Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions pso_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
x2 = x[1]
return x1**4 - 2*x2*x1**2 + x2**2 + x1**2 - 2*x1 + 5

lb = [-3, -1]
ub = [2, 6]
bounds = [(-3, 2), (-1, 6)]

xopt1, fopt1 = pso(myfunc, lb, ub)
xopt1, fopt1 = pso(myfunc, bounds)

print('The optimum is at:')

Check failure on line 17 in pso_examples.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal 'The optimum is at:' 3 times.

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMw30EzrWDwSmZU4&open=AZ2guMw30EzrWDwSmZU4&pullRequest=6
print(' {}'.format(xopt1))
print('Optimal function value:')
print(' myfunc: {}'.format(fopt1))
Expand All @@ -29,7 +28,7 @@
x2 = x[1]
return [-(x1 + 0.25)**2 + 0.75*x2]

xopt2, fopt2 = pso(myfunc, lb, ub, f_ieqcons=mycon)
xopt2, fopt2 = pso(myfunc, bounds, f_ieqcons=mycon)

print('The optimum is at:')
print(' {}'.format(xopt2))
Expand All @@ -46,22 +45,22 @@
print(' Deflection <= 0.25 inches')
def weight(x, *args):
H, d, t = x # all in inches
B, rho, E, P = args

Check warning on line 48 in pso_examples.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused local variable "E" with "_".

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMw30EzrWDwSmZU6&open=AZ2guMw30EzrWDwSmZU6&pullRequest=6

Check warning on line 48 in pso_examples.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused local variable "P" with "_".

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMw30EzrWDwSmZU5&open=AZ2guMw30EzrWDwSmZU5&pullRequest=6
return rho*2*np.pi*d*t*np.sqrt((B/2)**2 + H**2)

def stress(x, *args):
H, d, t = x # all in inches
B, rho, E, P = args

Check warning on line 53 in pso_examples.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused local variable "rho" with "_".

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMw30EzrWDwSmZU7&open=AZ2guMw30EzrWDwSmZU7&pullRequest=6

Check warning on line 53 in pso_examples.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused local variable "E" with "_".

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMw30EzrWDwSmZU8&open=AZ2guMw30EzrWDwSmZU8&pullRequest=6
return (P*np.sqrt((B/2)**2 + H**2))/(2*t*np.pi*d*H)

def buckling_stress(x, *args):
H, d, t = x # all in inches
B, rho, E, P = args

Check warning on line 58 in pso_examples.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused local variable "P" with "_".

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMw30EzrWDwSmZU-&open=AZ2guMw30EzrWDwSmZU-&pullRequest=6

Check warning on line 58 in pso_examples.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused local variable "rho" with "_".

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMw30EzrWDwSmZU9&open=AZ2guMw30EzrWDwSmZU9&pullRequest=6
return (np.pi**2*E*(d**2 + t**2))/(8*((B/2)**2 + H**2))

def deflection(x, *args):
H, d, t = x # all in inches
B, rho, E, P = args

Check warning on line 63 in pso_examples.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused local variable "rho" with "_".

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMw30EzrWDwSmZU_&open=AZ2guMw30EzrWDwSmZU_&pullRequest=6
return (P*np.sqrt((B/2)**2 + H**2)**3)/(2*t*np.pi*d*H**2*E)

def mycons(x, *args):
Expand All @@ -75,9 +74,8 @@
E = 30000 # kpsi
P = 66 # lb (force)
args = (B, rho, E, P)
lb = [10, 1, 0.01]
ub = [30, 3, 0.25]
xopt4, fopt4 = pso(weight, lb, ub, f_ieqcons=mycons, args=args)
bounds = [(10, 30), (1, 3), (0.01, 0.25)]
xopt4, fopt4 = pso(weight, bounds, f_ieqcons=mycons, args=args)

print('The optimum is at:')
print(' {}'.format(xopt4))
Expand Down
212 changes: 92 additions & 120 deletions pyswarm/pso.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
def _is_feasible_wrapper(func, x):
return np.all(func(x)>=0)

def _cons_none_wrapper(x):

Check warning on line 10 in pyswarm/pso.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused function parameter "x".

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMza0EzrWDwSmZVA&open=AZ2guMza0EzrWDwSmZVA&pullRequest=6
return np.array([0])

def _cons_ieqcons_wrapper(ieqcons, args, kwargs, x):
Expand All @@ -16,21 +16,18 @@
def _cons_f_ieqcons_wrapper(f_ieqcons, args, kwargs, x):
return np.array(f_ieqcons(x, *args, **kwargs))

def pso(func, lb, ub, ieqcons=[], f_ieqcons=None, args=(), kwargs={},
swarmsize=100, omega=0.5, phip=0.5, phig=0.5, maxiter=100,
minstep=1e-8, minfunc=1e-8, debug=False, processes=1,
particle_output=False):
def pso(func, bounds, ieqcons=[], f_ieqcons=None, args=(), kwargs={},

Check failure on line 19 in pyswarm/pso.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 46 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMza0EzrWDwSmZVC&open=AZ2guMza0EzrWDwSmZVC&pullRequest=6
swarmsize=100, omega=0.5, phip=0.5, phig=0.5, maxiter=100,
minstep=1e-12, minfunc=1e-12, debug=False):

