asyncio.Queue不是线程安全的,因此您不能直接在多个线程中直接使用它。相反,您可以使用
janus,它是提供线程感知
asyncio队列的第三方库:
import asyncioimport threadingimport janusdef threaded(squeue): import time while True: time.sleep(2) squeue.put_nowait(time.time()) print(squeue.qsize())@asyncio.coroutinedef async(aqueue): while True: time = yield from aqueue.get() print(time)loop = asyncio.get_event_loop()queue = janus.Queue(loop=loop)asyncio.Task(asyncio.ensure_future(queue.async_q))threading.Thread(target=threaded, args=(queue.sync_q,)).start()loop.run_forever()
还有
aioprocessing(全披露:我写了它),它还提供了进程安全的队列(作为副作用,线程安全的队列),但是如果您不尝试使用,那就太过分了
multiprocessing。
编辑
正如在其他答案中指出的那样,对于简单的用例,您也可以使用
loop.call_soon_threadsafe添加到队列中。



