栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

day04kafka

day04kafka

一. 复习

kafka使用分布式公平架构,主节点:kafka controllere (负责存储和管理) 从节点:kafka broker(负责存储)如果主节点挂掉,会依赖zk重新选举。
kafka的数据安全是依赖副本机制
leader和follwer是topic下的part的主节点和从节点,而controller和broker是集群的

二.topic管理:创建与列举、

/export/server/kafka_2.12-2.4.1/bin

    创建:kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092(–topic topic名称, --partitions分区个数,–replication-factor副本,–bootstrap-server node1:9092,node2:9092,node3:9092 服务端地址进程端口9092)列举:–list:kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092查看:–describe :kafka-topics.sh --describe --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092
    删除:delete:kafka-topics.sh --delete --topic bigdata02 --bootstrap-server node1:9092,node2:9092,node3:9092修改topic:–alter:kafka-topics.sh --alter --topic bigdata02 分区/副本/属性 --bootstrap-server node1:9092,node2:9092,node3:9092
三.生产者及消费者测试
    命令行生产者: kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092命令行消费者: kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092
    从头开始消费:kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning(默认从最新的位置开始消费,–from-beginning从最早的位置开始消费)只要生产者不断生产,消费者就能实时的消费到topic中的数据
四.kafka集群压力测试

    创建topic:bigdata :kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

    生产测试: kafka-producer-perf-test.sh --topic bigdata --num-records 100000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1

    消费测试: kafka-consumer-perf-test.sh --topic bigdata --broker-list node1:9092,node2:9092,node3:9092 --fetch-size 1048576 --messages 100000

五.kafka API
    API分类
    1.1high level API:高级API,基于simpleAPI做了封装,让用户开发更加方便,但是由于封装了底层的API有很多东西不可控,无法控制数据安全;offset 自动存储zookeeper中,不需要自己管理。
    1.2simpleAPI:简单API。自定义控制所有的消费和生产,保障数据安全。生产者API:生产数据到kafka
    2.1导入maven依赖:


aliyun
http://maven.aliyun.com/nexus/content/groups/public/






org.apache.kafka
kafka-clients
2.4.1


2.2 代码

package bigdata.hlzq.com.kafka.produce;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

//生产者数据到kafka
public class KafkaProducerTestClient {
    public static void main(String[] args) {
        //构建连接
        //构建一个配置对象
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//服务端地址
        props.put("acks","all");//生产者写入网络部丢失的原因ack+重试机制:写入一条到kafka分区,kafka会返回一个ack确认,如果没有返回重新发送ack的值: 0不用等待 1:等待写入到lead就返回 all:等待所有副本
        //定义写入kafka的类型
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //构建连接对象加载配置
        KafkaProducer producer = new KafkaProducer<>(props);
        //实现操作
        for (int i=1;i<=100;i++){
            //调用连接对象的方法,指定topic key决定分区规则
            producer.send(new ProducerRecord("bigdata01",i+"",i+"itc"));
            //不给key在同一个分区
            producer.send(new ProducerRecord("bigdata01",i+"hh"));
            //指定分区
            producer.send(new ProducerRecord("bigdata01", 1,i+"",i+"itc"));
        }
        //释放连接
        producer.close();
    }
}
    消费者
    代码:
import java.util.Properties;
import java.util.function.Consumer;

public class KafkaCons {
    public static void main(String[] args) {
        // 构建消费者连接
        //构建配置对象
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms","1000");
        props.setProperty("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer consumer = new KafkaConsumer(props);
        // 订阅消费处理
        consumer.subscribe(Arrays.asList("bigdata01"));
        //消费者是不停的
        while (true){
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecordrecord:records){
                System.out.println("offset:"+record.offset()+"   "+"topic:"+record.topic()+"  "+"part:"+record.partition()+"  "+"value:"+record.value());

            }
        }
    }
}
六. 生产分区规则
    数据存储在那个分区的规则,
 //调用连接对象的方法,指定topic key决定分区规则
            producer.send(new ProducerRecord("bigdata01",i+"",i+"itc"));
            //不给key在同一个分区
            producer.send(new ProducerRecord("bigdata01",i+"hh"));
            //指定分区
            producer.send(new ProducerRecord("bigdata01", 1,i+"",i+"itc"));

