传统的通过服务调用让其它系统感知事件发生
通过消息中间件解耦服务调用
1.2 消息中间件的好处- 解耦;
- 异步;
- 削峰;
- 横向扩展;
- 安全可靠;
- 顺序保证;
- …
什么是中间件?
非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。
什么是消息中间件?
关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。
什么是JMS?
Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
什么是AMQP
AMQP(Advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等 条件的限制。
JMS和AMQP对比
常见消息中间件对比
ActiveMQ
ActiveMQ是Apache出品,最流行的,能力强劲的开源消息中间件。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
RabbitMQ
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
Kafka
Kakfa是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。
综合评价
二、JMS规范什么是JMS?
2.1 JMS相关概念Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
- 提供者:实现JMS规范的消息中间件服务器;
- 客户端:发送或接收消息的应用程序;
- 生产者/发布者:创建并发送消息的客户端;
- 消费者/订阅者:接收并处理消息的客户端;
- 消息:应用程序之间传递的数据内容;
- 消息模式: 在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式。
- 客户端包括生产者和消费者;
- 队列中的消息只能被一个消费者消费;
- 消费者可以随时消费队列中的消息;
- 客户端包括发布者和订阅者;
- 主题 中的消息被所有订阅者消费;
- 消费者不能消费订阅之前就发送到主题中的消息;
- ConnectionFactory:用于创建连接到消息中间件的连接工厂;
- Connection:代表了应用程序和消息服务器之间的通信链路;
- Destination:指消息发布和接收的地点,包括队列或主题;
- Session:表示一个单线程的上下文,用于发送和接收消息;
- MessageConsumer:由会话创建,用于接收发送到目标的消息;
- MessageProducer:由会话创建,用于发送消息到目标;
- Message:是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体;
- 下载安装包
- 直接启动 activemq.bat
- 使用服务启动 installservice.bat
Apache Active MQ官网
下载地址
windows版本,选择第一个版本,下载,解压即可。
选择bin/win64/activemq.bat 右键选择管理员运行即可。
注意:如果这里启动有错误,记得在conf/activemq.xml中修改配置如下即可。
启动成功之后,在网页输入http://localhost:8161/admin/。账号为:admin,密码为:admin。即可
3.2 Linux平台安装activeMQ- 下载并 解压安装包
- 启动
- 创建生产者;
- 创建消费者;
- 创建发布者;
- 创建订阅者
在IDEA开发工具中新建一个maven项目,在pom.xml引入依赖包。
阿里云maven仓库
4.0.0 org.example maven_demo01 1.0-SNAPSHOT org.apache.activemq activemq-all 5.9.1
队列模式下的生产者
package com.lcz.jms_demo.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
//定义两个常量
private static final String url = "tcp://127.0.0.1:61616";
private static final String queueName = "queue_test";
public static void main(String[] args) throws JMSException {
//1.创建connectionfactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.创建connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(queueName);
//6.创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i=0;i<100;i++){
//7.创建消息
TextMessage textMessage = session.createTextMessage("test" + i);
//8.发布消息
producer.send(textMessage);
System.out.println("发送消息"+textMessage.getText());
}
//9.关闭连接
connection.close();
}
}
队列模式下的消费者
package com.lcz.jms_demo.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
//定义两个常量
private static final String url = "tcp://127.0.0.1:61616";
private static final String queueName = "queue_test";
public static void main(String[] args) throws JMSException {
//1.创建connectionfactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.创建connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(queueName);
//6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.关闭连接
// connection.close();
}
}
(2)使用JMS接口规范连接ActiveMQ-主题模式的消息模式
类似与上述,不过主题模式是分布和订阅,消费者只有先订阅主题,才能收到消息。
消费者
package com.lcz.jms_demo.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
//定义两个常量
private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName = "topic_test";
public static void main(String[] args) throws JMSException {
//1.创建connectionfactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.创建connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createTopic(topicName);
//6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.关闭连接
// connection.close();
}
}
生产者
package com.lcz.jms_demo.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
//定义两个常量
private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName = "topic_test";
public static void main(String[] args) throws JMSException {
//1.创建connectionfactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.创建connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createTopic(topicName);
//6.创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i=0;i<100;i++){
//7.创建消息
TextMessage textMessage = session.createTextMessage("test" + i);
//8.发布消息
producer.send(textMessage);
System.out.println("发送消息"+textMessage.getText());
}
//9.关闭连接
connection.close();
}
}
4.2 Spring JMS代码演练
(1)Spring JMS理论
- ConnectionFactory:用于管理连接的连接工厂;
- JmsTemplate:用于发送和接收消息的模板类;
- MessageListener:消息监听器;
ConnectionFactory
- 一个Spring为我们提供的连接池;
- JmsTemplate每次发消息都会重新创建连接,会话和productor。
- spring中提供了SingleConnectionFactory和CachingConnectionFactory。
JmsTemplate
- 是spring提供的,只需向spring容器内注册这个类就可以使用JmsTemplate方便的操作jms;
- JmsTemplate类是线程安全的,可以在整个应用范围使用。
MessageListerner
- 实现一个onMessage方法,该方法只接收一个Message参数。
官网地址
消息队列主要分为点对点(Queue)模式和订阅(Topic)模式两种
主要角色有生产者、消息、队列、消费者
如图所示点对点模式: 生产者Producer将生产出来的消息塞入到队列Queue中,消费者Consumer从队列中取出消息并消费,被消费完的消息不会存在在队列中,一条消息只会被一个消费者消费一次;
本地启动avitvemq服务。
b.pom.xml依赖包c.配置文件application.properties4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.1 com.lcz spring_demo19 0.0.1-SNAPSHOT spring_demo19 Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-activemq org.messaginghub pooled-jms org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
#整合jms测试,安装在别的机器,防火墙和端口号记得开放 spring.activemq.broker-url=tcp://127.0.0.1:61616 #集群配置 #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) spring.activemq.user=admin spring.activemq.password=admin #下列配置要增加依赖 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100d.spring boot启动类启用JMS
package com.lcz.spring_demo19;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@EnableJms
@SpringBootApplication
public class SpringDemo19Application {
public static void main(String[] args) {
SpringApplication.run(SpringDemo19Application.class, args);
}
@Bean
public Queue queue(){
return new ActiveMQQueue("common.queue");
}
}
e.生产消息接口ProduceService.java
package com.lcz.spring_demo19.service;
import javax.jms.Destination;
public interface ProduceService {
public void sendMessage(Destination destination,final String message);
public void sendMessage(final String message);
}
f.实现生产消息接口ProduceServiceImpl.java
package com.lcz.spring_demo19.service.impl;
import com.lcz.spring_demo19.service.ProduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Destination;
import javax.jms.Queue;
@Service
public class ProduceServiceImpl implements ProduceService {
//注入指定消息队列 (在启动类中有注入哦!)
@Autowired
private Queue queue;
@Autowired
//用来发送消息到broker的对象
private JmsMessagingTemplate jmsMessagingTemplate;
@Override
public void sendMessage(Destination destination, String message) {
jmsMessagingTemplate.convertAndSend(destination,message);
System.out.println("发送信息指定目标:"+message);
}
@Override
public void sendMessage(String message) {
jmsMessagingTemplate.convertAndSend(this.queue,message);
System.out.println("发送信息默认目标:"+message);
}
}
g.控制类访问
package com.lcz.spring_demo19.controller;
import com.lcz.spring_demo19.service.ProduceService;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Destination;
@RestController
@RequestMapping("/spring_jms")
public class JmsController {
@Autowired
private ProduceService produceService;
@GetMapping("/order")
public Object order(String msg){
Destination destination = new ActiveMQQueue("order.queue");
produceService.sendMessage(destination,msg);
return "发送成功!";
}
@GetMapping("/common")
public Object common(String msg){
produceService.sendMessage(msg);
return "发送成功";
}
}
h.消费者消费类
package com.lcz.spring_demo19.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class CommonConsumer {
@JmsListener(destination = "common.queue")
public void receiveQueue(String text){
System.out.println("CommonConsumer收到的报文消息为:"+text);
}
}
package com.lcz.spring_demo19.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer {
@JmsListener(destination = "order.queue")
public void receiveQueue(String text){
System.out.println("OrderConsumer收到报文消息为:"+text);
}
}
i.结果展示
(3) SpringBoot2.x整合ActiveMQ实战之订阅消息总之: 消息队列并不能提高系统的运行速度(如果想提高速度,还是需要用到多线程等方式),消息队列作为中间件的作用是降低应用间的耦合,在高并发、高流量的情况下保证服务端的稳定,保证业务流程的顺畅和数据的完整(请求不丢失)。
生产者产生一条消息message放入一个topic中,该topic已经三个消费者订阅了,那么被放入topic中的这条消息,就会同时被这三个消费者取走(当然他们必须都处于在线状态),并进行“消费”。其实就类似现实生活中的手机接收推送。
使用场景
发布订阅模式下,当发布者消息量很大时,显然单个订阅者的处理能力是不足的。实际上现实场景中是多个订阅者节点组成一个订阅组负载均衡消费topic消息即分组订阅,这样订阅者很容易实现消费能力线性扩展。可以看成是一个topic下有多个Queue,每个Queue是点对点的方式,Queue之间是发布订阅方式。a.properties配置文件
需要加入配置文件,支持发布订阅模型,默认只支持点对点
#整合jms测试,安装在别的机器,防火墙和端口号记得开放 spring.activemq.broker-url=tcp://127.0.0.1:61616 #集群配置 #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) spring.activemq.user=admin spring.activemq.password=admin #下列配置要增加依赖 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100 # 支持发布订阅模型,默认只支持点对点 #default point to point spring.jms.pub-sub-domain=trueb.spring boot启动类添加
@Bean
public Topic topic(){
return new ActiveMQTopic("common.topic");
}
c.生产消息的接口
public void publish(String msg);
d.生产消息的实现类
//注入订阅的
@Autowired
private Topic topic;
@Override
public void publish(String msg) {
jmsMessagingTemplate.convertAndSend(this.topic,msg);
}
e.消费者类
package com.lcz.spring_demo19.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicSub {
//实时监听这个消息队列
@JmsListener(destination = "common.topic")
public void receiveTopic1(String text){
System.out.println("TopicSub 消费者1 收到的消息为:"+text);
}
@JmsListener(destination = "common.topic")
public void receiveTopic2(String text){
System.out.println("TopicSub 消费者2 收到的消息为:"+text);
}
@JmsListener(destination = "common.topic")
public void receiveTopic3(String text){
System.out.println("TopicSub 消费者3 收到的消息为:"+text);
}
}
f.控制器类
@GetMapping("topic")
public Object topic(String msg) {
//调用方法
produceService.publish(msg);
return "消息发送成功!!!";
}
g.结果展示
(4)SpringBoot2.x整合ActiveMQ使其同时支持两种模式
我们用发布订阅模式操作时,点对点的就不起作用:
如何两个同时都可以?
a. 在配置文件里面,注释掉 #spring.jms.pub-sub-domain=true
b.在启动类添加
//需要给topic定义独立的JmsListenerContainer
@Bean
public JmsListenerContainerFactory> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
c.在消费者类@JmsListener如果不指定独立的containerFactory的话是只能消费queue消息!
package com.lcz.spring_demo19.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicSub {
//实时监听这个消息队列
@JmsListener(destination = "common.topic", containerFactory="jmsListenerContainerTopic")
public void receiveTopic1(String text){
System.out.println("TopicSub 消费者1 收到的消息为:"+text);
}
@JmsListener(destination = "common.topic", containerFactory="jmsListenerContainerTopic")
public void receiveTopic2(String text){
System.out.println("TopicSub 消费者2 收到的消息为:"+text);
}
@JmsListener(destination = "common.topic", containerFactory="jmsListenerContainerTopic")
public void receiveTopic3(String text){
System.out.println("TopicSub 消费者3 收到的消息为:"+text);
}
}
结果展示:
注意:
默认消费者并不会消费订阅发布类型的消息,这是由于springboot默认采用的是p2p模式进行消息的监听!



