1.引入依赖
org.springframework.kafka spring-kafka 2.7.8 com.alibaba fastjson 1.2.78
2.kafka配置信息
sobev.kafka.ip=xxx.x.x.x
sobev.kafka.port=9092
sobev.business.topic.name=sobevBusiness
spring.kafka.bootstrap-servers=${sobev.kafka.ip}:${sobev.kafka.port}
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto.offset.reset=earliest
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
3.发送消息工具类
@Component
@Slf4j
public class KafkaUtils {
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String topicName, String jsonMessage) {
kafkaTemplate.send(topicName, jsonMessage);
}
}
4.注册kafka消息监听器接收消息
由于存在多种消息发送渠道和消息类型,因此需要多个监听器监听不同渠道的不同类型,但是不着急,不用急着把所有类型都写一个Listener
//不同的渠道不同的消息类型都设为单独的类型,由不同consumer消费
public class GroupIdMappingUtils {
public static List getAllGroupIds() {
List groupIds = new ArrayList<>();
for (ChannelType channelType : ChannelType.values()) {
for (MessageType messageType : MessageType.values()) {
groupIds.add(channelType.getCodeEn() + "." + messageType.getCodeEn());
}
}
return groupIds;
}
}
定义一个kafka消息监听器
@Component
//prototype类型 意味着每次请求类都将返回一个新的类
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class KafkaReceiver {
@Autowired
private TaskPendingHolder taskPendingHolder;
@KafkaListener(topics = "#{'${sobev.business.topic.name}'}")
public void consumer(ConsumerRecord, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId){
Optional kafkaMessage = Optional.of(consumerRecord.value());
if(kafkaMessage.isPresent()){
String s = kafkaMessage.get();
TaskInfo taskInfo = JSON.parseObject(s, TaskInfo.class);
if((taskInfo.getSendChannel() + "." + taskInfo.getMsgType()).equals(topicGroupId)){
System.out.println(topicGroupId + " :" + s);
//根据当前 topicGroupId 路由到不同的线程池处理
taskPendingHolder.route(topicGroupId).execute(具体任务...)
}
}
}
}
根据prototype的特性,注册多个监听器
Service
public class KafkaReceiverStarter {
@Autowired
ApplicationContext applicationContext;
@Autowired
private TaskPendingHolder taskPendingHolder;
private static List groupIds = GroupIdMappingUtils.getAllGroupIds();
private static Integer idx = 0;
@PostConstruct
public void init(){
for (int i = 0; i < groupIds.size(); i++) {
applicationContext.getBean(KafkaReceiver.class);
}
}
@Bean
public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer(){
return (attrs, element) -> {
//设置@KafkaListener内的groupId属性
attrs.put("groupId", groupIds.get(idx++));
return attrs;
};
}
}
定义线程池持有容器
@Component
public class TaskPendingHolder {
private Map taskPendingHolder = new HashMap<>(32);
private static List groupIds = GroupIdMappingUtils.getAllGroupIds();
@PostConstruct
public void init() {
for (String groupId : groupIds) {
//自定义线程池生成器 根据groupId生成不同的线程池
MyExecutor executor = MyThreadPoolConfig.getExecutor(groupId);
taskPendingHolder.put(groupId, executor);
}
}
public ExecutorService route(String groupId) {
return taskPendingHolder.get(groupId);
}
}
未完。。。
参考于github austin 消息推送平台



