栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka从入门到学废(六)——Spring框架下认证集群生产消息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka从入门到学废(六)——Spring框架下认证集群生产消息

生产环境基本都是采用认证集群的方式,认证集群搭建方式参见之前的博文(认证集群搭建)。接下来基于Spring kafka编写生产消息代码

首先配置文件:

kafka:
  producer:
    security:
      # 加密协议(和认证集群搭建时候的配置内容对应)
      protocol: SASL_PLAINTEXT
      # 加密方式(和认证集群搭建时候的配置内容对应)
      sasl-mechanism: SCRAM-SHA-256
      # 访问用户名(和认证集群搭建时候的配置内容对应)
      username: xxxxx
      # 访问密码(和认证集群搭建时候的配置内容对应)
      password: xxxxx
    # 集群地址端口
    bootstrap-servers: IP1:PORT1,IP2:PORT2,IP3:PORT3
    # 生产者生产消息的topic
    topic: centerm-cluster-topic

配置代码:

@Configuration
public class KafkaProducerConfig {
    @Value("${kafka.producer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${kafka.producer.security.username}")
    private String username;
    @Value("${kafka.producer.security.password}")
    private String pWord;
    @Value("${kafka.producer.security.protocol}")
    private String protocol;
    @Value("${kafka.producer.security.sasl-mechanism}")
    private String mechanism;

    private static final String P_NAME = "password";


    private static final Integer PRODUCER_CONFIGS_COUNT = 3;

    @Bean
    public ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map producerConfigs() {
        Map props = new HashMap<>(PRODUCER_CONFIGS_COUNT);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);

        // 认证
        props.put("security.protocol", protocol);
        props.put("sasl.mechanism", mechanism);
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=""
            + username + "" " + P_NAME + "="" + pWord + "";");
        return props;
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

发送消息类:

@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    
    public void sendAsync(final String topic, final Object key, final Object value, KafkaSendCallback kafkaSendCallback) {
        final ProducerRecord record = createProducerRecord(topic, key, value);
        ListenableFuture> future = kafkaTemplate.send(record);

        if(kafkaSendCallback != null) {
            future.addCallback(kafkaSendCallback);
        }
    }

    private static final Integer SEND_TIME_OUT_SEConDS = 10;
    
    public void sendSync(final String topic, final Object key, final Object value, KafkaSenderHandler kafkaSenderHandler) {
        final ProducerRecord record = createProducerRecord(topic, key, value);
        try {
            kafkaTemplate.send(record).get(SEND_TIME_OUT_SECONDS, TimeUnit.SECONDS);
            kafkaSenderHandler.handleSuccess(value);
        }
        catch (ExecutionException e) {
            kafkaSenderHandler.handleFailure(value, record, e);
        }
        catch (TimeoutException | InterruptedException e) {
            kafkaSenderHandler.handleFailure(value, record, e);
        }
    }

    
    private ProducerRecord createProducerRecord(final String topic, final Object key, final Object value) {
        String valueString;
        if(value instanceof String || value instanceof Integer || value instanceof Long
                || value instanceof Double || value instanceof Float) {
            valueString = String.valueOf(value);
        } else {
            valueString = JSONObject.toJSONString(value);
        }

        if(key != null) {
            String keyString = String.valueOf(key);
            return new ProducerRecord<>(topic, keyString, valueString);
        } else {
            return new ProducerRecord<>(topic, valueString);
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/318371.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号