栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【自用】RabbitMq 生产、发送消息、接收消息

【自用】RabbitMq 生产、发送消息、接收消息

队列配置
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Configuration
@Component
public class xxxxxRabbitMqConfig {

    // 交换机名
    @Value("${xxxxxxxxxx}")
    private String 交换机名;

    // 队列名
    @Value("${xxxxxxxxxx}")
    private String 队列名;

    // Routingkey
    @Value("${xxxxxxxx}")
    private String Routingkey;

    // 声明 死信队列交换机
    @Bean("exchange")(可以自定义名字)
    public DirectExchange exchange() {
        return new DirectExchange(channelOrgChangeExchange);
    }

    // 声明队列
    @Bean("queue") (可以自定义名字)
    public Queue queue() {     // 导包: org.springframework.amqp.core
        return new Queue(队列名字, true);
    }

    // 绑定交换机和队列
    @Bean(绑定需要和前面定义名字关联)
    public Binding queuebBindingX(@Qualifier("queue") Queue queue,
                                  @Qualifier("exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }
}
消息发送
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;

--------------------------------------------------------------	
 	@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisTemplate redisTemplate;
---------------------------------------------------------------
	//可以自定义发送消息实体 赋值转为json字符串
	String sendMsg = JSON.toJSONString("发送消息实体");
	
	CorrelationData correlationData = new CorrelationData();
                // 绑定消息发送确认回调方法
                rabbitTemplate.set/confirm/iCallback(/confirm/iCallback);
                // 发送消息之前将消息存入 redis中 k消息id v发送消息
                archiveMsg(correlationData.getId(), sendMsg);
                rabbitTemplate.convertAndSend(交换机名字, routingkey, sendMsg, correlationData);
                log.info("mq发送完成:消息[{}]", sendMsg);

    private final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
        // mQ 发消息唯一id
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            // 发送成功
            if (!StringUtils.isEmpty(id)) {
                redisTemplate.delete(id);
            }
        } else {
            // 发送失败
            repeatMsg(id);
        }
    };
// 发送消息存储Redis
    private void archiveMsg(String id, String msg) {
        if (com.alibaba.druid.util.StringUtils.isEmpty(id)) {
            log.error("rabbitmq收到未知的空消息!");
            return;
        }
        log.info("消息存档,消息Id[{}]", id);
        redisTemplate.opsForSet().add(id, msg);
    }


    // 消息发送失败重新发送
    private void repeatMsg(String id) {
        if (com.alibaba.druid.util.StringUtils.isEmpty(id)) {
            log.error("rabbitmq收到未知的空消息!");
            return;
        }
        // redis中获取到失败的消息
        String msg = redisTemplate.opsForValue().get(id).toString();
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(id);
        // 绑定消息发送确认回调方法
        rabbitTemplate.set/confirm/iCallback(/confirm/iCallback);
        // 重新发送消息
        rabbitTemplate.convertAndSend(交换机, routingkey, msg);
    }
消息接收
```java
	@RabbitHandler
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "队列名字", durable = "true"), exchange =
    @Exchange(value = "交换机名字"), key = "key名字"))
    public void 方法名(@Payload String msg, @Headers Map headers, Channel channel)
            throws Exception {
        String msgId = String.valueOf(headers.get("id"));
        long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
        log.info("接mq通知消息,msgId:{},消息:{},headers:{}", msgId, msg, headers);
        
        try {
             自定义接收实体 = JSON.parseObject(msg, 转换类型.class);
            log.info("实体对象:{}", 自定义接收实体);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
            return;
        }
        if (null != 接收实体) {
            
			//接收到实体处理
            } catch (Exception e) {
                log.info("处理异常");
            }
        }
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758416.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号