栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spring纯Java配置集成kafka代码实例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring纯Java配置集成kafka代码实例

这篇文章主要介绍了Spring纯Java配置集成kafka代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

KafkaConfig.java

package com.niugang.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndmetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;

import com.niugang.controller.SenderConttoller;


@Configuration
@EnableKafka
public class KafkaConfig {

  private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);

  @Bean
  public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
    // 偏移量提交方式
    // factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.COUNT);
    // 异步提交偏移量(默认就是true)
    // factory.getContainerProperties().setSyncCommits(true);
    //回调函数经常用于记录提交错误
    
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }

  
  @Bean
  public ConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProps());
  }

  
  @Bean
  public ProducerFactory producerFactory() {
    return new DefaultKafkaProducerFactory<>(senderProps());
  }

  
  @Bean
  public KafkaTemplate kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
  }

  
  @Bean
  public ConsumerListener listener() {
    return new ConsumerListener();
  }

  
  private Map consumerProps() {
    Map props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka_group_1");
    
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
  }

  
  private Map senderProps() {
    Map props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    
    props.put(ProducerConfig.ACKS_CONFIG, "1");
    
    // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    // key的序列化方式
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // value序列化方式
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
  }
}

ConsumerListener.java

package com.niugang.config;

import org.springframework.kafka.annotation.KafkaListener;


public class ConsumerListener {
  
  @KafkaListener(id = "foo", topics = "annotated1")
  public void listen1(String foo) {
    System.out.println("接收消息为:"+foo);
  }
}

源码:https://gitee.com/niugangxy/kafka/tree/master/kafka-spring-boot

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/135466.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号