1. Kafka 架构Kafka 流数据缓冲存储工具
分布式 缓冲作用 限流削峰 异步 ,Scala 编写
基于发布-订阅模式的消息队列:一对多,消费者拉取数据之后不会清除数据
-
Producer 生产消息
-
Broker Kafka集群
- Topic 主题 消息分类
- Partition 分区
- Leader Follower 副本
-
Consumer 消费消息
- Consumer group :一个主题分区数据 只能 对应一个消费者组 提高消费能力
-
Zookeeper 注册消息
-
管理Kafka集群消息
-
0.9 版本之前 存储offset
-
Kafka 0.9 版本之后,offset 存储在本地
2. Kafka 安装- 下载安装包
- 解压安装包
- 修改配置文件
server.properties
broker.id=0 delete.topic.enable=true # kafka data dir log.dirs=/opt/module/kafka_2.11-0.10.2.0/logs # zk zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
- 分发Kafka,并修改broker.id的值
- 启动ZK
- 启动Kafka
bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-0.10.2.0/config/server.properties
- Jps查看进程
------hadoop102------- 2790 QuorumPeerMain 3850 Kafka 3931 Jps ------hadoop103------- 2635 Jps 2572 Kafka 1599 QuorumPeerMain ------hadoop104------- 2608 Jps 2545 Kafka 1574 QuorumPeerMain3. Kafka Shell 3.1 topic
replication 数目 <= Broker 数目
# create topic bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --topic test2 --partitions 3 --replication-factor 1 # delete topic bin/kafka-topics.sh --delete --topic test1 --zookeeper hadoop102:2181 # topic list bin/kafka-topics.sh --list --zookeeper hadoop102:2181 # topic describe bin/kafka-topics.sh --describe --topic test2 --zookeeper hadoop102:21813.2 producer
bin/kafka-console-producer.sh --topic test2 --broker-list hadoop102:90923.3 consumer
bin/kafka-console-consumer.sh --topic test2 --bootstrap-server hadoop102:90924. Kafka 重装
修改kafka 数据存放目录 log.dirs
- 删除 logs/ 目录
- 创建 data/ 目录
- 修改配置文件
- 删除zk zkData/version-2 目录
- 重启zk
- 重启kafka
- 一个Topic -> 多个Partition
- 一个Partition -> Segment
- 一个Segment -> xxx.log xxx.index
- 分区策略
- 指定分区号,进入指定分区
- 没有分区号,指定K,按照 Key 的hash / partition 数,进入分区
- 都没有,轮询,进入分区
- 生产数据可靠性
-
ISR 机制 : ISR列表中的Leader 和所有 Follower 同步成功 才算成功,当 Follower 超过一定时间未发起拉取数据请求,则 Leader 把此 Follower 从ISR 移除
-
Acks机制
- 0:不管Leader 和Follower 状态 ,可能数据丢失
- 1:Leader 写完即返回,可能数据丢失
- -1 | all : ISR 的Leader 和 所有Follower 全部写完才返回 ,极小可能数据丢失,可能数据重复
- 消费数据一致性
- LEO:每个副本最大offset
- HW:消费者能见的最大的offset = ISR 最小 LEO
当Leader 挂了,新Leader 为了保证 数据存储一致性,截掉超过HW的部分
- Exactly once 不能跨会话
Exactly once = At Least once + 幂等性(0.11)
7. Kafka 消费者-
消费方式
pull 从broker拉取数据,问题:空数据
-
分区分配策略
当消费者组内消费者数目发生变化时触发,重新分配
- RoundRobin:TopicAndPartition 按照组划分【组内默认策略】
- Range:按照主题划分
-
Offset
Offset = Group + Topic + Partition
8. Kafka 高效读写- 顺序写磁盘,节省寻址时间
- 零复制技术(页缓存技术)
10. Kafka API 10.1 Producer API生产者事务 跨会话,引入一个全局唯一用户定义好的一个 Transaction ID ,最终实现Exactly Once
异步发送,和ACK没关系,涉及两个线程:main 和 sender ,经过 Interceptors -> Serializer -> Partitioner
- 简单生产者
package com.ipinyou.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(props);
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord("test2", "a"+i));
}
kafkaProducer.close();
}
}
- 带回调生产者
package com.ipinyou.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CallBackProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(props);
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord("test2", "a:" + i), (metadata, exception) -> {
if (exception == null) {
System.out.println(metadata.partition() + "--" + metadata.offset());
}
});
}
kafkaProducer.close();
}
}
- 自定义分区器
package com.ipinyou.kafka.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class DemoPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// service code
return 2;
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}
# 在producer程序中指定 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.ipinyou.kafka.partitioner.DemoPartitioner");10.2 Consumer API
- 简单消费者
package com.ipinyou.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata2");
// 重置消费的offset:当groupid更换或offset失效时,配置才生效
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
kafkaConsumer.subscribe(Arrays.asList("test2"));
while (true) {
ConsumerRecords records = kafkaConsumer.poll(100);
for (ConsumerRecord record : records) {
System.out.println(record.key() + ":" + record.value());
}
}
// kafkaConsumer.close();
}
}
默认自动提交offset,可能造成数据消费丢失或数据消费重复
同步和异步手动提交offset,也会造成数据重复消费
- 自定义存储offset
10.3 Interceptor API封装事务操作:消费数据+保存并提交offset,Rebalance 需要借助 ComsumerRebalanceListener
在发送到Broker之前,加个时间戳,发送成功之后,显示成功条数
package com.ipinyou.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import java.util.Map; public class DemoInterceptor implements ProducerInterceptor{ private int success; private int error; @Override public ProducerRecord onSend(ProducerRecord record) { ProducerRecord producerRecord = new ProducerRecord<>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "_" + record.value()); return producerRecord; } @Override public void onAcknowledgement(Recordmetadata metadata, Exception exception) { if (metadata != null) { success++; } else { error++; } } @Override public void close() { System.out.println("success:" + success); System.out.println("error:" + error); } @Override public void configure(Map configs) { } }
- 在生产者程序中添加
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("com.ipinyou.kafka.interceptor.DemoInterceptor"));
11. Kafka 监控
使用Eagle
- 修改kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms8G -Xmx8G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi
-
分发启动配置
-
下载并解压eagle安装包
-
配置eagle环境变量,source
export KE_HOME=/opt/module/eagle-web-1.3.7 export PATH=$PATH:$KE_HOME/bin
- 修改eagle 配置 system-config.properties
这里使用 sqlite 存储eagle 数据,连接MySQL时一直连不上
###################################### # multi zookeeper & kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181 ###################################### # broker size online list ###################################### cluster1.kafka.eagle.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=25 ###################################### # kafka eagle webui port ###################################### kafka.eagle.webui.port=8048 ###################################### # kafka offset storage ###################################### cluster1.kafka.eagle.offset.storage=kafka ###################################### # kafka metrics, 15 days by default ###################################### kafka.eagle.metrics.charts=true kafka.eagle.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### kafka.eagle.sql.topic.records.max=5000 kafka.eagle.sql.fix.error=false ###################################### # delete kafka topic token ###################################### kafka.eagle.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.kafka.eagle.sasl.enable=false cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256 cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.kafka.eagle.sasl.client.id= cluster1.kafka.eagle.sasl.cgroup.enable=false cluster1.kafka.eagle.sasl.cgroup.topics= cluster2.kafka.eagle.sasl.enable=false cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster2.kafka.eagle.sasl.mechanism=PLAIN cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle"; cluster2.kafka.eagle.sasl.client.id= cluster2.kafka.eagle.sasl.cgroup.enable=false cluster2.kafka.eagle.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.kafka.eagle.ssl.enable=false cluster3.kafka.eagle.ssl.protocol=SSL cluster3.kafka.eagle.ssl.truststore.location= cluster3.kafka.eagle.ssl.truststore.password= cluster3.kafka.eagle.ssl.keystore.location= cluster3.kafka.eagle.ssl.keystore.password= cluster3.kafka.eagle.ssl.key.password= cluster3.kafka.eagle.ssl.cgroup.enable=false cluster3.kafka.eagle.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### kafka.eagle.driver=org.sqlite.JDBC kafka.eagle.url=jdbc:sqlite:/opt/module/eagle-web-1.4.8/db/ke.db kafka.eagle.username=root kafka.eagle.password=123456 ###################################### # kafka mysql jdbc driver address ###################################### #kafka.eagle.driver=com.mysql.jdbc.Driver #kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #kafka.eagle.username=root #kafka.eagle.password=root
- 启动zk,kafka之后,启动eagle
bin/ke.sh start
- 启动之后,在最后能看到UI 地址 和Account Password
http://192.168.10.102:8048/ke Account:admin Password:12345612. Kafka Flume 12.1 Log -> Kafka
监控模拟日志输出到Kafka,可以使用 KafkaSink ,也可以直接使用KafkaChannel ,不用Sink
netcat2kafka2.conf
a1.sources = r1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.channels = c1 a1.channels=c1 a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.channels.c1.kafka.topic = test2 a1.channels.c1.kafka.consumer.group.id = bigdata3
启动
bin/flume-ng agnet -n a1 -c conf -f jobs/netcat2kafka2.conf12.2 Log-> Kafka 不同Topic
监控模拟日志,根据日志内容不同,输出到不同的Kafka Topic下
- 定义Flume Interceptor
package com.ipinyou.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TopicInterceptor implements Interceptor {
private List eventList;
@Override
public void initialize() {
eventList = new ArrayList<>();
}
@Override
public Event intercept(Event event) {
Map headers = event.getHeaders();
String body = new String(event.getBody());
if (body.contains("Error") || body.contains("Exception")) {
headers.put("topic", "error");
} else {
headers.put("topic", "normal");
}
return event;
}
@Override
public List intercept(List list) {
eventList.clear();
for (Event event : list) {
eventList.add(intercept(event));
}
return eventList;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TopicInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
- 配置采集方案
kafka-topic.conf
a1.sources = r1 a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 44444 a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.ipinyou.flume.interceptor.TopicInterceptor$Builder a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.sinks = k1 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = error a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy
- 启动Flume
bin/flume-ng agent -n a1 -c conf/ -f jobs/kafka-topic.conf



