栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitMQ使用

rabbitMQ使用

一、声明队列交换机绑定关系
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQConfigTest {
	
	
	public static final String TEST_QUEUE = "test.queue";
	
	
	public static final String TEST_EXCHANGE = "test.exchange";
	
	
	public static final String TEST_KEY = "TEST.NONE" ;
	
	
	@Bean(TEST_QUEUE)
	public Queue testQueue() {
		return new Queue(TEST_QUEUE);
	}

	
	@Bean(TEST_EXCHANGE )
	public TopicExchange emailTodoTopicExchange() {
		return new TopicExchange(TEST_EXCHANGE , true, false);
	}
	
	
	
	@Bean
	public Binding bindingTestQueue(@Qualifier(TEST_QUEUE) Queue queue,
	                                @Qualifier(TEST_EXCHANGE ) TopicExchange exchange) {
		return BindingBuilder.bind(queue).to(exchange).with(TEST_KEY);
	}
	
	
	@Bean
	public Jackson2JsonMessageConverter testConverter() {
		return new Jackson2JsonMessageConverter();
	}
	
}
二、创建消费者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;



@Component
@Slf4j
public class MqTest {
	
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	
	
	@RabbitListener(queues = "test.queue")
	public void sendMail(ActivitiEventVO event, Channel channel, Message message) throws IOException {
		// 获取消息体中的幂等性KEY
		String businessKey = event.getBusinessKey();
		// 创造redis缓存KEY,为了记录重试次数
		String key = "TEST_QUEUE" + ":" + businessKey;
		Long count = stringRedisTemplate.opsForValue().increment(key, 1);
		
		try {
			// todo 业务代码 切记出现异常需要抛出,否则不会重试,例如下方注释代码:
			if ("5".equals(businessKey) || "2".equals(businessKey)) {
				log.error("消费失败:{}", businessKey);
				throw new RuntimeException();
			}
		} catch (Exception e) {
			e.printStackTrace();
			// 这里的10次与后续配置文件中的重试次数相同
			if (count != null && count >= 10) {
				log.error("重试次数达限删除");
				// 重试次数超限处理
				basicAck("追加提取流程邮件通知", event, key, channel, message.getMessageProperties().getDeliveryTag());
				return;
			} else {
				throw new RuntimeException();
			}
			
		}
		
		// 正常确认消费消息
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
		// 删除redis中缓存的key
		stringRedisTemplate.delete(key);
		log.info("消费成功:{}", businessKey);
	}
	
	
	public void basicAck(String type, ActivitiEventVO event, String key, Channel channel, long var1) throws IOException {
		try {
			// 删除redis中缓存的key
			stringRedisTemplate.delete(key);
			if ("5".equals(event.getBusinessKey())) {
				log.error("模拟确认失败:{}", event.getBusinessKey());
				throw new RuntimeException();
			}
			// 确认消费消息
			channel.basicAck(var1, false);
			
			// todo 可视业务作出警报处理等,例如发邮件报警

		} catch (Exception e) {
			log.error("消息消费确认失败");
			e.printStackTrace();
			try {
				//出现异常,不删除队列信息,信息重新放置在队列中
				channel.basicNack(var1, false, true);
			} catch (IOException ioException) {
				log.error("重新放置在队列失败");
				ioException.printStackTrace();
			}
		}
	}
}
三、配置yml文件
spring:
  rabbitmq:
    addresses: 172.168.0.1:5672 #RabbitMq的服务器地址
    username: root #RabbitMq的服务器的登录名
    password: root #RabbitMq的服务器的登录密码
    listener:
      simple: 
        retry:
          enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
          max-attempts: 10 #最大重试次数
          initial-interval: 10000 #重试间隔时间(单位毫秒) 
          max-interval: 30000 #重试最大时间间隔(单位毫秒) 
          multiplier: 2 #应用于上一次重试间隔的乘法器,例如上次间隔10秒,本次间隔20秒,但上限不能超过重试最大时间间隔。
        acknowledge-mode: manual #手动确认消息
        prefetch: 1 #消息预读数量 1表示每次从队列中读取一条消息
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327190.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号