栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

java Kafka使用

java Kafka使用

消息队列的应用场景, 异步处理, 系统解耦,日志处理
常用消息队列比较

特性				ActiveMQ	RabbitMQ	Kafka				RocketMQ
所属				Apache		Mozilla		Apache				Apache/Ali
成熟度				成熟		成熟		成熟				比较成熟
生产者-消费者模式	支持		支持		支持				支持
发布-订阅			支持		支持		支持				支持
REQUEST-REPLY		支持		支持		-					支持
API完备性			高			高			高					低(静态配置)
多语言支持			支持 		语言无关	支持 				支持
单机呑吐量			万级 		万级		十万级				十万级(最高)
消息延迟			-			微秒级		毫秒级				-
可用性				高(主从)	高(主从)	非常高(分布式)		高
消息丢失			-			低			理论上不会丢失		-
消息重复			-			可控制		理论上会有重复		-
事务				支持		不支持		支持				支持
文档的完备性		    高			高			高					中
首次部署难度		    -			低			中					高

不论成成熟度、社区、性能、可靠性,Kafka都是非常好的一款产品

集群环境搭建

http://kafka.apache.org/downloads下载地址
Kafka计划使用内嵌的KRaft替代ZooKeeper,是一个非常大的进步,因为像ES之类的分布式系统,这种集群meta信息的同步,都是自循环的,而且更快。
2.8以上版本在config目录下,多了一个叫做kraft的目录,里面包含着一套新的配置文件,可以直接摒弃对ZK的依赖。
但是kraft不完善,目前不要在线上环境开启这个功能,还是用ZK

三台kafka,三台zookeeper
tar -zxvf kafka_2.12-3.0.0.tgz
修改 server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/kafka/data
# 配置zk的三个节点
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
复制到另外服务器
scp -r srcpath ip:path
修改另外服务器的broker.id分别为1和2
如果需要远程访问修改
advertised.listeners=PLAINTEXT://ip:9194

配置KAFKA_HOME环境变量
vim /etc/profile
export KAFKA_HOME=/opt/kafka_2.12-3.0.0
export PATH=:$PATH:${KAFKA_HOME}
source /etc/profile

# 启动Kafka
nohup kafka-server-start.sh ../config/server.properties &
常用命令
 创建topic,test的主题
 kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic test
 查看目前Kafka中的主题
 kafka-topics.sh --list --bootstrap-server localhost:9092
 删除主题
 kafka-topics.sh --bootstrap-server localhost:2181 --delete --topic test
 创建消息
 kafka-console-producer.sh --broker-list localhost:9092 --topic test
 消费消息(即查看)
 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

可以使用Kafka Tools图形界面连接kafka

kafka主要概念

zookeeper:
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer),Kafka正在逐步想办法将ZooKeeper剥离,自己来管理自己

broker:
Kafka实例,一个Kafka的集群通常由多个broker组成

主题Topic:
主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
Kafka中的主题必须要有标识符,而且是唯一的,没有数量上的限制

分区Partition:
主题被分为多个分区

producer:
生产者负责将数据推送给broker的topic
生产者分区写入策略
1.轮询分区策略
2.随机分区策略
3.按key分区分配策略
4.自定义分区策略
实现 my implements Partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, my.class.getName());

consumer:
消费者负责从broker的topic中拉取数据

consumer group:
一个消费者组可以包含多个消费者
一个消费者组有一个唯一的ID
组内的消费者一起消费主题的数据(即一个数据只能被同一个消费者组中一个消费者消费一次)
消费者组Rebalance机制:
是Kafka中确保Consumer group下所有的consumer如何分配订阅topic每个分区的机制
消费者客户端参数partition.asssignment.strategy可以配置多个分配策略
触发的时机有:
1.消费者组中consumer的个数发生变化,
2.订阅的topic个数发生变化
3.订阅的topic分区数发生变化
发生Rebalance时会对consumer group产生非常严重的影响,Rebalance的过程中所有的消费者都将停止工作,直到Rebalance完成。
消费者分区分配策略:
Range范围分配策略是Kafka默认的分配策略,将分区数平均分给消费者(如1,2给消费者1和3,4给消费者2)
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor
RoundRobin轮询策略,轮询方式逐个将分区分配给每个消费者
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor
Stricky粘性分配策略
1.分区分配尽可能均匀
2.在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同
org.apache.kafka.clients.consumer.strategyStickyAssignor
副本Replicas:
副本可以确保某个服务器出现故障时,确保数据依然可用

