栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ActiveMQ消息中间件

ActiveMQ消息中间件

         ActiveMQ消息中间件

文章目录
  • 1.ActiveMQ简介
  • 2.ActiveMQ下载和安装
    • 2.1 Windows下安装ActiveMQ
    • 2.2 Linux下安装ActiveMQ(略)
  • 3.控制台和常用配置
    • 3.1 控制台界面介绍
    • 3.2 修改日志文件目录
    • 3.3 修改端口号
    • 3.4 修改控制台用户名密码
    • 3.5 添加第三方访问ActiveMQ的账号密码
  • 4.Queue队列模式
    • 4.1 生产者
    • 4.2 消费者
  • 5.Topic 发布/订阅模式
    • 5.1 生产者
    • 5.2 消费者
  • 6.SpringBoot整合ActiveMQ
    • 6.1 搭建SpringBoot工程
    • 6.2 发送Queue消息
    • 6.3 接收Queue消息
    • 6.4 发送Topic消息
    • 6.5 接收Topic消息
  • 7.ActiveMQ集群(略)
    • 7.1 安装zookeeper

1.ActiveMQ简介

​ 消息队列中间件是分布式系统中重要的组件,主要简介应用解耦,异步消息,流量削锋等问题,实现高性能,高并发,可伸缩性和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,RocketMQ,Kafka.

​ 属于Apache组织,用Java程序写的。

​ 消息队列就是存消息的集合。

消息队列应用场景:

​ 以下介绍消息队列在实际应用中常见的使用场景。异步处理,应用解耦,流量削锋,消息通讯4个场景。

​ 1.异步处理

场景说明:用户注册后,需要发注册邮箱和注册短信。传统的做法有两种:1串行的方式 2并行方式

串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上3个任务全部完成后,返回给客户端。

并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信,以上3个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理时间。

引入消息队列后:

2.应用解耦

订单系统和库存系统分开。

订单系统-----消息队列----库存系统

3.流量削锋 秒杀或团抢活动中

​ 用户请求------消息队列-------秒杀业务处理

4.消息通讯:

​ 消息通讯是指,消息队列一般都高效的通讯机制,因此也可以用存的通讯。比如实现点对点 消息队列,或者聊天室。

点对点通讯:

​ 客户端A --------->消息队列 <--------客户端B

聊天室通讯:

​ 客户端A <--------->消息队列 < -------->客户端B

示例:

消息模型:

​ 发送消息的人 称为生产者

​ 接收消息的人 称为消费者

​ P2P:点对点 秒杀 点对点用得多

​ 每个消息只有一个消费者(一旦被消费,消息就不再在消息队列中)

​ 发送者和接收者之间在时间上没有依赖性,也就是说,当发送者发了消息之后,不管接收者有没有正在运

​ 行,它不会影响影响到消息被发送到队列。

​ 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式

发布/订阅模式:

​ 包含3个角色:主题(topic) 发布者(publisher) 订阅者 subscriber 多个发布者将消息发送到topic,系统将这些消息传递给多个订阅者。

​ 特点:

​ 每个消息有多个消费者

​ 发布者和订阅者直接有时间上的依赖性。针对某个主题的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。发消息的时候 人不在,后面就收不到了。因为已经发过了。

​ 为了消费消息,订阅者必须保持运行的状态。

如果希望发送的消息可以不被做任何处理,或者只被一个消息者处理,或者可以被多个消息者处理的话,那么可以采用发布/订阅模型。比如发个通知。显示库存数量,通知一下别人。

2.ActiveMQ下载和安装 2.1 Windows下安装ActiveMQ

1、打开浏览器,访问网址activemq.apache.org,如下图所示:

2、我选的版本为5.9.1,根据ActiveMQ需要安装的操作系统选择性下载对应的版本,这里我选择Windows版本,然后点击下载ZIP包,如下图所示:

3、下载完成以后,将zip文件解压到D盘下,解压后的目录结构如下图所示:

4、在启动ActiveMQ前,首先要确保服务器上已经安装和配置好JDK,并且JDK的版本要满足ActiveMQ的要求,如下图所示:

5、接下来我们进入到D:1program_soft1install_before0901ActiveMQapache-activemq-5.9.1bin,如下图所示:

