栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Java从0到架构师】RocketMQ 使用 - 发送消息、接收消息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Java从0到架构师】RocketMQ 使用 - 发送消息、接收消息

RocketMQ 消息中间件
  • 入门案例
    • NameServer 地址
  • 发送消息
    • 同步发送消息
    • 异步发送消息
    • 一次性发送消息
    • 生产者组、消息封装
  • 接收消息
    • 消费方式:推式消费、拉式消费
    • 消息方式:集群模式、广播模式
    • 消息消费位置
    • 消息确认

Java 从 0 到架构师目录:【Java从0到架构师】学习记录

入门案例

	org.apache.rocketmq
	rocketmq-client
	4.7.0

生产者:

public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        //1 创建一个生产者
        DefaultMQProducer producer = new DefaultMQProducer("maoge");
        //2 指定NameServer的地址
        producer.setNamesrvAddr("192.168.52.128:9876");
        //3 启动生产者
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 4 创建消息对象
            Message msg = new Message("01_hello", ("hello" + i).getBytes());
            // 5 发送消息
            SendResult result = producer.send(msg);
            System.out.println(result.getSendStatus());
            System.out.println("result.getMsgId() = " + result.getMsgId());
        }
        //6 关闭应用程序
        producer.shutdown();
    }
}

消费者:

public class ConsumerDemo {
    public static void main(String[] args) throws MQClientException {
        //1 创建一个消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("maoge");
        // 设置NameServer的地址, 通过配置环境变量、JVM参数, 可不写死
        consumer.setNamesrvAddr("192.168.52.128:9876");
        //2 指定消费的主题
        consumer.subscribe("01_hello","*");
        //3 指定从哪个位置开始消费(只是对第一次启动消费的时候有效果)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //4 指定一个监听器, 并发消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println(Thread.currentThread().getName() + "收到消息:" + new String(messageExt.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5 启动消费者
        consumer.start();
    }
}
NameServer 地址

在生产者或消费者找 NameServer 的地址不能写死(代码要在多环境运行),以下方式可以取代在代码中写死 consumer.setNamesrvAddr("192.168.52.128:9876");

  • 方法一:配置环境变量,NAMESRV_ADDR 的值为 IP地址
  • 方法二:通过 JVM 参数传递
发送消息 同步发送消息

同步,指的是在发送数据到消息中间件的时候,需要及时的返回一个结果到发送者

  • 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知
异步发送消息

在异步发送消息的时候,将数据发送到消息中心,此时程序不会等待消息中心的返回结果,而是继续往下执行,当有结果返回的时候,通过异步通知的方式告诉消息生产者

  • 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应
public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        //1 创建一个生成者对象
        DefaultMQProducer producer = new DefaultMQProducer("async");
        //2 设置NameServer的地址
        // producer.setNamesrvAddr("192.168.52.128:9876");
        //3 启动生产者
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        //4 创建消息Message
        //5 发送消息
        //创建一个计数器
        final CountDownLatch2 countDownLatch = new CountDownLatch2(5);
        for (int i = 0; i < 5; i++) {
            Message msg = new Message("02_async", "hello, NBA".getBytes());
            try {
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(Thread.currentThread().getName());
                        if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                            System.out.println("消息发送成功" + JSON.toJSONString(sendResult));
                        }
                        countDownLatch.countDown();
                    }

                    //异常处理
                    @Override
                    public void onException(Throwable e) {
                        e.printStackTrace();
                        countDownLatch.countDown();
                    }
                });
            } catch (MQClientException e) {
                //补救措施  把对应的消息先保存到另外一个地方 MySQL, 自己到时候重新触发 发送消息
                e.printStackTrace();
            } catch (RemotingException e) {
                //补救措施
                e.printStackTrace();
            } catch (InterruptedException e) {
                //补救措施
                e.printStackTrace();
            }
        }
        // 如果使用异步操作, 需要等待接收完所有的异步返回结果之后, 再去关闭主线程
        countDownLatch.await();// 等待计数器归0
        //6 关闭生产者
        producer.shutdown();

    }
}
一次性发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送

