栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

都2022年了,不会还有人没学Activemq吧

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

都2022年了,不会还有人没学Activemq吧

目录

一、MQ系列产品

 二、为什么要使用mq

三、mq的特点

 四、mq的缺点

五、下载,解压,启动,开端口,访问

六、导入JMS依赖

七、废话少说,上queue入门代码

1.生产者

 2.消费者

3.监听器写法

 4.消费者消费特点

 八、topic

九、JMS组成 

JSM Message

         消息头

 消息体

消息属性 

十、持久化 


一、MQ系列产品

  1. kafka

编程语言:scala。

大数据领域的主流MQ。

  1. rabbitmq

编程语言:erlang

基于erlang语言,不好修改底层,不要查找问题的原因,不建议选用。

  1. rocketmq

编程语言:java

适用于大型项目。适用于集群。

  1. activemq

编程语言:java

适用于中小型项目。

 二、为什么要使用mq

1、系统之间接口耦合比较严重

2、面对大流量并发时,容易被冲垮

3、等待同步存在性能问题

使用mq的好处

  1. 异步。调用者无需等待。
  2. 解耦。解决了系统之间耦合调用的问题。
  3. 消峰。抵御洪峰流量,保护了主业务。

三、mq的特点
  1.  采用异步处理模式
  2. 应用系统之间解耦合

                发送者和接受者不必了解对方,只需要确认消息。

                发送者和接受者不必同时在线。

电商项目常用流程图: 

 四、mq的缺点

两个系统之间不能同步调用,不能实时回复,不能响应某个调用的回复。

五、下载,解压,启动,开端口,访问

启动命令

service activemq start

查看activemq状态

service activemq status

关闭activemq服务

service activemq stop

如果出现无法访问8186端口,可见另一博客

解决无法访问activemq的8161端口问题_阿狗哲哲的博客-CSDN博客一、查看端口是否已经开放查看8161和61616端口是否已经开放二、查看阿里云服务器安全组是否开启三、查看8161端口是否被其他进程占用四、查看activemq配置文件端口号是否为8161,以及是否运行外机访问activemq.xml未发现端口的配置,看到引入了外部的xml文件,因此查看jetty.xml发现配置文件中允许访问的ip只能为本机,将其修改为0.0.0.0,所有人都能访问问题解决!...https://blog.csdn.net/qq_52438590/article/details/123773605?spm=1001.2014.3001.5502

六、导入JMS依赖

  
  
    org.apache.activemq
    activemq-all
    5.15.9
  
  
  
    org.apache.xbean
    xbean-spring
    3.16
  

七、废话少说,上queue入门代码

1.生产者
package Jms_Queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduce_queue {

    //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVEMQ_URL = "tcp://123.57.64.216:61616";
    // 目的地的名称
    public static final String QUEUE_NAME = "queue01";


    public static void main(String[] args) throws  Exception{
        // 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接 connection 并启动访问。
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
        for (int i = 1; i < 7 ; i++) {
            // 7  创建消息
            TextMessage textMessage = session.createTextMessage("msg--" + i);
            // 8  通过messageProducer发送给mq
            messageProducer.send(textMessage);
        }
        // 9 关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息发送到MQ完成 ****");
    }
}

 2.消费者
package Jms_Queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://123.57.64.216:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接 connection 并启动访问。
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){

            // reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
// reveive(Long time) : 等待n毫秒之后还没有收到消息,就是结束阻塞。
            // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
//            TextMessage message = (TextMessage)consumer.receive();
            TextMessage message = (TextMessage)consumer.receive(4000);
            if (null != message){
                System.out.println("****消费者的消息:"+message.getText());
            }else {
                break;
            }
        }
        // 9 关闭资源
        consumer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息接受完成 ****");
    }
}

3.监听器写法

 4.消费者消费特点

 八、topic

topic只需将queue换为topic即可

九、JMS组成 

JSM Message

         消息头

 textMessage.setJMSDestination(topic)

 这里可以指定每个消息的目的地

 textMessage.setJMSDeliveryMode(0);

 

textMessage.setJMSExpiration(1000);

如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
            如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。

textMessage.setJMSPriority(10);

 

 textMessage.setJMSMessageID("ABCD");

 消息体

 

消息属性 

如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。

 

十、持久化 

queue持久化只需设置一个属性。

topic比较麻烦,topic默认就是非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。

topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。

具体翻阅文档p31页。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/782354.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号