public class TopicDemo {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("IM");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
producer.createTopic("broker_im", "im_topic", 8);
System.out.println("创建topic成功");
producer.shutdown();
}
发送同步消息
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("IM");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("im_topic","SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
producer.shutdown();
}
}
打印结果:
消息状态:SEND_OK
消息id:AC1037A0307418B4AAC2374062400000
消息queue:MessageQueue [topic=im_topic, brokerName=broker_im, queueId=6]
消息offset:0
Message数据结构 发送异步消息public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("IM");
producer.setNamesrvAddr("172.16.55.185:9876");
// 发送失败的重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
producer.start();
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("im_topic", "SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败!" + e);
}
});
System.out.println("发送成功!");
// producer.shutdown();
}
}
消费消息
public class ConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("IM");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("haoke_im_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
其他消费方式
//完整匹配
consumer.subscribe("im_topic", "SEND_MSG");
//或匹配
consumer.subscribe("im_topic", "SEND_MSG || SEND_MSG1");
消息过滤器
创建消息时定制消息,给消息加上一些额外的属性,利用关系特性对消息进行过滤!需要broker开启属性过滤配置,类似SQLwhere语句。
#加入到broker的配置文件中
enablePropertyFilter=true
重启broker使得配置生效!
发送消息到Topicpublic class SyncProducerFilter {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("IM");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
String msgStr = "美女001";
Message msg = new Message("meinv_topic","SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("age", "18");
msg.putUserProperty("sex", "女");
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
消费方
public class ConsumerFilterDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("IM");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("meinv_topic", MessageSelector.bySql("age>=20 AND
sex = '女'"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}



