栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka-python的部分api详解

kafka-python的部分api详解

1.生产者属性
bootstrap.servers:该属性指定broker的地址清单,地址格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker中查找到其他的broker的信息。不过建议
    至少要提供两个broker信息,一旦其中一个宕机,生产者仍然能连接到集群上。
key_serializer (callable) – used to convert user-supplied keys to bytes If not None, called as f(key), should return bytes. Default: None.
value_serializer (callable) – used to convert user-supplied message values to bytes. If not None, called as f(value), should return bytes. Default: None.

上面的两个数值指定了键和值怎么序列化。默认是none。

在实例化了一个生成者对象之后,实例有一个config属性,返回的是当前生产者的默认配置如下:
producer.config
{'bootstrap_servers': ['10.0.102.204:9092'], 'client_id': 'kafka-python-producer-1', 'key_serializer': None, 'value_serializer': None, 'acks': 1, 'compression_type': None, 'retries': 0, 'batch_size': 16384, 'linger_ms': 0, 'partitioner': , 'buffer_memory': 33554432, 'connections_max_idle_ms': 600000, 'max_block_ms': 60000, 'max_request_size': 1048576, 'metadata_max_age_ms': 300000, 'retry_backoff_ms': 100, 'request_timeout_ms': 30000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(6, 1, 1)], 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, 'ssl_cafile': None, 'ssl_certfile': None, 'ssl_keyfile': None, 'ssl_crlfile': None, 'api_version': (0, 10), 'api_version_auto_timeout_ms': 2000, 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'selector': , 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None}

data = producer.config
type(data)

data是一个字典对象,然后就可以使用修改字典的方法修改对应的属性。
从上面可以看到生成者有很多可以配置的参数,他们大部分的参数都有合理的默认值,所有没有必要修改它们。不过有几个参数在内存中使用,性能和可靠性方面对生产者影响比较大,下面会介绍这些参数。

acks,这个参数指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。
    如果acks=0;生成者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有接收到消息,那么生产者是不知道的,消息也就是丢失了。
          不过,因为生成者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
    如果acks=1:只要集群的首领节点收到消息,生成者会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到
          一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步
          发送。如果让客户端等待服务器的响应(通过调用future对象的get()方法),显然会增加延迟。如果客户端使用回调,延迟问题可以得到缓解,不过吞吐量还是会受发送中
          消息数量的限制。
    
    如果acks=all,只有当所有参与复制的节点全部收到消息时,生成者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器
          发生崩溃,整个集群仍然可以运作。不过,它的延迟比acks=1时更高,因为我们要等待不只一个服务器节点接收消息。
    
buffer.memory: 该参数用来设置生成者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
          这个时候send()方法调用要么被阻塞,要么抛出异常,取决于如何设置block.on.buffer.full参数(0.9.0.0之后的版本中被替换为max.block.ms,表示在抛出异常之
          前可以阻塞一段时间)
    
compression.type: 默认情况下,消息发送不会被压缩。该参数可以设置为snappy, gzip或lz4,它指定了消息被发送给broker之前使用哪一种压缩算法进行压缩。snappy压缩,
          占用较少的CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip压缩算法一般会占用较多的CPU,但会提供更高的压缩比,
          所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销。

retries:生成者从服务器收到的错误有可能是临时性的错误。在这种情况下,retries参数的值决定了生成者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。
         默认情况下,生产者会在每次重试之间等待100ms,不过可以通过retry.backoff.ms参数来个改变这个时间间隔。
    
batch.size:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,安装字节数计算。当批次被填满时,批次里所
         有的消息都会被发送出去。不过生产者并不一定都会等到批次填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次带下设置得很大,也不会
         造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
    
linger.ms :该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。kafka生产者会在批次填满或者linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把linger.ms设置成比0大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然会增加延迟,但是会提升吞吐量(因为一次发送更多的消息,每个消息的开销变小了)

client.id: 该参数可以是任意字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
    
max.in.flight.requests.per.connection: 该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设置
           为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
    
