栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Linux、Windows安装ActiveMQ

Linux、Windows安装ActiveMQ

Linux、Windows安装ActiveMQ
  • 一、下载
  • 二、Linux安装
    • 2.1、下载方式
    • 2.2、测试
    • 2.3、配置
  • 三、Windows安装
  • 四、ActiveMQ集群
    • 4.1、为什么使用集群
    • 4.2、Master Slave 部署(主从)
      • 4.2.1、Share storage master slave(共享存储)
      • 4.2.2、Replicated LevelDB Store(使用ZooKeeper协调多个Broker)
      • 4.2.3、KahaDB

一、下载

下载地址:https://activemq.apache.org/components/classic/download/

选择相应的系统版本

二、Linux安装 2.1、下载方式
  • 下载tar.gz包到本地,然后上传到服务器
  • 通过wget下载

安装路径:/opt/home/mq

## 下载
wget https://dlcdn.apache.org/activemq/5.16.3/apache-activemq-5.16.3-bin.tar.gz


## 复制到其它服务器
scp -r /opt/home/mq/apache-activemq-5.16.3-bin.tar.gz root@192.168.0.5:/opt/home/mq/

## 解压
tar -zxvf apache-activemq-5.16.3-bin.tar.gz

## 切换到apache-activemq-5.16.3/bin/目录下
cd apache-activemq-5.16.3/bin/

## 启动
./activemq start

## 停止
./activemq stop

## 重启
./activemq restart

## 查看状态
./activemq status


## 检查端口是否使用
netstat -lnp | grep 8161
netstat -lnp | grep 61616
2.2、测试

浏览器访问:http://192.168.0.3:8161/

发现无法访问此网站,防火墙放行端口也无法访问

2.3、配置

这是因为 Activemq 默认只能当前服务器访问,如果需要外部访问,需要修改jetty.xml文件

## 修改apache-activemq-5.16.3/conf/jetty.xml文件
vim /opt/home/mq/apache-activemq-5.16.3/conf/jetty.xml

## 找到id为jettyPort的bean 将其中的127.0.0.1 改为 0.0.0.0

    
    
    


## 重启ActiveMQ
./activemq restart

再通过浏览器访问:http://192.168.0.3:8161/
默认用户名/密码:admin/admin

三、Windows安装

下载解压后,进入binwin64下,双击activemq.bat,启动

访问:http://127.0.0.1:8161
选择Manage ActiveMQ broker(管理ActiveMQ代理),输入账号和密码登陆。
默认用户名/密码:admin/admin

四、ActiveMQ集群

集群分为两种方式:

  • 伪集群:集群节点都搭在一台机器上

  • 真集群:集群节点分布在多台机器上

4.1、为什么使用集群
  • 实现高可用,以排除单点故障引起的服务中断。

  • 实现负载均衡,以提升效率为更多的客户提供服务。

4.2、Master Slave 部署(主从)

通过部署多个broker实例,选举产生一个master和多个slave,master宕机后由slave接管服务来达到高可用性。Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker Cluster的部署方式刚好可以解决负载均衡的问题。一般两者结合使用。

这里主要介绍2种配置方案

  • Shared File System Master slave 模式
  • JDBC Store Master Slave 模式
4.2.1、Share storage master slave(共享存储)

此模式中Master和Slave的数据是共享的(相当于共享同一个数据库),当master失效后,slave会自动接管服务,无需手动进行数据的copy与同步,因为master存储数据之后,这些数据在任何时候对slave都是可见的。
master与slave之间,通过共享文件的“排他锁”或者分布式排他锁(ZooKeeper)来决定Broker的状态与角色,获取锁权限的Broker作为master,其它的Broker则作为slave。如果master失效,它必将失去锁权限,那么其它的slave将通过锁竞争来选举新master,没有获取锁权限的Broker作为slave,并等待锁的释放(间歇性尝试获取锁)。

1、Shared File System Master Slave模式(只适合单台主机部署,不适合多台主机部署)

这种方式是最常用的模式,架构简单,可靠实用。我们只需要一个SAN文件系统即可。使用文件系统来共享数据文件,多个Broker共享同一个文件系统。

  
       

