1、下载安装包(地址戳这里)apache-activemq-5.16.3-bin.tar.gz并上传到云服务器。
2、解压:tar -zvxf apache-activemq-5.16.3-bin.tar.gz
3、相关配置文件
activemq.xml可以修改各协议连接的ip地址:
jetty.xml可以修改管理后台的ip及端口
jetty-realm.properties可查看账户名及密码
4、进入bin目录执行./activemq start命令启动activeMq,其他命令如下:
5、访问http://xxxx:xxxx/8161/admin进入管理页面
注:如不能访问请开启防火墙端口,如是云服务器,只要在入方向开放对应端口就可以,更多安装及启动方式请 戳这里
二、springboot集成activemq1、添加依赖包
org.apache.activemq activemq-all5.8.0
2、发布者
ActiveMqttClient(单例模式连接类)
package com.mq.server.busi.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
public class ActiveMqttClient {
private static volatile ActiveMqttClient instance = null;
private static TopicConnection connection = null;
private static final String USERNAME = env("ACTIVEMQ_USER", "admin");
private static final String PASSWORD = env("ACTIVEMQ_PASSWORD", "password");
private static final String HOST = env("ACTIVEMQ_HOST", "118.31.168.121");
private static final int PORT = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));
private ActiveMqttClient() {
try {
//创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://" + HOST + ":" + PORT);
//创建连接
connection = connectionFactory.createTopicConnection(USERNAME, PASSWORD);
//开启连接
connection.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static synchronized ActiveMqttClient getInstance() {
if (instance == null) {
instance = new ActiveMqttClient();
}
return instance;
}
public TopicSession getSession() {
try {
//创建会话,不需要事务则传入false,注释session.commit(),如果是需要事务则传入true,放开session.commit()
return connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if( rc== null ) {
return defaultValue;
}
return rc;
}
}
MqServerBusiServiceImpl(发送消息)
package com.mq.server.busi.service.impl;
import com.mq.server.busi.config.ActiveMqttClient;
import com.mq.server.busi.service.IMqServerBusiService;
import org.springframework.stereotype.Service;
import javax.jms.MapMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
@Service
public class MqServerBusiServiceImpl implements IMqServerBusiService {
private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance();
@Override
public void sendMessage(String msg) {
try {
TopicSession session = activeMqttClient.getSession();
//创建topic
Topic topic = session.createTopic("topic-test");
TopicPublisher publisher = session.createPublisher(topic);
String message1 = "发送消息:测试activemq用mqtt协议以Topic主题发布和订阅方式发送消息";
MapMessage message = session.createMapMessage();
message.setString("text", message1);
publisher.send(message);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、消费者
package com.sys.server.busi.config; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQMapMessage; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; import javax.jms.*; import java.util.Map; @Component public class ActiveMqttConsume implements ApplicationListener{ private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance(); public void reveiveMessage() { try { TopicSession session = activeMqttClient.getSession(); //创建topic Topic topic = session.createTopic("topic-test"); MessageConsumer consumer = session.createConsumer(topic); do { Message msg = consumer.receive(); if (msg instanceof TextMessage) { String body = ((TextMessage) msg).getText(); System.out.println(body); } if (msg instanceof ActiveMQMapMessage) { Map body = ((ActiveMQMapMessage) msg).getContentMap(); System.out.println(body); } if (msg instanceof ActiveMQBytesMessage) { String body = ((ActiveMQBytesMessage) msg).readUTF(); System.out.println(body); } else { System.out.println("Unexpected message type: " + msg.getClass()); } } while (true); } catch (Exception e) { e.printStackTrace(); } } @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { this.reveiveMessage(); } }
启动服务验证
PS:大家可以参考部署那一块examples下面的demo,里面有各个协议各种语言的demo;另外,activeMq也可以换成apollo,apollo 是 ActiveMQ的子工程,是 ActiveMQ的下一代消息代理,是一个更快、更可靠、更容易维护的消息代理。



