栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

SpringBoot整合RocketMQ搭建集群详细步骤

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot整合RocketMQ搭建集群详细步骤

文章目录
  • RocketMQ
    • 1、重试策略
      • 1.1 producer端重试
  • 1.2 consumer端重试
      • 1.2.1 exception
    • 2、RocketMQ的集群
      • 2.1 集群模式
    • 2.2 搭建2m2s集群
      • 搭建集群:
    • 3、SprinBoot整合RocketMQ
      • 3.1、导入依赖
      • 3.2、编写application.properties配置文件
      • 3.3、生产者发送消息
      • 3.4、消费消息
      • 3.5、编写启动类
      • 3.6、编写测试用例
      • 3.7、测试
      • 3.8、事务消息
        • 3.8.1、定义TransactionListenerImpl
        • 3.8.2、定义生产者
        • 3.8.3、消费者(没有变化)
        • 3.8.4、编写测试用例
        • 3.8.5、测试


RocketMQ 1、重试策略

在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重
试需要分2种,分别是producer端重试和consumer端重试。

1.1 producer端重试

生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失
败。

package cn.itcast.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
	public static void main(String[] args) throws Exception {
		DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
		producer.setNamesrvAddr("172.16.55.185:9876");
		//消息发送失败时,重试3次
		producer.setRetryTimesWhenSendFailed(3);
		producer.start();
		String msgStr = "用户A发送消息给用户B";
		Message msg = new Message("haoke_im_topic","SEND_MSG",
		msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
		// 发送消息,并且指定超时时间
		SendResult sendResult = producer.send(msg, 1000);
		System.out.println("消息状态:" + sendResult.getSendStatus());
		System.out.println("消息id:" + sendResult.getMsgId());
		System.out.println("消息queue:" + sendResult.getMessageQueue());
		System.out.println("消息offset:" + sendResult.getQueueOffset());
		System.out.println(sendResult);
		producer.shutdown();
	}
}
1.2 consumer端重试

消费者端的失败,分为2种情况,一个是exception,一个是timeout。

1.2.1 exception

消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话
费充值,当前消息的手机号被注销,无法充值)等。
消息的状态:

package org.apache.rocketmq.client.consumer.listener;
public enum ConsumeConcurrentlyStatus {
	
	CONSUME_SUCCESS,
	
	RECONSUME_LATER;
}

可以看到,消息的状态分为成功或者失败。如果返回的状态为失败会怎么样呢?
在启动broker的日志中可以看到这样的信息:

INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h

这个表示了,如果消息消费失败,那么消息将会在1s、5s、10s后重试,一直到2h后不再重试。
其实,有些时候并不需要重试这么多次,一般重试3~5次即可。这个时候就可以通过msg.getReconsumeTimes()获
取重试次数进行控制。

package cn.itcast.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class ConsumerDemo {
	public static void main(String[] args) throws Exception {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
		consumer.setNamesrvAddr("172.16.55.185:9876");
		// 订阅topic,接收此Topic下的所有消息
		consumer.subscribe("my-test-topic", "*");
		consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List msgs,
		ConsumeConcurrentlyContext context) {
			for (MessageExt msg : msgs) {
			try {
				System.out.println(new String(msg.getBody(), "UTF-8"));
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
			}
			System.out.println("收到消息->" + msgs);
			if(msgs.get(0).getReconsumeTimes() >= 3){
				// 重试3次后,不再进行重试
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
				return ConsumeConcurrentlyStatus.RECONSUME_LATER;
			}
		});
		consumer.start();
	}
}

1.2.2、timeout
比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直
至发送成功为止!
也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败,这个时候定义为超时。

2、RocketMQ的集群 2.1 集群模式

在RocketMQ中,集群的部署模式是比较多的,有以下几种:

  • 单个Master

    这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。

  • 多Master模式

    一个集群无Slave,全是Master,例如2个Master或者3个Master
    单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

  • 多Master多Slave模式,异步复制

    每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级。
    优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master宕机后,消费者仍然可以从Slave消费,此过程对应用透明,不需要人工干预。性能同多Master模式几乎一样。
    缺点:Master宕机,磁盘损坏情况,会丢失少量消息。

  • 多Master多Slave模式,同步双写

    每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功。
    优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
    缺点:性能比异步复制模式略低,大约低10%左右。

    2.2 搭建2m2s集群

    下面通过docker搭建2master+2slave的集群。
    克隆新的镜像,配置如下:

    vi /etc/hostname

    vi /etc/sysconfig/network-scripts/ifcfg-ens32(修改IPADDR为192.168.208.131)

    vi /etc/hosts

    reboot # 重启linux服务器

