diff --git a/pso_examples.py b/pso_examples.py index c946788..f88df6f 100644 --- a/pso_examples.py +++ b/pso_examples.py @@ -10,10 +10,9 @@ def myfunc(x): x2 = x[1] return x1**4 - 2*x2*x1**2 + x2**2 + x1**2 - 2*x1 + 5 -lb = [-3, -1] -ub = [2, 6] +bounds = [(-3, 2), (-1, 6)] -xopt1, fopt1 = pso(myfunc, lb, ub) +xopt1, fopt1 = pso(myfunc, bounds) print('The optimum is at:') print(' {}'.format(xopt1)) @@ -29,7 +28,7 @@ def mycon(x): x2 = x[1] return [-(x1 + 0.25)**2 + 0.75*x2] -xopt2, fopt2 = pso(myfunc, lb, ub, f_ieqcons=mycon) +xopt2, fopt2 = pso(myfunc, bounds, f_ieqcons=mycon) print('The optimum is at:') print(' {}'.format(xopt2)) @@ -75,9 +74,8 @@ def mycons(x, *args): E = 30000 # kpsi P = 66 # lb (force) args = (B, rho, E, P) -lb = [10, 1, 0.01] -ub = [30, 3, 0.25] -xopt4, fopt4 = pso(weight, lb, ub, f_ieqcons=mycons, args=args) +bounds = [(10, 30), (1, 3), (0.01, 0.25)] +xopt4, fopt4 = pso(weight, bounds, f_ieqcons=mycons, args=args) print('The optimum is at:') print(' {}'.format(xopt4)) diff --git a/pyswarm/pso.py b/pyswarm/pso.py index 1811cd9..e013c5f 100644 --- a/pyswarm/pso.py +++ b/pyswarm/pso.py @@ -16,10 +16,9 @@ def _cons_ieqcons_wrapper(ieqcons, args, kwargs, x): def _cons_f_ieqcons_wrapper(f_ieqcons, args, kwargs, x): return np.array(f_ieqcons(x, *args, **kwargs)) -def pso(func, lb, ub, ieqcons=[], f_ieqcons=None, args=(), kwargs={}, - swarmsize=100, omega=0.5, phip=0.5, phig=0.5, maxiter=100, - minstep=1e-8, minfunc=1e-8, debug=False, processes=1, - particle_output=False): +def pso(func, bounds, ieqcons=[], f_ieqcons=None, args=(), kwargs={}, + swarmsize=100, omega=0.5, phip=0.5, phig=0.5, maxiter=100, + minstep=1e-12, minfunc=1e-12, debug=False): """ Perform a particle swarm optimization (PSO) @@ -27,10 +26,8 @@ def pso(func, lb, ub, ieqcons=[], f_ieqcons=None, args=(), kwargs={}, ========== func : function The function to be minimized - lb : array - The lower bounds of the design variable(s) - ub : array - The upper bounds of the design variable(s) + bounds: tuple array + The bounds of the design variable(s). In form [(lower, upper), ..., (lower, upper)] Optional ======== @@ -87,148 +84,123 @@ def pso(func, lb, ub, ieqcons=[], f_ieqcons=None, args=(), kwargs={}, The objective values at each position in p """ - - assert len(lb)==len(ub), 'Lower- and upper-bounds must be the same length' + lower_bound, upper_bound = [], [] + for variable_bounds in bounds: + lower_bound.append(variable_bounds[0]) + upper_bound.append(variable_bounds[1]) + + assert len(lower_bound) == len(upper_bound), 'Lower- and upper-bounds must be the same length' assert hasattr(func, '__call__'), 'Invalid function handle' - lb = np.array(lb) - ub = np.array(ub) - assert np.all(ub>lb), 'All upper-bound values must be greater than lower-bound values' - - vhigh = np.abs(ub - lb) + lower_bound = np.array(lower_bound) + upper_bound = np.array(upper_bound) + assert np.all(upper_bound > lower_bound), 'All upper-bound values must be greater than lower-bound values' + + vhigh = np.abs(upper_bound - lower_bound) vlow = -vhigh - # Initialize objective function - obj = partial(_obj_wrapper, func, args, kwargs) - # Check for constraint function(s) ######################################### + obj = lambda x: func(x, *args, **kwargs) if f_ieqcons is None: if not len(ieqcons): if debug: print('No constraints given.') - cons = _cons_none_wrapper + cons = lambda x: np.array([0]) else: if debug: print('Converting ieqcons to a single constraint function') - cons = partial(_cons_ieqcons_wrapper, ieqcons, args, kwargs) + cons = lambda x: np.array([y(x, *args, **kwargs) for y in ieqcons]) else: if debug: print('Single constraint function given in f_ieqcons') - cons = partial(_cons_f_ieqcons_wrapper, f_ieqcons, args, kwargs) - is_feasible = partial(_is_feasible_wrapper, cons) - - # Initialize the multiprocessing module if necessary - if processes > 1: - import multiprocessing - mp_pool = multiprocessing.Pool(processes) - + cons = lambda x: np.array(f_ieqcons(x, *args, **kwargs)) + + def is_feasible(x): + check = np.all(cons(x) >= 0) + return check + # Initialize the particle swarm ############################################ S = swarmsize - D = len(lb) # the number of dimensions each particle has + D = len(lower_bound) # the number of dimensions each particle has x = np.random.rand(S, D) # particle positions v = np.zeros_like(x) # particle velocities p = np.zeros_like(x) # best particle positions - fx = np.zeros(S) # current particle function values - fs = np.zeros(S, dtype=bool) # feasibility of each particle - fp = np.ones(S)*np.inf # best particle function values + fp = np.zeros(S) # best particle function values g = [] # best swarm position - fg = np.inf # best swarm position starting value - - # Initialize the particle's position - x = lb + x*(ub - lb) + fg = 1e100 # artificial best swarm position starting value + + for i in range(S): + # Initialize the particle's position + x[i, :] = lower_bound + x[i, :] * (upper_bound - lower_bound) + + # Initialize the particle's best known position + p[i, :] = x[i, :] + + # Calculate the objective's value at the current particle's + fp[i] = obj(p[i, :]) - # Calculate objective and constraints for each particle - if processes > 1: - fx = np.array(mp_pool.map(obj, x)) - fs = np.array(mp_pool.map(is_feasible, x)) - else: - for i in range(S): - fx[i] = obj(x[i, :]) - fs[i] = is_feasible(x[i, :]) - - # Store particle's best position (if constraints are satisfied) - i_update = np.logical_and((fx < fp), fs) - p[i_update, :] = x[i_update, :].copy() - fp[i_update] = fx[i_update] - - # Update swarm's best position - i_min = np.argmin(fp) - if fp[i_min] < fg: - fg = fp[i_min] - g = p[i_min, :].copy() - else: # At the start, there may not be any feasible starting point, so just # give it a temporary "best" point since it's likely to change - g = x[0, :].copy() - - # Initialize the particle's velocity - v = vlow + np.random.rand(S, D)*(vhigh - vlow) - + if i == 0: + g = p[0, :].copy() + + # If the current particle's position is better than the swarm's, + # update the best swarm position + if fp[i] < fg and is_feasible(p[i, :]): + fg = fp[i] + g = p[i, :].copy() + + # Initialize the particle's velocity + v[i, :] = vlow + np.random.rand(D) * (vhigh - vlow) + # Iterate until termination criterion met ################################## - it = 1 - while it <= maxiter: + iteration_termination = 1 + while iteration_termination <= maxiter: rp = np.random.uniform(size=(S, D)) rg = np.random.uniform(size=(S, D)) + for i in range(S): - # Update the particles velocities - v = omega*v + phip*rp*(p - x) + phig*rg*(g - x) - # Update the particles' positions - x = x + v - # Correct for bound violations - maskl = x < lb - masku = x > ub - x = x*(~np.logical_or(maskl, masku)) + lb*maskl + ub*masku - - # Update objectives and constraints - if processes > 1: - fx = np.array(mp_pool.map(obj, x)) - fs = np.array(mp_pool.map(is_feasible, x)) - else: - for i in range(S): - fx[i] = obj(x[i, :]) - fs[i] = is_feasible(x[i, :]) - - # Store particle's best position (if constraints are satisfied) - i_update = np.logical_and((fx < fp), fs) - p[i_update, :] = x[i_update, :].copy() - fp[i_update] = fx[i_update] - - # Compare swarm's best position with global best position - i_min = np.argmin(fp) - if fp[i_min] < fg: - if debug: - print('New best for swarm at iteration {:}: {:} {:}'\ - .format(it, p[i_min, :], fp[i_min])) - - p_min = p[i_min, :].copy() - stepsize = np.sqrt(np.sum((g - p_min)**2)) - - if np.abs(fg - fp[i_min]) <= minfunc: - print('Stopping search: Swarm best objective change less than {:}'\ - .format(minfunc)) - if particle_output: - return p_min, fp[i_min], p, fp - else: - return p_min, fp[i_min] - elif stepsize <= minstep: - print('Stopping search: Swarm best position change less than {:}'\ - .format(minstep)) - if particle_output: - return p_min, fp[i_min], p, fp - else: - return p_min, fp[i_min] - else: - g = p_min.copy() - fg = fp[i_min] + # Update the particle's velocity + v[i, :] = omega * v[i, :] + phip * rp[i, :] * (p[i, :] - x[i, :]) + \ + phig * rg[i, :] * (g - x[i, :]) + + # Update the particle's position, correcting lower and upper bound + # violations, then update the objective function value + x[i, :] = x[i, :] + v[i, :] + mark1 = x[i, :] < lower_bound + mark2 = x[i, :] > upper_bound + x[i, mark1] = lower_bound[mark1] + x[i, mark2] = upper_bound[mark2] + fx = obj(x[i, :]) + + # Compare particle's best position (if constraints are satisfied) + if fx < fp[i] and is_feasible(x[i, :]): + p[i, :] = x[i, :].copy() + fp[i] = fx + + # Compare swarm's best position to current particle's position + # (Can only get here if constraints are satisfied) + if fx < fg: + if debug: + print('New best for swarm at iteration {:}: {:} {:}'.format(iteration_termination, x[i, :], fx)) + + tmp = x[i, :].copy() + stepsize = np.sqrt(np.sum((g - tmp) ** 2)) + if np.abs(fg - fx) <= minfunc: + print('Stopping search: Swarm best objective change less than {:}'.format(minfunc)) + return tmp, fx + elif stepsize <= minstep: + print('Stopping search: Swarm best position change less than {:}'.format(minstep)) + return tmp, fx + else: + g = tmp.copy() + fg = fx if debug: - print('Best after iteration {:}: {:} {:}'.format(it, g, fg)) - it += 1 + print('Best after iteration {:}: {:} {:}'.format(iteration_termination, g, fg)) + iteration_termination += 1 print('Stopping search: maximum iterations reached --> {:}'.format(maxiter)) - + if not is_feasible(g): print("However, the optimization couldn't find a feasible design. Sorry") - if particle_output: - return g, fg, p, fp - else: - return g, fg + return g, fg