Apache Kafka 是一个分布式发布 - 订阅消息系统和一个强大的消息队列,可以处理大量的数据,并使你能够将消息从一个端点传递到另一个端点。 Kafka 适合离线和在线消息消费。 Kafka 消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka 构建在 Zookeeper 同步服务之上。 它与 Apache Storm 和 Spark 非常好地集成,用于实时流式数据分析。
Kafka 是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
相关术语
Broker
Kafka集群包含多个服务器,每个服务器节点就叫Broker
Topic
Kafka消息分为不同的主题,按主题分开投递和处理
Partition
每个Topic将数据分为1个或N个分区来存储
Producer
消息生产者,负责投递消息
Consumer
消息消费者,负责接收消息
Consumer Group
消费者组,每个消费者属于特定的组
Leader
Kafka集群中会选取一个Broker,负责写数据,然后广播同步给其他Broker,挂掉后会重新选举
Follower
跟随Leader,负责读数据,Leader的数据会同步给Follower
Replica
Broker的副本,当前Broker挂掉后可以代替
Kafka安装和配置1)下载kafka
https://kafka.apache.org/downloads
本例使用的版本是:kafka_2.11-2.4.1.tgz
2) 解压并修改目录
tar -xvf kafka_2.11-2.4.1.tgz mv kafka_2.11-2.4.1 kafka
3) 配置
修改server.properties关键配置、其他的保持默认:
# 监听本机的9092端口 listeners=PLAINTEXT://:9092 # zk的ip和端口 zookeeper.connect=localhost:2181
4) 启动kafka
需要先启动zookeeper,kafka提供了内置的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
执行bin目录中的脚本
bin/kafka-server-start.sh config/server.properties
如果出现Not enough space是因为堆内存不足,可以修改kafka-server-start.sh脚本,将"-Xmx1G -Xms1G"改为"-Xmx512M -Xms512M"
需要后台执行可以加入-daemon命令
bin/kafka-server-start.sh -daemon config/server.properties
日志在logs/server.log中查看
5) 停止kafka
bin/kafka-server-stop.sh config/server.propertiesKafka的基本使用
执行bin目录中的脚本
1) 创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
–create 创建topic–zookeeper zk的地址–replication-factor 副本数,默认是1–partitions 分区数–topic 主题名称
2) 查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
3) 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
4) 消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningSpringBoot整合Kafka
1) 修改kafka配置
# 允许外部IP连接 listeners=PLAINTEXT://0.0.0.0:9092 # 为外部提供服务的代理地址,就是当前服务的IP advertised.listeners=PLAINTEXT://192.168.7.188:9092
2) 重新启动Kafka
3) 创建SpringBoot项目
4) 添加依赖
org.springframework.kafka spring-kafka
5) 配置文件
# Kafka集群地址 spring.kafka.bootstrap-servers=192.168.7.188:9092 # 生产者配置 # 重试次数 spring.kafka.producer.retries=0 # 批量大小 spring.kafka.producer.batch-size=16384 # 提交延时 spring.kafka.producer.properties.linger.ms=0 # 生产端缓冲区大小 spring.kafka.producer.buffer-memory = 33554432 # Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 消费者配置 # 默认的消费组ID spring.kafka.consumer.group-id=0 # 是否自动提交offset spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.auto-commit-interval=100 # 设置获取一次数据的最大值,默认值为1M spring.kafka.consumer.properties.max.partition.fetch.bytes=2097152 # Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
参考官网的kafka配置:
https://docs.spring.io/spring-boot/docs/2.0.4.RELEASE/reference/htmlsingle/#common-application-properties
6) 配置类
@Configuration
public class KafkaConfig {
@Bean
public NewTopic newTopic(){
return new NewTopic("hello-topic",1, (short) 1);
}
}
7) 生产者
@RestController
public class KafkaProducerController {
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("/kafka/sendMessage")
public String sendMessage(String msg){
kafkaTemplate.send("hello-topic",msg);
return "发送成功:" + msg;
}
}
8) 消费者
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = {"hello-topic"})
public void consumeMessage(ConsumerRecord,?> record){
System.out.println("Get Message:" + record.value());
}
}
9) 浏览器测试