6、根据服务器上操作系统的版本,选择进入到win32还是win64,这里选择进入win64目录,然后双击activemq.bat,这时activemq就启动起来了,成功启动以后打印的日志如下图所示:

7、打开浏览器,输入http://localhost:8161/admin/ ,弹出一个windows安全提示框,提示输入activemq的用户名和密码,如下图所示:

2.2 Linux下安装ActiveMQ(略) 3.控制台和常用配置 3.1 控制台界面介绍

控制台地址:http://localhost:8161/admin 端口号:8161

用户名:admin

地 址:admin

简单介绍下主要的导航栏:

​ Queues: 队列的消息

​ Topics: 主题方式消息

Subscribers: 消息订阅监控查询

(1)Queue队列

​ Name:队列名称

​ Number Of Pending Message: 等待消费的消息数量

​ Number Of Consumers: 消费者的数量

​ Messages Enqueued: 进来的消息

​ Messages Dequeued: 出去的消息

​ Views: 详情

​ Operations:操作

(2)Topics主题:发布订阅模式的消息

Name: 消息队列

Number Of Consumers: 消费者数量

Messages Enqueued: 进来的消息

Messages Dequeued: 出去的消息

Operations: 操作

3.2 修改日志文件目录

启动后的ActiveMQ的数据位于:$activemq_home/data/目录内

启动后的ActiveMQ运行日志位于:$activemq_home/data/目录内的activemq.log文件

如果需要改activemq的日志配置可以通过修改$activemq_home/conf/log4j.properties

log4j.appender.audit.file=${activemq.base}/data/audit.log

日志默认存在audit.log文件中

3.3 修改端口号

在jetty.xml文件中:

  //修改端口号
3.4 修改控制台用户名密码

​ 在jetty-realm.properties属性文件中:

​ 格式:用户名:密码,角色

admin: admin, admin   //修改admin用户的用户名和密码  超级管理员
 user: user, user     //修改user用户的用户名和密码   普通用户
3.5 添加第三方访问ActiveMQ的账号密码

​ Java代码对于ActiveMQ来说,就是第三方

​ Java程序连接ActiveMQ的时候,所用到的用户名和密码


	
		
			
		
	

具体账号和密码在credentials.properties文件中

activemq.username=system   //第三方访问ActiveMQ的账号
activemq.password=manager  //第三方访问ActiveMQ的密码
4.Queue队列模式

添加ActiveMQ依赖


    org.apache.activemq
    activemq-all
    5.15.8

4.1 生产者
package com.tangguanlin.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class QueueProducer {

    private static String username = "system";
    private static String password = "manager";
    private static String url = "tcp://127.0.0.1:61616"; //第三方连接的端口号

    
    public static void main(String[] args) throws JMSException {

        
        ActiveMQConnectionFactory connectionFactory = new             
                                 ActiveMQConnectionFactory(username, password, url);

        //2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();

        //3.开启连接
        connection.start();

        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5.使用会话对象创建目标对象,包含queue和topic(一对一 和 一对多)
        Queue queue = session.createQueue("队列模式-test2");

        //6.使用会话对象创建生产者对象----绑定队列
        MessageProducer producer = session.createProducer(queue);

        //7.使用会话对象创建一个消息对象
       TextMessage textMessage =  session.createTextMessage("张cc2-8");

       //8.发送消息
        producer.send(textMessage);

        //9.关闭资源
        producer.close();
        session.close();;
        connection.close();
    }
}
4.2 消费者
package com.tangguanlin.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class QueueReceiver {

    private static String username = "system";
    private static String password = "manager";
    private static String url = "tcp://127.0.0.1:61616"; //第三方连接的端口号

    //接收消息
    public static void main(String[] args) throws JMSException, IOException {

        
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username, password, url);

        //2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();

        //3.开启连接
        connection.start();

        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5.使用会话对象创建目标对象,包含queue和topic(一对一 和 一对多)
        Queue queue = session.createQueue("队列模式-test2");

        //6.使用会话对象创建消费者对象----绑定队列
        MessageConsumer consumer = session.createConsumer(queue);

        //7.向consumer对象中设置一个messageListener对象,用来接收消息
        consumer.setMessageListener(new MessageListener() {
                                        @Override
                                        public void onMessage(Message message) {
                                            if(message instanceof  TextMessage){
                                                TextMessage textMessage = 
                                                                    (TextMessage)message;
                                                try{
                                         System.out.println("接收到的消息 "+               
                                                         textMessage.getText());
                                                }catch (JMSException e){
                                                    e.printStackTrace();
                                                }
                                            }
                                        }
                                    }
        );

        System.in.read();
        //8.释放资源
        consumer.close();
        session.close();;
        connection.close();
    }
}
5.Topic 发布/订阅模式 5.1 生产者
package com.tangguanlin.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class TopicProducer {

    private static String username = "system";
    private static String password = "manager";
    private static String url = "tcp://127.0.0.1:61616"; //第三方连接的端口号

    
    public static void main(String[] args) throws JMSException {
        
        ActiveMQConnectionFactory connectionFactory = new 
                                     ActiveMQConnectionFactory(username, password, url);

        //2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();

        //3.开启连接
        connection.start();

        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5.使用会话对象创建目标对象,包含queue和topic(一对一 和 一对多)   创建一个主题
        Topic topic = session.createTopic("发布/订阅模式1");

        //6.使用会话对象创建生产者对象----绑定topic   主题
        MessageProducer producer = session.createProducer(topic);

        //7.使用会话对象创建一个消息对象
    TextMessage textMessage =  session.createTextMessage("hello,topic 发布/订阅模式 内容2");

       //8.发送消息
        producer.send(textMessage);

        //9.关闭资源
        producer.close();
        session.close();;
        connection.close();
    }
}
5.2 消费者
package com.tangguanlin.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
import java.sql.SQLOutput;

