- 写在前面,本篇示例是笔者实践所得,如果不当烦请及时沟通,共同学习。
- 1. java客户端访问kafka
- 1.1 maven依赖
- 1.2 基础
- 1.3 创建topic
- 1.3 生产者发送消息
- 1.4 消费消息
- 2、spring整合kafka
- 2.1 maven依赖
- 2.2 生产者配置
- 2.3 消费者配置
- 3、springBoot整合kafka
- 3.1 自动装配
- 3.2 自定义装配
- 3.3 验证整合结果
在上篇博文中我们介绍了kafka的框架的基础知识,这篇博文我们从java程序的使用角度去熟悉kafka框架。 1. java客户端访问kafka 1.1 maven依赖
org.apache.kafka kafka-clients 2.1.1 log4j log4j 1.2.17
注意: kafka-clients的版本主要和自己安装的kafaka版本一致 否则会报错 报错信息如下:
1.2 基础创建kafka生产者/管理员
@BeforeEach
public void createKafkaProduct(){
Properties prop = new Properties();
//连接指定的kafka集群kafka地址,多个地址用逗号分割
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xx.xx.xx:9092,xxx.xx.xx.xx:9093,xxx.xx.x.xxx:9094");
//消息序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer .class);
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//省略部分kafka 生产者配置
//创建kafka生产者
kafkaProducer = new KafkaProducer(prop);
//创建kafka的管理客户端 用于创建topic
adminClient = KafkaAdminClient.create(prop);
}
创建kafka消费者
@BeforeEach
public void createKafkaConsumer(){
Properties prop = new Properties();
//连接指定的kafka集群kafka地址,多个地址用逗号分割
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xx.xx.xx:9092,xxx.xx.xx.xx:9093,xxx.xx.x.xxx:9094");
//消息反序列化
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//从topic最开始的数据进行消费
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//设置kafka消费组id
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"reader-1");
//省略部分kafka 消费者配置
// 创建kafka消费者
kafkaConsumer = new KafkaConsumer<>(prop);
}
1.3 创建topic
topic是消息存储载体,创建的topic可以制定名称,分区(提供吞吐量)、副本数量(故障转移高可靠性)
//创建分区
public void testTopic(){
//创建一个名为book,分区为2,副本数为3的topic 注意副本数量不能超过brokers否则报错
NewTopic topic = new NewTopic("book",2, (short) 4);
//adminClient在基础中已经进行了实例化 所以此处直接使用对象创建topic
CreateTopicsResult createTopicsResult = adminClient.createTopics(Lists.newArrayList(topic));
try {
createTopicsResult.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
if (createTopicsResult.all().isDone()){
System.out.println("topic创建完毕");
}
}
需要注意副本数量不能超过brokers的数量 否则报如下错误
1.3 生产者发送消息发送的消息需要选择topic,如果该topic不存在则会自动创建
//生产消息
public void testProduct(){
//设置
String topic = "book";
Integer partition = 0;
Long timeStamp = System.currentTimeMillis();
String key = "余华";
String value = "兄弟";
//创建一个消息
ProducerRecord msg = new ProducerRecord(topic,partition,timeStamp,key,value);
//kafka发送消息
Future sendFturue = kafkaProducer.send(msg);
//获取异步结果
Object result = null;
try {
result = sendFturue.get(1000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println("kafka生产:"+result);
}
1.4 消费消息
public void testConsumer(){
// 订阅主题
kafkaConsumer.subscribe(Arrays.asList("book"));
Duration duration = Duration.ofMillis(10);
// 获取数据
while(true){
ConsumerRecords consumerRecords = kafkaConsumer.poll(duration);
// 解析并打印consumerRecords
for(ConsumerRecord consumerRecord:consumerRecords){
System.out.println("kafka消费:"+consumerRecord.key()+"---"+consumerRecord.value());
}
}
}
2、spring整合kafka
2.1 maven依赖
org.springframework.kafka spring-kafka 1.3.5.RELEASE
kafka的配置
################# kafka生产者配置 ################## , # brokers集群 kafka.producer.bootstrap.servers = ip:9092,ip:9093,ip:9094 kafka.producer.acks = all #发送失败重试次数 kafka.producer.retries = 3 kafka.producer.linger.ms = 10 #即32MB的批处理缓冲区 kafka.producer.buffer.memory = 40960 #批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能 kafka.producer.batch.size = 4096 #默认topic kafka.producer.defaultTopic = book #消息序列化(key,value) kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer ################# kafka消费者配置 ################## , kafka.consumer.bootstrap.servers = ip:9092,ip:9093,ip:9094 # 如果为true,消费者的偏移量将在后台定期提交 kafka.consumer.enable.auto.commit = true #如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期 kafka.consumer.auto.commit.interval.ms=1000 #order-beta 消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一群组内的消费者只有一个能消费到消息 kafka.consumer.group.id = book_group #在使用Kafka的组管理时,用于检测消费者故障的超时 kafka.consumer.session.timeout.ms = 30000 #消息反序列化(key,value) kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer # 消费消息的并发数 kafka.consumer.concurrency = 32.2 生产者配置
1. 设置配置信息
使用hashMap对象接收kafka的配置信息,为后面的DefaultKafkaProducerFactory 提供配置参数
2. ProducerFactory
顾名思义该对象是kafka整合spring后的一个工厂对象用于创建kafka的生产者Producer
3. kafkaTemplate实例
该类是我们业务层面最终使用的服务类,里面提供了很多消息发送的相关api。
4. kafka生产者消费监听
public class KafkaProductListenerService implements ProducerListener{ @Override public void onSuccess(String topic, Integer partition, String key, String value, Recordmetadata recordmetadata) { System.out.println("主题topic: "+topic+" , 分区: "+partition+", key:"+key+", value:"+value); System.out.println(JacksonUtil.beanToJson(recordmetadata)); } @Override public void onError(String topic, Integer partition, String key, String value, Exception e) { System.out.println("主题topic: "+topic+" , 分区: "+partition+", key"+key+", value"+value); System.out.println(JacksonUtil.beanToJson(e)); } @Override public boolean isInterestedInSuccess() { return true; } }
5. 生产者生产消息
@Test
public void testProductMsg() throw Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[]{"spring-beans.xml"});
context.start();
//获取KafkaTemplate dui
KafkaTemplate kafkaTemplate = (KafkaTemplate) context.getBean("kafkaTemplate");
//发送消息到默认topic
ListenableFuture send = kafkaTemplate.sendDefault("毛姆", "月亮和六便士");
//异步获取结果
Object result = send.get(1000, TimeUnit.SECONDS);
System.out.println("发送消息成功:"+result);
try {
//模拟消息消费
Thread.sleep(50*1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2.3 消费者配置
1. 设置配置信息
同kafka消费者使用hashMap对象接收kafka的配置信息,为后面的DefaultKafkaConsumerFactory 提供配置参数
2. ConsumerFactory
顾名思义该对象是kafka整合spring后的一个工厂对象用于创建kafka的消费者者Consumer
3. 服务监容器配置
ContainerProperties 需要添加一个由我们实现的消息消费实现类KafkaConsumerService,同时在topics中可以用来声明 我们需要监听的主题集合。只有topics中指定的主题才会被监听类消费
book
public class KafkaConsumerService implements MessageListener{ @Override public void onMessage(ConsumerRecord data) { String topic = data.topic(); int partition = data.partition(); //此处可以采用Spring的dispatchServlet的类型 //根据不同主题,消费 if("book".equals(topic)){ //消费book类型的topic System.out.println("消费者消费主题topic: "+topic+" , 分区: "+partition+", key"+data.key()+", value"+data.value()); }else if("其他".equals(data.topic())){ //其他消费逻辑 } } }
4. 消费监听容器服务
3、springBoot整合kafka
org.springframework.boot spring-boot-starter-parent 2.3.5.RELEASE org.springframework.kafka spring-kafka 2.5.6.RELEASE
注意 springBoot的版本要和spring-kafka的版本相对应,否则会报如下错误:如果遇到报错信息是什么java.lang.NoClassDefFoundError: org/springframework/kafka/xxxx之类的话,基本就是版本问题了。
下面摘自官网springBoot和kafka版本对应关系。
使用了springboot 可以使用其自动装配或者自定义配置
在上述版本下spring自动装配使用的入口类为 KafkaAutoConfiguration 该类配置了 如下bean实例到spring容器中。
- KafkaAdmin 用于创建/修改/删除topic 的实例
- KafkaJaasLoginModuleInitializer JAAS登录权限控制
- KafkaTransactionManager kafka消息提交 的事务处理(消息发送成功 提交事务,否则回滚事务)
- ProducerFactory 消息生产者工厂对象
- ConsumerFactory 消息消费者工厂对象
- ProducerListener 消息生产者消息发送后的结果监听
- KafkaTemplate 我们熟悉的用于触发生产消息的spring模板类。
自动装配配置的优点是方便简单,但是缺点也很明显就是配置的bean已经固定了,如果我们需要进行一些特殊的设置可以通过如下的方式进行自定义配置替换掉springBoot中自动装配的kafka中的bean实例。
@Configuration
public class MyKafkaAutoConfiguration {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public DefaultKafkaProducerFactory kafkaProducerFactory() {
//进行自定义配置信息添加
return new DefaultKafkaProducerFactory<>(
this.kafkaProperties.buildProducerProperties());
}
@Bean
public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory kafkaProducerFactory) {
//进行自定义配置信息添加
return new KafkaTemplate<>(kafkaProducerFactory, false);
}
}
3.3 验证整合结果
生产消息
@RestController
public class KafkaProductController {
@Autowired
private KafkaTemplate kafkaTemplate;
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("book", callbackMessage).addCallback(success -> {
SendResult result = null;
if(success instanceof SendResult){
result = (SendResult)success;
}
// 消息发送到的topic
String topic = result.getRecordmetadata().topic();
// 消息发送到的分区
int partition = result.getRecordmetadata().partition();
// 消息在分区内的offset
long offset = result.getRecordmetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
}
消费消息
@Service
public class KafkaConsumerServiceTwo {
@KafkaListener(topics = {"book"}, groupId = "receiver")
public void registryReceiver(ConsumerRecord consumerRecords) {
System.out.println("模拟消息消费:"+consumerRecords.value());
}
}



