栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ 简单模式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ 简单模式

配置RabbitMQ

@Configuration
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory getFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/zwe-api");
        return factory;
    }
}

生产者:

@Log
@Component
public class SimpleMessageTask {


    @Resource
    ConnectionFactory faction;

    @Resource
    PersonalMessageDao personalMessageDao;


    public void send(String topic) {
        try (Connection connection = faction.newConnection();
             Channel channel = connection.createChannel();
             ) {
            channel.queueDeclare(topic, true, false, false, null);
            String msg = "Hello Ja7";
            // 此处可设置设置消息头
            HashMap hashMap = new HashMap();
            hashMap.put("id", "10000");
            AMQP.BasicProperties build = new AMQP.BasicProperties().builder().headers(hashMap).build();
            channel.basicPublish("", topic, build, msg.getBytes("UTF-8"));
        } catch (Exception e) {
            e.printStackTrace();
            throw new ZweApiException("消息队列发布失败");
        }
    }

消费之

@Log
@Component
public class SimpleMessageTask {

    @Resource
    ConnectionFactory faction;

    public void receive(String topic) {
        int i = 0; // ack应答后才会执行下一个
        try (Connection connection = faction.newConnection();
             Channel channel = connection.createChannel()
        ) {
            channel.queueDeclare(topic, true, false, false, null);
            // 循环将队列里的消息全部读完
            while (true) { 
                GetResponse getResponse = channel.basicGet(topic, false);
                if (getResponse != null) {
                    AMQP.BasicProperties props = getResponse.getProps();
                    Map headers = props.getHeaders();
                    // 拿到头信息
                    String id = headers.get("id").toString();

                    // 消息体
                    byte[] body = getResponse.getBody();
                    String msg = new String(body, "UTF-8");
                    
                    // 业务代码

                    
                    // ack应答
                    long deliveryTag = getResponse.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);

                    i++;
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new ZweApiException("接收消息失败");
        }
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/763508.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号