Check warning on line 21 in pyswarm/pso.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Function "pso" has 14 parameters, which is greater than the 13 authorized.

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMza0EzrWDwSmZVB&open=AZ2guMza0EzrWDwSmZVB&pullRequest=6
"""
Perform a particle swarm optimization (PSO)

Parameters
==========
func : function
The function to be minimized
lb : array
The lower bounds of the design variable(s)
ub : array
The upper bounds of the design variable(s)
bounds: tuple array
The bounds of the design variable(s). In form [(lower, upper), ..., (lower, upper)]

Optional
========
Expand Down Expand Up @@ -87,148 +84,123 @@
The objective values at each position in p

"""

assert len(lb)==len(ub), 'Lower- and upper-bounds must be the same length'
lower_bound, upper_bound = [], []
for variable_bounds in bounds:
lower_bound.append(variable_bounds[0])
upper_bound.append(variable_bounds[1])

assert len(lower_bound) == len(upper_bound), 'Lower- and upper-bounds must be the same length'
assert hasattr(func, '__call__'), 'Invalid function handle'
lb = np.array(lb)
ub = np.array(ub)
assert np.all(ub>lb), 'All upper-bound values must be greater than lower-bound values'
vhigh = np.abs(ub - lb)
lower_bound = np.array(lower_bound)
upper_bound = np.array(upper_bound)
assert np.all(upper_bound > lower_bound), 'All upper-bound values must be greater than lower-bound values'

vhigh = np.abs(upper_bound - lower_bound)
vlow = -vhigh

# Initialize objective function
obj = partial(_obj_wrapper, func, args, kwargs)

# Check for constraint function(s) #########################################
obj = lambda x: func(x, *args, **kwargs)
if f_ieqcons is None:
if not len(ieqcons):
if debug:
print('No constraints given.')
cons = _cons_none_wrapper
cons = lambda x: np.array([0])
else:
if debug:
print('Converting ieqcons to a single constraint function')
cons = partial(_cons_ieqcons_wrapper, ieqcons, args, kwargs)
cons = lambda x: np.array([y(x, *args, **kwargs) for y in ieqcons])
else:
if debug:
print('Single constraint function given in f_ieqcons')
cons = partial(_cons_f_ieqcons_wrapper, f_ieqcons, args, kwargs)
is_feasible = partial(_is_feasible_wrapper, cons)

# Initialize the multiprocessing module if necessary
if processes > 1:
import multiprocessing
mp_pool = multiprocessing.Pool(processes)

cons = lambda x: np.array(f_ieqcons(x, *args, **kwargs))

def is_feasible(x):
check = np.all(cons(x) >= 0)
return check

# Initialize the particle swarm ############################################
S = swarmsize
D = len(lb) # the number of dimensions each particle has
D = len(lower_bound) # the number of dimensions each particle has
x = np.random.rand(S, D) # particle positions

Check warning on line 124 in pyswarm/pso.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a "numpy.random.Generator" here instead of this legacy function.

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMza0EzrWDwSmZVD&open=AZ2guMza0EzrWDwSmZVD&pullRequest=6
v = np.zeros_like(x) # particle velocities
p = np.zeros_like(x) # best particle positions
fx = np.zeros(S) # current particle function values
fs = np.zeros(S, dtype=bool) # feasibility of each particle
fp = np.ones(S)*np.inf # best particle function values
fp = np.zeros(S) # best particle function values
g = [] # best swarm position
fg = np.inf # best swarm position starting value

# Initialize the particle's position
x = lb + x*(ub - lb)
fg = 1e100 # artificial best swarm position starting value

for i in range(S):
# Initialize the particle's position
x[i, :] = lower_bound + x[i, :] * (upper_bound - lower_bound)

# Initialize the particle's best known position
p[i, :] = x[i, :]

# Calculate the objective's value at the current particle's
fp[i] = obj(p[i, :])

# Calculate objective and constraints for each particle
if processes > 1:
fx = np.array(mp_pool.map(obj, x))
fs = np.array(mp_pool.map(is_feasible, x))
else:
for i in range(S):
fx[i] = obj(x[i, :])
fs[i] = is_feasible(x[i, :])

# Store particle's best position (if constraints are satisfied)
i_update = np.logical_and((fx < fp), fs)
p[i_update, :] = x[i_update, :].copy()
fp[i_update] = fx[i_update]

# Update swarm's best position
i_min = np.argmin(fp)
if fp[i_min] < fg:
fg = fp[i_min]
g = p[i_min, :].copy()
else:
# At the start, there may not be any feasible starting point, so just
# give it a temporary "best" point since it's likely to change
g = x[0, :].copy()

