-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathoptimizers.py
118 lines (95 loc) · 4.22 KB
/
optimizers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import scipy.stats as stats
class Optimizer:
def __init__(self, *args, **kwargs):
pass
def setup(self, cost_function):
raise NotImplementedError("Must be implemented in subclass.")
def reset(self):
raise NotImplementedError("Must be implemented in subclass.")
def obtain_solution(self, *args, **kwargs):
raise NotImplementedError("Must be implemented in subclass.")
class CEMOptimizer(Optimizer):
def __init__(self,
sol_dim,
max_iters,
popsize,
num_elites,
cost_function,
upper_bound=None,
lower_bound=None,
epsilon=0.001,
alpha=0.25):
"""Creates an instance of this class.
Arguments:
sol_dim (int): The dimensionality of the problem space
max_iters (int): The maximum number of iterations to perform during optimization
popsize (int): The number of candidate solutions to be sampled at every iteration
num_elites (int): The number of top solutions that will be used to obtain the distribution
at the next iteration.
upper_bound (np.array): An array of upper bounds
lower_bound (np.array): An array of lower bounds
epsilon (float): A minimum variance. If the maximum variance drops below epsilon, optimization is
stopped.
alpha (float): Controls how much of the previous mean and variance is used for the next iteration.
next_mean = alpha * old_mean + (1 - alpha) * elite_mean, and similarly for variance.
"""
super().__init__()
self.sol_dim, self.max_iters, self.popsize, self.num_elites = sol_dim, max_iters, popsize, num_elites
# self.sol_dim = sol_dim
# self.max_iters = max_iters
# self.popsize = 2
# self.num_elites = 1
self.ub, self.lb = upper_bound, lower_bound
self.epsilon, self.alpha = epsilon, alpha
self.cost_function = cost_function
if num_elites > popsize:
raise ValueError(
"Number of elites must be at most the population size.")
def reset(self):
pass
def obtain_solution(self,
init_mean,
init_var,
query_action=None,
hor=None,
iters=None):
"""Optimizes the cost function using the provided initial candidate distribution
Arguments:
init_mean (np.ndarray): The mean of the initial candidate distribution.
init_var (np.ndarray): The variance of the initial candidate distribution.
"""
mean, var, t = init_mean, init_var, 0
X = stats.truncnorm(
-2, 2, loc=np.zeros_like(mean), scale=np.ones_like(var))
if iters is None:
iters = self.max_iters
while (t < iters) and np.max(var) > self.epsilon:
lb_dist, ub_dist = mean - self.lb[:len(init_mean)], self.ub[:len(
init_mean)] - mean
constrained_var = np.minimum(
np.minimum(np.square(lb_dist / 2), np.square(ub_dist / 2)),
var)
samples = X.rvs(size=[self.popsize, len(init_mean)]) * np.sqrt(
constrained_var) + mean
samples = samples.astype(np.float32)
if query_action is None:
costs = self.cost_function(samples)
else:
costs = self.cost_function(
np.hstack((np.tile(query_action, (len(samples), 1)),
samples)),
reachability=True)
elites = samples[np.argsort(costs)][:self.num_elites]
new_mean = np.mean(elites, axis=0)
new_var = np.var(elites, axis=0)
mean = self.alpha * mean + (1 - self.alpha) * new_mean
var = self.alpha * var + (1 - self.alpha) * new_var
t += 1
if query_action is None:
return mean
else:
return mean, costs.min()