public class TopicReceiver {

    private static String username = "system";
    private static String password = "manager";
    private static String url = "tcp://127.0.0.1:61616"; //第三方连接的端口号

    public static void main(String[] args) throws JMSException, IOException {
        
        ActiveMQConnectionFactory connectionFactory = new   
                                      ActiveMQConnectionFactory(username, password, url);

        //2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();

        //3.开启连接
        connection.start();

        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5.使用会话对象创建目标对象,包含queue和topic(一对一 和 一对多)   创建一个主题
        Topic topic = session.createTopic("发布/订阅模式1");

        //6.使用会话对象创建消费者对象----绑定队列
        MessageConsumer consumer = session.createConsumer(topic);

        consumer.setMessageListener(new MessageListener() {
                                        @Override
                                        public void onMessage(Message message) {
                                            if(message instanceof  TextMessage){
                                                TextMessage textMessage = 
                                                                   (TextMessage)message;
                                                try{
                                       System.out.println("订阅者接收到的消
                                                            息:"+textMessage.getText());
                                                }catch (JMSException e){
                                                    e.printStackTrace();
                                                }
                                            }
                                        }
                                    }
        );

        System.in.read(); //程序等待接收用户消息

        //7.释放资源
        consumer.close();
        session.close();
        connection.close();
    }
}
6.SpringBoot整合ActiveMQ 6.1 搭建SpringBoot工程

1.新建maven工程

2.添加依赖


    org.springframework.boot
    spring-boot-starter-parent
    2.1.1.RELEASE




	org.springframework.boot
	spring-boot-starter-web

3.启动类

package com.tangguanlin;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class OrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class);
    }
}

4.业务逻辑类—添加订单controller

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;

@RestController
public class OrderController {

    @GetMapping("addOrder")
    public Object addOrder(){
        System.out.println("下单成功...");
        Map map = new HashMap();
        map.put("result","添加订单成功");
        return map;
    }
}
6.2 发送Queue消息

配置文件: application.yml

spring:
  activemq:
    user: system
    password: manager
    broker-url: tcp://127.0.0.1:61616
    pool:
      enabled: true
      max-connects: 20

server:
  port: 8081

添加依赖:


    org.springframework.boot
    spring-boot-starter-activemq


    org.apache.activemq
    activemq-pool


    org.messaginghub
    pooled-jms

发送Queue消息:

package com.tangguanlin.controller;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@RestController
public class OrderController {

    @Resource
    private JmsMessagingTemplate jmsMessagingTemplate;