# Initialize the particle's velocity
v = vlow + np.random.rand(S, D)*(vhigh - vlow)

if i == 0:
g = p[0, :].copy()

# If the current particle's position is better than the swarm's,
# update the best swarm position
if fp[i] < fg and is_feasible(p[i, :]):
fg = fp[i]
g = p[i, :].copy()

# Initialize the particle's velocity
v[i, :] = vlow + np.random.rand(D) * (vhigh - vlow)

Check warning on line 153 in pyswarm/pso.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a "numpy.random.Generator" here instead of this legacy function.

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMza0EzrWDwSmZVE&open=AZ2guMza0EzrWDwSmZVE&pullRequest=6

# Iterate until termination criterion met ##################################
it = 1
while it <= maxiter:
iteration_termination = 1
while iteration_termination <= maxiter:
rp = np.random.uniform(size=(S, D))

Check warning on line 158 in pyswarm/pso.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a "numpy.random.Generator" here instead of this legacy function.

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMza0EzrWDwSmZVF&open=AZ2guMza0EzrWDwSmZVF&pullRequest=6
rg = np.random.uniform(size=(S, D))

Check warning on line 159 in pyswarm/pso.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a "numpy.random.Generator" here instead of this legacy function.

See more on https://sonarcloud.io/project/issues?id=eggzec_pyswarm&issues=AZ2guMza0EzrWDwSmZVG&open=AZ2guMza0EzrWDwSmZVG&pullRequest=6
for i in range(S):

# Update the particles velocities
v = omega*v + phip*rp*(p - x) + phig*rg*(g - x)
# Update the particles' positions
x = x + v
# Correct for bound violations
maskl = x < lb
masku = x > ub
x = x*(~np.logical_or(maskl, masku)) + lb*maskl + ub*masku

# Update objectives and constraints
if processes > 1:
fx = np.array(mp_pool.map(obj, x))
fs = np.array(mp_pool.map(is_feasible, x))
else:
for i in range(S):
fx[i] = obj(x[i, :])
fs[i] = is_feasible(x[i, :])

# Store particle's best position (if constraints are satisfied)
i_update = np.logical_and((fx < fp), fs)
p[i_update, :] = x[i_update, :].copy()
fp[i_update] = fx[i_update]

# Compare swarm's best position with global best position
i_min = np.argmin(fp)
if fp[i_min] < fg:
if debug:
print('New best for swarm at iteration {:}: {:} {:}'\
.format(it, p[i_min, :], fp[i_min]))

p_min = p[i_min, :].copy()
stepsize = np.sqrt(np.sum((g - p_min)**2))

if np.abs(fg - fp[i_min]) <= minfunc:
print('Stopping search: Swarm best objective change less than {:}'\
.format(minfunc))
if particle_output:
return p_min, fp[i_min], p, fp
else:
return p_min, fp[i_min]
elif stepsize <= minstep:
print('Stopping search: Swarm best position change less than {:}'\
.format(minstep))
if particle_output:
return p_min, fp[i_min], p, fp
else:
return p_min, fp[i_min]
else:
g = p_min.copy()
fg = fp[i_min]
# Update the particle's velocity
v[i, :] = omega * v[i, :] + phip * rp[i, :] * (p[i, :] - x[i, :]) + \
phig * rg[i, :] * (g - x[i, :])

# Update the particle's position, correcting lower and upper bound
# violations, then update the objective function value
x[i, :] = x[i, :] + v[i, :]
mark1 = x[i, :] < lower_bound
mark2 = x[i, :] > upper_bound
x[i, mark1] = lower_bound[mark1]
x[i, mark2] = upper_bound[mark2]
fx = obj(x[i, :])

# Compare particle's best position (if constraints are satisfied)
if fx < fp[i] and is_feasible(x[i, :]):
p[i, :] = x[i, :].copy()
fp[i] = fx

# Compare swarm's best position to current particle's position
# (Can only get here if constraints are satisfied)
if fx < fg:
if debug:
print('New best for swarm at iteration {:}: {:} {:}'.format(iteration_termination, x[i, :], fx))

tmp = x[i, :].copy()
stepsize = np.sqrt(np.sum((g - tmp) ** 2))
if np.abs(fg - fx) <= minfunc:
print('Stopping search: Swarm best objective change less than {:}'.format(minfunc))
return tmp, fx
elif stepsize <= minstep:
print('Stopping search: Swarm best position change less than {:}'.format(minstep))
return tmp, fx
else:
g = tmp.copy()
fg = fx

if debug:
print('Best after iteration {:}: {:} {:}'.format(it, g, fg))
it += 1
print('Best after iteration {:}: {:} {:}'.format(iteration_termination, g, fg))
iteration_termination += 1

print('Stopping search: maximum iterations reached --> {:}'.format(maxiter))

if not is_feasible(g):
print("However, the optimization couldn't find a feasible design. Sorry")
if particle_output:
return g, fg, p, fp
else:
return g, fg
return g, fg