本篇参考了 基于kafka实现延迟队列 - 知乎 ,实现了Rocket MQ 18级延迟消息队列的功能,并加入任意延迟时间的支持(通过多个延迟队列的转发实现)。
1. Kafka操作工具类——KafkaManager
本类实现了对Kafka的连接和基本操作。
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@Component
public class KafkaManager {
@Autowired
private KafkaTemplate kafkaTemplate;
private AdminClient adminClient;
@Value("${spring.kafka.bootstrap-servers}")
private String springKafkaBootstrapServers;
@PostConstruct
private void initAdminClient() {
Map props = new HashMap<>(1);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
adminClient = KafkaAdminClient.create(props);
}
public CreateTopicsResult createTopic(Collection newTopics) {
return adminClient.createTopics(newTopics);
}
public void deleteTopic(Collection topics) {
adminClient.deleteTopics(topics);
}
public String getTopicInfo(Collection topics) {
AtomicReference info = new AtomicReference<>("");
try {
adminClient.describeTopics(topics).all().get().forEach((topic, description) -> {
for (TopicPartitionInfo partition : description.partitions()) {
info.set(info + partition.toString() + "n");
}
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return info.get();
}
public List getAllTopic() {
try {
return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return new ArrayList<>();
}
public void sendMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
}
2. 延迟消息Pojo类——DelayMessage
本类包装了延迟消息的一些参数属性
public class DelayMessage {
private String topic;
private String key;
private String data;
private long expire;
private long expire2;
public DelayMessage(String topic, String key, String data, long expire, long expire2) {
this.topic = topic;
this.key = key;
this.data = data;
this.expire = expire;
this.expire2 = expire2;
}
public long getExpire2() {
return expire2;
}
public void setExpire2(long expire2) {
this.expire2 = expire2;
}
。。。
}
3. 延迟队列消费者——DelayConsumer
本类实现了对延迟队列的消费处理。
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class DelayConsumer {
private KafkaConsumer consumer;
private final DelayManager delayManager;
private final int idx;
private final int t;
private final int t2;
private int interval;
private final String servers;
private final Object lock = new Object();
private final String topic;
private Thread thread;
public DelayConsumer(int idx, int t, String topic, String servers,
DelayManager dm) {
this.idx = idx;
this.topic = topic;
this.t = t;
this.interval = t<=5?500:1000;
this.servers = servers;
this.t2 = 200;
this.delayManager = dm;
}
void initTimer() {
List topics = Collections.singletonList(topic);
consumer.subscribe(topics);
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (lock) {
if(DelayManager.exit)return;
consumer.resume(consumer.paused());
lock.notify();
}
}
}, 0, interval);
thread = new Thread(this::loop);
thread.start();
}
private void loop() {
do {
synchronized (lock) {
try {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(t2));
if (consumerRecords.isEmpty()) {
lock.wait();
continue;
}
boolean timed = false;
for (ConsumerRecord consumerRecord : consumerRecords) {
// long timestamp = consumerRecord.timestamp();
TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
String value = consumerRecord.value();
DelayMessage dm = null;
try {
dm = JSON.parseObject(value, DelayMessage.class);
} catch (Exception ex) {
ex.printStackTrace();
}
if (delayManager.sendAndWait(dm)) {
consumer.pause(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, consumerRecord.offset());
timed = true;
break;
} else {
OffsetAndmetadata offsetAndmetadata = new OffsetAndmetadata(consumerRecord.offset() + 1);
HashMap metadataHashMap = new HashMap<>();
metadataHashMap.put(topicPartition, offsetAndmetadata);
consumer.commitSync(metadataHashMap);
}
}
if (timed) {
lock.wait();
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
} while (!DelayManager.exit);
}
void initConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "d");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
}
}
4. 延迟消息Manager——DelayManager
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Component
public class DelayManager implements CommandLineRunner {
private final boolean redirect = true;
final List consumers = new ArrayList<>();
@Autowired
private KafkaManager kafkaManager;
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
private final int[] delayTimes = new int[]{1, 5, 10, 30, 60, 120, 180, 240, 300, 360
, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
static boolean exit=false;
@Override
public void run(String... args) {
List list = new ArrayList<>();
int n = delayTimes.length;
for (int i = 0; i < n; i++) {
list.add(new NewTopic(topicName(i), 1, (short) 1));
}
kafkaManager.createTopic(list);
for (int i = 0; i < n; i++) {
DelayConsumer dc = new DelayConsumer(i, delayTimes[i], topicName(i), servers, this);
dc.initConsumer();
dc.initTimer();
consumers.add(dc);
System.out.println("add consumer:" + i);
}
}
private String topicName(int idx) {
return "delay-" + idx;
}
public void sendDelay(String topic, String key, String data, int delay) {
int next = Arrays.binarySearch(delayTimes, delay);
if (next < 0) {
next = -next - 2;
}
long now = System.currentTimeMillis();
long expire = now + delayTimes[next] * 1000;
long expire2 = now + delay * 1000;
DelayMessage dm = new DelayMessage(topic, key, data, expire, expire2);
sendDelayMessage(dm, next);
}
private void sendDelayMessage(DelayMessage dm, int idx) {
kafkaManager.sendMessage(topicName(idx), null, JSON.toJSonString(dm));
// System.out.println("send to delay-"+idx);
}
boolean sendAndWait(DelayMessage dm) {
if (dm == null) return false;
long now = System.currentTimeMillis();
if (now < dm.getExpire()) {
return true;
}
long delay = dm.getExpire2() - now;
if (delay < 1000 || !redirect) {
//send to target topic
kafkaManager.sendMessage(dm.getTopic(), dm.getKey(), dm.getData());
} else {
//redirect to next
int next = Arrays.binarySearch(delayTimes, (int) (delay / 1000));
if (next < 0) {
next = -next - 2;
}
dm.setExpire(now + delayTimes[next] * 1000);
sendDelayMessage(dm, next);
}
return false;
}
public void sendDelayonLevel(String topic, String key, String data, int level) {
long now = System.currentTimeMillis();
int next = level - 1;
long expire = now + delayTimes[next];
DelayMessage dm = new DelayMessage(topic, key, data,
expire, expire);
sendDelayMessage(dm, next);
}
public void shutdown(){
exit=true;
}
public void on(){
exit=false;
}
}
5.使用方法:
延迟指定的秒数,秒数可以在固定级别内,也可以不在:
DelayManager dm=new DelayManager();
dm.sendDelay(String topic, String key, String data, int delay);
或者
延迟固定的延迟级别:
dm.sendDelayonLevel(String topic, String key, String data, int level);



