无论使用Kafka作为队列,消息总线,还是数据存储平台,总是需要通过创建一个生产者写入数据到Kafka,一个消费者从Kafka读取数据,或一个应用程序承担生产者和消费者两个角色。
例如,在信用卡交易处理系统中,会有一个客户端应用程序,可能是一个在线商店,负责在付款完成后立即将每笔交易发送给Kafka。 另一个应用程序负责根据规则引擎立即检查该事务,并确定该事务是被批准还是被拒绝。 然后,批准/拒绝响应可以写回Kafka,并且响应可以传播回交易发起的在线商店。 第三个应用程序可以从Kafka读取事务和审批状态,并将它们存储在一个数据库中,分析师可以稍后审查决策,也许还可以改进规则引擎。
Apache Kafka提供了内置的客户端api,开发者可以在开发与Kafka交互的应用程序时使用。
在本章中,我们将学习如何使用Kafka生产者,从概述其设计和组件开始。 我们将展示如何创建KafkaProducer和ProducerRecord对象,如何向Kafka发送消息,以及如何处理Kafka可能返回的错误。 然后,我们将回顾用于控制生产者行为的最重要的配置选项。 最后,我们将深入研究如何使用不同的分区方法和序列化器,以及如何编写自己的序列化器和分区器。
在第四章中,我们将学习Kafka的消费者客户端和从Kafka读取数据。
3.1 生产者(Producer)概述第三方客户端
除了内置的客户端,Kafka还有一个二进制有线协议(binary wire protocol)。 这意味着应用程序可以简单地通过发送正确的字节序列到Kafka的网络端口来从Kafka读取消息或向Kafka写入消息。 有多种客户端在不同的编程语言中实现Kafka的有线协议,提供了简单的方法,不仅在Java应用程序中使用Kafka,还在c++、Python、Go等语言中使用。 这些客户端不是Apache Kafka项目的一部分,但是在项目wiki中维护了一个非java客户端列表。 有线协议和外部客户端不在本章的讨论范围之内。
应用程序可能需要向Kafka写入消息的原因有很多:记录用户活动以进行审计或分析,记录指标,存储日志消息,记录智能设备的信息,与其他应用程序异步通信,在写入数据库之前缓冲信息,等等。
这些不同的用例也暗示了不同的需求:每个消息都是关键的吗?或者我们能容忍消息的丢失吗? 我们对意外重复的消息没有问题吗? 我们是否需要支持严格的延迟或吞吐量要求?
在前面介绍的信用卡事务处理示例中,我们可以看到,永远不要丢失任何消息或重复任何消息是至关重要的。 延迟应该很低,但是可以容忍超过500毫秒的延迟,吞吐量应该非常高——我们期望每秒处理100万条消息。
另一个用例可能是存储网站的点击信息。 在这种情况下,可以容忍一些消息丢失或重复; 只要不影响用户体验,延迟可以很高。 换句话说,我们不介意消息花几秒钟到达Kafka,只要下一个页面在用户点击链接后立即加载。 吞吐量将取决于网站访问量。
不同的需求会影响你使用生产者API向Kafka写消息的方式和使用的配置。
虽然生产者API非常简单,但当我们发送数据时,在生产者内部还有更多内容。 向Kafka发送数据的主要步骤如图3-1所示。
图3-1 Kafka生产者组件的概述
我们通过创建一个ProducerRecord开始向Kafka生产消息,消息必须包含我们想要发送记录的主题和一个值。 还可以指定一个键、一个分区、一个时间戳和/或一组头文件。 一旦发送了ProducerRecord,生产者要做的第一件事就是将键和值对象序列化为字节数组,以便通过网络发送它们。
接下来,如果我们没有显式地指定分区,数据将被发送到分区器。 分区程序将为我们选择一个分区,通常基于ProducerRecord键。 一旦选择了分区,生产者就知道该记录将发送到哪个主题和分区。 然后将记录添加到一批记录中,这些记录也将被发送到相同的主题和分区。 一个单独的线程负责将这些批记录发送到相应的Kafka broker。
当 broker接收到消息时,它将返回一个响应。 如果消息成功写入到Kafka,它将返回一个Recordmetadata对象,其中包含主题、分区和分区中记录的偏移量。 如果 broker未能写入消息,它将返回一个错误。 当生产者收到错误时,可能会在放弃并返回错误之前重试几次发送消息。
3.2 构建一个kafka Producer向Kafka写入消息的第一步是创建一个生产者对象,其中包含你想要传递给生产者的属性。Kafka producer有三个强制属性:
bootstrap.servers
broker的host:port列表,生产者将使用它们建立到Kafka集群的初始连接。 这个列表不需要包含所有的 broker,因为生产者将在初始连接之后获得更多信息。 但是建议至少包含两个 broker,这样在一个 broker宕机的情况下,生产者仍然能够连接到集群。
key.serializer
一个类的名字,用于序列化我们将生产到Kafka的记录的键。 Kafka broker期望字节数组作为消息的键和值。 但是,生产者接口允许(使用参数化类型)将任何Java对象作为键和值发送。 这使得代码非常易读,但这也意味着生产者必须知道如何将这些对象转换为字节数组。 key.serializer应该被设置为实现org.apache.kafka.common.serialization.Serializer接口的类名。 生产者将使用这个类将key对象序列化为字节数组。 Kafka客户端包包括ByteArraySerializer(它并没有做很多事情),String Serializer, IntegerSerializer,以及更多序列化器,所以如果你使用普通类型,没有必要实现自己的序列化器。设置key.serializer是必需的,即使你打算只发送值,但你可以使用Void类型的键和VoidSerializer。
value.serializer
一个类的名字,用来序列化我们将生产到Kafka的记录的值。将key. serializer设置为将消息键对象序列化为字节数组的类的名称,同样地,将value. serializer设置为将消息值对象序列化的类。
下面的代码片段展示了如何通过设置强制参数和使用默认值来创建一个新的生产者:
Properties kafkaProps = new Properties(); //我们从一个Properties对象开始。
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
//因为我们计划使用字符串作为消息键和值,所以我们使用内置的StringSerializer。
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//在这里,我们通过设置适当的键和值类型并传递Properties对象来创建一个新的生产者。
producer = new KafkaProducer(kafkaProps);
这样一个简单的接口,很明显,对生产者行为的大部分控制都是通过设置正确的配置属性来完成的。 Apache Kafka文档涵盖了所有的配置选项,我们将在本章的后面讨论重要的选项。
实例化生产者之后,就可以开始发送消息了。 有三种主要的发送消息的方法:
“即发即弃” Fire-and-forget
我们向服务器发送消息,并不真正关心它是否成功到达。 大多数时候,它会成功到达,因为Kafka是高可用的,并且生产者会重试自动发送消息。 然而,在不可检索的错误或超时的情况下,消息将会丢失,应用程序将不会得到任何关于此的信息或异常。
同步发送 Synchronous send
从技术上讲,Kafka的生产者总是异步的——我们发送一个消息,然后send()方法返回一个Future对象。 但是,我们使用get()来等待Future,并在发送下一条记录之前查看send()是否成功。
异步发送 Asynchronous send
我们用一个回调函数调用send()方法,当它收到来自Kafka broker的响应时触发该回调函数。
在下面的示例中,我们将看到如何使用这些方法发送消息,以及如何处理可能发生的不同类型的错误。
虽然本章中的所有例子都是单线程的,但生产者对象可以被多个线程使用来发送消息。
3.3 发送消息到Kafka发送消息的最简单方法如下:
//生产者接受ProducerRecord对象,因此我们首先创建一个。ProducerRecord有多个构造函数,我们将在后面讨论。在这里,我们使用一个需要我们发送数据的主题的名称,它总是一个字符串,以及我们发送给Kafka的键和值,在这种情况下也是字符串。键和值的类型必须匹配key serializer和value serializer对象。 ProducerRecordrecord = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { //我们使用生产者对象的send()方法来发送ProducerRecord。正如我们在图3-1的生产者架构图中所看到的,消息将被放置在一个缓冲区中,并将在一个单独的线程中发送到代理。send()方法返回一个带有Recordmetadata的Java Future对象,但是由于我们简单地忽略了返回的值,因此我们无法知道消息是否成功发送。当可以接受静默删除消息时,可以使用这种发送消息的方法。在生产应用程序中通常不是这样。 producer.send(record); } catch (Exception e) { //虽然我们忽略了在发送消息给Kafka broker或broker本身时可能发生的错误,但如果生产者在发送消息给Kafka之前遇到了错误,我们仍然可能会得到一个异常。例如,当它未能序列化消息时,可能会出现SerializationException;如果缓冲区已满,则会出现BufferExhaustedException或TimeoutException异常;如果发送线程被中断,则会出现InterruptException。 e.printStackTrace(); }
同步发送消息
同步发送消息很简单,当Kafka响应生产请求出错,或发送重试次数耗尽时,仍然允许生产者捕捉异常。 其中涉及的主要权衡是性能。 根据Kafka集群的繁忙程度,broker响应生成请求的时间从2毫秒到几秒不等。 如果您同步发送消息,发送线程将花费这段时间等待,不做任何其他事情,甚至不发送额外的消息。 这导致性能非常差,因此,同步发送通常不会用于生产应用程序(但在代码示例中非常常见)。
同步发送消息的最简单方法如下:
ProducerRecordrecord = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { //这里,我们使用Future.get()来等待来自Kafka的回复。如果记录没有成功发送给Kafka,这个方法将抛出一个异常。如果没有错误,我们将获得一个Recordmetadata对象,我们可以使用该对象检索写入消息的偏移量和其他元数据。 producer.send(record).get(); } catch (Exception e) { //如果在将记录发送到Kafka之前或发送过程中有任何错误,我们将遇到异常。在本例中,我们只输出遇到的任何异常。 e.printStackTrace(); }
KafkaProducer有两种类型的错误。 可重试错误是那些可以通过再次发送消息来解决的错误。 例如,可以解决连接错误,因为连接可能会重新建立。 当为分区选出一个新的leader并刷新客户端元数据时,可以解决“not leader for partition”错误。 KafkaProducer可以被配置为自动重试这些错误,所以应用程序代码将获得可重试的异常,只有当重试的次数被耗尽,错误没有得到解决。 有些错误无法通过重试来解决——例如,“消息大小太大”。 在这些情况下,KafkaProducer不会尝试重试,而是立即返回异常。
异步发送消息
假设我们的应用程序和Kafka集群之间的网络往返时间是10毫秒。 如果我们在发送每条消息后等待回复,发送100条消息大约需要1秒。 另一方面,如果我们只是发送所有的消息,而不等待任何回复,那么发送100条消息几乎不会花任何时间。 在大多数情况下,我们真的不需要一个响应----Kafka在记录被写入后,返回主题、分区和偏移量,这通常不是发送应用程序所需要的。另一方面,我们需要知道,当我们无法发送消息完全可以抛出一个异常,记录一个错误, 或者将消息写入“errors”文件以供以后分析。
为了异步发送消息并仍然处理错误场景,生产者支持在发送记录时添加回调函数。 下面是一个如何使用回调函数的例子:
//要使用回调,需要实现org.apache.kafka. clients.producer.Callback接口,这个接口中有一个方法onCompletion()
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(Recordmetadata recordmetadata, Exception e) {
if (e != null) {
//如果Kafka返回一个错误,onCompletion()将有一个非空异常。这里我们通过打印来“处理”它,但是生产代码可能会有更健壮的错误处理功能。
e.printStackTrace();
}
}
}
ProducerRecord record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback()); //我们在发送记录时传递一个Callback对象。
警告
回调在生产者的主线程中执行。 这保证了当我们将两个消息依次发送到同一个分区时,它们的回调将按照发送顺序执行。 但这也意味着回调应该相当快,以避免延迟生产者和阻止其他消息发送。 不建议在回调中执行阻塞操作。 相反,应该使用另一个线程并发地执行任何阻塞操作。



