Linux 64位操作系统
64bit JDK 1.8+
unzip rocketmq-all-4.4.0-bin-release.zip
tail -f /root/logs/rocketmqlogs/namesrv.log启动Broker
修改runbroker.sh,runserver.sh
运行nohup ./mqbroker -n localhost:9876 &查看日志
tail -f /root/logs/rocketmqlogs/broker.log测试 发送消息
export NAMESRV_ADDR=localhost:9876 /usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer接收消息
export NAMESRV_ADDR=localhost:9876 /usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer关闭RocketMQ
/usr/local/rocketmq/bin/mqshutdown broker /usr/local/rocketmq/bin/mqshutdown namesrv原理介绍
解压后只需要rocketmq-console
修改rocketmq-consolesrcmainresources
到根目录下执行打包
mvn clean package -Dmaven.test.skip=true
target目录下运行jar包
访问localhost:7777
RocketMQ消息基础 导入依赖消息发送org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2
//创建消息生产者,设置生产组名
DefaultMQProducer cg_producer_group = new DefaultMQProducer("cg_producer_group");
//设置NameServer地址
cg_producer_group.setNamesrvAddr("192.168.238.130:9876");
//启动生产者
cg_producer_group.start();
//构建消息对象,主题,标签,内容
Message message = new Message("cgTopic", "cgTag", ("RocketMQ Message Test").getBytes());
//发送消息,设置超时时间
SendResult result = cg_producer_group.send(message, 1000);
System.out.println("result----------------------: "+result);
//关闭生产者
cg_producer_group.shutdown();
消息消费
//创建消费者,指定组名
DefaultMQPushConsumer cg_consumer_group = new DefaultMQPushConsumer("cg_consumer_group");
//为消费者设置NameServer地址
cg_consumer_group.setNamesrvAddr("192.168.238.130:9876");
//指定订阅的主题和标签
cg_consumer_group.subscribe("cgTopic","");
//设置回调函数,编写消息处理方法
cg_consumer_group.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("Message--------: " + list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息消费者
cg_consumer_group.start();
微服务发送消息案例
订单微服务
导入依赖
com.alibaba.cloud
spring-cloud-starter-alibaba-nacos-discovery
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.2
org.apache.rocketmq
rocketmq-client
4.4.0
修改配置文件
spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
#rocketmq
rocketmq:
name-server: 192.168.238.130:9876
producer:
group: shop-order #生产者组
发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/order/prod/{pid}")
public Order order(@PathVariable("pid") Long pid) {
//直接根据服务id获取服务
Product product = productFeignClient.findByPid(pid);
if (product.getId() == 250L){
return new Order().setId(250L);
}
log.info("查询{}号商品:{}", pid, JSON.toJSONString(product));
Order order = new Order().setNumber(1).setPid(pid).setUid(1L);
log.info("订单信息{}", order.toString());
orderService.createOrder(order);
//mq发送
rocketMQTemplate.convertAndSend("order-topic",order);
return order;
}
用户微服务
导入依赖
com.alibaba.cloud
spring-cloud-starter-alibaba-nacos-discovery
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.2
org.apache.rocketmq
rocketmq-client
4.4.0
修改配置文件
spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
#rocketmq
rocketmq:
name-server: 192.168.238.130:9876
producer:
group: shop-order #生产者组
实现接口
@Service
//消费者组名 消费者主题
@RocketMQMessageListener(
//消费者组名
consumerGroup = "shop-user",
//消费者主题
topic = "order-topic",
//消费模式:无序和有序
consumeMode = ConsumeMode.CONCURRENTLY,
//消息模式:广播和集群,默认是集群
messageModel = MessageModel.CLUSTERING)
public class SmsService implements RocketMQListener {
@Override
public void onMessage(Order order) {
System.out.println("接收到一个订单信息");
System.out.println(order.toString());
}
}
广播消费:每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理集群模式: 一条消息只能被一个消费者实例消费
普通消息 @Autowired
private RocketMQTemplate rocketMQTemplate;
可靠同步发送
@Test
public void testSyncSend() {
SendResult result = rocketMQTemplate.syncSend("test-topic-1:test-tag", "同步消息", 10000);
System.out.println("运行结果" + result);
}
可靠异步发送
@Test
public void testAsyncSend() {
rocketMQTemplate.asyncSend("test-topic-1:test-tag", "异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
System.out.println("======================");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
单向发送
@Test
public void testOneWay(){
rocketMQTemplate.sendOneWay("test-topic-1:test-tag","单向消息");
}
顺序消息
@Test
public void testOneWayOrderly(){
// hashkey 要求不重复
rocketMQTemplate.sendOneWayOrderly("test-topic-1:test-tag","单向顺序消息","xx");
}
同时还有同步顺序消息,异步顺序消息都是在原方法后加上Orderly,同时加上hashkey参数,指定唯一值
事务消息
@Entity(name = "shop_txlog")
@Data
public class TxLog {
@Id
private String txId;
private Date date;
}
dao
public interface TxLogDao extends JpaRepositoryservice{ }
@Service
public class OrderServiceImplTest {
@Autowired
private OrderDao orderDao;
@Autowired
private TxLogDao txLogDao;
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrderBefore(Order order) {
String txId = UUID.randomUUID().toString();
rocketMQTemplate.sendMessageInTransaction(
"tx_producer_group",
"tx_topic",
MessageBuilder.withPayload(order).setHeader("txId",txId).build(),
order
);
}
@Transactional
public void createOrder(String txId,Order order) {
orderDao.save(order);
TxLog txLog = new TxLog();
txLog.setTxId(txId);
txLog.setDate(new Date());
txLogDao.save(txLog);
}
}
listen
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImplTestListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderServiceImplTest orderServiceImplTest;
@Autowired
private TxLogDao txLogDao;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String txId = (String) message.getHeaders().get("txId");
try {
Order order = (Order) o;
orderServiceImplTest.createOrder(txId, order);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String txId = (String) message.getHeaders().get("txId");
TxLog txLog = txLogDao.findById(txId).get();
if (txLog != null) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
controller
orderServiceImplTest.createOrderBefore(order);



