本文记录了Kafka在K8S上部署过程及Spring访问Kafka代码示例,包括了如下内容:
- 使用helm 3部署bitnami/kafka到Kubernetes 1.22环境
- 使用spring-kafka进行消息收发的简单示例
首先使用helm 3部署3节点ZooKeeper和3节点Kafka集群。如果要开户集群的数据持久化,需要提前在K8S上建立可用的PVC,但由于我的环境中还未搭建好NFS,就暂时配置两者的存储为临时存储作为演示,后续通过更改部署配置,也可以方便地切换为持久化存储。同时,我们使用K8S的NodePort方式对外发布Kafka的节点,以供开发调试使用。
搭建ZooKeeper集群部署3节点的zookeeper集群,关闭持久化存储,并设置服务发布方式为NodePort。NodePort是K8S一种服务发布方式,这样我们就可以通过任意K8S节点IP加指定端口的方式来访问对应的应用。
helm repo add bitnami https://charts.bitnami.com/bitnami helm install zookeeper-cluster --set replicaCount=3,persistence.enabled=false,service.type=NodePort bitnami/zookeeper
部署成功后会出现访问服务的相关提示,例如可以通过如下命令调用zookeeper客户端查询zookeeper内部数据。
export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper-cluster,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.sh
如果后续需要删除zookeeper的话,就可以使用如下命令:
helm delete zookeeper-cluster搭建Kafka集群
部署3节点的Kafka集群,关闭持久化存储,并设置服务发布方式为NodePort。关闭Kafka内置的ZooKeeper,配置为使用我们刚刚搭建ZooKeeper。
helm install kafka-cluster --set replicaCount=3,externalAccess.enabled=true,externalAccess.service.type=NodePort,externalAccess.service.port=9094,externalAccess.autoDiscovery.enabled=true,serviceAccount.create=true,persistence.enabled=false,zookeeper.enabled=false,externalZookeeper.servers=zookeeper-cluster:2181,rbac.create=true bitnami/kafka
部署成功后也会提示如何通过命令行来访问Kafka,例如通过如下命令可以得到一个Kafka客户端环境,然后在容器内对Kafka进行操作:
kubectl run kafka-cluster-client --restart='Never' --image docker.io/bitnami/kafka:2.8.1-debian-10-r73 --namespace default --command -- sleep infinity kubectl exec --tty -i kafka-cluster-client --namespace default -- bash
查询Kafka部署状态
[root@k8s-master1 ~]# kubectl get pods | grep kafka kafka-cluster-0 1/1 Running 0 22h kafka-cluster-1 1/1 Running 0 22h kafka-cluster-2 1/1 Running 0 22hSpring-kafka代码示例 概述
接下去就演示一下SpringBoot下对Kafka的代码访问示例。
生成项目结构可以去start.spring.io生成代码结构,添加Spring-Kafka的项目依赖。Pom文件如下:
配置文件4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.1 cn.zewade.course kafka-demo 0.0.1-SNAPSHOT kafka-demo Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.projectlombok lombok com.alibaba fastjson 1.2.73 org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka-test test org.springframework.boot spring-boot-maven-plugin
application.properties
spring.kafka.bootstrap-servers=${K8S_NODE_IP}:30361
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.max-poll-records=100
生产者代码
生产者相关代码,注入KafkaTemplate,生成NewTopic的Bean就会提前创建好Topic,这是为了避免后续消费者监听时报Topic不存在的警告。在生产环境中一般建议禁用自动创建Topic,改由管理员手工创建Topic。在sendUserMessage中,演示消息头和消息体的发送。
@Component
public class MessagingService {
@Autowired
KafkaTemplate kafkaTemplate;
@Bean
public NewTopic topic1() {
return TopicBuilder.name("user_sync")
.partitions(3)
.replicas(1)
.compact()
.build();
}
public void sendUserMessage(String topic, User user) throws IOException {
ProducerRecord producerRecord = new ProducerRecord<>(topic, JSON.toJSONString(user));
producerRecord.headers().add("type", user.getClass().getName().getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(producerRecord);
}
}
User.java
import lombok.Data;
@Data
public class User {
private Integer id;
private String userCode;
private String userName;
}
消费者代码
通过KafkaListener监听消息,注意不同的groupId指定了不同的消费者组,它们独立消费消息,所以同一消息会被两组各自消费。
@Component
@Slf4j
public class TopicMessageListener {
@KafkaListener(topics = "user_sync", groupId = "group1")
public void onLoginMessage(@Payload String message, @Header("type") String type) throws Exception {
User user = JSON.parseObject(message, getType(type));
log.info("received user message: {}, group: {}", user, "group1");
}
@KafkaListener(topics = "user_sync", groupId = "group2")
public void processLoginMessage(@Payload String message, @Header("type") String type) throws Exception {
User user = JSON.parseObject(message, getType(type));
log.info("received user message: {}, group: {}", user, "group2");
}
@SuppressWarnings("unchecked")
private static Class getType(String type) {
// TODO: use cache:
try {
return (Class) Class.forName(type);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
总结
通过helm 3和Spring对Kakfa进行了初步的实践,后续将尝试持久化存储操作和Spring-kafka更加深入的使用。
参考文档bitnami/kafka部署文档
Spring-kafka说明文档



