栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMQ快速入门

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQ快速入门

 创建Topic
public class TopicDemo {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("IM");
        producer.setNamesrvAddr("172.16.55.185:9876");
        producer.start();

        
        producer.createTopic("broker_im", "im_topic", 8);
        System.out.println("创建topic成功");
        producer.shutdown();
}
发送同步消息
public class SyncProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("IM");
        producer.setNamesrvAddr("172.16.55.185:9876");
        producer.start();
        String msgStr = "用户A发送消息给用户B";
        Message msg = new Message("im_topic","SEND_MSG",
        msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 发送消息
        SendResult sendResult = producer.send(msg);
        System.out.println("消息状态:" + sendResult.getSendStatus());
        System.out.println("消息id:" + sendResult.getMsgId());
        System.out.println("消息queue:" + sendResult.getMessageQueue());
        System.out.println("消息offset:" + sendResult.getQueueOffset());
        producer.shutdown();
    }
}

打印结果:

消息状态:SEND_OK

消息id:AC1037A0307418B4AAC2374062400000

消息queue:MessageQueue [topic=im_topic, brokerName=broker_im, queueId=6]

消息offset:0

Message数据结构

发送异步消息
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("IM");
        producer.setNamesrvAddr("172.16.55.185:9876");
        // 发送失败的重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);
        producer.start();
        String msgStr = "用户A发送消息给用户B";
        Message msg = new Message("im_topic", "SEND_MSG",
                msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 异步发送消息
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息状态:" + sendResult.getSendStatus());
                System.out.println("消息id:" + sendResult.getMsgId());
                System.out.println("消息queue:" + sendResult.getMessageQueue());
                System.out.println("消息offset:" + sendResult.getQueueOffset());
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("发送失败!" + e);
            }
        });
        System.out.println("发送成功!");
        // producer.shutdown();
    }
}
 消费消息
public class ConsumerDemo {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("IM");
        consumer.setNamesrvAddr("172.16.55.185:9876");
        // 订阅topic,接收此Topic下的所有消息
        consumer.subscribe("haoke_im_topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        System.out.println(new String(msg.getBody(), "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("收到消息->" + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
其他消费方式
//完整匹配
consumer.subscribe("im_topic", "SEND_MSG");
//或匹配
consumer.subscribe("im_topic", "SEND_MSG || SEND_MSG1");
消息过滤器

        创建消息时定制消息,给消息加上一些额外的属性,利用关系特性对消息进行过滤!需要broker开启属性过滤配置,类似SQLwhere语句。

#加入到broker的配置文件中

enablePropertyFilter=true

重启broker使得配置生效!

发送消息到Topic
public class SyncProducerFilter {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("IM");
        producer.setNamesrvAddr("172.16.55.185:9876");
        producer.start();
        String msgStr = "美女001";
        Message msg = new Message("meinv_topic","SEND_MSG",
        msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.putUserProperty("age", "18");
        msg.putUserProperty("sex", "女");
        // 发送消息
        SendResult sendResult = producer.send(msg);
        System.out.println("消息状态:" + sendResult.getSendStatus());
        System.out.println("消息id:" + sendResult.getMsgId());
        System.out.println("消息queue:" + sendResult.getMessageQueue());
        System.out.println("消息offset:" + sendResult.getQueueOffset());
        System.out.println(sendResult);
        producer.shutdown();
    }
}
消费方
public class ConsumerFilterDemo {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("IM");
        consumer.setNamesrvAddr("172.16.55.185:9876");
        // 订阅topic,接收此Topic下的所有消息
        consumer.subscribe("meinv_topic", MessageSelector.bySql("age>=20 AND
                sex = '女'"));
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                                    ConsumeConcurrentlyContext context) {
                        for (MessageExt msg : msgs) {
                            try {
                                System.out.println(new String(msg.getBody(), "UTF-8"));
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                        System.out.println("收到消息->" + msgs);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
        consumer.start();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/345271.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号