public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        //1 创建一个生成者对象
        DefaultMQProducer producer = new DefaultMQProducer("03_oneway");
        //2 设置NameServer的地址
        // producer.setNamesrvAddr("192.168.52.128:9876");
        //3 启动生产者
        producer.start();
        //4 创建消息Message
        //5 发送消息
        //创建一个计数器
        for (int i = 0; i < 5; i++) {
            // Tag: 消息标签, 用来进行消息分类  支付消息--> wx支付,zfb支付
            // Key: 业务消息  订单消息: key: 订单id/订单号
            Message msg = new Message("03_oneway", "TagB", i + "", "hello, NBA".getBytes());
            producer.sendOneway(msg);//日志, 大数据应用
        }
        //6 关闭生产者
        producer.shutdown();
    }
}
生产者组、消息封装

生产者组
同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

消息封装

  • Tag:用来给消息进行标记
    通过 Tag 对消息进行分类,把不同类型的消息交给不同的消费者进行消费
  • Key:可以设置消息的一个唯一ID,用于区分每个消息的标志、业务ID
    在管理控制台,可以通过 Key 进行消息的查询跟踪
接收消息 消费方式:推式消费、拉式消费

推式消费:消息中心主动推送消息给消费者消费(需要绑定推送事件)

public class PushConsumerDemo {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("type");
        consumer.setNamesrvAddr("192.168.52.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅主题, 后面可以指定表达式Tag, 根据tag进行消息过滤
        consumer.subscribe("01_hello","*");
        //注册消息消费监听 有消息就会触发consumeMessage方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                System.out.println("接收消息线程: "+Thread.currentThread().getName());
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息:" + new String(msg.getBody()));
                }
                //确定消息消费从
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

拉式消费:消费者主动去消费中心拉取消息(默认 5 秒拉取 1 次)

public class PullConsumerDemo {
    public static void main(String[] args) throws Exception {
        //创建一个拉取消息的消费者对象
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("pullType2");
        //设置对应的NameServer地址
        consumer.setNamesrvAddr("192.168.52.128:9876");
        //设置从那里开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅对应的Topic
        consumer.subscribe("01_hello","*");
        //启动消费者
        consumer.start();
        while (true) {
            //开始获取消息
            List msgs = consumer.poll();
            //处理消息结果
            for (MessageExt msg : msgs) {
                System.out.println("接收到的消息:" + new String(msg.getBody()));
            }
        }
    }
}
消息方式:集群模式、广播模式

集群模式:对于同一个消费者组里面的多个消费者,每个消费者消费的消息都是不一样的,相当于消费者的负载均衡

设置消费者的模式:

consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式

广播模式:每个消费者都会接受全部的消息,所有消费者消费的数据都是一样的

  • 一般用于对于消息需要多个其他业务进行处理

设置消费者的模式:

consumer.setMessageModel(MessageModel.BROADCASTING); // 广播模式
消息消费位置

在指定消费的 pos 位置的时候,会优先获取服务端记录的上次消费点,所以该参数只有在服务端没有对应的消费者的记录的时候有效,一般情况是第一次启动的消费者有效。

  • CONSUME_FROM_FIRST_OFFSET:从最开始的位置消费,会消费该 Topic 下面所有的有效的数据,过期的数据会删除掉
  • CONSUME_FROM_LAST_OFFSET:
    1 如果该 Topic 的数据都是最近的数据,没有过期数据,则从最开始的位置消费
    2 如果该 Topic 的数据存在过期的数据,则从最后的位置开始消费,只会消费新加入的数据
  • CONSUME_FROM_TIMESTAMP
    根据指定的时间戳进行消费,配合 consumer.setConsumeTimestamp("20200612083300"); 从指定的时间开始消费;如果不指定,则默认从半个小时前的数据开始消费
消息确认

拉式消费:可以通过 consumer.setAutoCommit(false); 设置是否自动提交;

  • 如果设置为手动提交,需要使用 consumer.commitSync(); 方法进行手动提交
  • 对于未提交的操作,Topic 中的订阅的偏移量不会发生改变,下次消费的时候会继续消费改数据

推式消费:

  • 返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态表示消费成功
  • 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 表示消费不成功,会放入到重试队列
  • 默认重试采用服务端重试(重试次数 16 次)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/343305.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号