- 1.rabbit mq和mqtt
- 1.1mq
- 1.2mqtt
- 2.集成rabbitmq收发消息
- 2.1引入pom文件增加配置文件
- 2.2参数说明
- 2.2.1交换机
- 2.2.2队列
- 2.2.3消息
- 2.3发送消息
- 2.4接收消息
- 3.集成mqtt收发消息
- 3.1引入pom文件增加配置文件
- 3.2发送消息
- 3.3接收消息
- 4.github源码地址
之前介绍过rabbitmq的安装和底层实现有兴趣可以查阅之前的博客。
重要的说明:
- 生产者 producer:生产者向队列发送消息
- 消费者 consumer:消费者从队列获取消息
- 交换机 exchange:生产者将消息发给交换机,由交换机分发消息
- 通道 channel:信道,用于通信
- 队列 queue:储存消息的地方
- 路由键 RoutingKey:指定当前消息被谁接受
- 绑定key BindingKey:指定当前Exchange下,什么样的路由键会被下派到当前绑定的Queue中
今天抽象型的说一下mq各部分的运行机制:
交换机: 队列与交换机绑定,然后指定路由键,这样消息发送到交换机,交换机根据路由键去投递到匹配的队列中。
- Direct Exchange 直连交换机
需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。 - Fanout Exchange 扇形交换机
一种不使用路由键的交换机,收到的消息直接投递到绑定的全部队列上,一个队列一份。 - Topic Exchange 主题交换机
路由键匹配,他的路由键支持通配符比如"test/",可以匹配到多个路由键,假设交换机收到两条消息路由键是“test/1”“test/2”,那么交换机会发送给路由键是"test/"的交换机一份。
“#”会匹配之后多级的目录,“”只能匹配后一级目录。
test/ : test/1
test/# : test/1/2/3/4/5/6/7/8/9/10
队列:
- 简单队列:Hello World
一个生产者对应一个消费者没有中间商赚差价(交换机)。 - work模式:Work queus
一个生产者多个消费者,消息只会被一个消费者处理,同一条消息不会被重复消费,也没有中间商赚差价(交换机)。 - 订阅模式:Publish/Subscribe
类似于mqtt协议的主题订阅,消息先到交换机,交换机再将消息分配给已经绑定的消费者。 - 路由模式:Routing
和订阅模式极度相同,队列可以设置多个key绑定交换机任何一个匹配上了都可以接收消息,比如key为 test/get、test/put,test/get和test/put任何一个来消息了路由器都会投递到队列中。 - 通配符模式:Topics
和交换机的同通配符差不多,只是这次通配符用在了队列绑定交换机的key上 - RPC(没啥用)
MQTT是当前物联网使用比较多的一个协议,他是以主题订阅的形式进行工作,类似于八婆传瞎话,某一个八婆在某个小团体中说别人的坏话,然后你不在它的小团里不行,它说别人坏话的时候你不在也听不见。
(mqttclient订阅某一个主题,然后所有订阅这个主题的用户都可以收发消息,某个客户端发送消息其他所有在线用户会同步收到,即时消息不会存储,不在线收不到消息,即使下线又上线之前不在线的消息也不会收到)
注意:一定要现有队列才可以发送消息,发送消息并不会创建交换机和队列,可以先让消费者启动也就是RabbitListener他会根据配置创建。
2.1引入pom文件增加配置文件POM文件
org.springframework.boot spring-boot-starter-amqp 2.1.6.RELEASE
porperties
spring:
#rabbitmq
rabbitmq:
port: 5672
username: 账号
password: 密码
virtual-host: /
host: ip
listener:
simple:
## auto表示自动完成这次消费,manual表示需要手动告知完成消费
acknowledge-mode: manual
## listener开几个线程处理数据
concurrency: 1
## linstener 最大开几个线程
max-concurrency: 1
## 一次拿几条数据
prefetch: 1
# 开启重试,重试5次 间隔1秒
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 1000
# 是否进入死信队列 true是 false不是
default-requeue-rejected: false
2.2参数说明
2.2.1交换机
@AliasFor("name")
String value() default ""; // 交换机名称(两个都是)
@AliasFor("value")
String name() default ""; // 交换机名称(两个都是)
String type() default "direct"; // 交换机类型
String durable() default "true"; // 是否是持久化的,即使rabbitmq重启,交换机是否存在
String autoDelete() default "false"; // 当没有队列绑定交换机自动销毁
String internal() default "false"; // 是否为内部交换机,内部交换机只能路由交换机到交换机
String ignoreDeclarationExceptions() default "false"; // 忽略声明异常
String delayed() default "false"; // 是否开启延迟消息,需要使用延迟消息插件
Argument[] arguments() default {}; // 结构化参数,发送消息的时候,额外设置消息的参数(也就是header信息)
String declare() default "true"; // 是否有管理员
String[] admins() default {}; // 返回应该声明此组件的管理bean名称列表。默认情况下,所有管理员都将声明它
2.2.2队列
@AliasFor("name")
String value() default ""; // 队列名称(两个都是)
@AliasFor("value")
String name() default ""; // 队列名称(两个都是)
String durable() default ""; // 是否是持久化的,即使rabbitmq重启,队列是否存在
String exclusive() default ""; // 是否是排他队列,是否只在第一次创建它的Connection中有效,当Connection关闭,该Queue也会被删除
String autoDelete() default ""; // 当没有消费者自动销毁队列
String ignoreDeclarationExceptions() default "false"; // 忽略声明异常
Argument[] arguments() default {}; // 结构化参数,发送消息的时候,额外设置消息的参数(也就是header信息)
String declare() default "true"; // 是否有管理员
String[] admins() default {}; // 返回应该声明此组件的管理bean名称列表。默认情况下,所有管理员都将声明它
2.2.3消息
- 设置exchange为持久化之后,并不能保证消息不丢失,因为此时发送往exchange中的消息并不是持久化的,需要配置delivery_mode=2指明message为持久的。FanoutExchange 发送的消息默认就是持久化
@Component
public class SendMessage implements CommandLineRunner {
private int index = 0;
private RabbitTemplate rabbitTemplate;
public SendMessage(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) throws InterruptedException {
//测试使用代码
Boolean where = true;
while (where) {
System.out.println("Sending message...");
try {
rabbitTemplate.convertAndSend("test_exchange", "", "发送消息" + index);
} catch (Exception e) {
e.printStackTrace();
}
index++;
Thread.sleep(1000);
}
}
}
2.4接收消息
@Component
public class ReceiveMessage {
@Autowired
private MqttClient mqttClient;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "test_queue", durable = "true"),
exchange = @Exchange(name = "test_exchange", type = "fanout"),
key = ""))
@RabbitHandler
private void receive1(Message message, Channel channel) {
//消费者操作
try {
receiveMessage(new String(message.getBody()));
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 正常消费队列,第二个参数的意思是小于该ack的消息是否等待全部完成后再一次性签收
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 将消息重新放回队列
} catch (Exception e) {
e.printStackTrace();
}
}
public void receiveMessage(String message) {
// 1.接收AMQP消息
Thread t = Thread.currentThread();
String name = t.getName();
System.out.println("name=" + name);
System.out.println("-------------------接收消息-------------------");
System.out.println(message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 2.发送mqtt 消息
String mqttTopic = "test";
MqttMessage msg = new MqttMessage();
String msgStr = message;
msg.setPayload(msgStr.getBytes());//设置消息内容
msg.setQos(0);//设置消息发送质量,可为0,1,2.
msg.setRetained(false);//服务器是否保存最后一条消息,若保存,client再次上线时,将再次受到上次发送的最后一条消息。
try {
mqttClient.publish(mqttTopic, msg);//设置消息的topic,并发送
} catch (MqttException e) {
e.printStackTrace();
}
}
}
3.集成mqtt收发消息
3.1引入pom文件增加配置文件
org.eclipse.paho
org.eclipse.paho.client.mqttv3
${mqttv3.version}
#mqtt mqtt: clientid: mqtt-system host: 182.92.101.122 username: root pwd: hydf@8888 completionTimeout: 30000
@Configuration
@RefreshScope
public class MqttConfig {
@Value("${mqtt.host}")
private String mqttHost;
@Value("${mqtt.username}")
private String mqttUserName;
@Value("${mqtt.clientid}")
private String clientId;
@Value("${mqtt.pwd}")
private String mqttPwd;
@Value("${mqtt.completionTimeout}")
private Integer completionTimeout;
@Bean
public MqttClient mqttClient() throws MqttException {
//1.初始化mqtt参数
MqttConnectOptions mOptions = new MqttConnectOptions();
mOptions.setAutomaticReconnect(true);//断开后,是否自动连接
mOptions.setCleanSession(false);//是否清空客户端的连接记录。若为true,则断开后,broker将自动清除该客户端连接信息
mOptions.setConnectionTimeout(completionTimeout);//设置超时时间,单位为秒
mOptions.setUserName(mqttUserName);//设置用户名。跟Client ID不同。用户名可以看做权限等级
mOptions.setPassword(mqttPwd.toCharArray());//设置登录密码
mOptions.setKeepAliveInterval(60);//心跳时间,单位为秒。即多长时间确认一次Client端是否在线
mOptions.setMaxInflight(10);//允许同时发送几条消息(未收到broker确认信息)
//2.创建mqtt客户端
MqttClient client = null;
try {
client = new MqttClient("tcp://"+mqttHost, clientId,null);
client.connect(mOptions);//连接broker
client.setCallback(mqttCallback);//设置回调
} catch (MqttException e) {
e.printStackTrace();
}
// 订阅主题
client.subscribe("test");
return client;
}
static MqttCallback mqttCallback = new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("MQTT Lost");
}
@Override
public void messageArrived(String topic, MqttMessage message){
System.out.println("收到消息!");
System.out.println(new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("MQTT delivery Complete ");
}
};
}
3.2发送消息
// 2.发送mqtt 消息
String mqttTopic = "test";
MqttMessage msg = new MqttMessage();
String msgStr = message;
msg.setPayload(msgStr.getBytes());//设置消息内容
msg.setQos(0);//设置消息发送质量,可为0,1,2.
msg.setRetained(false);//服务器是否保存最后一条消息,若保存,client再次上线时,将再次受到上次发送的最后一条消息。
try {
mqttClient.publish(mqttTopic, msg);//设置消息的topic,并发送
} catch (MqttException e) {
e.printStackTrace();
}
3.3接收消息
接收消息代码,在3.1中初始化client的时候绑定的回调
4.github源码地址https://github.com/1142235090/frame



