- 为什么需要RocketMQ中间件?什么是中间件,就像redis一样,也是个中间件,基于内存的存储中间件。为什么需要内存存储中间件,学过JVM知道,JVM的内存常常是有限的,需要一个外部内部,所以就出现了Redis,充当内存,即中间件。那么RocketMQ是什么?也可以认为像redis一样是一个内存中间件,只不过是用不同数据结构来设计的,用了队列来设计装载消息文本。首先需要下载rocketMQ解压到云服务器端,部署在端口运行。(此处省略部署流程,和redis一样道理)如何使用?理解几个对象。nameserver是用来确定我们部署在服务器上MQ的地址(localhost:8085)的,broker是用来装数据的,MQ本身。product是生产者,发送消息(存入)到MQ内存。Consume是消费者,消费(读取)MQ内存中的数据。------------------详解几个重要对象------结构图,非常重要!------product是生产者----------生产去的地址producer.setNamesrvAddr("localhost:9876");;确定放入的组DefaultMQProducer producer = new DefaultMQProducer("group1");发送消息的同步、异步、单向:--------Consume是消费者-------消费要去的地址consumer.setNamesrvAddr("localhost:9876");确定消费的组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");确定消费的分类和标签consumer.subscribe("酒店topic", "维也纳Tag");
创建一个RocketMQListener,继承RocketMQListener<**>接口,
重写onMessage(String message)方法。这个方法是专门消费消息的,
queue的消息都会被输入这个方法中。
设置消费模式(消息的分配模式,维度为topic):----------广播模式:consumer.setMessageModel(MessageModel.BROADCASTING); 集群模式:consumer.setMessageModel(MessageModel.CLUSTERING); 设置消费方式(把消息读出来的方式)------------pull:主动消费型,每次都由程序员写的代码直接拉取消息,若MQ没有消息,则返回null。push(常用):被动消费型,注册消息监听器,只要MQ来了消息,就会消费。消息监听器的其它作用:顺序消费:consumer.registerMessageListener(new MessageListenerOrderly() {.... 常用在对顺序有要求的场景,如对商家群里商家发送的话:牛,奶,不要钱--->奶,牛,不要钱。但怎么保证顺序消费?那就是让生产者按顺序存入一个queue中(只求顺序不求相连),Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法。单线程消费。并发消费:相对的是并发的消息监听器MessageListenerConcurrently)常用在对顺序要求不大时,如一般通知:你的外卖已送达、新人买菜优惠已到账、有奖调查....。多线程消费。---------------事务:-------------为什么需要事务消息?当完成支付订单时,就自动发订单消息给酒店前台。但是如果并没有完成支付,就发送订单消息给酒店,这是不被允许的。所以需要事务消息,即让发送的消息成为事务的原子之一。需要发送的消息:事务消息监听器:@RocketMQTransactionListener(txProducerGroup = "美团group")
该注解用于定义一个执行本地事务execute和消息回查check的实现类。
继承RocketMQLocalTransactionListener接口,重写executeLocalTransaction(执行本地事务)方法、checkLocalTransaction(检查本地事务状态)方法,可以return RocketMQTransactionState.COMMIT/ROLLBACK。
例子:
--------------常见问题:--------------
rocketMQ有什么用?流量削峰,让写消息与读消息分开。支持集群扩容、高可用、数据0丢失。
消息是否会被超时删除?不会,消息事实上都会被存储在commitlog中,queue上存储的commitlog的物理地址。只有删除commitlog,消息才会被永久删除。不存在超时删除这种情况。当消息被消费后,默认48小时删除不再使用的commitlog文件。若消息不被消费,会进行次重试队列%RETRY%messageConsumerGroupTest,重试完还没被消息,就会进入死信队列%DLQ%+consumergroup。
rocketMQ怎么解决消息堆积问题?首先明白什么是消息堆积?消息没有被消息,就不会删除而产生堆积。为什么会堆积?生产者生产太多、消费者消费太少。1、如果消费者太少,可以自己上线更多生产者,把暂时没用的消息消费掉。如何优化高吞吐量下的性能?同一个group(消费者的订阅),多机部署,并行消费。任何一台beoker宕机怎么办?集群是master和slave模式,主从都有消息,保证了MQ的高可用性。消费者端如何保证数据不丢失(确定MQ的数据被消费)?在消费后进行手动确认ack(消费之后MQ会返回一个响应,HTTP头中的ack是确认位,ack=1说明确认已经连接)如何保证消息不被重复消费?


