引入依赖:
org.springframework.boot spring-boot-starter-data-redis org.springframework.boot spring-boot-starter-web redis.clients jedis 2.9.0
订阅频道类:
package com.wty.redisdemo.service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class SubThread extends Thread {
//redis连接池
private final JedisPool jedisPool;
//订阅提醒
private final Subscriber subscriber = new Subscriber();
//频道
// private final String channel = "mychannel";
private final String channel = "remote_detection_inference";
public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
}
@Override
public void run() {
System.out.println(String.format("订阅redis, 频道: %s, 线程将被阻塞", channel));
Jedis jedis = null;
try {
jedis = jedisPool.getResource(); //取出一个连接
jedis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名
} catch (Exception e) {
System.out.println(String.format("订阅频道错误, %s", e));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
发布消息类:
package com.wty.redisdemo.service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class Publisher extends Thread{
private final JedisPool jedisPool;
public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接
while (true) {
String line = null;
try {
line = reader.readLine();
if (!"quit".equals(line)) {
// jedis.publish("mychannel", line); //从 mychannel 的频道上推送消息
jedis.publish("remote_detection_inference", line); //从 mychannel 的频道上推送消息
} else {
System.out.println(String.format("取消订阅"));
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
消息提示类:
package com.wty.redisdemo.service;
import redis.clients.jedis.JedisPubSub;
public class Subscriber extends JedisPubSub {
public Subscriber(){}
@Override
public void onMessage(String channel, String message) { //收到消息会调用
System.out.println(String.format("接收订阅消息, 频道; %s, 收到消息: %s", channel, message));
}
@Override
public void onSubscribe(String channel, int subscribedChannels) { //订阅了频道会调用
System.out.println(String.format("订阅redis频道成功, 频道: %s, 订阅的频道: %d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅 会调用
System.out.println(String.format("退订复述,通道, 频道: %s, 订阅的频道: %d",
channel, subscribedChannels));
}
}
测试类:
package com.wty.redisdemo.service;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class testDemo {
public static void main( String[] args )
{
// 连接redis服务端
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);
System.out.println(String.format("Redis连接池启动, redis ip: %s, redis port: %d", "127.0.0.1", 6379));
SubThread subThread = new SubThread(jedisPool); //订阅者
subThread.start();
Publisher publisher = new Publisher(jedisPool); //发布者
publisher.start();
}
}



