栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【微服务/淘淘商城实践/SSM框架】06 商城后台管理系统 添加商品同步索引库 ActiveMQ消息队列&spring整合 Producer Consumer queue/topic

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【微服务/淘淘商城实践/SSM框架】06 商城后台管理系统 添加商品同步索引库 ActiveMQ消息队列&spring整合 Producer Consumer queue/topic

1. ActiveMQ消息队列 介绍

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试
ActiveMQ的消息形式

对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
  · StreamMessage – Java原始值的数据流
  · MapMessage–一套名称-值对
  · TextMessage–一个字符串对象
  · ObjectMessage–一个序列化的 Java对象
  · BytesMessage–一个字节的数据流

安装 启动

第一步: 把ActiveMQ 的压缩包上传到Linux系统。
第二步:解压缩。
第三步:启动。
使用bin目录下的activemq命令启动:
[root@localhost bin]# ./activemq start
关闭:
[root@localhost bin]# ./activemq stop
查看状态:
[root@localhost bin]# ./activemq status

http://192.168.25.175:8161/admin/
访问不了,

[root@training bin]# cat /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=training

修改host文件

[root@training bin]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
192.168.25.175 training
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
[root@training bin]# 
ActiveMQ的使用方法

2. Queue、Topic使用示例

taotao-search-service

testQueueProducer

ActiveMQConnectionFactory
Connection connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
Session
Queue session.createQueue(“test-queue”)
MessageProducer session.createProducer(queue)
TextMessage session.createTextMessage(“hello activemq222”)
producer.send(textMessage);

//queue
//Producer
@Test
public void testQueueProducer() throws Exception {
	//1.创建一个连接工厂对象ConnectionFactory对象。需要指定mq服务的ip及端口
	ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.175:61616");
	//2.使用ConnectionFactory创建一个连接Connection对象
	Connection connection = connectionFactory.createConnection();
	//3.开启连接。调用Connection对象的start方法
	connection.start();
	//4.使用Connection对象创建一个Session对象
	//第一个参数是是否开启事务,一般不使用事务。保证数据的最终一致,可以使用消息队列实现。
	//如果第一个参数为true,第二个参数自动忽略。如果不开启事务false,第二个参数为消息的应答模式。一般自动应答就可以。		
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	//5.使用Session对象创建一个Destination对象,两种形式queue、topic。现在应该使用queue
	//参数就是消息队列的名称
	Queue queue = session.createQueue("test-queue");
	//6.使用Session对象创建一个Producer对象
	MessageProducer producer = session.createProducer(queue);
	//7.创建一个TextMessage对象
	
	TextMessage textMessage = session.createTextMessage("hello activemq222");
	//8.发送消息
	producer.send(textMessage);
	//9.关闭资源
	producer.close();
	session.close();
	connection.close();
	
}
testQueueConsumer

ActiveMQConnectionFactory
Connection connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
Session
Queue session.createQueue(“test-queue”)
MessageConsumer session.createConsumer(queue)
MessageListener consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {}

producer.send(textMessage);

	@Test
	public void testQueueConsumer() throws Exception {
		//创建一个连接工厂对象
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.175:61616");
		//使用连接工厂对象创建一个连接
		Connection connection = connectionFactory.createConnection();
		//开启连接
		connection.start();
		//使用连接对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//使用Session创建一个Destination,Destination应该和消息的发送端一致。
		Queue queue = session.createQueue("test-queue");
		//使用Session创建一个Consumer对象
		MessageConsumer consumer = session.createConsumer(queue);
		//向Consumer对象中设置一个MessageListener对象,用来接收消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				//取消息的内容
				if (message instanceof TextMessage) {
					TextMessage textMessage = (TextMessage) message;
					try {
						String text = textMessage.getText();
						//打印消息内容
						System.out.println(text);
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
				
			}
		});
		//系统等待接收消息
		
		System.in.read();
		//关闭资源
		consumer.close();
		session.close();
		connection.close();
	}
testTopicProducer 发完消息,自动结束

Topic topic = session.createTopic(“test-topic1”);
MessageProducer producer = session.createProducer(topic);

	//topic
	//Producer
	@Test
	public void testTopicProducer() throws Exception {
		//创建一个连接工厂对象
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.175:61616");
		//创建连接
		Connection connection = connectionFactory.createConnection();
		//开启连接
		connection.start();
		//创建Session
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//创建Destination,应该使用topic
		Topic topic = session.createTopic("test-topic1");
		//创建一个Producer对象
		MessageProducer producer = session.createProducer(topic);
		//创建一个TextMessage对象
		TextMessage textMessage = session.createTextMessage("hello activemq topic23");
		//发送消息
		producer.send(textMessage);
		//关闭资源
		producer.close();
		session.close();
		connection.close();
	}
testTopicConsumser 阻塞式 一直等消息发来

Topic topic = session.createTopic(“test-topic1”);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {})

	@Test
	public void testTopicConsumser() throws Exception {
		//创建一个连接工厂对象
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.175:61616");
		//使用连接工厂对象创建一个连接
		Connection connection = connectionFactory.createConnection();
		//开启连接
		connection.start();
		//使用连接对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//使用Session创建一个Destination,Destination应该和消息的发送端一致。
		Topic topic = session.createTopic("test-topic1");
		//使用Session创建一个Consumer对象
		MessageConsumer consumer = session.createConsumer(topic);
		//向Consumer对象中设置一个MessageListener对象,用来接收消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				//取消息的内容
				if (message instanceof TextMessage) {
					TextMessage textMessage = (TextMessage) message;
					try {
						String text = textMessage.getText();
						//打印消息内容
						System.out.println(text);
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
				
			}
		});
		//系统等待接收消息
		
		System.out.println("topic消费者3.。。。");
		System.in.read();
		//关闭资源
		consumer.close();
		session.close();
		connection.close();
	}
