ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个
完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久
的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;
也可以更复杂,可能包含嵌入对象。
“消息队列”是在消息的传输过程中保存消息的容器。
5 常用消息服务应用 5.1ActiveMQActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。
5.2RabbitMQRabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。他遵循 Mozilla Public License 开源协议。开发语言为 Erlang。
5.3RocketMQ由阿里巴巴定义开发的一套消息队列应用服务。
二、消息服务的应用场景消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且**不需要即时(同步)**返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。
用户注册流程:
1)注册处理以及写数据库
2)发送注册成功的手机短信
3)发送注册成功的邮件信息
如果用消息中间件:则可以创建两个线程来做这些事情,直接发送消息给消息中间件,然后让邮件服务和短信服务自己去消息中间件里面去取消息,然后取到消息后再自己做对应的业务操作。就是这么方便
5.2应用的解耦 5.2.1 订单处理生成订单流程:
1)在购物车中点击结算
2)完成支付
3)创建订单
4)调用库存系统
订单完成后,订单系统并不去直接调用库存系统,而是发送消息到消息中间件,写入一个订单信息。库存系统自己去消息中间件上去获取,然后做发货处理,并更新库存,这样能够实现互联网型应用追求的快这一个属性。而库存系统读取订单后库存应用这个操作也是非常快的,所以有消息中间件对解耦来说也是一个不错的方向。
5.3流量的削峰 5.3.1 秒杀功能秒杀流程:
1)用户点击秒杀
2)发送请求到秒杀应用
3)在请求秒杀应用之前将请求放入到消息队列
4)秒杀应用从消息队列中获取请求并处理。
比如,系统举行秒杀活动,热门商品。流量蜂拥而至 100 件商品,10 万人挤进来怎么办?10 万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的 10 万个请求中的前 100个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被怼死.
三、 JMS 1 什么是 JMSJMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口,简化企业应用的开发。
2 JMS 模型 2.1点对点模型(Point To Point)生产者发送一条消息到 queue,只有一个消费者能收到。
发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息。
ActiveMQ 官网: http://activemq.apache.org
ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。
ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。
tar -zxf apache-activemq-5.9.0-bin.tar.gz
ls -al apache-activemq-5.9.0/bin
如果权限不足,则无法执行,需要修改文件权限:
chmod 755 activemq
cp -r apache-activemq-5.9.0 /usr/local/activemq -r
6 启动 ActiveMQ/usr/local/activemq/bin/activemq start
ps aux | grep activemq
见到下述内容即代表启动成功
使用浏览器访问 ActiveMQ 管理应用, 地址如下:
http://ip:8161/admin/
用户名: admin
密码: admin
ActiveMQ 使用的是 jetty 提供 HTTP 服务.启动稍慢,建议短暂等待再访问测试.见到如下界面代表服务启动成功
首先关闭防火墙:
如果service iptables stop关闭防火墙,没有成功!
是因为centos7不能关闭防火墙!
所以,这里要使用
systemctl stop firewalld,然后就可以访问了。
修改 ActiveMQ 配置文件: /usr/local/activemq/conf/jetty.xml
配置文件修改完毕,保存并重新启动 ActiveMQ 服务。
7.4修改用户名和密码修改 conf/users.properties 配置文件.内容为: 用户名=密码
保存并重启 ActiveMQ 服务即可.
/usr/local/activemq/bin/activemq restart
9 关闭 ActiveMQ/usr/local/activemq/bin/activemq stop
10 配置文件 activemq.xml配置文件中,配置的是 ActiveMQ 的核心配置信息. 是提供服务时使用的配置. 可以修改启动的访问端口. 即 java 编程中访问 ActiveMQ 的访问端口.
默认端口为 61616.
使用协议是: tcp 协议.
修改端口后, 保存并重启 ActiveMQ 服务即可.
从它的目录来说,还是很简单的:
- bin 存放的是脚本文件
- conf 存放的是基本配置文件
- data 存放的是日志文件
- docs 存放的是说明文档
- examples 存放的是简单的实例
- lib 存放的是 activemq 所需 jar 包
- webapps 用于存放项目的目录
目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管理的对象。MessageProducer 需要指定 Destination 才能发送消息,MessageReceiver 需要指定 Destination才能接收消息。
2 Producer消息生成者,负责发送 Message 到目的地。
3 Consumer | Receiver消息消费者,负责从目的地中消费【处理|监听|订阅】Message。
4 Message消息,消息封装一次通信的内容。
六、 ActiveMQ 应用 1 ActiveMQ 常用 API 简介下述 API 都是接口类型,由定义在 javax.jms 包中. 是 JMS 标准接口定义.
1.1ConnectionFactory链接工厂, 用于创建链接的工厂类型.
1.2Connection链接. 用于建立访问 ActiveMQ 连接的类型, 由链接工厂创建.
1.3Session会话, 一次持久有效有状态的访问. 由链接创建.
1.4Destination & Queue目的地, 用于描述本次访问 ActiveMQ 的消息访问目的地. 即 ActiveMQ 服务中的具体队列. 由会话创建.interface Queue extends Destination
1.5MessageProducer消息生成者, 在一次有效会话中, 用于发送消息给ActiveMQ 服务的工具. 由会话创建.
1.6MessageConsumer消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于从 ActiveMQ 服务中获取消息的工具. 由会话创建.
1.7Message消息, 通过消息生成者向 ActiveMQ 服务发送消息时使用的数据载体对象或消息消费者从 ActiveMQ 服务中获取消息时使用的数据载体对象. 是所有消息【文本消息,对象消息等】具体类型的顶级接口. 可以通过会话创建或通过会话从 ActiveMQ 服务中获取.
2 JMS-HelloWorld 2.1处理文本消息 2.1.1 创建消息生产者 2.1.1.1 创建工程 2.1.1.2 修改 POM 文件添加 ActiveMQ 坐标
2.1.1.3 编写消息的生产者4.0.0 com.bjsxt mq-producer 1.0-SNAPSHOT org.apache.activemq activemq-all 5.9.0
package com.bjsxt;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.stomp.JmsframeTranslator;
import org.apache.activemq.transport.stomp.Stomp;
import javax.jms.*;
public class HelloWorldProducer {
public void sendHelloWorldActiveMQ(String msgTest){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//定义消息目的地
Destination destination = null;
//定义消息的发送者
MessageProducer producer = null;
//定义消息
Message message = null;
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createQueue("helloworld-destination");
//创建消息的生产者
producer = session.createProducer(destination);
//创建消息对象
message = session.createTextMessage(msgTest);
//发送消息
producer.send(message);
}catch (Exception e){
e.printStackTrace();
}finally {
//回收消息发送者资源
if (producer != null) {
try {
producer.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (connection == null) {
try {
connection.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
2.1.2 创建消息消费者
2.1.2.1 创建工程
2.1.2.2 修改 POM 文件添加 ActiveMQ 坐标
2.1.2.3 编写消息的消费者4.0.0 org.example mq-consumer 1.0-SNAPSHOT org.apache.activemq activemq-all 5.9.0
package com.bjsxt;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class HelloWorldConsumer {
public void readHelloWorldActiveMQ(){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//定义消息目的地
Destination destination = null;
//定义消息的消费者
MessageConsumer consumer = null;
//定义消息
Message message = null;
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createQueue("helloworld-destination");
//创建消息的消费者
consumer = session.createConsumer(destination);
//创建消息对象
message = consumer.receive();
//处理消息
String msg = ((TextMessage)message).getText();
System.out.println("从 ActiveMQ 服务中获取n" +
"的文本信息"+msg);
}catch (Exception e){
e.printStackTrace();
}finally {
//回收消息发送者资源
if (consumer != null) {
try {
consumer.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (connection == null) {
try {
connection.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
2.1.3 测试
2.1.3.1 Producer
public class Test {
public static void main(String[] args) {
HelloWorldProducer producer = new HelloWorldProducer();
producer.sendHelloWorldActiveMQ("HelloWorld");
}
}
public class Test {
public static void main(String[] args) {
HelloWorldConsumer consumer = new HelloWorldConsumer();
consumer.readHelloWorldActiveMQ();
}
}
public class Users implements Serializable {
private int userid;
private String username;
private int userage;
public int getUserid() {
return userid;
}
public void setUserid(int userid) {
this.userid = userid;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public int getUserage() {
return userage;
}
public void setUserage(int userage) {
this.userage = userage;
}
@Override
public String toString() {
return "Users{" +
"userid=" + userid +
", username='" + username + ''' +
", userage=" + userage +
'}';
}
}
2.2.2 创建生产者
public class HelloWorldProducer2 {
public void sendHelloWorldActiveMQ(Users users){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//定义消息的发送者
MessageProducer producer = null;
//定义消息
Message message = null;
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createQueue("my-users");
//创建消息的生产者
producer = session.createProducer(destination);
//创建消息对象
message = session.createObjectMessage(users);
//发送消息
producer.send(message);
}catch (Exception e){
e.printStackTrace();
}finally {
//回收消息发送者资源
if (producer != null) {
try {
producer.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (connection == null) {
try {
connection.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
2.2.3 定义消息消费者
public class HelloWorldConsumer2 {
public void readHelloWorldActiveMQ(){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//定义消息目的地
Destination destination = null;
//定义消息的消费者
MessageConsumer consumer = null;
//定义消息
Message message = null;
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createQueue("my-users");
//创建消息的消费者
consumer = session.createConsumer(destination);
//创建消息对象
message = consumer.receive();
//处理消息
ObjectMessage objMessage = (ObjectMessage)message;
Users users = (Users)objMessage.getObject();
System.out.println(users);
}catch (Exception e){
e.printStackTrace();
}finally {
//回收消息发送者资源
if (consumer != null) {
try {
consumer.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (connection == null) {
try {
connection.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
producer,Test
public class Test {
public static void main(String[] args) {
Users users = new Users();
users.setUserid(1);
users.setUserage(20);
users.setUsername("zhangsan");
HelloWorldProducer2 producer2 = new HelloWorldProducer2();
producer2.sendHelloWorldActiveMQ(users);
}
}
Consumer,Test
public class Test {
public static void main(String[] args) {
HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
consumer2.readHelloWorldActiveMQ();
}
}
队列服务监听使用的观察者设计模式
3.1创建消息生产者public class HelloWorldProducer3 {
public void sendHelloWorldActiveMQ(Users users){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//定义消息的发送者
MessageProducer producer = null;
//定义消息
Message message = null;
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createQueue("my-destination");
//创建消息的生产者
producer = session.createProducer(destination);
//创建消息对象
message = session.createTextMessage(msgTest);
//发送消息
producer.send(message);
}catch (Exception e){
e.printStackTrace();
}finally {
//回收消息发送者资源
if (producer != null) {
try {
producer.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (connection == null) {
try {
connection.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
3.2消息消费者
public class HelloWorldConsumer3 {
public void readHelloWorldActiveMQ(){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//定义消息目的地
Destination destination = null;
//定义消息的消费者
MessageConsumer consumer = null;
//定义消息
Message message = null;
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createQueue("my-destination");
//创建消息的消费者
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
//ActiveMQ 回调的方法。通过该方法将消息传递到 consumer
@Override
public void onMessage(Message message) {
//处理消息
String msg = null;
try {
msg = ((TextMessage)message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("从 ActiveMQ 服务中获取n" +
"的文本信息"+msg);
}
});
}catch (Exception e){
e.printStackTrace();
}
}
}
consumer,Test
public class Test {
public static void main(String[] args) {
HelloWorldConsumer3 consumer3 = new HelloWorldConsumer3();
consumer3.readHelloWorldActiveMQ();
}
}
Producer,Test
public class Test {
public static void main(String[] args) {
HelloWorldProducer3 producer3 = new HelloWorldProducer3();
producer3.sendHelloWorldActiveMQ("Hello World");
}
}
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
当生产者发布消息,不管是否有消费者。都不会保存消息
一定要先有消息的消费者,后有消息的生产者。
public class HelloWorldProducerTopic {
public void sendHelloWorldActiveMQ(String msgTest){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//定义消息目的地
Destination destination = null;
//定义消息的发送者
MessageProducer producer = null;
//定义消息
Message message = null;
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createTopic("test-topic");
//创建消息的生产者
producer = session.createProducer(destination);
//创建消息对象
message = session.createTextMessage(msgTest);
//发送消息
producer.send(message);
}catch (Exception e){
e.printStackTrace();
}finally {
//回收消息发送者资源
if (producer != null) {
try {
producer.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (connection == null) {
try {
connection.close();
}catch (JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
4.3创建消费者
为了测试效果,这里复制三份
public class HelloWorldConsumerTopic1 implements Runnable{
public void readHelloWorldActiveMQ(){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//定义消息目的地
Destination destination = null;
//定义消息的消费者
MessageConsumer consumer = null;
//定义消息
Message message = null;
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createTopic("test-topic");
//创建消息的消费者
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
//ActiveMQ 回调的方法。通过该方法将消息传递到 consumer
@Override
public void onMessage(Message message) {
//处理消息
String msg = null;
try {
msg = ((TextMessage)message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("从 ActiveMQ 服务中获取---topic1n" + "的文本信息"+msg);
}
});
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void run() {
this.readHelloWorldActiveMQ();
}
}
Consumer,,,,test
public class Test {
public static void main(String[] args) {
HelloWorldConsumerTopic1 topic1 = new HelloWorldConsumerTopic1();
Thread t1 = new Thread(topic1);
t1.start();
HelloWorldConsumerTopic2 topic2 = new HelloWorldConsumerTopic2();
Thread t2 = new Thread(topic2);
t2.start();
HelloWorldConsumerTopic3 topic3 = new HelloWorldConsumerTopic3();
Thread t3 = new Thread(topic3);
t3.start();
}
}
Producer,Test
public class Test {
public static void main(String[] args) {
HelloWorldProducerTopic topic = new HelloWorldProducerTopic();
topic.sendHelloWorldActiveMQ("Hello Topic");
}
}
先运行消费者,再运行生产者
4.0.0 com.bjsxt parent 0.0.1-SNAPSHOT pom 5.9.0 4.5 4.1.6.RELEASE 5.9.0 4.10.3 2.9.0 4.12 4.1.3.RELEASE 3.2.8 1.2.2 5.1.32 1.6.4 1.0.9 1.2 2.5 2.2 2.0 0.10 2.5.4 2.4.2 3.3 1.3.1 org.apache.activemq activemq-all ${activemq.version} org.apache.xbean xbean-spring ${xbean.version} org.springframework spring-jms ${jms.version} org.apache.activemq activemq-pool ${activemq-pool.version} org.apache.activemq activemq-jms-pool ${activemq-pool.version} org.apache.solr solr-solrj ${solrj.version} redis.clients jedis ${jedis.version} junit junit ${junit.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.mybatis mybatis ${mybatis.version} org.mybatis mybatis-spring ${mybatis.spring.version} mysql mysql-connector-java ${mysql.version} com.alibaba druid ${druid.version} org.springframework spring-context ${spring.version} org.springframework spring-beans ${spring.version} org.springframework spring-webmvc ${spring.version} org.springframework spring-jdbc ${spring.version} org.springframework spring-aspects ${spring.version} jstl jstl ${jstl.version} javax.servlet servlet-api ${servlet-api.version} provided javax.servlet jsp-api ${jsp-api.version} provided commons-fileupload commons-fileupload ${commons-fileupload.version} commons-net commons-net ${commons-net.version} com.fasterxml.jackson.core jackson-databind ${jackson.version} src/main/java ***.xml ** @Component(value="myListener") public class MyMessageListener implements MessageListener{ @Autowired private UserService userService; @Override public void onMessage(Message message) { //处理消息 ObjectMessage objMessage = (ObjectMessage)message; Users user=null; try { user = (Users)objMessage.getObject(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } this.userService.showUser(user); } }
练习项目源码:https://gitee.com/cutelili/active-mq/tree/master/