    @GetMapping("addOrder")
    public Object addOrder(){

        //下单业务逻辑

        //更新库存,发送消息到ActiveMQ
        ActiveMQQueue queue = new ActiveMQQueue("库存队列1");
        jmsMessagingTemplate.convertAndSend(queue,"商品ID:1001,数量:1");

        Map map = new HashMap();
        map.put("result","添加订单成功");
        return map;
    }
}
6.3 接收Queue消息

1.启动类

package com.tangguanlin;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StoreApplication {
    public static void main(String[] args) {
        SpringApplication.run(StoreApplication.class);
    }
}

2.接收消息

package com.tangguanlin.controller;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class StoreController {

    //从MQ中获取消息 更新库存
    @JmsListener(destination = "库存队列1")
    public void updateStore(String message){
        System.out.println("库存系统接收到的消息"+message);

        //去数据库中更新库存......
    }
}
6.4 发送Topic消息
package com.tangguanlin.controller;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@RestController
public class OrderController {

    @Resource
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    
    @GetMapping("addOrder2")
    public Object addOrder2(){

        //下单逻辑

        //更新库存,发送消息到ActiveMQ
        ActiveMQTopic topic = new ActiveMQTopic("库存通知1");
        jmsMessagingTemplate.convertAndSend(topic,"通知内容,哈哈哈");

        Map map = new HashMap();
        map.put("result","添加订单成功2");
        return map;
    }
}
6.5 接收Topic消息

开启topic 发布/订阅模式

package com.tangguanlin.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;

@Configuration
public class ActivemqConfig {

    @Bean
    public JmsListenerContainerFactory topicLisenerContainer(ConnectionFactory activeMQConnectionFactory){
        DefaultJmsListenerContainerFactory topicListenerContainer = new DefaultJmsListenerContainerFactory();
        //把发布/订阅模式 激活 默认是关闭的
        topicListenerContainer.setPubSubDomain(true);
        topicListenerContainer.setConnectionFactory(activeMQConnectionFactory);
        return topicListenerContainer;
    }
}

StoreController库存类

package com.tangguanlin.controller;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class StoreController {
    
    @JmsListener(destination = "库存通知1",containerFactory = "topicLisenerContainer")
    public void updateStore2(String message){
        System.out.println("库存系统接收到的消息"+message);

        //去数据库中更新库存......
    }
}
7.ActiveMQ集群(略)

​ 使用zookeeper+ActiveMQ实现主从和集群

1.主从模式

​ 主从模式是一种高可用解决方案。在zookeeper中注册若干ActiveMQ Broker,其中只有一个Broker提供对外服务(Master),其他Broker处于待机状态。当Master出现故障导致宕机时,通过zookeeper内部的选举机制,选举一台从机替代主机机箱对外提供服务。

​ zookeeper: 分发器 管理activeMQ主从机制

​ activeMQ: 一主一备 一主两备

​ 3台 zookeeper + 3台 activeMQ(一主两备)

7.1 安装zookeeper

搭建伪集群,在同一个Linux中安装3个zookeeper实例。使用不同的端口实现同时启动。端口分配如下:

​ 主机 服务端口 投票端口 选举端口

192.168.159.130 2181 2181 3881

192.168.159.130 2182 2182 3882

192.168.159.130 2183 2183 3883

eStore2(String message){
System.out.println(“库存系统接收到的消息”+message);

    //去数据库中更新库存......
}

}



# 7.ActiveMQ集群(略)

​     使用zookeeper+ActiveMQ实现主从和集群

1.主从模式

​       主从模式是一种高可用解决方案。在zookeeper中注册若干ActiveMQ Broker,其中只有一个Broker提供对外服务(Master),其他Broker处于待机状态。当Master出现故障导致宕机时,通过zookeeper内部的选举机制,选举一台从机替代主机机箱对外提供服务。

​      zookeeper: 分发器  管理activeMQ主从机制

​        activeMQ: 一主一备    一主两备

​      3台 zookeeper  +  3台 activeMQ(一主两备)



## 7.1 安装zookeeper

 搭建伪集群,在同一个Linux中安装3个zookeeper实例。使用不同的端口实现同时启动。端口分配如下:

​          主机                      服务端口             投票端口          选举端口

192.168.159.130             2181                   2181                  3881

192.168.159.130             2182                   2182                  3882

192.168.159.130             2183                   2183                  3883

















转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/326741.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号