搭建集群:

出现错误:

Error response from daemon: Get “https://registry-1.docker.io/v2/”: dial tcp: lookup registry-1.docker.io on 192.168.208.2:53: read udp 192.168.208.131:44902->192.168.208.2:53: i/o timeout

解决方法:

vi /etc/resolv.conf
添加 nameserver 8.8.8.8

#	创建2个master
#	nameserver1
docker create -p 9876:9876 --name rmqserver01 
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" 
-e "JAVA_OPTS=-Duser.home=/opt" 
-v /usr/local/develop/rmq/rmqserver01/logs:/opt/logs 
-v /usr/local/develop/rmq/rmqserver01/store:/opt/store 
foxiswho/rocketmq:server-4.3.2
#	nameserver2
docker create -p 9877:9876 --name rmqserver02 
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" 
-e "JAVA_OPTS=-Duser.home=/opt" 
-v /usr/local/develop/rmq/rmqserver02/logs:/opt/logs 
-v /usr/local/develop/rmq/rmqserver02/store:/opt/store 
foxiswho/rocketmq:server-4.3.2
#	创建第1个master broker
#	master broker01
docker create --net host --name rmqbroker01 
-e "JAVA_OPTS=-Duser.home=/opt" 
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" 
-e "autoCreateTopicEnable=true" 
-v /usr/local/develop/rmq/rmqbroker01/conf/broker.conf:/etc/rocketmq/broker.conf 
-v /usr/local/develop/rmq/rmqbroker01/logs:/opt/logs 
-v /usr/local/develop/rmq/rmqbroker01/store:/opt/store 
foxiswho/rocketmq:broker-4.3.2
#	配置broker.conf文件内容如下(需要先在linux主机上创建)
namesrvAddr=192.168.208.131:9876;192.168.208.131:9877
brokerClusterName=ItcastCluster
brokerName=broker01
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=192.168.208.131
brokerIp2=192.168.208.131
listenPort=10911
#	创建第2个master broker
#	master broker02
docker create --net host --name rmqbroker02 
-e "JAVA_OPTS=-Duser.home=/opt" 
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" 
-e "autoCreateTopicEnable=true" 
-v /usr/local/develop/rmq/rmqbroker02/conf/broker.conf:/etc/rocketmq/broker.conf 
-v /usr/local/develop/rmq/rmqbroker02/logs:/opt/logs 
-v /usr/local/develop/rmq/rmqbroker02/store:/opt/store 
foxiswho/rocketmq:broker-4.3.2
#	master broker02(broker.conf文件需要先创建好)
namesrvAddr=192.168.208.131:9876;192.168.208.131:9877
brokerClusterName=ItcastCluster
brokerName=broker02
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=192.168.208.131
brokerIp2=192.168.208.131
listenPort=10811
#	创建第1个slave broker
#	slave broker01
docker create --net host --name rmqbroker03 
-e "JAVA_OPTS=-Duser.home=/opt" 
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" 
-e "autoCreateTopicEnable=true" 
-v /usr/local/develop/rmq/rmqbroker03/conf/broker.conf:/etc/rocketmq/broker.conf 
-v /usr/local/develop/rmq/rmqbroker03/logs:/opt/logs 
-v /usr/local/develop/rmq/rmqbroker03/store:/opt/store 
foxiswho/rocketmq:broker-4.3.2
#	slave broker01
namesrvAddr=192.168.208.131:9876;192.168.208.131:9877
brokerClusterName=ItcastCluster
brokerName=broker01
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=192.168.208.131
brokerIp2=192.168.208.131
listenPort=10711
#创建第2个slave broker
#slave broker01
docker create --net host --name rmqbroker04 
-e "JAVA_OPTS=-Duser.home=/opt" 
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" 
-e "autoCreateTopicEnable=true" 
-v /usr/local/develop/rmq/rmqbroker04/conf/broker.conf:/etc/rocketmq/broker.conf 
-v /usr/local/develop/rmq/rmqbroker04/logs:/opt/logs 
-v /usr/local/develop/rmq/rmqbroker04/store:/opt/store 
foxiswho/rocketmq:broker-4.3.2
#slave broker02
namesrvAddr=192.168.208.131:9876;192.168.208.131:9877
brokerClusterName=ItcastCluster
brokerName=broker02
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=192.168.208.131
brokerIp2=192.168.208.131
listenPort=10611

部署RocketMQ的管理工具:

