栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka添加安全验证配置

kafka添加安全验证配置

spring kafka 添加安全验证配置

综合考虑性能影响、管理成本、安全等级要求,接入便利程度。 

鉴权采用SASL+PLAINTEXT 方式。

每个集群会分配统一的访问账号及密码用于客户端访问。 
服务端配置: 1. config 目录添加kafka_server_jaas.conf 配置文件, 内容:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="7f8d9dsf789ds7ffsdfdsfu9"
user_admin="7f8d9dsf789ds7ffsdfdsfu9"
user_alice="xjfkddjfdssifds";
};
2. kafka-run-class.sh 添加
KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/home/finance/App/kafka_2.12-2.5.1/config/kafka_server_jaas.conf"
3. config/server.properties

添加

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.mechanism.inter.broker.protocol=PLAIN

sasl.enabled.mechanisms=PLAIN

listener, advertised.listeners 添加对应SASL_PLAINTEXT 监听器

listeners=SASL_PLAINTEXT://10.193.196.112:9092

advertised.listeners=SASL_PLAINTEXT://10.193.196.112:9092
客户端接入改造

就是在java程序中将安全参数配置进来

生产者、消费者属性配置加入一下配置

props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=alice password=alice;");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

springboot 高版本估计有支持在properties文件中直接配置,此处没有验证。

完整配置示例

此方法,可以用KafkaProperties 获取properties配置文件中的参数后,再往里面添加新的参数

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.Map;


@Configuration
@EnableKafka
public class KafkaProducerConfig {
	@Autowired
	private KafkaProperties kafkaProperties;
	
	
	@Bean
	public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
		Map props = kafkaProperties.buildConsumerProperties();
		props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;");
		props.put("security.protocol", "SASL_PLAINTEXT");
		props.put("sasl.mechanism", "PLAIN");
		factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
		factory.setConcurrency(2);
		factory.getContainerProperties().setPollTimeout(1500);
		return factory;
	}


	
	@Bean
	public KafkaTemplate kafkaTemplate() {
		Map props = kafkaProperties.buildProducerProperties();
		props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;");
		props.put("security.protocol", "SASL_PLAINTEXT");
		props.put("sasl.mechanism", "PLAIN");
		return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
	}

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457408.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号