栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

python中rabbitmq的简单使用--Windows

python中rabbitmq的简单使用--Windows

开始准备

首先需要安装rabbitmq和erlang,安装方式很简单

  • 其中需要注意的是版本问题 我暂时用的3.8.5和23.0
    https://www.rabbitmq.com/changelog.html
  • 还需要添加一下erlang
  • python中需要安装pika库

代码中的参数可能不一样,那就是版本问题

send.py
生产者

import random
import pika

# credentials = pika.PlainCredentials("","")
# 新建连接,rabbitmq安装在本地则hostname为'localhost'
hostname = 'localhost'
parameters = pika.ConnectionParameters(hostname)
connection = pika.BlockingConnection(parameters)

# 创建通道
channel = connection.channel()

# 声明一个队列,生产者和消费者都要声明一个相同的队列, 用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello') # durable=True队列持久化

number = random.randint(1, 1000)
body = 'hello world:%s' % number
# 交换机; 队列名,写明将消息发往哪个队列; 消息内容
# routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列
channel.basic_publish(exchange="", routing_key="task_queue", body=message,
                      properties=pika.BasicProperties(delivery_mode=2   # 使消息持久化,
                                                      )
                      )
print(" [x] Sent %s" % body)
connection.close()

reveive.py
消费者

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import pika

hostname = 'localhost'
parameters = pika.ConnectionParameters(hostname)
connection = pika.BlockingConnection(parameters)

# 创建通道
channel = connection.channel()
# channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    time.sleep(10)
    print(" [x] Received %r" % (body,))


# 告诉rabbitmq使用callback来接收信息
channel.basic_consume(queue="hello",on_message_callback=callback,auto_ack=True)

# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理,按ctrl+c退出
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消费者中需要注意 channel.basic_consume中的auto_ack,用于是否需要确认
确认就是在callback中加一个 ch.basic_ack(delivery_tag=method.delivery_tag)

recv_safe.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time

import pika

hostname = 'localhost'
parameters = pika.ConnectionParameters(hostname)
connection = pika.BlockingConnection(parameters)

# 创建通道
channel = connection.channel()

channel.queue_declare(queue='hello') # durable=True队列持久化


def callback(ch, method, properties, body):
    time.sleep(10)
    print(" [x] Received %r" % (body,))
    ch.basic_ack(delivery_tag=method.delivery_tag)
    # ack 确认


# 告诉rabbitmq使用callback来接收信息
channel.basic_consume(queue="hello", on_message_callback=callback,
                      auto_ack=False)  # auto_ack  True生产者发出之后就不管了,不需要确认  False

# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理,按ctrl+c退出
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

队列持久化,需要在声明队列时将属性durable设置为True
消息持久化,需要将basic_publish中的properties属性设置pika.BasicProperties(delivery_mode=2 )

如果队列关闭,消息也会关闭

这个是在本地使用的,如果连接其他需要用户认证和一个类似白名单的,具体之后再加

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663226.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号