默认分区器defaultpartitioner
如果指定了key:按照key的hash取余分区的个数,来写入对应的分区:只要key一样就会进入同一个分区,容易导致数据倾斜
黏性分区器:实现少批次多数据:一个批次的数据都存放在一个分区。(判断缓存中是否有这个topic的分区连接,如果有直接使用,没有随机写入一个分区,并放入缓存)
轮循分区:数据分配相对均衡,批次多,每个批次数据量少,性能差。(不用)

    分区规则:先判断有没有指定分区,指定分区就写入指定的分区,调用分区器在判断有没有指定key如果有key按照key类似于hash取余的方式计算分区,没有的话使用黏性分区自定义开发生产分区器
package bigdata.hlzq.com.kafka.userpart;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.Random;

//自定义分区器
public class UserPartition implements Partitioner {
  
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       //获取topic的分区个数
        Integer integer = cluster.partitionCountForTopic(topic);
        //构建随机分区随机值
        Random random = new Random();
        int i = random.nextInt(integer);
        return i;
    }

    @Override
    public void close() {
    //释放资源连接
    }

    @Override
    public void configure(Map configs) {
     //获取配置
    }
}
在生产者代码中指定分区配置
 props.put("partitioner.class", "bigdata.hlzq.com.kafka.userpart.UserPartition");
七.消费者消费安全问题
    第一次消费规则:有属性决定
    auto.offset.reset =latest | earliest |none
    latest:默认的值,从topic每个分区的最新的位置开始消费
    earliest:从最早的位置开始消费,每个分区的offset为0开始消费
    none:如果是第一次消费,这个属性为none,kafka会抛出异常,如果不是第一次消费这 不起作用第二次消费:根据上一次消费的offset位置+1继续进行消费消费者怎么知道上一次消费的位置:
    在自己内存中记录offset的值,下次直接请求上一次消费的位置;
    consumer offset:表示当前消费者组已经消费到的位置
    commit offset:表示下一次要消费的位置,=consumer offset+1只有一个消费者,消费者故障,原来内存的offset没了,消费者怎么知道上一次的消费位置:
    数据重复和丢失,offset只放在内存,内存数据丢失offset丢失。
    将offset持久化并且共享。
    kafka将每个消费者消费的commit offset主动记录在一个topic中:_ _consumer_offsets,如果下次消费者没有给定请求的offset,kafka根据自己记录的offset来提供消费的位置。

    自动提交的问题 :消费成功并处理成功提交,如果消费或者处理失败不提交手动提交topic的offset:offset是分区级别的,提交是按照topic级别的。按分区提交:
    消费级别:consumer.seek(offset)自定义数据的消费、consumer.subcribe(topic)普通的发布订阅、consumer.assign(parition)限制每个消费者消费分区消费分配策略:一个消费者组消费的过程中,一个分区的数据只能有某一个消费者消费;一个消费者可以消费多个分区的数据。
    消费策略:范围分配:rangeassignor默认的分配策略 partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
    轮询分配:2.0之前 RoundRobinAssignor
    黏性分配:StickyAssignor2.0之后
    7.1默认分配rangeassignor:每个消费者消费一定的分区,尽量实现分区均分给不同的消费者,如果不能均分,优先分配给编号小的消费者(负载不均衡)

    7.2 轮询分配:2.0之前 RoundRobinAssignor:按照topic的名称和分区编号排序,轮训分配给每个消费者
    轮训不均衡,以及上面的没有考虑消费者故障
    7.3 黏性分配:StickyAssignor2.0之后:类似于轮训分配,尽量的将分区均衡分配给消费者,如果某个消费者故障,尽量的避免网络传输
    如果消费者故障:其他的消费者重新分配所有分区
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746065.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号