栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

aio

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

aio

aip_pika篇—实现收发功能 发送

publisher.py

import asyncio

from aio_pika import DeliveryMode, ExchangeType, Message, connect


async def main() -> None:
    # Perform connection
    connection = await connect(host='127.0.0.1',port=5672,login='ai.litter',
                               password='r7n2ApE2yYk3yVz',virtualhost='sli')

    async with connection:
        # Creating a channel
        channel = await connection.channel()
        logs_exchange = await channel.declare_exchange(
            "ai.litter", ExchangeType.DIRECT,durable=True
        )

        # Sending the message
        for i in range(1000):
            message_body = 'hello  - {}'.format(str(i)).encode()
            message = Message(
                body=message_body,
                delivery_mode=DeliveryMode.PERSISTENT,
            )
            await asyncio.sleep(1)

            # routing_key = "hello"
            routing_key = "pool_queue"

            await logs_exchange.publish(message, routing_key=routing_key)

            print(f" [x] Sent {message.body!r}")


if __name__ == "__main__":
    asyncio.run(main())

接收并发送
import aio_pika
from aio_pika import ExchangeType,Message
from aio_pika.abc import AbstractRobustConnection, AbstractIncomingMessage
from aio_pika.pool import Pool
from utils import receive_task


async def main() -> None:
    loop = asyncio.get_event_loop()

    async def get_connection() -> AbstractRobustConnection:
        # return await aio_pika.connect_robust("amqp://guest:guest@localhost/")
        return await aio_pika.connect_robust(host='127.0.0.1', port=5672, login='ai.litter',
                                             password='r7n2ApE2yYk3yVz', virtualhost='slife')

    connection_pool: Pool = Pool(get_connection, max_size=2, loop=loop)

    async def get_channel() -> aio_pika.Channel:
        async with connection_pool.acquire() as connection:
            return await connection.channel()

    channel_pool: Pool = Pool(get_channel, max_size=10, loop=loop)

    async def consume() -> None:
        async with channel_pool.acquire() as channel:
            await channel.set_qos(20)
            direct_exchange = await channel.declare_exchange(
                "ai.litter", ExchangeType.DIRECT, durable=True
            )
            queue_name = "pool_queue"

            queue = await channel.declare_queue(
                queue_name, durable=False, auto_delete=False,
            )
            await queue.bind(direct_exchange, routing_key=queue_name)
            async with queue.iterator() as queue_iter:
                message: AbstractIncomingMessage
                async for message in queue_iter:
                    try:
                        print('task received, handling')
                        print(str(message.body))
                        await receive_task(message.body, publish_func=publish)
                    except Exception as e:
                        print('message nacked, exception=', e)
                        await message.nack(requeue=False)
                    else:
                        print('task finished')
                        try:
                            await message.ack()
                        except:
                            await channel.reopen()

    async def publish(message: bytes, queue_name: str) -> None:
        async with channel_pool.acquire() as channel:
            # queue_name = "test_queue"
            routing_key = "test_queue"
            # Declaring exchange
            exchange = await channel.declare_exchange("direct", auto_delete=True)

            # Declaring queue
            queue = await channel.declare_queue(queue_name, auto_delete=True)

            # Binding queue
            await queue.bind(exchange, routing_key)

            await exchange.publish(
                Message(
                    # bytes("wwwwwwwwwwwwww", "utf-8"),
                    message,
                    # content_type="text/plain",
                    # headers={"foo": "bar"},
                ),
                routing_key,
            )

    async with connection_pool, channel_pool:
        task = loop.create_task(consume())
        print('amqp consumer created, waiting for task...')

        # await asyncio.wait([publish('hello world -- {}'.format(str(i)).encode(), queue_name) for i in range(5)])
        await task


if __name__ == "__main__":
    asyncio.run(main())import asyncio

查看收到的内容

utils.py

from typing import Callable, Coroutine, Any
import asyncio
from asyncio import queues


async def receive_task(body: bytes, publish_func: Callable[[bytes, str], Coroutine[Any, Any, None]]):
    print("aaa ----->  " + str(body))

    await asyncio.wait([publish_func('good Job -- {}'.format(str(body)).encode(), 'hello')])
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/768338.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号