栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

都2022年了,不会还有人没学Activemq吧

都2022年了,不会还有人没学Activemq吧

目录

一、MQ系列产品

 二、为什么要使用mq

三、mq的特点

 四、mq的缺点

五、下载,解压,启动,开端口,访问

六、导入JMS依赖

七、废话少说,上queue入门代码

1.生产者

 2.消费者

3.监听器写法

 4.消费者消费特点

 八、topic

九、JMS组成 

JSM Message

         消息头

 消息体

消息属性 

十、持久化 


一、MQ系列产品

  1. kafka

编程语言:scala。

大数据领域的主流MQ。

  1. rabbitmq

编程语言:erlang

基于erlang语言,不好修改底层,不要查找问题的原因,不建议选用。

  1. rocketmq

编程语言:java

适用于大型项目。适用于集群。

  1. activemq

编程语言:java

适用于中小型项目。

 二、为什么要使用mq

1、系统之间接口耦合比较严重

2、面对大流量并发时,容易被冲垮

3、等待同步存在性能问题

使用mq的好处

  1. 异步。调用者无需等待。
  2. 解耦。解决了系统之间耦合调用的问题。
  3. 消峰。抵御洪峰流量,保护了主业务。

三、mq的特点
  1.  采用异步处理模式
  2. 应用系统之间解耦合

                发送者和接受者不必了解对方,只需要确认消息。

                发送者和接受者不必同时在线。

电商项目常用流程图: 

 四、mq的缺点

两个系统之间不能同步调用,不能实时回复,不能响应某个调用的回复。

五、下载,解压,启动,开端口,访问

启动命令

service activemq start

查看activemq状态

service activemq status

关闭activemq服务

service activemq stop

如果出现无法访问8186端口,可见另一博客

解决无法访问activemq的8161端口问题_阿狗哲哲的博客-CSDN博客一、查看端口是否已经开放查看8161和61616端口是否已经开放二、查看阿里云服务器安全组是否开启三、查看8161端口是否被其他进程占用四、查看activemq配置文件端口号是否为8161,以及是否运行外机访问activemq.xml未发现端口的配置,看到引入了外部的xml文件,因此查看jetty.xml发现配置文件中允许访问的ip只能为本机,将其修改为0.0.0.0,所有人都能访问问题解决!...https://blog.csdn.net/qq_52438590/article/details/123773605?spm=1001.2014.3001.5502

六、导入JMS依赖

  
  
    org.apache.activemq
    activemq-all
    5.15.9
  
  
  
    org.apache.xbean
    xbean-spring
    3.16
  

七、废话少说,上queue入门代码

1.生产者
package Jms_Queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduce_queue {

    //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVEMQ_URL = "tcp://123.57.64.216:61616";
    // 目的地的名称
    public static final String QUEUE_NAME = "queue01";


    public static void main(String[] args) throws  Exception{
        // 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接 connection 并启动访问。
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
        for (int i = 1; i < 7 ; i++) {
            // 7  创建消息
            TextMessage textMessage = session.createTextMessage("msg--" + i);
            // 8  通过messageProducer发送给mq
            messageProducer.send(textMessage);
        }
        // 9 关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息发送到MQ完成 ****");
    }
}

 2.消费者
package Jms_Queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://123.57.64.216:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接 connection 并启动访问。
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){

            // reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
// reveive(Long time) : 等待n毫秒之后还没有收到消息,就是结束阻塞。
            // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
//            TextMessage message = (TextMessage)consumer.receive();
            TextMessage message = (TextMessage)consumer.receive(4000);
            if (null != message){
                System.out.println("****消费者的消息:"+message.getText());
            }else {
                break;
            }
        }
        // 9 关闭资源
        consumer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息接受完成 ****");
    }
}

3.监听器写法

 4.消费者消费特点

 八、topic

topic只需将queue换为topic即可

九、JMS组成 

JSM Message

         消息头

 textMessage.setJMSDestination(topic)

 这里可以指定每个消息的目的地

 textMessage.setJMSDeliveryMode(0);

 

textMessage.setJMSExpiration(1000);

如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
            如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。

textMessage.setJMSPriority(10);

 

 textMessage.setJMSMessageID("ABCD");

 消息体

 

消息属性 

如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。

 

十、持久化 

queue持久化只需设置一个属性。

topic比较麻烦,topic默认就是非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。

topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。

具体翻阅文档p31页。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784434.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号