生产环境基本都是采用认证集群的方式,认证集群搭建方式参见之前的博文(认证集群搭建)。接下来基于Spring kafka编写生产消息代码
首先配置文件:
kafka:
producer:
security:
# 加密协议(和认证集群搭建时候的配置内容对应)
protocol: SASL_PLAINTEXT
# 加密方式(和认证集群搭建时候的配置内容对应)
sasl-mechanism: SCRAM-SHA-256
# 访问用户名(和认证集群搭建时候的配置内容对应)
username: xxxxx
# 访问密码(和认证集群搭建时候的配置内容对应)
password: xxxxx
# 集群地址端口
bootstrap-servers: IP1:PORT1,IP2:PORT2,IP3:PORT3
# 生产者生产消息的topic
topic: centerm-cluster-topic
配置代码:
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.producer.security.username}")
private String username;
@Value("${kafka.producer.security.password}")
private String pWord;
@Value("${kafka.producer.security.protocol}")
private String protocol;
@Value("${kafka.producer.security.sasl-mechanism}")
private String mechanism;
private static final String P_NAME = "password";
private static final Integer PRODUCER_CONFIGS_COUNT = 3;
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map producerConfigs() {
Map props = new HashMap<>(PRODUCER_CONFIGS_COUNT);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
// 认证
props.put("security.protocol", protocol);
props.put("sasl.mechanism", mechanism);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=""
+ username + "" " + P_NAME + "="" + pWord + "";");
return props;
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
发送消息类:
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendAsync(final String topic, final Object key, final Object value, KafkaSendCallback kafkaSendCallback) {
final ProducerRecord record = createProducerRecord(topic, key, value);
ListenableFuture> future = kafkaTemplate.send(record);
if(kafkaSendCallback != null) {
future.addCallback(kafkaSendCallback);
}
}
private static final Integer SEND_TIME_OUT_SEConDS = 10;
public void sendSync(final String topic, final Object key, final Object value, KafkaSenderHandler kafkaSenderHandler) {
final ProducerRecord record = createProducerRecord(topic, key, value);
try {
kafkaTemplate.send(record).get(SEND_TIME_OUT_SECONDS, TimeUnit.SECONDS);
kafkaSenderHandler.handleSuccess(value);
}
catch (ExecutionException e) {
kafkaSenderHandler.handleFailure(value, record, e);
}
catch (TimeoutException | InterruptedException e) {
kafkaSenderHandler.handleFailure(value, record, e);
}
}
private ProducerRecord createProducerRecord(final String topic, final Object key, final Object value) {
String valueString;
if(value instanceof String || value instanceof Integer || value instanceof Long
|| value instanceof Double || value instanceof Float) {
valueString = String.valueOf(value);
} else {
valueString = JSONObject.toJSONString(value);
}
if(key != null) {
String keyString = String.valueOf(key);
return new ProducerRecord<>(topic, keyString, valueString);
} else {
return new ProducerRecord<>(topic, valueString);
}
}
}



