生产者:
package com.atguigu.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.postgresql.core.QueryExecutor;
import org.postgresql.util.HostSpec;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
public class Producer {
//队列名称
public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.company.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.department.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.position.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.rank.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.employee.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.university.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.taginfo.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.district.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bank.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.category.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.stordoc.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.measdoc.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.mattaxes.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.customer.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.material.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bankaccbas.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.supplier.queue";
//发消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂,该连接工厂其实就对应着我们访问http://182.92.210.39:15672/网站之后的rabbitmq,从这个工厂里可以获取队列
ConnectionFactory factory = new ConnectionFactory() {
public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String s, String s1, Properties properties) throws SQLException {
return null;
}
};
//工厂IP连接RabbitMQ的队列
factory.setHost("10.106.11.37");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123456");
//创建连接
//Connection connection = factory.newConnection();
Connection connection = factory.newConnection();
//获取信道,通过这个信道可以连接交换机Exchange,然后再连接队列Queue
Channel channel = (connection).createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//要发送到hello队列中的消息
// String message = "hello world";
String message = "{"table_name":"md_company", "type":"insert", "time_now":"2022-05-09 11:20:10", "id":"2", "tenant_id":"2", "code":"2", "name" :"2", "short_name":"2", "bank_account":"2", "bank_name":"2", "tax_payer_no":"2", "legal_person":"2", "tel":"2", "established_date":"2", "business_license":"2", "is_region":"2" , "is_bm_payment":"2", "province_code":"2", "city_code":"2" , "address":"2" , "company_type_code":"2", "is_used":"2", "create_user":"2", "create_dept":"2", "create_time":"2022-05-09 11:20:10", "update_user":"2", "update_time":"2022-05-09 11:20:18", "status":"2", "is_deleted":"2", "edge_form_id":"2" , "data_type":"2","exchange_status":"2"}";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//如果消息成功的发送到了hello队列中,那么会输出这句代码
System.out.println("消息发送完毕");
}
}
消费者:
package com.atguigu.util;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
//消费者要获取哪个队列中的消息
public static final String QUEUE_NAME="data.etl.syn.rs.mdm.company.queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.106.11.37");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//如果能成功接收到消息会调用的回调函数
DeliverCallback deliverCallback=(consumerTag, message)->{
System.out.println(new String(message.getBody()));
};
//如果取消从消息队列中获取消息时会调用的回调函数
CancelCallback cancelCallback= consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}



