多重处理通过传递任务(包括
check_one和
data)到工作进程
mp.SimpleQueue。与
Queue.Queues不同,放入s中的所有内容都
mp.SimpleQueue必须是可挑选的。
Queue.Queues不可选:
import multiprocessing as mpimport Queuedef foo(queue): passpool=mp.Pool()q=Queue.Queue()pool.map(foo,(q,))
产生此异常:
UnpickleableError: Cannot pickle <type 'thread.lock'> objects
您的
datainclude包括
packages一个Queue.Queue。这可能是问题的根源。
这是一个可能的解决方法:该方法
Queue有两个用途:
- 找出大概的大小(通过调用
qsize
) - 存储结果供以后检索。
qsize我们可以使用而不是在多个进程之间共享值,而无需调用
mp.Value。
除了将结果存储在队列中之外,我们可以(并且应该)仅从调用返回值
check_one。该
pool.map在自己制作的队列中收集结果,并返回结果的返回值
pool.map。
例如:
import multiprocessing as mpimport Queueimport randomimport logging# logger=mp.log_to_stderr(logging.DEBUG)logger = logging.getLogger(__name__)qsize = mp.Value('i', 1)def check_one(args): total, package, version = args i = qsize.value logger.info('r[{0:.1%} - {1}, {2} / {3}]'.format( i / float(total), package, i, total)) new_version = random.randrange(0,100) qsize.value += 1 if new_version > version: return (package, version, new_version, None) else: return Nonedef update(): logger.info('Searching for updates') set_len=10 data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100)) for i in range(set_len) ) pool = mp.Pool() results = pool.map(check_one, data) pool.close() pool.join() for result in results: if result is None: continue package, version, new_version, json = result txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format( package, new_version, version) logger.info(txt) logger.info('Updating finished successfully')if __name__=='__main__': logging.basicConfig(level=logging.DEBUG) update()


