工具类第一弹、Rabbitmq监听器
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.zip.GZIPInputStream;
@Component
public class WorkFlowReceiveListener {
private static final Logger log = LoggerFactory.getLogger(WorkFlowReceiveListener.class);
private ThreadPoolManager executor = ThreadPoolManager.getPoolA();
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "work_flow_exchange", type = ExchangeTypes.TOPIC),//交换器名称
key = {"work_flow_routingkey"},//路由名称
value = @Queue("crt_work_flow_queue")//队列名
))
public void receive(byte[] compressedStr, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
executor.exec(new SendMessageRunnable(compressedStr, channel, tag));
}
class SendMessageRunnable implements Runnable {
private byte[] compressedStr;
private Channel channel;
private long tag;
public SendMessageRunnable(byte[] compressedStr, Channel channel, long tag) {
this.compressedStr = compressedStr;
this.channel = channel;
this.tag = tag;
}
@Override
public void run() {
//测试使用ip过滤
InetAddress address = null;//获取的是本地的IP地址
try {
address = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
String hostAddress = address.getHostAddress();
log.info("本机ip地址为:" + hostAddress);
try {
//接受消息 进行解压
String message = new String(compressedStr);
message = gunzip(message, "UTF-8");
log.info("MQ回调信息:{}", message);
//接收到的结果
JSONObject result = JSONUtil.toBean(message, JSONObject.class);
} catch (Exception e) {
e.printStackTrace();
log.error("MQ回调处理业务逻辑异常{}", e.toString());
} finally {
try {
channel.basicAck(tag, false);
} catch (IOException e) {
log.error("MQ回调处理业务逻辑异常{}", e.toString());
}
}
}
}
private static String gunzip(String compressedStr,String charset) {
if (compressedStr == null) {
return null;
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayInputStream in = null;
GZIPInputStream ginzip = null;
byte[] compressed = null;
String decompressed = null;
try {
compressed = new sun.misc.base64Decoder()
.decodeBuffer(compressedStr);
in = new ByteArrayInputStream(compressed);
ginzip = new GZIPInputStream(in);
byte[] buffer = new byte[1024];
int offset = -1;
while ((offset = ginzip.read(buffer)) != -1) {
out.write(buffer, 0, offset);
}
decompressed = out.toString(charset);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (ginzip != null) {
try {
ginzip.close();
} catch (IOException e) {
}
}
if (in != null) {
try {
in.close();
} catch (IOException e) {
}
}
if (out != null) {
try {
out.close();
} catch (IOException e) {
}
}
}
return decompressed;
}
}



