栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Idea+maven+spring-cloud项目搭建系列--8整合Kafka

Idea+maven+spring-cloud项目搭建系列--8整合Kafka

前言:
本文是建立在已整合nacos 的基础上进行的扩展,如需要整合Nacos 可以参考:
https://blog.csdn.net/l123lgx/article/details/121624988

本文是建立在服务器已经部署Kafka的基础只上进行的整合,如需要部署Kafka可以参考:https://blog.csdn.net/l123lgx/article/details/122047659

1 Kafka 介绍:
Spring for Apache Kafka (spring-kafka) 项目将核心 Spring 概念应用于基于 Kafka 的消息传递解决方案的开发;

官网地址:https://docs.spring.io/spring-kafka/docs/current/reference/html/

2 springcloud 整合kafka:

2.1 引入kafka jar:


    org.springframework.kafka
    spring-kafka
    2.6.0

2.2 nacos kafka 配置文件:

spring:
  kafka:
     # kafka 服务地址
     bootstrap-servers: kafkaip:9092
     # 消费者配置
     consumer:
        autostartup: false
        # 消费者分组
        group-id: consumer-1
        properties:
          # 设置从什么位置进行消费,这里设置从最早开始消费
          auto-offset-reset: earliest
          # 消费偏移量是否自动提交,这里设置false
          enable-auto-commit: false
          # 心跳间隔
          heartbeat-interval-ms: 2000
          #  enable-auto-commit为true 时使用,自动提交的间隔时间
          auto-commit-interval-ms: 500
          # kafka 每次拉取的记录数 
          max-poll-records: 10
          # session 超时时间
          session-timeout-ms: 6000
          # 加密
          security-protocol: SASL_PLAINTEXT
          sasl-mechanism: PLAIN
          # 加密的实现类及配置 账号和密码
          sasl-jaas-config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="12345678";
     # 生产者配置
     producer:
        # 生产者id
        client-id: producer-1
        properties:
            # 失败时重试次数
            retries: 3
            # 批量发送的消息数量
            batch-size: 16384
            # 32MB的批处理缓冲区
            buffer-memory: 33554432
            # 发送信息时所有的机器都写入成功,此消息为发送成功
            acks: all
            # 加密
            security-protocol: SASL_PLAINTEXT
            sasl-mechanism: PLAIN
            sasl-jaas-config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="12345678";

2.3 kafka 配置文件kafkaConfig.java:

import com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.Map;


@Configuration
@EnableKafka
public class kafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.properties.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.properties.auto-commit-interval-ms}")
    private Integer autoCommitInterval;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.properties.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.properties.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.properties.security-protocol:PLAINTEXT}")
    private String securityProtocol;

    @Value("${spring.kafka.consumer.properties.sasl-mechanism:GSSAPI}")
    private String saslMechanism;

    @Value("${spring.kafka.consumer.properties.sasl-jaas-config:null}")
    private String saslJaasConfig;
    @Value("${spring.kafka.consumer.autoStartup:false}")
    private boolean autoStartup;

    @Value("${spring.kafka.producer.properties.retries}")
    private Integer retries;

    @Value("${spring.kafka.producer.properties.batch-size}")
    private Integer batchSize;

    @Value("${spring.kafka.producer.properties.buffer-memory}")
    private Integer bufferMemory;

    @Value("${spring.kafka.producer.properties.acks}")
    private String acks;

    @Value("${spring.kafka.producer.client-id}")
    private String clientId;

    @Value("${spring.kafka.producer.properties.security-protocol:PLAINTEXT}")
    private String securityProtocolProducer;

    @Value("${spring.kafka.producer.properties.sasl-mechanism:GSSAPI}")
    private String saslMechanismProducer;

    @Value("${spring.kafka.producer.properties.sasl-jaas-config:null}")
    private String saslJaasConfigProducer;

    @Value("${spring.cric.bi.environment:dev}")
    private String environment;
    
    @Bean
    public Map producerConfigs() {
        Map props = Maps.newHashMap();
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.CLIENT_ID_CONFIG,clientId);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        props.put("security.protocol", securityProtocolProducer);
        props.put("sasl.mechanism", saslMechanismProducer);
        props.put("sasl.jaas.config", saslJaasConfigProducer);
        return props;
    }

    
    @Bean
    public ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    
    @Bean
    public KafkaTemplate kafkaTemplateCustomer() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    @Bean
    public Map batchConsumerConfigs() {
        Map props = Maps.newHashMap();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
        props.put("security.protocol", securityProtocol);
        props.put("sasl.mechanism", saslMechanism);
        props.put("sasl.jaas.config", saslJaasConfig);
        return props;
    }

    
    @Bean
    public KafkaListenerContainerFactory batchFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(batchConsumerConfigs()));
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        factory.setConcurrency(1);
        //设置提交偏移量的方式
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // if (!autoStartup){
        //     factory.setAutoStartup(false);
        // }

        return factory;
    }
}

3 测试:
3.1 测试生产者 KafkaTestProducerController.java:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;


@Slf4j
@RestController
public class KafkaTestProducerController {
    static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//24小时制
    @Autowired
    @Qualifier("kafkaTemplateCustomer")
    private KafkaTemplate kafkaTemplate;


    @RequestMapping(value ="message/send", method = RequestMethod.GET, produces = "application/json;charset=utf-8")
    public Map send(@RequestParam String msg){
        Map  mapData = new HashMap<>();
        try {
            ListenableFuture data = kafkaTemplate.send("test1","weixin_123");
            ListenableFuture data1 = kafkaTemplate.send("test1","weixin_123");
            ListenableFuture data2 = kafkaTemplate.send("test1","weixin_123");
            Object  obj = data.get();
            mapData.put("success",true);
        }catch (Exception e){
            log.error(e.getMessage());
            mapData.put("success",false);
            mapData.put("errorMs",e.getMessage());
        }

        return mapData;
    }

}

3.2 测试消费者KafkaTestConsumer.java:

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;


@Slf4j
@Component
public class KafkaTestConsumer {

    @KafkaListener(topics = "test1",containerFactory = "batchFactory")
    public void articleConsumerWx(List> records, Consumer consumer){
        Long time1   = System.currentTimeMillis();
        records.stream().forEach(e->{
            System.out.println("e.value() = " + e.value());
        });
       // 提交偏移量
       consumer.commitAsync();
       // consumer.commitSync();
        Long time2   = System.currentTimeMillis();
        log.debug("消费{}条数据,耗时:{}",records.size(),(time2-time1)/1000);
    }
}

参考:https://docs.spring.io/spring-kafka/docs/current/reference/html

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/674089.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号