栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

spring kafka之如何批量给topic加前缀

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

spring kafka之如何批量给topic加前缀

前言

最近业务开发部门给我们部门提了一个需求,因为他们开发环境和测试环境共用一套kafka,他们希望我们部门能帮他们实现自动给kafka的topic加上环境前缀,比如开发环境,则topic为dev_topic,测试环境,则topic为test_topic,他们kafka客户端是使用spring-kafka。一开始接到这个需求的时候,我心里是拒绝的,为啥开发环境和测试环境不分别部署一套kafka,还要那么麻烦。但老大都答应接这个需求了,作为小罗罗也只能接了

实现思路 1、生产者端

可以通过生产者拦截器,来给topic加前缀

2、实现步骤

a、编写一个生产者拦截器

@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor {



    
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
 log.info("原始topic:{}",record.topic());
 return new ProducerRecord(TOPIC_KEY_PREFIX + record.topic(),
  record.partition(),record.timestamp(),record.key(), record.value());
    }




    
    @Override
    public void onAcknowledgement(Recordmetadata metadata, Exception exception) {
      log.info("实际topic:{}",metadata.topic());
    }


    
    @Override
    public void close() {
    }


    
    @Override
    public void configure(Map configs) {

    }

b、配置拦截器

kafka:
    producer:
      # 生产者拦截器配置
      properties:
 interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor

c、测试

2、消费者端

这个就稍微有点难搞了,因为业务开发部门他们是直接用@KafkaListener的注解,形如下

 @KafkaListener(id = "msgId",topics = {Constant.TOPIC})

像这种也没啥好的办法,就只能通过源码了,通过源码可以发现在如下地方

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

会把@KafkaListener的值赋值给消费者,如果对spring有了解的朋友,可能会知道postProcessAfterInitialization是spring后置处理器的方法,主要用来bean初始化后的一些操作,既然我们知道@KafkaListener会在bean初始化后再进行赋值,那我们就可以在bean初始化前,修改掉@KafkaListener的值。具体实现如下

@Component
public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {

    @SneakyThrows
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

 List packageNames = AutoConfigurationPackages.get(beanFactory);

 for (String packageName : packageNames) {
     Reflections reflections = new Reflections(new ConfigurationBuilder()
      .forPackages(packageName) // 指定路径URL
      .addScanners(new SubTypesScanner()) // 添加子类扫描工具
      .addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具
      .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具
      .addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具
     );

     Set methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
     if(!CollectionUtils.isEmpty(methodSet)){
  for (Method method : methodSet) {
      KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
      changeTopics(kafkaListener);
  }
     }
 }

    }


    private void changeTopics(KafkaListener kafkaListener) throws Exception{
 InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
 Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
 memberValuesField.setAccessible(true);
 Map memberValues = (Map)memberValuesField.get(invocationHandler);
 String[] topics = (String[])memberValues.get("topics");
 System.out.println("修改前topics:" + Lists.newArrayList(topics));
 for (int i = 0; i < topics.length; i++) {
     topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];
 }
 memberValues.put("topics", topics);
 System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics()));

    }
}

测试


总结

虽然实现了动态修改topic,但我还是觉得topic不要随便改变,有条件的话,kafka还是得基于物理环境隔离,其次真的客观条件不允许,要动态变更topic,则需做好topic动态变更宣导以及相关wiki的编写,不然很容易掉坑

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号