栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMq生产者消费者代码

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMq生产者消费者代码

生产者:
package com.atguigu.util; 


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.postgresql.core.QueryExecutor;
import org.postgresql.util.HostSpec;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;


public class Producer {


    //队列名称

    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.company.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.department.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.position.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.rank.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.employee.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.university.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.taginfo.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.district.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bank.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.category.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.stordoc.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.measdoc.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.mattaxes.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.customer.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.material.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bankaccbas.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.supplier.queue";


    //发消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工厂,该连接工厂其实就对应着我们访问http://182.92.210.39:15672/网站之后的rabbitmq,从这个工厂里可以获取队列
        ConnectionFactory factory = new ConnectionFactory() {

            public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String s, String s1, Properties properties) throws SQLException {
                return null;
            }
        };

        //工厂IP连接RabbitMQ的队列
        factory.setHost("10.106.11.37");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");

        //创建连接
        //Connection connection =  factory.newConnection();
        Connection connection = factory.newConnection();
        //获取信道,通过这个信道可以连接交换机Exchange,然后再连接队列Queue
        Channel channel = (connection).createChannel();

        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //要发送到hello队列中的消息
       // String message = "hello world";
        String message = "{"table_name":"md_company", "type":"insert", "time_now":"2022-05-09 11:20:10", "id":"2", "tenant_id":"2", "code":"2", "name" :"2", "short_name":"2", "bank_account":"2", "bank_name":"2", "tax_payer_no":"2", "legal_person":"2", "tel":"2", "established_date":"2", "business_license":"2", "is_region":"2" , "is_bm_payment":"2", "province_code":"2", "city_code":"2" , "address":"2" , "company_type_code":"2", "is_used":"2", "create_user":"2", "create_dept":"2", "create_time":"2022-05-09 11:20:10", "update_user":"2", "update_time":"2022-05-09 11:20:18", "status":"2", "is_deleted":"2", "edge_form_id":"2" , "data_type":"2","exchange_status":"2"}";
        
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        //如果消息成功的发送到了hello队列中,那么会输出这句代码
        System.out.println("消息发送完毕");
    }

}

消费者:

package com.atguigu.util; 


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Consumer {
    //消费者要获取哪个队列中的消息
    public static final String QUEUE_NAME="data.etl.syn.rs.mdm.company.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.106.11.37");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //如果能成功接收到消息会调用的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println(new String(message.getBody()));
        };

        //如果取消从消息队列中获取消息时会调用的回调函数
        CancelCallback cancelCallback= consumerTag->{
            System.out.println("消息消费被中断");
        };

        
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/873969.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号