栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka集群K8S部署与开发实践

Kafka集群K8S部署与开发实践

Kafka集群K8S部署与开发实践 摘要

本文记录了Kafka在K8S上部署过程及Spring访问Kafka代码示例,包括了如下内容:

  • 使用helm 3部署bitnami/kafka到Kubernetes 1.22环境
  • 使用spring-kafka进行消息收发的简单示例
Kafka集群部署 概述

首先使用helm 3部署3节点ZooKeeper和3节点Kafka集群。如果要开户集群的数据持久化,需要提前在K8S上建立可用的PVC,但由于我的环境中还未搭建好NFS,就暂时配置两者的存储为临时存储作为演示,后续通过更改部署配置,也可以方便地切换为持久化存储。同时,我们使用K8S的NodePort方式对外发布Kafka的节点,以供开发调试使用。

搭建ZooKeeper集群

部署3节点的zookeeper集群,关闭持久化存储,并设置服务发布方式为NodePort。NodePort是K8S一种服务发布方式,这样我们就可以通过任意K8S节点IP加指定端口的方式来访问对应的应用。

helm repo add bitnami https://charts.bitnami.com/bitnami
helm install zookeeper-cluster --set replicaCount=3,persistence.enabled=false,service.type=NodePort bitnami/zookeeper

部署成功后会出现访问服务的相关提示,例如可以通过如下命令调用zookeeper客户端查询zookeeper内部数据。

export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper-cluster,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.sh

如果后续需要删除zookeeper的话,就可以使用如下命令:

helm delete zookeeper-cluster
搭建Kafka集群

部署3节点的Kafka集群,关闭持久化存储,并设置服务发布方式为NodePort。关闭Kafka内置的ZooKeeper,配置为使用我们刚刚搭建ZooKeeper。

helm install kafka-cluster --set replicaCount=3,externalAccess.enabled=true,externalAccess.service.type=NodePort,externalAccess.service.port=9094,externalAccess.autoDiscovery.enabled=true,serviceAccount.create=true,persistence.enabled=false,zookeeper.enabled=false,externalZookeeper.servers=zookeeper-cluster:2181,rbac.create=true bitnami/kafka

部署成功后也会提示如何通过命令行来访问Kafka,例如通过如下命令可以得到一个Kafka客户端环境,然后在容器内对Kafka进行操作:

kubectl run kafka-cluster-client --restart='Never' --image docker.io/bitnami/kafka:2.8.1-debian-10-r73 --namespace default --command -- sleep infinity
kubectl exec --tty -i kafka-cluster-client --namespace default -- bash

查询Kafka部署状态

[root@k8s-master1 ~]# kubectl get pods | grep kafka
kafka-cluster-0                                1/1     Running   0          22h
kafka-cluster-1                                1/1     Running   0          22h
kafka-cluster-2                                1/1     Running   0          22h
Spring-kafka代码示例 概述

接下去就演示一下SpringBoot下对Kafka的代码访问示例。

生成项目结构

可以去start.spring.io生成代码结构,添加Spring-Kafka的项目依赖。Pom文件如下:



	4.0.0
	
		org.springframework.boot
		spring-boot-starter-parent
		2.6.1
		 
	
	cn.zewade.course
	kafka-demo
	0.0.1-SNAPSHOT
	kafka-demo
	Demo project for Spring Boot
	
		1.8
	
	
		
			org.springframework.boot
			spring-boot-starter
		
		
			org.springframework.boot
			spring-boot-starter-web
		
		
			org.springframework.kafka
			spring-kafka
		
		
			org.projectlombok
			lombok
		

		
			com.alibaba
			fastjson
			1.2.73
		

		
			org.springframework.boot
			spring-boot-starter-test
			test
		
		
			org.springframework.kafka
			spring-kafka-test
			test
		
	

	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	



配置文件

application.properties

spring.kafka.bootstrap-servers=${K8S_NODE_IP}:30361
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.max-poll-records=100
生产者代码

生产者相关代码,注入KafkaTemplate,生成NewTopic的Bean就会提前创建好Topic,这是为了避免后续消费者监听时报Topic不存在的警告。在生产环境中一般建议禁用自动创建Topic,改由管理员手工创建Topic。在sendUserMessage中,演示消息头和消息体的发送。

@Component
public class MessagingService {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("user_sync")
                .partitions(3)
                .replicas(1)
                .compact()
                .build();
    }

    public void sendUserMessage(String topic, User user) throws IOException {
        ProducerRecord producerRecord = new ProducerRecord<>(topic, JSON.toJSONString(user));
        producerRecord.headers().add("type", user.getClass().getName().getBytes(StandardCharsets.UTF_8));
        kafkaTemplate.send(producerRecord);
    }
}

User.java

import lombok.Data;

@Data
public class User {

    private Integer id;
    private String userCode;
    private String userName;
}
消费者代码

通过KafkaListener监听消息,注意不同的groupId指定了不同的消费者组,它们独立消费消息,所以同一消息会被两组各自消费。

@Component
@Slf4j
public class TopicMessageListener {

    @KafkaListener(topics = "user_sync", groupId = "group1")
    public void onLoginMessage(@Payload String message, @Header("type") String type) throws Exception {
        User user = JSON.parseObject(message, getType(type));
        log.info("received user message: {}, group: {}", user, "group1");
    }

    @KafkaListener(topics = "user_sync", groupId = "group2")
    public void processLoginMessage(@Payload String message, @Header("type") String type) throws Exception {
        User user = JSON.parseObject(message, getType(type));
        log.info("received user message: {}, group: {}", user, "group2");
    }

    @SuppressWarnings("unchecked")
    private static  Class getType(String type) {
        // TODO: use cache:
        try {
            return (Class) Class.forName(type);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
}
总结

通过helm 3和Spring对Kakfa进行了初步的实践,后续将尝试持久化存储操作和Spring-kafka更加深入的使用。

参考文档

bitnami/kafka部署文档

Spring-kafka说明文档

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677140.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号