ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
- 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
- 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
- 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支持通过JDBC和journal提供高速的消息持久化
- 从设计上保证了高性能的集群,客户端-服务器,点对点
- 支持Ajax
- 支持与Axis的整合
- 可以很容易得调用内嵌JMS provider,进行测试
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage – Java原始值的数据流
· MapMessage–一套名称-值对
· TextMessage–一个字符串对象
· ObjectMessage–一个序列化的 Java对象
· BytesMessage–一个字节的数据流
第一步: 把ActiveMQ 的压缩包上传到Linux系统。
第二步:解压缩。
第三步:启动。
使用bin目录下的activemq命令启动:
[root@localhost bin]# ./activemq start
关闭:
[root@localhost bin]# ./activemq stop
查看状态:
[root@localhost bin]# ./activemq status
http://192.168.25.175:8161/admin/
访问不了,
[root@training bin]# cat /etc/sysconfig/network NETWORKING=yes HOSTNAME=training
修改host文件
[root@training bin]# cat /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 192.168.25.175 training ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 [root@training bin]#ActiveMQ的使用方法 2. Queue、Topic使用示例
taotao-search-service
testQueueProducerActiveMQConnectionFactory
Connection connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
Session
Queue session.createQueue(“test-queue”)
MessageProducer session.createProducer(queue)
TextMessage session.createTextMessage(“hello activemq222”)
producer.send(textMessage);
//queue
//Producer
@Test
public void testQueueProducer() throws Exception {
//1.创建一个连接工厂对象ConnectionFactory对象。需要指定mq服务的ip及端口
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.175:61616");
//2.使用ConnectionFactory创建一个连接Connection对象
Connection connection = connectionFactory.createConnection();
//3.开启连接。调用Connection对象的start方法
connection.start();
//4.使用Connection对象创建一个Session对象
//第一个参数是是否开启事务,一般不使用事务。保证数据的最终一致,可以使用消息队列实现。
//如果第一个参数为true,第二个参数自动忽略。如果不开启事务false,第二个参数为消息的应答模式。一般自动应答就可以。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session对象创建一个Destination对象,两种形式queue、topic。现在应该使用queue
//参数就是消息队列的名称
Queue queue = session.createQueue("test-queue");
//6.使用Session对象创建一个Producer对象
MessageProducer producer = session.createProducer(queue);
//7.创建一个TextMessage对象
TextMessage textMessage = session.createTextMessage("hello activemq222");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
testQueueConsumer
ActiveMQConnectionFactory
Connection connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
Session
Queue session.createQueue(“test-queue”)
MessageConsumer session.createConsumer(queue)
MessageListener consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {}
)
producer.send(textMessage);
@Test
public void testQueueConsumer() throws Exception {
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.175:61616");
//使用连接工厂对象创建一个连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用连接对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session创建一个Destination,Destination应该和消息的发送端一致。
Queue queue = session.createQueue("test-queue");
//使用Session创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(queue);
//向Consumer对象中设置一个MessageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//取消息的内容
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
//打印消息内容
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//系统等待接收消息
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
testTopicProducer 发完消息,自动结束
Topic topic = session.createTopic(“test-topic1”);
MessageProducer producer = session.createProducer(topic);
//topic
//Producer
@Test
public void testTopicProducer() throws Exception {
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.175:61616");
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建Destination,应该使用topic
Topic topic = session.createTopic("test-topic1");
//创建一个Producer对象
MessageProducer producer = session.createProducer(topic);
//创建一个TextMessage对象
TextMessage textMessage = session.createTextMessage("hello activemq topic23");
//发送消息
producer.send(textMessage);
//关闭资源
producer.close();
session.close();
connection.close();
}
testTopicConsumser 阻塞式 一直等消息发来
Topic topic = session.createTopic(“test-topic1”);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {})
@Test
public void testTopicConsumser() throws Exception {
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.175:61616");
//使用连接工厂对象创建一个连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用连接对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session创建一个Destination,Destination应该和消息的发送端一致。
Topic topic = session.createTopic("test-topic1");
//使用Session创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(topic);
//向Consumer对象中设置一个MessageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//取消息的内容
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
//打印消息内容
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//系统等待接收消息
System.out.println("topic消费者3.。。。");
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
testTopicConsumser2 阻塞式 一直等消息发来
订阅同一个topic的消费者,可以同时收到消息
代码同上
截图示例
activeMQ admin
send a1 消息
testTopicConsumser 1,2
activeMQ admin 使用jsmTemplate 发送消息 taotao-manager-service
@Test
public void testJmsTemplate() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//从容器中获得JmsTemplate对象
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//从容器中获得Destination对象
Destination destination = (Destination) applicationContext.getBean("test-queue");
//发送消息
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("spring activemq send queue message");
System.out.println("activemq:"+message.getText());
return message;
}
});
}
发送 testJmsTemplate
taotao-search-service 后台收到
MyMessageListener
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
//接收到消息
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(text);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4. 添加商品同步索引库
ItemServiceImpl Producer Taotao-manager-service
Taotao-manager-server工程中发送消息。
当商品添加完成后发送一个TextMessage,包含一个商品id。
package com.taotao.service.impl;
import java.util.Date;
import java.util.List;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.taotao.common.pojo.EasyUIDataGridResult;
import com.taotao.common.pojo.TaotaoResult;
import com.taotao.common.utils.IDUtils;
import com.taotao.common.utils.JsonUtils;
import com.taotao.jedis.JedisClient;
import com.taotao.mapper.TbItemDescMapper;
import com.taotao.mapper.TbItemMapper;
import com.taotao.pojo.TbItem;
import com.taotao.pojo.TbItemDesc;
import com.taotao.pojo.TbItemExample;
import com.taotao.service.ItemService;
@Service
public class ItemServiceImpl implements ItemService {
@Autowired
private TbItemMapper itemMapper;
@Autowired
private TbItemDescMapper itemDescMapper;
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name="itemAddtopic")
private Destination destination;
@Autowired
private JedisClient jedisClient;
@Value("${ITEM_INFO}")
private String ITEM_INFO;
@Value("${TIEM_EXPIRE}")
private Integer TIEM_EXPIRE;
@Override
public TaotaoResult addItem(TbItem item, String desc) {
//生成商品id
final long itemId = IDUtils.genItemId();
//补全item的属性
item.setId(itemId);
//商品状态,1-正常,2-下架,3-删除
item.setStatus((byte) 1);
item.setCreated(new Date());
item.setUpdated(new Date());
//向商品表插入数据
itemMapper.insert(item);
//创建一个商品描述表对应的pojo
TbItemDesc itemDesc = new TbItemDesc();
//补全pojo的属性
itemDesc.setItemId(itemId);
itemDesc.setItemDesc(desc);
itemDesc.setUpdated(new Date());
itemDesc.setCreated(new Date());
//向商品描述表插入数据
itemDescMapper.insert(itemDesc);
//向Activemq发送商品添加消息
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//发送商品id
TextMessage textMessage = session.createTextMessage(itemId + "");
Topic topic= (Topic)destination;
System.out.println("message-topic-out:"+topic.getTopicName()+"-"+itemId);
return textMessage;
}
});
//返回结果
return TaotaoResult.ok();
}
}
ItemAddMessageListener Consumer Taotao-search-service
功能分析
1、接收消息。需要创建MessageListener接口的实现类。
2、取消息,取商品id。
3、根据商品id查询数据库。
4、创建一SolrInputdocument对象。
5、使用SolrServer对象写入索引库。
6、返回成功,返回TaotaoResult。
public class ItemAddMessageListener implements MessageListener{
@Autowired
private SearchItemMapper searchItemMapper;
@Autowired
private SolrServer solrServer;
@Override
public void onMessage(Message message) {
try {
//从消息中取商品id
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("activemq-re:"+text);
long itemId = Long.parseLong(text);
//根据商品id查询数据,取商品信息
//等待事务提交
Thread.sleep(1000);
SearchItem searchItem = searchItemMapper.getItemById(itemId);
//创建文档对象
SolrInputdocument document = new SolrInputdocument();
//向文档对象中添加域
document.addField("id", searchItem.getId());
document.addField("item_title", searchItem.getTitle());
document.addField("item_sell_point", searchItem.getSell_point());
document.addField("item_price", searchItem.getPrice());
document.addField("item_image", searchItem.getImage());
document.addField("item_category_name", searchItem.getCategory_name());
document.addField("item_desc", searchItem.getItem_desc());
//把文档对象写入索引库
solrServer.add(document);
//提交
solrServer.commit();
} catch (Exception e) {
e.printStackTrace();
}
}



