springboot整合kafka:
导入依赖:
org.apache.kafka
kafka-clients
2.1.0
org.springframework.kafka
spring-kafka
2.4.1.RELEASE
注意导入的版本,如果导入版本不合适可能会引起冲突,导致项目无法启动;
接收数据导入kafka:
1.根据自己的情况对接收数据进行序列化,kafka有自己的序列化可供适使用,由于通常使用实体类接收数据,此处以实体类简介:
编写一个序列化工具类,进行实体类的序列化:
public class SerializerUtils implements Serializer {
@Override
public void configure(Map
}
@Override
public byte[] serialize(String s, T t) {
return JSON.toJSonBytes(t);
}
@Override
public void close() {
}
}
重写serialize方法;
对需要存储的实体类数据使用上面的工具类转化位byte数组;
新建ProducerRecord
ProducerRecord
新建producer:
public KafkaProducer createProducer() {
String jaasTemplate = “org.apache.kafka.common.security.scram.ScramLoginModule required username=”%s" password="%s";";
String jaasCfg = String.format(jaasTemplate, "production_testing_rw", "production_testing_rw");
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
权限校验
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-256");
properties.setProperty("sasl.jaas.config", jaasCfg);
producer = new KafkaProducer(properties);
return producer;
}
}
发送消息至kafka:
try {
Future future = this.createProducer().send(producerRecord);
future.get();
}catch (Exception e){
e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
}
发送END;;;
从kafka取出刚才的消息:
监听自己所需要取的数据topic;
取出数据并进行反序列化;
public class DeSerializerUtils implements Deserializer {
@Override
public void configure(Map
}
@Override
public T deserialize(String topic, byte[] data) {
return JSON.parseObject(data, T.class);
}
@Override
public void close() {
}
}
存入相应数据库;



