流程配置参数
必填 消息体数据结构发送模式异常
可重试不可重试 拦截器分区器
流程一个正常的生产逻辑需要具备以下几个步骤:
- 配置生产者客户端参数及创建相应的生产者实例构建待发送的消息发送消息关闭生产者实例
send->interceptor->serializer(必须)->partitioner
配置参数 必填- bootstrap.servers:格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”;注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。这里必须填写序列化器的全限定名
public class ProducerRecord发送模式{ private final String topic;//必填 private final Integer partition; private final Headers headers; private final K key; private final V value;//必填 private final Long timestamp; }
发后即忘(fire-and-forget)同步(sync)异步(async)
//发后即忘 Futuresend(ProducerRecord ) //同步(伪) Future send(ProducerRecord ).get() //异步 Future send(ProducerRecord ,new Callback(){...})
Recordmetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。
异常 可重试NetworkException:网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决LeaderNotAvailableException:分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复UnknownTopicOrPartitionExceptionNotEnoughReplicasExceptionNotCoordinatorException
对于可重试的异常,如果配置了重试次数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。默认值为0。
不可重试RecordTooLargeException:发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常
拦截器可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
package org.apache.kafka.clients.producer;
//实现该接口,然后传入Properties中
//多个拦截器时用逗号分隔,拦截链会按照顺序来执行
public interface ProducerInterceptor{
//在将消息序列化和计算分区之前会调用
public ProducerRecord onSend(ProducerRecord record);
public void onAcknowledgement(Recordmetadata metadata,Exception exception);
public void close();
}
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
分区器根据key计算分区号:
如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。
在不改变主题分区数量的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。



