目录
一、MQ系列产品
二、为什么要使用mq
三、mq的特点
四、mq的缺点
五、下载,解压,启动,开端口,访问
六、导入JMS依赖
七、废话少说,上queue入门代码
1.生产者
2.消费者
3.监听器写法
4.消费者消费特点
八、topic
九、JMS组成
JSM Message
消息头
消息体
消息属性
十、持久化
一、MQ系列产品
- kafka
编程语言:scala。
大数据领域的主流MQ。
- rabbitmq
编程语言:erlang
基于erlang语言,不好修改底层,不要查找问题的原因,不建议选用。
- rocketmq
编程语言:java
适用于大型项目。适用于集群。
- activemq
编程语言:java
适用于中小型项目。
二、为什么要使用mq
1、系统之间接口耦合比较严重
2、面对大流量并发时,容易被冲垮
3、等待同步存在性能问题
使用mq的好处
- 异步。调用者无需等待。
- 解耦。解决了系统之间耦合调用的问题。
- 消峰。抵御洪峰流量,保护了主业务。
三、mq的特点
- 采用异步处理模式
- 应用系统之间解耦合
发送者和接受者不必了解对方,只需要确认消息。
发送者和接受者不必同时在线。
电商项目常用流程图:
四、mq的缺点
两个系统之间不能同步调用,不能实时回复,不能响应某个调用的回复。
五、下载,解压,启动,开端口,访问
启动命令
service activemq start
查看activemq状态
service activemq status
关闭activemq服务
service activemq stop
如果出现无法访问8186端口,可见另一博客
解决无法访问activemq的8161端口问题_阿狗哲哲的博客-CSDN博客一、查看端口是否已经开放查看8161和61616端口是否已经开放二、查看阿里云服务器安全组是否开启三、查看8161端口是否被其他进程占用四、查看activemq配置文件端口号是否为8161,以及是否运行外机访问activemq.xml未发现端口的配置,看到引入了外部的xml文件,因此查看jetty.xml发现配置文件中允许访问的ip只能为本机,将其修改为0.0.0.0,所有人都能访问问题解决!...https://blog.csdn.net/qq_52438590/article/details/123773605?spm=1001.2014.3001.5502
六、导入JMS依赖
org.apache.activemq
activemq-all
5.15.9
org.apache.xbean
xbean-spring
3.16
七、废话少说,上queue入门代码
1.生产者
package Jms_Queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_queue {
// linux 上部署的activemq 的 IP 地址 + activemq 的端口号
public static final String ACTIVEMQ_URL = "tcp://123.57.64.216:61616";
// 目的地的名称
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws Exception{
// 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接 connection 并启动访问。
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
Queue queue = session.createQueue(QUEUE_NAME);
// 5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
// 6 通过messageProducer 生产 3 条 消息发送到消息队列中
for (int i = 1; i < 7 ; i++) {
// 7 创建消息
TextMessage textMessage = session.createTextMessage("msg--" + i);
// 8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
// 9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息发送到MQ完成 ****");
}
}
1.生产者
package Jms_Queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_queue {
// linux 上部署的activemq 的 IP 地址 + activemq 的端口号
public static final String ACTIVEMQ_URL = "tcp://123.57.64.216:61616";
// 目的地的名称
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws Exception{
// 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接 connection 并启动访问。
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
Queue queue = session.createQueue(QUEUE_NAME);
// 5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
// 6 通过messageProducer 生产 3 条 消息发送到消息队列中
for (int i = 1; i < 7 ; i++) {
// 7 创建消息
TextMessage textMessage = session.createTextMessage("msg--" + i);
// 8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
// 9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息发送到MQ完成 ****");
}
}
2.消费者
package Jms_Queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://123.57.64.216:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接 connection 并启动访问。
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true){
// reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
// reveive(Long time) : 等待n毫秒之后还没有收到消息,就是结束阻塞。
// 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
// TextMessage message = (TextMessage)consumer.receive();
TextMessage message = (TextMessage)consumer.receive(4000);
if (null != message){
System.out.println("****消费者的消息:"+message.getText());
}else {
break;
}
}
// 9 关闭资源
consumer.close();
session.close();
connection.close();
System.out.println(" **** 消息接受完成 ****");
}
}
3.监听器写法
4.消费者消费特点
八、topic
topic只需将queue换为topic即可
九、JMS组成
JSM Message
消息头
| textMessage.setJMSDestination(topic) | 这里可以指定每个消息的目的地 |
| textMessage.setJMSDeliveryMode(0); |
|
| textMessage.setJMSExpiration(1000); | 如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。 |
| textMessage.setJMSPriority(10); |
|
| textMessage.setJMSMessageID("ABCD"); |
|
消息体
消息属性
如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。
十、持久化
queue持久化只需设置一个属性。
topic比较麻烦,topic默认就是非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。
topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。
具体翻阅文档p31页。



