接第二十八篇笔记,在处理大规模问题时,一方面计算每个个体的fitness value需要很大算力,相应耽误较多时间,另一方面大规模种群的优化计算也是耗时费力的。万幸有人帮我们开发了Python并行加速框架Ray,入门简单,省时省力,提速明显。
Ray框架的介绍在下面的链接。
Modern Parallel and Distributed Python: A Quick Tutorial on Ray
pip install pytest-runner pip install ray pip install ray[default]Ray入门
正确用法:
import time
import ray
ray.init()
start = time.time()
@ray.remote
def wait_time():
time.sleep(2)
results_id = []
for i in range(5):
results_id.append(wait_time.remote())
ray.get(results_id)
ray.shutdown()
print("等待时间: {}s".format(time.time()-start))
# 等待时间: 4.921686410903931s
错误用法:
import time
import ray
ray.init()
start = time.time()
@ray.remote
def wait_time():
time.sleep(2)
for i in range(5):
ray.get(wait_time.remote())
ray.shutdown()
print("等待时间: {}s".format(time.time()-start))
# 等待时间: 12.869806051254272s
原因是ray.get()会阻塞进程,所以需要先整理一个进程表。
正确用法:
ray.init()
def a():
return 1
@ray.remote
def b():
return a()
c = ray.get(b.remote())
ray.shutdown()
print(c)
错误用法:
ray.init()
@ray.remote
def b():
return a()
def a():
return 1
c = ray.get(b.remote())
ray.shutdown()
print(c)
正确写法:
ray.init()
@ray.remote
def b():
return ray.get(a.remote())
@ray.remote
def a():
return 1
c = ray.get(b.remote())
ray.shutdown()
print(c)
# 1
函数编写的先后顺序会影响ray,例如b函数调用a函数,那么a不加修饰器,就必须放置b前面。
Ray Actor入门Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker.
Actor 将 Ray API 从函数(任务)扩展到类。Actor本质上是一个有状态的worker(或服务)。实例化新actor时,将创建新的工作线程,并且actor的method被安排在特定工作线程上,并且可以访问和改变该工作线程的状态。
正确用法
@ray.remote(num_cpus=2, num_gpus=0.5) # 指定计算资源
class Counter(object):
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
# Create an actor from this class.
counter = Counter.remote()
调用Actor及其对象
obj_ref = counter.increment.remote() assert ray.get(obj_ref) == 1
调用不同对象的method是并行进行的,但调用同一对象的method将按照程序编写的顺序逐一串行调用。
# Create ten Counter actors. counters = [Counter.remote() for _ in range(10)] # Increment each Counter once and get the results. These tasks all happen in # parallel. results = ray.get([c.increment.remote() for c in counters]) print(results) # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] # Increment the first Counter five times. These tasks are executed serially # and share state. results = ray.get([counters[0].increment.remote() for _ in range(5)]) print(results) # prints [2, 3, 4, 5, 6]
其他详细内容请见教程和知乎。
利用Ray加速CMA-ES实验遗憾的是,在如下参数配置下实验,时间反而要比第二十八篇笔记的时间多出10余秒,在没有进一步分析的情况下,只能将原因归咎于fitness函数过于简单了。
import cma, ray, time
import numpy as np
import matplotlib.pyplot as plt
ray.init(num_cpus=10)
@ray.remote
def fitness(x):
x = np.array(x)
time.sleep(0.0001)
return ((x - np.pi)**2).sum()
es = cma.CMAEvolutionStrategy(x0=[0.]*30,
sigma0=0.1,
inopts={'popsize': 500
})
tick = time.time()
log = []
for _ in range(200):
solutions = es.ask()
fit = []
results = [fitness.remote(solution) for solution in solutions]
for result in results:
fit.append(ray.get(result))
log.append([min(fit),max(fit)])
es.tell(solutions, fit)
print('time:',time.time()-tick)
print('best solution',solutions[np.argmin(fit)])
ray.shutdown()
'''
log = np.array(log)
plt.plot(log[:,0])
plt.plot(log[:,1])
plt.grid(); plt.xlabel('iteration'); plt.ylabel('fitness value')
plt.legend(['min fit in pop', 'max fit in pop'])
plt.show()
'''