testTopicConsumser2 阻塞式 一直等消息发来

订阅同一个topic的消费者,可以同时收到消息
代码同上

截图示例
activeMQ admin

send a1 消息

testTopicConsumser 1,2

3. Activemq整合spring applicationContext-activemq.xml
	
	
		
	
	
	
		
	
	
	
		
	
	
	
		
	
	
		
	
activeMQ admin

使用jsmTemplate 发送消息 taotao-manager-service
	@Test
	public void testJmsTemplate() throws Exception {
		//初始化spring容器
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
		//从容器中获得JmsTemplate对象
		JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
		//从容器中获得Destination对象
		Destination destination = (Destination) applicationContext.getBean("test-queue");
		//发送消息
		jmsTemplate.send(destination, new MessageCreator() {
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage message = session.createTextMessage("spring activemq send queue message");
				
				System.out.println("activemq:"+message.getText());
				return message;
			}
		});
		
	}

发送 testJmsTemplate

运行结果 MyMessageListener taotao-search-service

taotao-search-service 后台收到

Spring配置监听 applicationContext-activemq.xml
	
	
		
	
	
	
		
	
	
	
		
	
	
		
	
	
	
	
	
		
		
		
	
	
	
	
		
		
		
	
MyMessageListener
public class MyMessageListener implements MessageListener {

	@Override
	public void onMessage(Message message) {
		try {
			//接收到消息
			TextMessage textMessage = (TextMessage) message;
			String text = textMessage.getText();
			System.out.println(text);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

4. 添加商品同步索引库 ItemServiceImpl Producer Taotao-manager-service

Taotao-manager-server工程中发送消息。
当商品添加完成后发送一个TextMessage,包含一个商品id。

package com.taotao.service.impl;

import java.util.Date;
import java.util.List;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.taotao.common.pojo.EasyUIDataGridResult;
import com.taotao.common.pojo.TaotaoResult;
import com.taotao.common.utils.IDUtils;
import com.taotao.common.utils.JsonUtils;
import com.taotao.jedis.JedisClient;
import com.taotao.mapper.TbItemDescMapper;
import com.taotao.mapper.TbItemMapper;
import com.taotao.pojo.TbItem;
import com.taotao.pojo.TbItemDesc;
import com.taotao.pojo.TbItemExample;
import com.taotao.service.ItemService;


@Service
public class ItemServiceImpl implements ItemService {

	@Autowired
	private TbItemMapper itemMapper;
	@Autowired
	private TbItemDescMapper itemDescMapper;
	@Autowired
	private JmsTemplate jmsTemplate;
	@Resource(name="itemAddtopic")
	private Destination destination;
	@Autowired
	private JedisClient jedisClient;
	
	@Value("${ITEM_INFO}")
	private String ITEM_INFO;
	@Value("${TIEM_EXPIRE}")
	private Integer TIEM_EXPIRE;
	
	
	@Override
	public TaotaoResult addItem(TbItem item, String desc) {
		//生成商品id
		final long itemId = IDUtils.genItemId();
		//补全item的属性
		item.setId(itemId);
		//商品状态,1-正常,2-下架,3-删除
		item.setStatus((byte) 1);
		item.setCreated(new Date());
		item.setUpdated(new Date());
		//向商品表插入数据
		itemMapper.insert(item);
		//创建一个商品描述表对应的pojo
		TbItemDesc itemDesc = new TbItemDesc();
		//补全pojo的属性
		itemDesc.setItemId(itemId);
		itemDesc.setItemDesc(desc);
		itemDesc.setUpdated(new Date());
		itemDesc.setCreated(new Date());
		//向商品描述表插入数据
		itemDescMapper.insert(itemDesc);
		//向Activemq发送商品添加消息
		jmsTemplate.send(destination, new MessageCreator() {
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				//发送商品id
				TextMessage textMessage = session.createTextMessage(itemId + "");
				Topic topic= (Topic)destination;
				System.out.println("message-topic-out:"+topic.getTopicName()+"-"+itemId);
				return textMessage;
			}
		});
		//返回结果
		return TaotaoResult.ok();
	}

	

}

ItemAddMessageListener Consumer Taotao-search-service

功能分析
1、接收消息。需要创建MessageListener接口的实现类。
2、取消息,取商品id。
3、根据商品id查询数据库。
4、创建一SolrInputdocument对象。
5、使用SolrServer对象写入索引库。
6、返回成功,返回TaotaoResult。

public class ItemAddMessageListener implements MessageListener{
	
	@Autowired
	private SearchItemMapper searchItemMapper;
	@Autowired
	private SolrServer solrServer;

	@Override
	public void onMessage(Message message) {
		try {
			//从消息中取商品id
			TextMessage textMessage = (TextMessage) message;
			
			String text = textMessage.getText();
			
			System.out.println("activemq-re:"+text);
			long itemId = Long.parseLong(text);
			//根据商品id查询数据,取商品信息
			//等待事务提交
			Thread.sleep(1000);
			SearchItem searchItem = searchItemMapper.getItemById(itemId);
			//创建文档对象
			SolrInputdocument document = new SolrInputdocument();
			//向文档对象中添加域
			document.addField("id", searchItem.getId());
			document.addField("item_title", searchItem.getTitle());
			document.addField("item_sell_point", searchItem.getSell_point());
			document.addField("item_price", searchItem.getPrice());
			document.addField("item_image", searchItem.getImage());
			document.addField("item_category_name", searchItem.getCategory_name());
			document.addField("item_desc", searchItem.getItem_desc());
			//把文档对象写入索引库
			solrServer.add(document);
			//提交
			solrServer.commit();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/667683.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号