timeout.ms: 指定了broker等待同步副本返回消息确认的时间,与acks的配置相匹配----如果在指定时间内没有收到同步副本的确认,那么broker就会返回一个错误。
request.timeout.ms: 指定了生产者在发送数据时等待服务器返回响应的时间。
metadata.fetch.timeout.ms:指定了生产者在获取元数据时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误。
    
max.block.ms: 该参数指定了在调用send()方法或使用partitionFor()方法获取元数据时生产者阻塞时间,当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞,
           在阻塞时间达到max.block.ms,生产者会抛出异常。
    
max.request.size: 该参数用于控制生成者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为1MB,那么可以发送的
           单个最大消息为1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了1000个消息,每个消息大小为1KB。另外,broker对可接收的消息最大值也有自己的限制,
           所以两边的配置最好可以匹配,避免生产者发送的消息被broker拒绝。
    
receive.buffer.bytes和send.buffer.bytes:这两个参数分别指定了TCP socket接收和发送数据包的缓冲区大小。如果它们被设置为-1.就是用操作系统的默认值。如果生产者或
           消费者与broker处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

2.消费者属性

1. fetch.min.bytes:该属性指定了消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes 指定的大小,那么它会等到
有足够的可用数据时才把它返回给消费者。这样可以降低消费者和broker的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有
很多可用数据,但消费者的CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低broker 的工作负载。

2. fetch.max.wait.ms:我们通过fetch.min.bytes告诉Kafka ,等到有足够的数据时才把它返回给消费者。而fetch.max.wait.ms 则用于指定broker 的等待时间,默认是500ms
    ,如果没有足够的数据流入Kafka ,消费者获取最小数据量的要求就得不到满足,最终导致500ms 的延迟。如果要降低潜在的延迟(为了满足SLA ),可以把该参数值设置得小一些。
      如果fetch.max.wait.ms被设为lOOms ,并且fetch.min.bytes被设为1MB ,那么Kafka 在收到消费者的请求后,要么返回1MB 数据,要么在1OOms 后返回所有可用的数据,
      就看哪个条件先得到满足。

3. max.partition.fetch.bytes:该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB,也就是说kafkaconsumer.poll() 方法从每个分区里返回的记录最
      多不超过max.partition.fetch.bytes指定的字节。如果一个主题有20 个分区和5 个消费者,那么每个消费者需要至少4MB 的可用内存来接收记录。在为消费者分配内存时,
      可以给它们多分配一些,因为如果群组里有消费者发生崩愤,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes的值必须比broker 能够接收的最大消息的字节
      数(通过max.message.size属性配置)大, 否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。
      消费者需要频繁调用poll()方法来避免会话过期和发生分区再均衡,如果单次调用poll()返回的数据太多,消费者需要更多的时间来处理,可能无怯及时进行下一个轮询来避免会
      话过期。如果出现这种情况, 可以把max.partition.fetch.bytes值改小,或者延长会i舌过期时间。
4. session.timeout.ms:该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s 。如果消费者没有在session.timeout.ms 指定的时间内发送心跳给群组协调
      器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与heartbeat.interval.ms紧密相关。heartbeat.interval.ms指定了poll()
      方法向协调器发送心跳的频率, session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms必须比
      session.timeout.ms 小, 一般是session.timeout.ms 的三分之一。如果session.timeout.ms 是3s ,那么heartbeat.interval.ms应该是1s 。
      把session.timeout.ms 值设得比默认值小,可以更快地检测和恢复崩愤的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设
      置得大一些,可以减少意外的再均衡,不过检测节点崩愤-需要更长的时间。
5.auto.offset.reset: 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理。它的默认
     值是latest , 意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earlist,意思是说,在偏移量无效的情况下,
     消费者将从起始位置读取分区的记录。
6.enable.auto.commit: 该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为false ,由自己控制何时提交偏移量。如果把
     它设为true ,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。
7.partition.assignment.strategy: 我们知道,分区会被分配给群组里的消费者。PartitionAssignor根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。
      Kafka 有两个默认的分配策略。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612584.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号