栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据学习教程SD版第十篇【Kafka】

大数据学习教程SD版第十篇【Kafka】

Kafka 流数据缓冲存储工具

分布式 缓冲作用 限流削峰 异步 ,Scala 编写

基于发布-订阅模式的消息队列:一对多,消费者拉取数据之后不会清除数据

1. Kafka 架构
  1. Producer 生产消息

  2. Broker Kafka集群

    • Topic 主题 消息分类
    • Partition 分区
    • Leader Follower 副本
  3. Consumer 消费消息

    • Consumer group :一个主题分区数据 只能 对应一个消费者组 提高消费能力
  4. Zookeeper 注册消息

    • 管理Kafka集群消息

    • 0.9 版本之前 存储offset

Kafka 0.9 版本之后,offset 存储在本地

2. Kafka 安装
  1. 下载安装包
  2. 解压安装包
  3. 修改配置文件

server.properties

broker.id=0
delete.topic.enable=true
# kafka data dir
log.dirs=/opt/module/kafka_2.11-0.10.2.0/logs
# zk
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
  1. 分发Kafka,并修改broker.id的值
  2. 启动ZK
  3. 启动Kafka
bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-0.10.2.0/config/server.properties
  1. Jps查看进程
------hadoop102-------
2790 QuorumPeerMain
3850 Kafka
3931 Jps
------hadoop103-------
2635 Jps
2572 Kafka
1599 QuorumPeerMain
------hadoop104-------
2608 Jps
2545 Kafka
1574 QuorumPeerMain
3. Kafka Shell 3.1 topic

replication 数目 <= Broker 数目

# create topic
bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --topic test2 --partitions 3 --replication-factor 1
# delete topic
 bin/kafka-topics.sh --delete --topic test1  --zookeeper hadoop102:2181
# topic list 
bin/kafka-topics.sh --list --zookeeper hadoop102:2181
# topic describe
bin/kafka-topics.sh --describe --topic test2 --zookeeper hadoop102:2181
3.2 producer
bin/kafka-console-producer.sh --topic test2 --broker-list hadoop102:9092
3.3 consumer
bin/kafka-console-consumer.sh --topic test2 --bootstrap-server hadoop102:9092
4. Kafka 重装

修改kafka 数据存放目录 log.dirs

  1. 删除 logs/ 目录
  2. 创建 data/ 目录
  3. 修改配置文件
  4. 删除zk zkData/version-2 目录
  5. 重启zk
  6. 重启kafka
5. Kafka 文件存储
  1. 一个Topic -> 多个Partition
  2. 一个Partition -> Segment
  3. 一个Segment -> xxx.log xxx.index
6. Kafka 生产者
  • 分区策略
  1. 指定分区号,进入指定分区
  2. 没有分区号,指定K,按照 Key 的hash / partition 数,进入分区
  3. 都没有,轮询,进入分区
  • 生产数据可靠性
  1. ISR 机制 : ISR列表中的Leader 和所有 Follower 同步成功 才算成功,当 Follower 超过一定时间未发起拉取数据请求,则 Leader 把此 Follower 从ISR 移除

  2. Acks机制

    • 0:不管Leader 和Follower 状态 ,可能数据丢失
    • 1:Leader 写完即返回,可能数据丢失
    • -1 | all : ISR 的Leader 和 所有Follower 全部写完才返回 ,极小可能数据丢失,可能数据重复
  • 消费数据一致性
  1. LEO:每个副本最大offset
  2. HW:消费者能见的最大的offset = ISR 最小 LEO

当Leader 挂了,新Leader 为了保证 数据存储一致性,截掉超过HW的部分

  • Exactly once 不能跨会话

Exactly once = At Least once + 幂等性(0.11)

7. Kafka 消费者
  • 消费方式

    pull 从broker拉取数据,问题:空数据

  • 分区分配策略

    当消费者组内消费者数目发生变化时触发,重新分配

    1. RoundRobin:TopicAndPartition 按照组划分【组内默认策略】
    2. Range:按照主题划分
  • Offset

Offset = Group + Topic + Partition

8. Kafka 高效读写
  1. 顺序写磁盘,节省寻址时间
  2. 零复制技术(页缓存技术)
9. Kafka 事务

生产者事务 跨会话,引入一个全局唯一用户定义好的一个 Transaction ID ,最终实现Exactly Once

10. Kafka API 10.1 Producer API

异步发送,和ACK没关系,涉及两个线程:main 和 sender ,经过 Interceptors -> Serializer -> Partitioner

  1. 简单生产者
package com.ipinyou.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(props);
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord("test2", "a"+i));
        }
        kafkaProducer.close();
    }
}
  1. 带回调生产者
package com.ipinyou.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CallBackProducerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(props);
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord("test2", "a:" + i), (metadata, exception) -> {
                if (exception == null) {
                    System.out.println(metadata.partition() + "--" + metadata.offset());
                }
            });
        }
        kafkaProducer.close();
    }
}
  1. 自定义分区器
package com.ipinyou.kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class DemoPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // service code
        return 2;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map configs) {

    }
}

