最近单位由于业务整改,需要重新设计一套任务调度中间件,便于各分系统根据任务优先级进行集中调度,详细需求如下:
-
高并发
-
低延迟
-
高可靠
-
根据任务优先级进行转发
由于工作业务需要,各分系统统一将任务消息发送到Kafka中,所以设计的任务调度中间件业务逻辑如下图所示:
代码如下所示:
-
xml文件中添加Kafka和RabbitMQ依赖包
org.apache.kafka kafka-clients2.5.1 org.springframework.boot spring-boot-starter-amqp
- 在application.yaml文件中添加RabbitMQ配置参数
spring:
rabbitmq:
publisher-/confirm/i-type: correlated #发送到交换器后触发回调,属性有三种确认类型,none是禁用发布确认
publisher-returns: true #是否开启消息发送到队列(Queue)后触发回调
template:
retry: # 消息发送失败重试相关配置
enabled: true
initial-interval: 3000
max-attempts: 3
max-interval: 10000
multiplier: 1
host: localhost
port: 5672
listener:
simple:
acknowledge-mode: auto # 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)
# concurrency: 10 # 最小线程数量
# max-concurrency: 20 # 最大线程数量
prefetch: 1 # 每个消费者可能未完成的最大未确认消息数量,消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch 设置为较小数值,让优先级越高的消息更快加入到消费者线程。
- 构建RabbitMQ配置文件
@Configuration
public class RabbitMQConfig {
private static final String EXCHANGE = "priority-exchange";
public static final String QUEUE = "priority-queue";
private static final String ROUTING_KEY = "priority.queue.#";
@Bean
Queue queue() {
Map args= new HashMap<>();
args.put("x-max-priority", 100);
return new Queue(QUEUE, false, false, false, args);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(EXCHANGE);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
-
RabbitMq生产者
public void sendPriorityMessage(String content, Integer priority) {
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,
message -> {
message.getMessageProperties().setPriority(priority);
return message;
});
}
-
构建Kafka消费者
public ConsumercreateConsumer() { Properties props = new Properties(); // 指定Kafka服务的ip地址及端口 props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress); // 指定group.id,Kafka中的消费者需要在消费者组里 props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 是否开启自动提交 props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 消息key的反序列化器 props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); // 消息value的反序列化器 props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); // // 指定每次poll方法返回的记录数量,该方法仅针对手动提交有效 // props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); // 设定session.timeout.ms超时时间防止出现rebalance props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); // props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); return new KafkaConsumer<>(props); }
-
构建数据通道将Kafka中的数据发送到RabbitMQ优先级队列中
public void KafkaMonitor() {
Consumer consumer = createConsumer();
kafkaSwitch.setSwitchOn(-1);
Long pollIntervalInt = Long.valueOf(pollInterval);
Collection collection = new ArrayList();
collection.add(topic1);
consumer.subscribe(collection);
int count = 0;
try {
while (true){
System.out.println("准备进行kafka消费,获取开关变量="+ kafkaSwitch.getSwitchOn());
log.info("准备进行kafka消费,获取开关变量="+ kafkaSwitch.getSwitchOn());
if (kafkaSwitch.getSwitchOn() == 1){
log.info("开始结束消费" + consumer.toString());
consumer.close();
log.info("消费已结束");
System.out.println("消费已结束");
kafkaSwitch.setSwitchOn(-1);
}else {
// 从Topic中拉取数据
ConsumerRecords records = consumer.poll(Duration.ofMillis(pollIntervalInt));
for (ConsumerRecord record : records){
count ++;
JSonObject jsonObject = JSONObject.parseObject(record.value());
if (jsonObject.containsKey("priority")){
sendPriorityMessage(record.value(), jsonObject.getInteger("priority"));
log.info("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = " + jsonObject.getString("priority"));
System.out.println("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = " + jsonObject.getString("priority"));
}else{
sendPriorityMessage(record.value(), 0);
log.info("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = 0" );
System.out.println("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = 0");
}
}
//控制往RabbitMq传输数据的速度,避免在RM中堆积过多数据
if (count >= Integer.getInteger(buffer)){
Thread.sleep(20000); //该数值应小于sessionTimeout数值,避免频繁rebalance
}
count = 0;
}
}
}catch (WakeupException e){
//消费停止
}catch (Exception e){
//消费异常,记录信息
System.out.println(e.toString());
log.error(e.toString());
}finally {
System.out.println("关闭消费者");
log.info("关闭消费者");
}
}
ps:
-
由于Kafka消费速度与RabbitMQ消费速度存在较大差异,为避免数据在RabbitMQ中过多堆积,应当适当控制Kafka消费速度,每当Kafka消费者往RabbitMQ中写入一定数量(Buffer)的数据时,设置Kafka消费者睡眠一段时间,需要注意的是,要保持Kafka消费者睡眠时间小于Kafka消费者SessionTimeOut时间,从而避免Kafka消费者群组频繁rebalance而造成kafka消费速度下降。
-
应当添加Kafka消费者控制器,控制打开/关闭消费者,在上述代码中设置kafkaSwitch对象,该对象代码如下所示,当kafkaSwitch.getSwitchOn() == 1时Kafka消费结束
public class KafkaSwitch {
private int switchOn = -1;
public int getSwitchOn() {
return switchOn;
}
public void setSwitchOn(int switchOn) {
this.switchOn = switchOn;
}
}
-
RabbitMQ消费者
@RabbitListener(id = "taskschedule", queues = "priority-queue", autoStartup = "false", concurrency = "20-50")
public void listen(String message) {
try{
jobSubmitService.PgsService(message);
System.out.println("从rabbitmq获取信息" + message);
} catch (Exception e){
log.error(e.getMessage());
}
}
ps:
-
为了提高RabbitMQ优先级队列消费速度,可以启动多个消费者进行消费,可以在@RabbitListener注解中进行配置,例如concurrency = "20-50",也可以在application.yaml文件中进行配置,如上述配置文件中concurrency、max-concurrency参数所示。
-
应当添加RabbitMQ消费者控制器,控制打开/关闭消费者,如下述代码所示
public void rabbitlistener(String consumerId, String startflag){
MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer(consumerId);
if ("true".equals(startflag)){
consumer.start();
}else {
consumer.stop();
}
}
-
Feign+Hystrix熔断服务
@FeignClient(name = "name", fallback = JobSubmitServiceHystrixImpl.class)
public interface JobSubmitService {
@RequestMapping(value = "/JobSubmit", method = RequestMethod.POST)
public ResponseEntity
public ResponseEntity



