引入依赖:
org.apache.kafka kafka-clients 2.8.0
创建一个Bean,这里的配置后面会解析。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
@Slf4j
@Configuration
public class KafkaConfig {
@Bean
public KafkaProducer kafkaProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.138:9092");
// 消息确认机制配置
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
// 多长时间发送一个批次
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
// 缓冲
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(properties);
}
}
发生消息:
@RestController
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping("/send")
public String sendMsg() throws InterruptedException {
for (int i = 0; i < 100; i++) {
ProducerRecord record = new ProducerRecord<>("test-topic", "这是第" + i + "条消息");
kafkaProducer.send(record);
Thread.sleep(200);
}
return "success";
}
}
开启主题的监听:
# 单节点监听 bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9092 --topic test-topic --from-beginning # 集群节点监听 bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9091,192.168.31.138:9092,192.168.31.138:9093 --topic test-topic --from-beginning二. 消息发送模式
从第一部分可以知道,发送消息就是将消息和Topic封装一个ProducerRecord对象,然后通过KafkaProducer的send方法发送。
这里有两种种方法,这些方法都是通过Future封装返回的,是可以拿到返回值的,其实都是异步执行的,第二个可以执行异步回调。
ProducerRecord同步发送:record = new ProducerRecord<>("test-topic", "hello world"); kafkaProducer.send(record);
因为这里使用Future做返回,所以可以通过get()方法阻塞,相当于同步。
ProducerRecord异步回调发送:record = new ProducerRecord<>("test-topic", "hello world"); Future future = kafkaProducer.send(record); Recordmetadata metadata = future.get();
回调就是在callback中执行业务
ProducerRecord三. 生产者构建流程record = new ProducerRecord<>("test-topic", "hello world"); kafkaProducer.send(record, (metadata, exception) -> { // TODO });
这里简单看一下KafkaProducer的初始化过程,从构造方法开始:
接着获取Client.id,若没有配置则由是producer-递增的数字
this.producerConfig = config; this.time = time; // 获取事物ID String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); // 获取用户配置的client.id this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
设置生产者监控:
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
设置分区器paritioner,这里可以设置自己的分区器,需要增加:partitioner.class,将自定义的分区器路径写入。
设置key和value的序列化器:
解析并实例化拦截器,这里可以自定义实现拦截器,过滤特定的消息。
构建RecordAccumulator,这个类是用来存放消息
到RecordAccumulator这个类里面看一下属性batches:
private final ConcurrentMap> batches;
它是一个 ConcurrentMap,key 是 TopicPartition 类,代表一个 topic 的一个 partition。value 是一个包含 ProducerBatch 的双端队列。等待 Sender 线程发送给 broker。
创建守护线程sender,用来监听发送消息:
在KafkaThread的可以看出这个线程是守护线程:
这里需要注意,KafkaProducer的主线程是用来往RecordAccumulator里面写消息,Sender守护线程是用来读取消息并发送到Kafka中的。
四. 消息发送大体流程上面的例子演示了消息的发送,这里简单看一下消息发送的大体流程:
先从send函数开始分析
首先方法会先进入拦截器集合ProducerInterceptors ,onSend方法是遍历拦截器onSend方法,拦截器的目的是将数据处理加工,Kafka本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现接口ProducerInterceptor并实现onSend方法。
接下来看一下doSend方法,首先先判断守护线程sender是否可用,接着判断要发送到topic的metadata是否可用:
序列化key和value
计算record对应的partition的值,这里如果定义了分区器会使用自定义,也可以指定算法获取该值。下面会介绍具体分区器实现
int partition = partition(record, serializedKey, serializedValue, cluster);
接着像accumulator中追加数据:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
如果是新批次需要重新指定分区在追加数据:
缓冲满了之后,唤醒sender线程发送消息:
到这儿只是消息发送大体的流程,内部还有一大堆的代码,功力不够看着有点上头,等有时间在看吧!
消息在网络上传输,必须进行序列化转化为字节流。Kafka提供了默认的字符串序列化器StringSerializer,除此之外还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long等。
在org.apache.kafka.common.serialization包下面可以看到默认实现的序列化器!
接下来看一下借助fastjson实现自定义序列化器:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserInfo {
private String name;
private Integer age;
}
// 序列化器
public class CustomizeSerializer implements Serializer {
@Override
public byte[] serialize(String topic, UserInfo data) {
return JSON.toJSONBytes(data);
}
}
// 反序列化器
public class CustomizeDeserializer implements Deserializer {
@Override
public UserInfo deserialize(String topic, byte[] data) {
return JSON.parseObject(data, UserInfo.class);
}
}
配置使用:
// 序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomizeSerializer.class); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomizeDeserializer.class);
使用:
@Slf4j
@RestController
public class CustomizeController {
@Autowired
private KafkaProducer kafkaProducer;
private static final String TOPIC_NAME = "long-topic";
@GetMapping("c_send")
public String send(@RequestParam(value = "size", defaultValue = "20") Integer size) {
for (int i = 0; i < size; i++) {
ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, 0, "user:" + i, new UserInfo("这是第" + i + "条消息", i));
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}
}
六. 拦截器
拦截器的使用场景:
按照某个规则过滤掉不符合要求的消息修改消息内容发送消相关统计
自定义拦截器,主要是实现ProducerInterceptor接口,这个接口中四个方法:
onSend:该方法封装进KafkaProducer.send方法中,即它运行在用户的主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。注意:用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算onAcknowledgement:该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。注意:onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率close:关闭interceptor,主要用于执行一些资源清理的工作configure:获取配置信息和初始化数据时调用
实例:
public class CustomizeProducer implements ProducerInterceptor{ @Override public ProducerRecord onSend(ProducerRecord record) { UserInfo value = record.value(); return value.getAge() % 2 == 0 ? new ProducerRecord<>( record.topic(), record.key(), new UserInfo(value.getName(), value.getAge() + 100)) : new ProducerRecord<>( record.topic(), record.key(), new UserInfo(value.getName(), value.getAge() + 60)); } @Override public void onAcknowledgement(Recordmetadata metadata, Exception exception) {} @Override public void close() {} @Override public void configure(Map configs) {} }
使用自定义拦截器:
注意:ProducerConfig.INTERCEPTOR_CLASSES_CONFIG对应的值是一个拦截器数组。
发送消息结果数据实体Recordmetadata,返回数据包好分区器、偏移量和主题。
测试发动100条消息数据:
@GetMapping("/send")
public String sendMsg(@RequestParam(value = "size", defaultValue = "100") Integer size) throws InterruptedException {
for (int i = 0; i < size; i++) {
ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, "这是第" + i + "条消息");
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}
如果没有指定分区器系统会使用默认的默认的分区器:DefaultPartitioner(在ProducerConfig中有定义)
关于计算分区器的值有下面这几种情况:
指明partition的情况下,直接将指明的值作为partition的值
@GetMapping("/send")
public String sendMsg(@RequestParam(value = "size", defaultValue = "20") Integer size) throws InterruptedException {
for (int i = 0; i < size; i++) {
// 设置0分区发送数据
ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, 0, "key:" + i, "这是第" + i + "条消息");
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}
没有指明partition值但有key的情况下,将key和hash值与topic的partition数进行取余得到partition的值
Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
partition和key都不存在的情况下,通过 stickyPartitionCache 的 partition 方法计算出分区。
7.1. 默认分区器系统里面支持如下几种:
DefaultPartitioner:默认分区策略,如果key也不存在,则会对可用分区进行轮询,如果没有指定分区,且存在key值,则会根据key的hash进行取模来选择分区。(需要注意这里和以前的版本实现不一样)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
这里看看一下StickyPartitionCache,这里面的具体实现:
private final ConcurrentMapindexCache; public StickyPartitionCache() { this.indexCache = new ConcurrentHashMap<>(); } public int partition(String topic, Cluster cluster) { Integer part = indexCache.get(topic); if (part == null) { return nextPartition(topic, cluster, -1); } return part; }
indexCache 是一个 ConcurrentHashMap 对象,对应的是 Topic -> Partition 的映射,如果该值不存在则调用 nextPartition 方法选择一个分区并缓存。
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// 但是分区是空或者与prevPartition相同
if (oldPart == null || oldPart == prevPartition) {
// 获取主题对应partition
List availablePartitions = cluster.availablePartitionsForTopic(topic);
// partition数量小于1
if (availablePartitions.size() < 1) {
// 获取一个线程安全的随机数
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
// 取余返回新的partition
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
// partition数量等于1,返回对应的parition
newPart = availablePartitions.get(0).partition();
} else {
// partition数量大于1
while (newPart == null || newPart.equals(oldPart)) {
// 生成的新partition不为null,并且与当前的partion不相同
int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// 如果当前的partition是null,将partition缓冲更新
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
// 否则替换旧的partition
indexCache.replace(topic, prevPartition, newPart);
}
// 最后返回主题对应的partition
return indexCache.get(topic);
}
// 最后返回主题对应的partition
return indexCache.get(topic);
}
这个过程其实比较简单的,这里其实也是粘性分区策略实现方式
UniformStickyPartitioner:粘性分区策略
参看DefaultPartitioner
RoundRobinPartitioner:轮询策略
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题对应的partition
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size(); // 数量
int nextValue = nextValue(topic); // 主题对应的新值
// 获取主题对应的可用的partition
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
// 计算新的partition值
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 没有可用的分区,给一个不可用的分区
return Utils.toPositive(nextValue) % numPartitions;
}
}
// 将主题对应的值+1
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
7.2. 自定义分区器
自定义CustomizePartition类,并且实现Partitioner接口,重写partition和close方法
public class CustomizePartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
@Override
public void close() {}
@Override
public void configure(Map configs) {}
}
使用的时候只需要在配置文件中指定一下就可以:
acks:发送应答,默认值是1。batch.size:批量发送大小,默认是16384(16k)bootstrap.servers:服务器地址,多个服务器地址用逗号分割开buffer.memory:生产者最大可用的缓冲,默认是33554432(32M)client.id:生产者ID,默认是“”compression.type:压缩类型,默认是producer(未压缩),还有其他的配置类型:gzip(压缩率高,适合高内内存和CPU)/snappy(适合带宽敏感,压缩力度大)/lz4/sztdretries:失败重试次数,默认整型的最大值(2147483647)retry.backoff.ms:重试阻塞时间delivery.timeout.ms:传输时间connections.max.idle.ms:关闭空闲连接时间,默认是540000enable.idempotence:开启幂等,默认是false,max.in.flight.request.per.connection:单个连接上发送的未确认请求的最大连接数,默认是5interceptor.classes:拦截器,默认是无拦截器key.seriailzer:key的序列化器,默认是无value.seriailzer:value的序列化器,默认是无linger.ms:发送延迟时间,默认是0max.block.ms:阻塞时间,默认是一分钟(60000)max.request.size:最大请求字节大小,默认是1M(1048576)metric.reporters:自定义指标报告器partitioner.class:自定义分区器request.timeout.bytes:请求超时时间,默认30000receive.buffer.bytes:读取数据时使用TCP接收缓冲区(SO_RCVBUF)的大小,默认值32k(32768)。如果值是-1,将使用OS默认值send.buffer.bytes:发送数据时使用的TCP发送缓冲去(SO_SEDBUF)的大小,默认是是128k(131072)。如果值是-1,将使用OS的默认值



