- 1.redis实现普通消息队列
- 1.1 实现原理
- 1.2 pom.xml
- 1.3 JedisUtils工具类
- 1.4 消息类
- 1.4 消息队列类
- 1.5 消息入队测试
- 1.5 消息出队测试
- 2.redis实现延迟消息队列
- 2.1 实现原理
- 2.2 pom.xml
- 2.2 JedisUtils工具类
- 2.3 消息类
- 2.4 延迟消息队列类
- 2.5 消息入队测试
- 2.6 消息出队测试
利用lpush/rpush和lpop/blop(阻塞式)/rpop/brpop(阻塞式)来实现!
1.2 pom.xml1.3 JedisUtils工具类4.0.0 org.example distribute-lock 1.0-SNAPSHOT com.fasterxml.jackson.core jackson-databind 2.12.3 redis.clients jedis 3.2.0 jar compile
package com.yl;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class JedisUtils {
private static JedisPool jedisPool = null;
public static Jedis getJedisObject() {
if (jedisPool == null) {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
//最大空闲数
config.setMaxIdle(400);
//最大连接数
config.setMaxTotal(2000);
//连接最大等待时间,-1代表没有限制
config.setMaxWaitMillis(300000);
jedisPool = new JedisPool(config,"192.168.244.129",6379,30000,"root123");
}
try {
//通过连接池获取jedis对象
Jedis jedis = jedisPool.getResource();
jedis.auth("root123");
return jedis;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
1.4 消息类
package com.yl;
public class Message {
private String id;
private Object object;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Object getObject() {
return object;
}
public void setObject(Object object) {
this.object = object;
}
@Override
public String toString() {
return "Message{" +
"id='" + id + ''' +
", object=" + object +
'}';
}
}
1.4 消息队列类
package com.yl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;
import java.util.*;
public class MessageQueue {
private Jedis jedis;
private String queue;
public MessageQueue(Jedis jedis, String queue) {
this.jedis = jedis;
this.queue = queue;
}
public void queue(List list) {
List strs = new ArrayList<>();
for (Message message : list) {
//构造消息对象
message.setId(UUID.randomUUID().toString());
//序列化
ObjectMapper objectMapper = new ObjectMapper();
try {
String str = objectMapper.writevalueAsString(message);
strs.add(str);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
String[] arrs = null;
if (!strs.isEmpty()) {
arrs = new String[strs.size()];
for (int i = 0; i < strs.size(); i++) {
arrs[i] = strs.get(i);
}
}
if (arrs != null) {
System.out.println("message start send..."+new Date());
//发送消息
jedis.lpush(queue,arrs);
}
}
public void cosume() {
//如果当前线程没有被打断,就一直读
while (true) {
//会一直读取
//String s = jedis.lpop(queue);
//阻塞式弹出元素,如果没有读取到数据,会睡眠指定时间,如果达到了睡眠时间后,list中任然没有数据进来,则会直接抛出!
List list = jedis.blpop(5, queue);
System.out.println("receive message"+list.toString());
}
}
}
1.5 消息入队测试
package com.yl;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
public class Test1 {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedisObject();
MessageQueue queue = new MessageQueue(jedis,"myQueue");
//生产者
new Thread(){
@Override
public void run() {
List list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setObject("yl===message"+i);
list.add(message);
}
queue.queue(list);
}
}.start();
}
}
结果
package com.yl;
import redis.clients.jedis.Jedis;
public class Test2 {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedisObject();
MessageQueue queue = new MessageQueue(jedis,"myQueue");
//消费者
new Thread(){
@Override
public void run() {
queue.cosume();
}
}.start();
}
}
结果
通过zset,使用当前时间戳作为score可以实现延迟消息队列,读取数据的时候会延迟读取!
2.2 pom.xml同上
2.2 JedisUtils工具类同上
2.3 消息类同上
2.4 延迟消息队列类package com.yl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;
import java.util.Date;
import java.util.Set;
import java.util.UUID;
public class DelayMessageQueue {
private Jedis jedis;
private String queue;
public DelayMessageQueue(Jedis jedis, String queue) {
this.jedis = jedis;
this.queue = queue;
}
public void queue(Object object) {
//构造消息对象
Message message = (Message)object;
message.setId(UUID.randomUUID().toString());
//序列化
ObjectMapper objectMapper = new ObjectMapper();
try {
String str = objectMapper.writevalueAsString(message);
System.out.println("message start send..."+new Date());
//发送消息,score延迟五秒
jedis.zadd(queue,System.currentTimeMillis() + 5000,str);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
public void cosume() {
//如果当前线程没有被打断,就一直读
while (!Thread.interrupted()) {
//读取score在0到当前时间戳之间的消息,每次读取一条数据出来
Set zset = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
if (zset.isEmpty()) {
try {
//如果消息是空的,则睡眠一段时间继续读取
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
continue;
}
//如果读取到了消息,直接显示出来
String next = zset.iterator().next();
//从zset中移除消息,并且打印出来
if (jedis.zrem(queue,next) > 0) {
//抢到了,就在下面处理业务
try {
ObjectMapper objectMapper = new ObjectMapper();
Message message = objectMapper.readValue(next, Message.class);
System.out.println("receive message:"+message + new Date());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
}
}
2.5 消息入队测试
package com.yl;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
public class Test3 {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedisObject();
DelayMessageQueue queue = new DelayMessageQueue(jedis,"myDelayQueue");
//生产者
new Thread(){
@Override
public void run() {
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setObject("yl===message"+i);
queue.queue(message);
};
}
}.start();
}
}
结果
package com.yl;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
public class Test4 {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedisObject();
DelayMessageQueue queue = new DelayMessageQueue(jedis,"myDelayQueue");
//生产者
new Thread(){
@Override
public void run() {
queue.cosume();
}
}.start();
}
}
结果,5秒后才读取到消息



