消息队列的应用场景, 异步处理, 系统解耦,日志处理
常用消息队列比较
特性 ActiveMQ RabbitMQ Kafka RocketMQ 所属 Apache Mozilla Apache Apache/Ali 成熟度 成熟 成熟 成熟 比较成熟 生产者-消费者模式 支持 支持 支持 支持 发布-订阅 支持 支持 支持 支持 REQUEST-REPLY 支持 支持 - 支持 API完备性 高 高 高 低(静态配置) 多语言支持 支持 语言无关 支持 支持 单机呑吐量 万级 万级 十万级 十万级(最高) 消息延迟 - 微秒级 毫秒级 - 可用性 高(主从) 高(主从) 非常高(分布式) 高 消息丢失 - 低 理论上不会丢失 - 消息重复 - 可控制 理论上会有重复 - 事务 支持 不支持 支持 支持 文档的完备性 高 高 高 中 首次部署难度 - 低 中 高
不论成成熟度、社区、性能、可靠性,Kafka都是非常好的一款产品
集群环境搭建http://kafka.apache.org/downloads下载地址
Kafka计划使用内嵌的KRaft替代ZooKeeper,是一个非常大的进步,因为像ES之类的分布式系统,这种集群meta信息的同步,都是自循环的,而且更快。
2.8以上版本在config目录下,多了一个叫做kraft的目录,里面包含着一套新的配置文件,可以直接摒弃对ZK的依赖。
但是kraft不完善,目前不要在线上环境开启这个功能,还是用ZK
三台kafka,三台zookeeper
tar -zxvf kafka_2.12-3.0.0.tgz
修改 server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/kafka/data
# 配置zk的三个节点
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
复制到另外服务器
scp -r srcpath ip:path
修改另外服务器的broker.id分别为1和2
如果需要远程访问修改
advertised.listeners=PLAINTEXT://ip:9194
配置KAFKA_HOME环境变量
vim /etc/profile
export KAFKA_HOME=/opt/kafka_2.12-3.0.0
export PATH=:$PATH:${KAFKA_HOME}
source /etc/profile
# 启动Kafka
nohup kafka-server-start.sh ../config/server.properties &
常用命令
创建topic,test的主题
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic test
查看目前Kafka中的主题
kafka-topics.sh --list --bootstrap-server localhost:9092
删除主题
kafka-topics.sh --bootstrap-server localhost:2181 --delete --topic test
创建消息
kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费消息(即查看)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
可以使用Kafka Tools图形界面连接kafka
kafka主要概念zookeeper:
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer),Kafka正在逐步想办法将ZooKeeper剥离,自己来管理自己
broker:
Kafka实例,一个Kafka的集群通常由多个broker组成
主题Topic:
主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
Kafka中的主题必须要有标识符,而且是唯一的,没有数量上的限制
分区Partition:
主题被分为多个分区
producer:
生产者负责将数据推送给broker的topic
生产者分区写入策略
1.轮询分区策略
2.随机分区策略
3.按key分区分配策略
4.自定义分区策略
实现 my implements Partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, my.class.getName());
consumer:
消费者负责从broker的topic中拉取数据
consumer group:
一个消费者组可以包含多个消费者
一个消费者组有一个唯一的ID
组内的消费者一起消费主题的数据(即一个数据只能被同一个消费者组中一个消费者消费一次)
消费者组Rebalance机制:
是Kafka中确保Consumer group下所有的consumer如何分配订阅topic每个分区的机制
消费者客户端参数partition.asssignment.strategy可以配置多个分配策略
触发的时机有:
1.消费者组中consumer的个数发生变化,
2.订阅的topic个数发生变化
3.订阅的topic分区数发生变化
发生Rebalance时会对consumer group产生非常严重的影响,Rebalance的过程中所有的消费者都将停止工作,直到Rebalance完成。
消费者分区分配策略:
Range范围分配策略是Kafka默认的分配策略,将分区数平均分给消费者(如1,2给消费者1和3,4给消费者2)
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor
RoundRobin轮询策略,轮询方式逐个将分区分配给每个消费者
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor
Stricky粘性分配策略
1.分区分配尽可能均匀
2.在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同
org.apache.kafka.clients.consumer.strategyStickyAssignor
副本Replicas:
副本可以确保某个服务器出现故障时,确保数据依然可用
偏移量offset:
offset记录着下一条将要发送给Consumer的消息的序号
默认Kafka将offset存储在ZooKeeper中
在一个分区中,消息是按顺序存储,每次在分区消费都有一个递增的id。这个就是偏移量offset
auto.offset.reset参数指定了在没有偏移量可提交时或者请求的偏移量在broker上不存在时,消费者如何读取
earliest:消费者会从分区的开始位置读取数据
latest:消费者会从分区的末尾开始读取数据
enable.auto.commit参数true,false让消费者基于任务调度自动提交偏移量,也可以在代码里手动提交偏移量
auto.commit.interval.ms:此参数与enable.auto.commit有直接的联系,如果选择了自动提交偏移量,可以通过此参数配置提交的频度,默认值是每5秒钟提交一次
producer的ACKs参数
acks参数指定了在集群中有多少个分区副本收到消息,kafka producer才会认为消息是被写入成功。
有三种值可以设置,分别是0,1,和all.
acks=0是kafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在Partition Leader上落到磁盘,就直接认为这个消息发送成功
acks=1是Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管其他的Follower有没有同步过去这条消息了
acks=all是Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了
是一款结合了目前Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具
官网地址:https://www.kafka-eagle.org/
在启动Kafka的脚本前,添加export JMX_PORT=9988 上传解压,配置环境变量vim /etc/profile export KE_HOME=/opt/kafkaeagle-bin-2.0.9 export PATH=$PATH:$KE_HOME/bin 配置vim conf/system-config.properties #修改第4行,配置kafka集群别名 kafka.eagle.zk.cluster.alias=cluster1 #修改第5行,配置ZK集群地址 cluster1.zk.list=localhost:2181 #修改第64行,打开图标统计 efak.metrics.charts=true efak.metrics.retain=15 #修改第122行,开启mysql 启动 ke.sh startjava操作
版本最好和kafka对应 package w; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.time.Duration; import java.util.*; import java.util.concurrent.Future; public class kafka { public static void main(String[] args) throws Exception { //product( ); consumer(); } public static void product( ) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.200.128:9092");//多个逗号隔开 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //ProducerConfig.RETRIES_CONFIG重试次数 KafkaProducer org.apache.kafka kafka_2.13 3.0.0 kafkaProducer = new KafkaProducer (props); for (int i = 0; i < 1; i++) { Future test = kafkaProducer.send(new ProducerRecord ("test", "first" + i)); System.out.println(test.get()); System.out.println("send "+i+" ok"); } kafkaProducer.close(); } public static void consumer( ) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.200.128:9092"); props.put("group.id", "test"); //消费者自动提交offset值 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer kafkaConsumer = new KafkaConsumer (props); //订阅下要消费的topic kafkaConsumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : consumerRecords) { System.out.println("消费的数据为:" + record.value()); } } // 手动提交offset值 //props.put("enable.auto.commit", "false"); //将所有已接收的记录标记为已提交kafkaConsumer.commitSync(); //消费完每个分区之后手动提交每个分区offset //指定分区数据进行消费,主题与分区订阅只能二选一,当手动管理消费分区时,即使GroupID是一样的,Kafka的组协调器都将不再起作用 } } class KafkaCustomPartitioner implements Partitioner { @Override public void configure(Map configs) { } @Override public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int partitionNum = partitions.size(); Random random = new Random(); int partition = random.nextInt(partitionNum); return partition; } @Override public void close() { } }



