2. 生产者发送消息的基本实现org.apache.kafka kafka-clients 2.4.1
//消息的发送方
public class MyProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer producer = new KafkaProducer(props);
Order order = new Order((long) i, i);
ProducerRecord producerRecord = new ProducerRecord(TOPIC_NAME
, order.getOrderId().toString(), JSON.toJSONString(order));
//等待消息发送成功的同步阻塞方法
Recordmetadata metadata = producer.send(producerRecord).get();
//=====阻塞=======
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
3. 发送消息到指定分区上
ProducerRecord4. 未指定分区,则会通过业务key的hash运算,算出消息往哪个分区上发producerRecord = new ProducerRecord (TOPIC_NAME , 0, order.getOrderId().toString(), JSON.toJSONString(order));
//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum ProducerRecord5. 同步发送producerRecord = new ProducerRecord (TOPIC_NAME , order.getOrderId().toString(), JSON.toJSONString(order));
//等待消息发送成功的同步阻塞方法
Recordmetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
6. 异步发送消息
直接执行下面的业务逻辑。可以提供callback,让broker异步的调用callback,告知生产者,消息发送的结果
//要发送5条消息
Order order = new Order((long) i, i);
//指定发送分区
ProducerRecord producerRecord = new ProducerRecord(TOPIC_NAME
, 0, order.getOrderId().toString(), JSON.toJSONString(order));
//异步回调方式发送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(Recordmetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送消息失败:" + exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
8. 其他一些细节
二、消费者 1. 消费者消费消息的基本实现发送会默认会重试3次,每次间隔100ms
发送的消息会先进入到本地缓冲区(32mb),kakfa会跑一个线程,该线程去缓冲区中取16k的数据,发送到kafka,如果到10毫秒数据没取满16k,也会发送一次。
public class MyConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//创建一个消费者的客户端
KafkaConsumer consumer = new KafkaConsumer(props);
// 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
2. 自动提交offset
自动: 自动提交会丢消息: 因为如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息。之前未被消费的消息就丢失掉了。消费者poll到消息后默认情况下,会自动向broker的_consumer_offsets主题提交当前主题-分区消费的偏移量。
// 是否自动提交offset,默认就是true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交offset的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");3. 手动提交offset
设置手动提交参数在消费完消息后进行手动提交 1. 手动同步提交
if (records.count() > 0) {
// 手动同步提交offset,当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync();
}
2. 手动异步提交
if (records.count() > 0) {
// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
}
});
}
4. 消费者poll消息的过程
- 消费者建立了与broker之间的长连接,开始poll消息
默认一次poll500条消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
- 可以根据消费速度的快慢来设置,因为如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。可以通过这个值进行设置:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
- 如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次长轮询结束。
ConsumerRecordsrecords = consumer.poll(Duration.ofMillis(1000));
- 消费者发送心跳的时间间隔
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
- kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);5. 指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));6. 消息回溯消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));7. 指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);8. 从指定时间点消费
List9. 新消费组的消费偏移量topicPartitions = consumer.partitionsFor(TOPIC_NAME); //从1小时前开始消费 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; Map map = new HashMap<>(); for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime); } Map parMap = consumer.offsetsForTimes(map); for (Map.Entry entry : parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) continue; Long offset = value.offset(); System.out.println("partition-" + key.partition() + "|offset-" + offset); System.out.println(); //根据消费里的timestamp确定offset if (value != null) { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } }
- 当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费?latest(默认) :只消费自己启动之后发送到主题的消息earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");二、Springboot中使用Kafka 1. 引入依赖
2. 配置文件org.springframework.kafka spring-kafka
server:
port: 8080
spring:
kafka:
bootstrap-servers: 172.16.253.21:9093
producer: # 生产者
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
redis:
host: 172.16.253.21
3. 消息生产者
发送消息到指定topic
@RestController
public class KafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("/send")
public void send() {
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
}
}
4. 消息消费者
- 设置消费组,消费指定topic
@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1") public void listenGroup(ConsumerRecordrecord, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动提交offset ack.acknowledge(); }
- 设置消费组、多topic、指定分区、指定偏移量消费及设置消费者个数。
@KafkaListener(groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
},concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议小于等于分区总数
public void listenGroup(ConsumerRecord record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手动提交offset
ack.acknowledge();
}
四、Kafka集群Controller、Rebalance和HW
1. Controller
Kafka集群中的broker在zk中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的controller,负责管理整个集群中的所有分区和副本的状态:
- 当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本当检测到某个分区的ISR集合发⽣变化时,由控制器负责通知所有broker更新其元数据信息当使⽤kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。
前提是:消费者没有指明分区消费。当消费组里消费者和分区的关系发生变化,那么就会触发rebalance机制。这个机制会重新调整消费者消费哪个分区。在触发rebalance机制之前,消费者消费哪个分区有三种策略:
- range:通过公式来计算某个消费者消费哪个分区
- 轮询:大家轮着消费
- ticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整。
- HW俗称⾼⽔位,HighWatermark的缩写,取⼀个partition对应的ISR中最⼩的LEO(log-end-offset)作为HW(木桶效应,最短板),consumer最多只能消费到HW所在的位置每个replica都有HW,leader和follower各⾃负责更新⾃⼰的HW的状态对于leader新写⼊的消息,consumer不能⽴刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。
- 发送方:ack设置成all或者-1,把min.insync.replicas配置成分区备份数消费方:把自动提交改成手动提交(消费完成之后在手动提交)
保证消费消息的幂等性:
- mysql中加业务id,并设置为主键,唯一使用redis或者zk分布式锁
- 发送方:确保消息是按顺序发送的,ack不能设置成0,关闭重试,使用同步发送,等待成功发送,再发送下一条。消费方:消息是发送到一个分区中,只能有一个消费组的消费者来接收消息,因此会牺牲性能
消息积压会导致很多问题,比如磁盘被打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,就需要有相应的手段:
- 提升一个消费者的消费能力:在一个消费者中启动多个线程,让多个线程同时消费充分利用个服务器的CPU资源:若方案一不够,则启动多个消费者,部署在不同的服务器上让一个消费者去把收到的消息发往另一个topic上,另一个topic设置多个分区和多个消费者,进行具体的业务消费
延迟队列是为了解决任务推迟执行的问题,消息进入延迟队列之后暂时不能被消费,等超过了设定的时间才能被消费者进行消费
延迟队列的应用场景:在订单创建成功后如果超过30分钟没有付款,则需要取消订单,此时可用延时队列来实现
- 创建多个topic,每个topic表示延时的间隔
topic_5s: 延时5s执行的队列topic_1m: 延时1分钟执行的队列topic_30m: 延时30分钟执行的队列 消息生产者,发送消息到相应的topic,并带上消息的发送时间消费者订阅相应的topic,消费时轮询消费整个topic中的消息
消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟(前提是订单没支付)如果是:去数据库中修改订单状态为已取消如果否︰记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次向kafka拉取该offset及之后的消息,继续进行判断,以此反复。
官网下载压缩包
http://www.kafka-eagle.org/
安装jdk,配置环境变量,也注意把KE_HOME也配好解压缩后修改配置文件 system-config.properties
# 配置zk cluster1.zk.list=172.16.253.35:2181 # 配置mysql kafka.eagle.driver=com.mysql.cj.jdbc.Driver kafka.eagle.url=jdbc:mysql://172.16.253.22:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=123456
进入到bin目录,为ke.sh增加可执行的权限
chmod +x ke.sh
启动kafka-eagle
./ke.sh start