#	拉取镜像
docker pull styletang/rocketmq-console-ng:1.0.0
#	创建并启动容器
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.208.131:9876;192.168.208.131:9877 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t -d styletang/rocketmq-console-ng:1.0.0
#	启动容器
docker start rmqserver01 rmqserver02
docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04
3、SprinBoot整合RocketMQ 3.1、导入依赖


4.0.0

	org.springframework.boot
	spring-boot-starter-parent
	2.1.0.RELEASE

com.zrj.rocketmq
rocketmq-test
1.0-SNAPSHOT

	
		org.springframework.boot
		spring-boot-starter
	
	
		org.springframework.boot
		spring-boot-starter-test
		test
	
	
		org.apache.rocketmq
		rocketmq-spring-boot-starter
		2.0.1
	
	
		org.apache.rocketmq
		rocketmq-client
		4.3.2
	


	
	
		
			org.apache.maven.plugins
			maven-compiler-plugin
			3.2
			
				1.8
				1.8
				UTF-8
			
		
	


说明:rocketmq-spring-boot-starter的依赖包是不能直接从中央仓库下载的,需要自己通过源码install到本
地仓库的。

#源码地址(或者使用资料中的源码)
  https://github.com/apache/rocketmq-spring
#进入源码目录,执行如下命令
  mvn clean install

3.2、编写application.properties配置文件
# Spring boot application
spring.application.name = itcast-rocketmq
spring.rocketmq.nameServer=172.16.55.185:9876
spring.rocketmq.producer.group=my-group
3.3、生产者发送消息
@Component
public class SpringProducer {
	@Autowired
	private RocketMQTemplate rocketMQTemplate;
	
	public void sendMsg(String topic, String msg){
		this.rocketMQTemplate.convertAndSend(topic, msg);
	}
}
3.4、消费消息
@Component
@RocketMQMessageListener(topic = "my-topic",
consumerGroup = "haoke-consumer",
selectorExpression = "*")
public class SpringConsumer implements RocketMQListener {
	@Override
	public void onMessage(String msg) {
		System.out.println("接收到消息 -> " + msg);
	}
}
3.5、编写启动类
@SpringBootApplication
public class MyApplication {
	public static void main(String[] args) {
		SpringApplication.run(MyApplication.class, args);
	}
}
3.6、编写测试用例
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {

	@Autowired
	private SpringProducer springProducer;
	
	@Test
	public void testSendMsg(){
		this.springProducer.sendMsg("my-topic", "第一个Spring消息");
	}
}
3.7、测试

    先启动springboot,再运行测试用例,即可看到消费者接收到生产者发送的消息。

3.8、事务消息 3.8.1、定义TransactionListenerImpl
@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
	private static Map STATE_MAP = new
	HashMap<>();
	@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message message,
	Object o) {
		String transId =(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
		try {
			System.out.println("执行操作1");
			Thread.sleep(500);
			System.out.println("执行操作2");
			Thread.sleep(800);
			STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
			return RocketMQLocalTransactionState.COMMIT;
		} catch (Exception e) {
			e.printStackTrace();
		}
			STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
			return RocketMQLocalTransactionState.ROLLBACK;
	}
	@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
		String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
		System.out.println("回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId));
		return STATE_MAP.get(transId);
	}
}
3.8.2、定义生产者
@Component
public class SpringTransactionProducer {

	@Autowired
	private RocketMQTemplate rocketMQTemplate;
	
	
	public void sendMsg(String topic, String msg) {
		Message message = MessageBuilder.withPayload(msg).build();
		// myTransactionGroup要和 @RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")定义的一致
		this.rocketMQTemplate.sendMessageInTransaction("myTransactionGroup", topic, message, null);
		System.out.println("发送消息成功");
	}
}
3.8.3、消费者(没有变化)
@Component
@RocketMQMessageListener(topic = "my-topic",
consumerGroup = "haoke-consumer",
selectorExpression = "*")
public class SpringConsumer implements RocketMQListener {
	@Override
	public void onMessage(String msg) {
		System.out.println("接收到消息 -> " + msg);
	}
}
3.8.4、编写测试用例
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {

	@Autowired
	private SpringProducer springProducer;
	
	@Autowired
	private SpringTransactionProducer springTransactionProducer;
	
	@Test
	public void testSendMsg(){
		this.springProducer.sendMsg("my-topic", "第2个Spring消息");
	}
	@Test
	public void testSendMsg2(){
		this.springTransactionProducer.sendMsg("my-topic", "第3个Spring消息");
	}
}
3.8.5、测试

测试结果与非Spring使用,结果一致。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/866045.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号