目的:springboot项目中,作为消费者获取redis里对应topic中的消息
一、导入依赖
org.springframework.boot spring-boot-starter-data-redis
二、yml配置redis信息
三、编写配置类
编写监听配置类 RedisMessageListenerConfig
package com.setch.crodigy.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisMessageListenerConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener,配置不同的交换机 订阅频道
// container.addMessageListener(listenerAdapter, new PatternTopic("/sys/[a-zA-Z0-9_-]*/[a-zA-Z0-9_-]*/thing/service/property/set"));
container.addMessageListener(listenerAdapter, new PatternTopic(Constants.REDIS_IOT_PUBLISH_TOPIC));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(RedisRecevier receiver) {
//创建监听适配器,指定订阅者及其方法
System.out.println("消息适配器1");
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
四、编写消息处理类
package com.setch.crodigy.redis;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@RequiredArgsConstructor
@Component
public class RedisRecevier implements MessageListener {
private final MqttGateway mqttGateway;
@Override
public void onMessage(Message message, byte[] pattern) {
String s = new String(message.getBody(), StandardCharsets.UTF_8);
JSonObject jsonObject = JSONUtil.parseObj(s);
System.out.println("**********************"+jsonObject+"**************************");
//获取messge中payload中包含的下发指令的mqtt消息体
JSON jsonMessage = JSONUtil.parse(new String(message.getBody()));
Object objPayload = jsonMessage.getByPath("payload");
String strPayload = JSONUtil.toJsonStr(objPayload);
char[] charPayload = strPayload.toCharArray();
char[] charPayload2 = Arrays.copyOf(charPayload, charPayload.length + 1);
charPayload2[charPayload.length] = 0x04;
strPayload = String.valueOf(charPayload2);
mqttGateway.sendToMqtt(strPayload, Constants.CRODIGY_RECEIVE_TOPIC);
System.out.println(objPayload);
}
}
到此完成从redis中对应topic中获取message消息,并对消息进行处理,我这里是把消息重新封装后发送到Mqtt的topic中。



