- 1 生产者
- 2 消费者
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerTest {
static {
//设置log级别
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
List loggerList = loggerContext.getLoggerList();
loggerList.forEach(logger -> logger.setLevel(Level.WARN));
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.130:9092");
// leader将等待所有副本同步完成后,才确认发送成功 取值:all, -1 , 1, 0
props.put(ProducerConfig.ACKS_CONFIG, "-1");
// 限制单条消息大小,限制发送请求大小10M,默认值1048576 -> 1M
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 10);
// 批量处理请求数3M, 默认值16384 -> 16k
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024 * 3);
// 批处理等待时间1s,请求没有达到batch.size,将最多等待这么长时间然后批处理发送,默认值0
// props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
// 设置失败重试次数。
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 等待请求响应的最长时间,如果超时时间内未收到响应,则会重试或者返回失败,默认值:30000 -> 30s
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
// Kafka消息的序列化方式。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// kafka安全认证相关配置,kafka没有配置认证授权可忽略
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName());
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer";");
KafkaProducer producer = new KafkaProducer<>(props);
for (int i = 0; i < 5; i++) {
Recordmetadata test = producer.send(new ProducerRecord<>("test", "{"data":"test" + i + ""}")).get();
System.out.println(test.offset() + "--------" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
producer.close();
}
}
关于生产者常用的配置参数如下:
| 属性 | 默认值 | 描述 |
|---|---|---|
| key.serializer | 默认为消息key的class | 消息key序列化使用的类,class类型 |
| value.serializer | 默认为消息value的class | 消息value序列化使用的类,class类型 |
| acks | 3.0.0 版本,默认值为all,等效于-1 3.0.0之前版本,默认值为1 | producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项: (1)acks=0:设置为0表示producer不需要等待来自服务器的任何确认。消息将立即加到buffer并认为已经发送。没有任何保障可以保证此种情况下broker已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败),每条消息回馈的offset会总是设置为-1; (2)acks=1:这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失; (3)acks=all:这意味着leader需要等待所有副本都成功写入日志,这种策略会保证只要有一个副本存活就不会丢失数据。这是最强的保证。和acks=-1效果相同。 |
| bootstrap.servers | ”“ | kafka集群的连接地址,这个列表格式: host1:port1,host2:port2,… 。这些 server 仅仅是用于初始化的连接,以发现集群所有成员关系(可能会动态的变化),这个列表不需要包含所有的 servers(你可能想要不止一个server,尽管这样,可能某个server宕机了)。如果没有 server 在这个列表出现,则发送数据会一直失败,直到列表可用。 |
| buffer.memory | 33554432 | producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。 |
| max.request.size | 1048576 | 请求的最大字节数。此设置将限制producer在单个请求中批处理发送的记录个数,以避免发送大量请求。 |
| batch.size | 16384 | producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。这项配置控制默认的批量处理消息字节数。不会试图处理大于这个字节数的消息字节数。发送到brokers的请求将包含多个批量处理,其中会包含对每个partition的一个请求。较小的批量处理数值比较少用,并且可能降低吞吐量(0则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特定批量处理数值的内存大小。 |
| linger.ms | 0 | producer把在请求传输之间到达的消息组合到一个批处理请求中。通常来说,这只有在记录产生速度大于发送速度的时候才能发生。然而,在某些条件下,客户端也可能希望减少请求的数量。这项设置将通过增加小的延迟来实现这一点,即producer不会立即发送记录,而是等待给定的延迟,从而可以将发送的记录批处理在一起,这些消息记录可以批量发送处理。这可以认为是TCP种Nagle的算法类似。这项设置设定了批量处理的更高的延迟边界:一旦我们达到batch.size,它将会立即发送而不管这项设置,但是如果我们为该分区累积的字节数少于此数量,我们将“逗留”指定的时间,等待更多记录显示。这个设置默认为0,即没有延迟。例如,设定linger.ms=5将会减少请求的发送数,但是发送的记录会增加5毫秒的延迟。 |
| retries | 0 | 重试次数,设置大于0的值,如果数据发送失败,将使客户端重新发送这条数据。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 |
| request.timeout.ms | 30000 | 配置客户端等待请求响应的最长时间。如果在超时时间过去之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败。 |
| delivery.timeout.ms | 120000 | 调用send()返回成功或失败的时间上限。这限制了记录在发送前延迟的总时间、等待代理确认的时间(如果预期)以及可重试发送失败的允许时间。如果遇到不可恢复的错误,重试次数已用尽,或者记录添加到达到较早交付到期期限的批次中,则生产者可能会报告未能在此配置之前发送记录。此配置的值应大于或等于request.timeout.ms和linger.ms之和。 |
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
public class KafkaConsumerTest {
static {
//设置log级别
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
List loggerList = loggerContext.getLoggerList();
loggerList.forEach(logger -> logger.setLevel(Level.WARN));
}
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.130:9092");
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认3s。
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 8);
//获取消息后提交偏移量的最大时间(默认5分钟),超时服务端会认为消费者失效,触发Rebalance,并且提交offset会失败
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
//是否自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//每次Poll的最大数量。注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。默认50
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
//消息的反序列化方式。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//属于同一个组的消费实例,会负载消费消息。
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
// kafka安全认证相关配置,kafka没有配置认证授权可忽略
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName());
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";");
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test"));
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.println("offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());
}
// 提交offset
consumer.commitSync();
}
}
}
关于生产者常用的配置参数如下:
| 属性 | 默认值 | 描述 |
|---|---|---|
| group.id | null | 消费者组名称,如果多个consumer设置同一个group.id,那么它们属于同一个Consumer Group。如果consumer 使用subscribe(topic)订阅或者基于偏移量管理测策略,则需要设置此值。 |
| key.deserializer | 消息key反序列化的类,class类型 | |
| value.deserializer | 消息value反序列化的类,class类型 | |
| heartbeat.interval.ms | 3000 | 消费者协调员之间的心跳间隔时间。心跳用于确保消费者会话保持活跃,并在新消费者加入或离开群组时促进重平衡rebalance。该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。它可以调整得更低,以控制正常情况下再平衡的预期时间。 |
| max.poll.interval.ms | 300000 | poll()方式获取消息后提交偏移量的最大时间(默认5分钟),超时服务端会认为消费者失效,触发Rebalance,并且提交offset会失败 |
| enable.auto.commit | true | 是否自动提交offset |
| auto.offset.reset | latest | 如果Kafka中没有初始偏移量,配置消费策略,取值如下: earliest: 从最开始的位置开始消费 latest: 消费者启用后,从最新的位置开始消费 none: 如果没有消费者组之前的偏移量,抛出异常 有疑问的同学可以参考这篇博客: Kafka auto.offset.reset值详解 |
| max.poll.records | 500 | 每次poll()返回的最大数量。consumer缓存消息记录,记录数达到此值或者最新offset返回。 |
| session.timeout.ms | 10000 (10 seconds) | worker定期向broker发送心跳信号,以表明其活跃程度。如果在此会话超时过期之前broker未收到心跳,则broker将从group中删除该worker并启动重平衡。 |