偏移量offset:
offset记录着下一条将要发送给Consumer的消息的序号
默认Kafka将offset存储在ZooKeeper中
在一个分区中,消息是按顺序存储,每次在分区消费都有一个递增的id。这个就是偏移量offset
auto.offset.reset参数指定了在没有偏移量可提交时或者请求的偏移量在broker上不存在时,消费者如何读取
earliest:消费者会从分区的开始位置读取数据
latest:消费者会从分区的末尾开始读取数据
enable.auto.commit参数true,false让消费者基于任务调度自动提交偏移量,也可以在代码里手动提交偏移量
auto.commit.interval.ms:此参数与enable.auto.commit有直接的联系,如果选择了自动提交偏移量,可以通过此参数配置提交的频度,默认值是每5秒钟提交一次

producer的ACKs参数
acks参数指定了在集群中有多少个分区副本收到消息,kafka producer才会认为消息是被写入成功。
有三种值可以设置,分别是0,1,和all.
acks=0是kafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在Partition Leader上落到磁盘,就直接认为这个消息发送成功
acks=1是Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管其他的Follower有没有同步过去这条消息了
acks=all是Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了

Kafka-Eagle简介

是一款结合了目前Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具
官网地址:https://www.kafka-eagle.org/

在启动Kafka的脚本前,添加export JMX_PORT=9988

上传解压,配置环境变量vim /etc/profile
export KE_HOME=/opt/kafkaeagle-bin-2.0.9
export PATH=$PATH:$KE_HOME/bin
配置vim conf/system-config.properties
#修改第4行,配置kafka集群别名
kafka.eagle.zk.cluster.alias=cluster1
#修改第5行,配置ZK集群地址
cluster1.zk.list=localhost:2181
#修改第64行,打开图标统计
efak.metrics.charts=true
efak.metrics.retain=15
#修改第122行,开启mysql

启动
ke.sh start
java操作

    org.apache.kafka
    kafka_2.13
    3.0.0

版本最好和kafka对应
package w;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.Future;

public class kafka {

    public static void main(String[] args) throws Exception {
        //product( );
       consumer();
    }
    
    public static void product( ) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.200.128:9092");//多个逗号隔开
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        //ProducerConfig.RETRIES_CONFIG重试次数
        KafkaProducer kafkaProducer = new KafkaProducer
                (props);
        for (int i = 0; i < 1; i++) {
            Future test = kafkaProducer.send(new ProducerRecord("test", "first" + i));
            System.out.println(test.get());
            System.out.println("send "+i+" ok");
        }
        kafkaProducer.close();
    }

    
    public static void consumer( ) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.200.128:9092");
        props.put("group.id", "test");
        //消费者自动提交offset值
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms",  "1000");
        props.put("auto.offset.reset",  "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer kafkaConsumer = new KafkaConsumer
                (props);
        //订阅下要消费的topic
        kafkaConsumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord record : consumerRecords) {
                System.out.println("消费的数据为:" + record.value());
            }
        }

        // 手动提交offset值
        //props.put("enable.auto.commit", "false");
        //将所有已接收的记录标记为已提交kafkaConsumer.commitSync();
        //消费完每个分区之后手动提交每个分区offset


        //指定分区数据进行消费,主题与分区订阅只能二选一,当手动管理消费分区时,即使GroupID是一样的,Kafka的组协调器都将不再起作用

    }

}


class KafkaCustomPartitioner implements Partitioner {
    @Override
    public void configure(Map configs) {
    }

    @Override
    public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int partitionNum = partitions.size();
        Random random = new Random();
        int partition = random.nextInt(partitionNum);
        return partition;
    }

    @Override
    public void close() {
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710333.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号