# 在producer程序中指定
 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.ipinyou.kafka.partitioner.DemoPartitioner");
10.2 Consumer API
  • 简单消费者
package com.ipinyou.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata2");
        // 重置消费的offset:当groupid更换或offset失效时,配置才生效
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
        kafkaConsumer.subscribe(Arrays.asList("test2"));

        while (true) {
            ConsumerRecords records = kafkaConsumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println(record.key() + ":" + record.value());
            }
        }

//        kafkaConsumer.close();
    }
}

默认自动提交offset,可能造成数据消费丢失或数据消费重复

同步和异步手动提交offset,也会造成数据重复消费

  • 自定义存储offset

封装事务操作:消费数据+保存并提交offset,Rebalance 需要借助 ComsumerRebalanceListener

10.3 Interceptor API

在发送到Broker之前,加个时间戳,发送成功之后,显示成功条数

package com.ipinyou.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;

import java.util.Map;

public class DemoInterceptor implements ProducerInterceptor {

    private int success;
    private int error;

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        ProducerRecord producerRecord = new ProducerRecord<>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "_" + record.value());
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(Recordmetadata metadata, Exception exception) {
        if (metadata != null) {
            success++;
        } else {
            error++;
        }
    }

    @Override
    public void close() {
        System.out.println("success:" + success);
        System.out.println("error:" + error);
    }

    @Override
    public void configure(Map configs) {

    }
}
  • 在生产者程序中添加
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("com.ipinyou.kafka.interceptor.DemoInterceptor"));

11. Kafka 监控

使用Eagle

  1. 修改kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms8G -Xmx8G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi
  1. 分发启动配置

  2. 下载并解压eagle安装包

  3. 配置eagle环境变量,source

export KE_HOME=/opt/module/eagle-web-1.3.7
export PATH=$PATH:$KE_HOME/bin
  1. 修改eagle 配置 system-config.properties

这里使用 sqlite 存储eagle 数据,连接MySQL时一直连不上

######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181


######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=25

######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048

######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka

######################################
# kafka metrics, 15 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15


######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=false

######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.kafka.eagle.sasl.client.id=
cluster1.kafka.eagle.sasl.cgroup.enable=false
cluster1.kafka.eagle.sasl.cgroup.topics=

cluster2.kafka.eagle.sasl.enable=false
cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster2.kafka.eagle.sasl.mechanism=PLAIN
cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.kafka.eagle.sasl.client.id=
cluster2.kafka.eagle.sasl.cgroup.enable=false
cluster2.kafka.eagle.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
cluster3.kafka.eagle.ssl.enable=false
cluster3.kafka.eagle.ssl.protocol=SSL
cluster3.kafka.eagle.ssl.truststore.location=
cluster3.kafka.eagle.ssl.truststore.password=
cluster3.kafka.eagle.ssl.keystore.location=
cluster3.kafka.eagle.ssl.keystore.password=
cluster3.kafka.eagle.ssl.key.password=
cluster3.kafka.eagle.ssl.cgroup.enable=false
cluster3.kafka.eagle.ssl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/opt/module/eagle-web-1.4.8/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=123456

######################################
# kafka mysql jdbc driver address
######################################

#kafka.eagle.driver=com.mysql.jdbc.Driver
#kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#kafka.eagle.username=root
#kafka.eagle.password=root
  1. 启动zk,kafka之后,启动eagle
bin/ke.sh start
  1. 启动之后,在最后能看到UI 地址 和Account Password
http://192.168.10.102:8048/ke
Account:admin
Password:123456

12. Kafka Flume 12.1 Log -> Kafka

监控模拟日志输出到Kafka,可以使用 KafkaSink ,也可以直接使用KafkaChannel ,不用Sink

netcat2kafka2.conf

a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1

a1.channels=c1
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = test2
a1.channels.c1.kafka.consumer.group.id = bigdata3

启动

bin/flume-ng agnet -n a1 -c conf -f jobs/netcat2kafka2.conf
12.2 Log-> Kafka 不同Topic

监控模拟日志,根据日志内容不同,输出到不同的Kafka Topic下

  1. 定义Flume Interceptor
package com.ipinyou.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TopicInterceptor implements Interceptor {

    private List eventList;

    @Override
    public void initialize() {
        eventList = new ArrayList<>();
    }

    @Override
    public Event intercept(Event event) {
        Map headers = event.getHeaders();
        String body = new String(event.getBody());
        if (body.contains("Error") || body.contains("Exception")) {
            headers.put("topic", "error");
        } else {
            headers.put("topic", "normal");
        }
        return event;
    }

    @Override
    public List intercept(List list) {
        eventList.clear();
        for (Event event : list) {
            eventList.add(intercept(event));
        }
        return eventList;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TopicInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
  1. 配置采集方案

kafka-topic.conf

a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.ipinyou.flume.interceptor.TopicInterceptor$Builder

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000

a1.sinks = k1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = error
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
  1. 启动Flume
 bin/flume-ng agent -n a1 -c conf/ -f jobs/kafka-topic.conf
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/688383.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号