2、JDBC Store Master Slave模式(适合多台主机部署)
数据存储用的是数据库(MySQL/Oracle等),相对于日志文件而言,JDBC Store通常认为是低效的。配置如下

  
	...
    
		 
		 
    
  
 

	
	
	
	
	



	
	
	
	
	

下载 mysql-connector-java-5.1.47.jar放到/opt/home/mq/apache-activemq-5.16.3/lib目录下
新建activemq数据库,

-- 如果存在数据库删除
DROP DATAbase IF EXISTS `activemq`;
-- 创建数据库
CREATE DATAbase `activemq` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';

-- 使用activemq数据库
USE activemq;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for activemq_acks
-- ----------------------------
DROP TABLE IF EXISTS `activemq_acks`;
CREATE TABLE `activemq_acks`  (
  `CONTAINER` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  `SUB_DEST` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  `CLIENT_ID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  `SUB_NAME` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  `SELECTOR` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  `LAST_ACKED_ID` bigint(20) NULL DEFAULT NULL,
  `PRIORITY` bigint(20) NOT NULL DEFAULT 5,
  `XID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  PRIMARY KEY (`CONTAINER`, `CLIENT_ID`, `SUB_NAME`, `PRIORITY`) USING BTREE,
  INDEX `ACTIVEMQ_ACKS_XIDX`(`XID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of activemq_acks
-- ----------------------------

-- ----------------------------
-- Table structure for activemq_lock
-- ----------------------------
DROP TABLE IF EXISTS `activemq_lock`;
CREATE TABLE `activemq_lock`  (
  `ID` bigint(20) NOT NULL,
  `TIME` bigint(20) NULL DEFAULT NULL,
  `BROKER_NAME` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  PRIMARY KEY (`ID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of activemq_lock
-- ----------------------------
INSERT INTO `activemq_lock` VALUES (1, NULL, NULL);

-- ----------------------------
-- Table structure for activemq_msgs
-- ----------------------------
DROP TABLE IF EXISTS `activemq_msgs`;
CREATE TABLE `activemq_msgs`  (
  `ID` bigint(20) NOT NULL,
  `CONTAINER` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  `MSGID_PROD` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  `MSGID_SEQ` bigint(20) NULL DEFAULT NULL,
  `EXPIRATION` bigint(20) NULL DEFAULT NULL,
  `MSG` blob NULL,
  `PRIORITY` bigint(20) NULL DEFAULT NULL,
  `XID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  PRIMARY KEY (`ID`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_MIDX`(`MSGID_PROD`, `MSGID_SEQ`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_CIDX`(`CONTAINER`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_EIDX`(`EXPIRATION`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_PIDX`(`PRIORITY`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_XIDX`(`XID`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_IIDX`(`ID`, `XID`, `CONTAINER`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of activemq_msgs
-- ----------------------------

SET FOREIGN_KEY_CHECKS = 1;

数据表说明:

ACTIVEMQ_MSGS
-- 消息表,缺省表名ACTIVEMQ_MSGS,Queue和Topic都存在里面
-- ID:自增的数据库主键
-- CONTAINER:消息的Destination
-- MSGID_PROD:消息发送者的主键
-- MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
-- EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
-- MSG:消息本体的Java序列化对象的二进制数据
-- PRIORITY:优先级,从0-9,数值越大优先级越高

ACTIVEMQ_ACKS
-- 用于存储订阅关系,如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
-- CONTAINER: 消息的Destination
-- SUB_DEST: 如果是使用Static集群, 这个字段会有集群其他系统的信息
-- CLIENT_ID : 每个订阅者都必须有一个唯一的客户端ID用以区分
-- SUB_NAME: 订阅者名称
-- SELECTOR: 选择器,可以选择只消费满足条件的消息, 条件可以用自定义属性实现, 可支持多属性AND和OR操作
-- LAST_ACKED_ID: 记录消费过的ID
-- ACTIVEMQ_ACKS表存储订阅的消息和最后一个持久订阅接收的消息ID

ACTIVEMQ_LOCK
-- 在集群环境中才有用, 只有一个Broker可以获得消息, 称为Master Broker, 其他的只能作为备份等待Master Broker不可用, 才可能成为下一个Master Broker, 这个表用于记录哪个Broker是当前的Master Broker.

启动activemq,如果启动报错,尝试将数据库编码改为latin1 或者ASCII

./activemq start

测试代码

public class Producer {
    private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.0.3:61616,tcp://192.168.0.4:61616,tcp://192.168.0.5:61616)?randomize=true";
    private static final String ACTIVEMQ_QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        for (int i = 0; i < 10; i++) {
            TextMessage textMessage = session.createTextMessage("***发送消息顺序: " + i + "***发送消息优先级:" + i);
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L);//设置延迟时间
           // producer.send(textMessage);
            producer.send(queue, textMessage);
        }
        System.out.println("消息发送完成");
        producer.close();
        session.close();
        connection.close();
    }
}
public class Consumer {
    private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.0.3:61616,tcp://192.168.0.4:61616,tcp://192.168.0.5:61616)?randomize=true";
    private static final String ACTIVEMQ_QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        String messageText = textMessage.getText();
                        System.out.println("***消费者接收消息: " + messageText);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }

                }
            }
        });

        //保证控制台不灭
        System.in.read();
    }
}

4.2.2、Replicated LevelDB Store(使用ZooKeeper协调多个Broker)

基于复制的LevelDB Store模式是ActiveMQ 5.9以后新增的特性,这是ActiveMQ全力打造的HA存储引擎。 一般都使用这种方式。由于利用zk 进行配置管理,可以方便监控,同时配置也相对简单。

使用ZooKeeper(集群)注册所有的ActiveMQ Broker。只有其中的一个Broker可以对外提供服务(也就是Master节点),其他的Broker处于待机状态,被视为Slave。如果Master因故障而不能提供服务,则利用ZooKeeper的内部选举机制会从Slave中选举出一个Broker充当Master节点,继续对外提供服务。

由于基于ZooKeeper(通常ZooKeeper集群至少需要3个实例,才能保证ZooKeeper本身的高可用性),所以Broker最低需要3个。
警告

Warning
The LevelDB store has been deprecated and is no longer supported or recommended for use. The recommended store is KahaDB
意思是:
LevelDB存储已被弃用,不再支持或建议使用。推荐的使用KahaDB
参考:http://activemq.apache.org/replicated-leveldb-store.html

4.2.3、KahaDB

KahaDB 是一个基于文件的持久性数据库,它位于使用它的消息代理本地。它已针对快速持久性进行了优化。它是自ActiveMQ 5.4以来的默认存储机制。KahaDB 使用更少的文件描述符并提供比其前身AMQ 消息存储更快的恢复。
配置


    
      
    
 

KahaDB 属性

属性默认注释
archiveCorruptedIndexfalse如果true,启动时发现的损坏索引将被存档(不删除)。
archiveDataLogsfalse如果true, 会将消息数据日志移动到存档目录而不是将其删除。
checkForCorruptJournalFilesfalse如果true,将在启动时检查损坏的日志文件并尝试恢复它们。
checkpointInterval5000检查点日志之前的时间(毫秒)。
checksumJournalFilestrue为日志文件创建校验和。需要存在校验和才能使持久性适配器能够检测损坏的日志文件。ActiveMQ 5.9.0之前:默认为false.
cleanupInterval30000确定哪些日志文件(如果有)可以从消息存储中删除的连续检查之间的间隔(以毫秒为单位)。符合条件的期刊文件是指没有未完成的参考文献。
compactAcksAfterNoGC10从ActiveMQ 5.14.0 开始:当启用确认压缩功能时,此值控制必须完成多少存储 GC 循环,并且在触发压缩逻辑以可能将分布在日志文件中的旧确认压缩到新日志之前没有其他文件被清理文件。设置的值越低,压缩可能发生得越快,如果它经常运行,这可能会影响性能。
compactAcksIgnoresStoreGrowthfalse从ActiveMQ 5.14.0 开始:当启用确认压缩功能时,此值控制压缩是在存储仍在增长时运行还是仅在存储停止增长时才运行(由于空闲或达到存储限制)。如果启用压缩,无论商店是否仍有空间或处于活动状态,都会运行压缩,这会降低整体性能但会更快地回收空间。
concurrentStoreAndDispatchQueuestrue允许将 Queue 消息分派给感兴趣的客户端,以与消息存储同时发生。
concurrentStoreAndDispatchTopicsfalse允许将 Topic 消息分派给感兴趣的客户端,与消息存储同时发生。不建议启用此属性。
directoryactivemq-data用于存储消息存储数据和日志文件的目录的路径。
directoryArchivenull定义将数据日志移动到的目录,当它们包含的所有消息都已被使用时。
enableAckCompactiontrue从ActiveMQ 5.14.0 开始:此设置控制存储是否对仅包含消息确认的旧日志日志文件执行定期压缩。通过将这些较旧的确认压缩到新的日志日志文件中,可以删除较旧的文件以释放空间并允许消息存储继续运行而不会达到存储大小限制。
enableIndexWriteAsyncfalse如果true,则索引异步更新。
enableJournalDiskSyncstrue确保每个日志写入后跟一个磁盘同步(JMS 持久性要求)。自ActiveMQ 5.14.0 起,此属性已弃用。从ActiveMQ 5.14.0 开始:请参阅journalDiskSyncStrategy。
ignoreMissingJournalfilesfalse如果true,则忽略丢失日志文件的报告。
indexCacheSize10000内存中缓存的索引页数。
indexDirectory从ActiveMQ 5.10.0 开始:如果设置,则配置 KahaDB 索引文件(db.data和 db.redo)的存储位置。如果未设置,则索引文件存储在directory属性指定的目录中 。
indexWriteBatchSize1000批量写入的索引数。
journalDiskSyncInterval1000何时执行磁盘同步的时间间隔(毫秒) journalDiskSyncStrategy=periodic。仅当自上次磁盘同步后日志发生写入或日志滚动到新日志文件时,才会执行同步。
journalDiskSyncStrategyalways从ActiveMQ 5.14.0 开始:此设置配置磁盘同步策略。可用同步策略的列表是(按照降低安全性和提高性能的顺序):always确保每个日志写入后跟一个磁盘同步(JMS 持久性要求)。这是最安全的选项,但也是最慢的,因为它需要在每次写入消息后进行同步。这相当于已弃用的属性 enableJournalDiskSyncs=true。periodic磁盘将按设定的时间间隔(如果发生写入)而不是在每次日志写入后同步,这将减少磁盘上的负载并提高吞吐量。滚动到新的日志文件时,磁盘也将同步。默认间隔为 1 秒。默认间隔提供了非常好的性能,同时比 never磁盘同步,因为数据丢失被限制为最多 1 秒的价值。请参阅journalDiskSyncInterval更改磁盘同步的频率。never永远不会显式调用同步,由操作系统刷新到磁盘。这相当于设置了 deprecated 属性enableJournalDiskSyncs=false。这是最快的选项,但最不安全,因为无法保证数据何时刷新到磁盘。因此,消息丢失可能会在代理失败时发生。
journalMaxFileLength32mb设置消息数据日志最大大小的提示。
maxAsyncJobs10000将排队等待存储的异步消息的最大数量(应与并发 MessageProducers 的数量相同)。
preallocationScopeentire_journal从ActiveMQ 5.14.0 开始:此设置配置日志数据文件的预分配方式。默认策略在第一次使用时使用 appender 线程预分配日志文件。 entire_journal_async将在单独的线程中提前使用预分配。none禁用预分配。在 SSD 上,使用 entire_journal_async可避免在首次使用时延迟等待预分配的写入。注意:在 HDD 上,额外的磁盘线程争用会产生负面影响。因此使用默认值。
preallocationStrategysparse_file从ActiveMQ 5.12.0 开始:此设置配置代理将如何在需要新日志文件时尝试预分配日志文件。sparse_file- 设置文件长度,但不填充任何数据。os_kernel_copy- 将预分配委托给操作系统。zeros - 每个预分配的日志文件只包含0x00整个内容。
storeOpenWireVersion11确定编组到 KahaDB 日志的 OpenWire 命令的版本。ActiveMQ 5.12.0之前:默认值为6. 代理的某些功能取决于存储在来自较新协议修订版的 OpenWire 命令中的信息,如果存储版本设置为较低的值,这些功能可能无法正常工作。来自高于 5.9.0 的代理版本的 KahaDB 存储在许多情况下仍可由代理读取,但会导致代理继续使用旧的存储版本,这意味着新功能可能无法按预期工作。对于在ActiveMQ 5.9.0之前的版本中创建的 KahaDB 存储,必须手动设置storeOpenWireVersion="6"以便无错误地启动代理。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433916.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号