栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

java操作kafka

java操作kafka

springboot整合kafka:
导入依赖:

org.apache.kafka
kafka-clients
2.1.0


org.springframework.kafka
spring-kafka
2.4.1.RELEASE

注意导入的版本,如果导入版本不合适可能会引起冲突,导致项目无法启动;
接收数据导入kafka:
1.根据自己的情况对接收数据进行序列化,kafka有自己的序列化可供适使用,由于通常使用实体类接收数据,此处以实体类简介:
编写一个序列化工具类,进行实体类的序列化:
public class SerializerUtils implements Serializer {
@Override
public void configure(Map map, boolean b) {

}

@Override
public byte[] serialize(String s, T t) {
    return JSON.toJSonBytes(t);
}

@Override
public void close() {

}

}
重写serialize方法;
对需要存储的实体类数据使用上面的工具类转化位byte数组;
新建ProducerRecord
ProducerRecord producerRecord = new ProducerRecord<>(topic, 1, bytes);
新建producer:

public KafkaProducer createProducer() {
String jaasTemplate = “org.apache.kafka.common.security.scram.ScramLoginModule required username=”%s" password="%s";";

    String jaasCfg = String.format(jaasTemplate, "production_testing_rw", "production_testing_rw");
    Properties properties = new Properties();

    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    权限校验
    properties.setProperty("security.protocol", "SASL_PLAINTEXT");
    properties.setProperty("sasl.mechanism", "SCRAM-SHA-256");

    properties.setProperty("sasl.jaas.config", jaasCfg);
    producer = new KafkaProducer(properties);
    return producer;
}

}
发送消息至kafka:
try {
Future future = this.createProducer().send(producerRecord);
future.get();
}catch (Exception e){
e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
}
发送END;;;
从kafka取出刚才的消息:
监听自己所需要取的数据topic;
取出数据并进行反序列化;
public class DeSerializerUtils implements Deserializer {
@Override
public void configure(Map configs, boolean isKey) {

}

@Override
public T deserialize(String topic, byte[] data) {
    return JSON.parseObject(data, T.class);
}

@Override
public void close() {

}

}
存入相应数据库;

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/630311.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号