栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka配置文件

Kafka配置文件

经历了好多个周末的磨练,终于把如下的kafka参数在线上环境走了一圈,做到心中有数。

 Kafka官方配置

        如果你的生产环境连接的是阿里云的Kafka,需要提供SSL凭证,额外的SSL相关参数请参考阿里云。

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 10.200.8.29:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      bootstrap-servers: 10.200.8.29:9092
      retries: 1 #生产者发送消息失败重试次数
      batch-size: 16384 # 同一批次内存大小(默认16K)
      buffer-memory: 314572800 #生产者内存缓存区大小(300M = 300*1024*1024)
      #acks=0:无论成功还是失败,只发送一次。无需确认
      #acks=1:即只需要确认leader收到消息
      #acks=all或-1:ISR + Leader都确定收到
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的编解码方法
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #key的编解码方法
      #开启事务,但是要求ack为all,否则无法保证幂等性
      #transaction-id-prefix: "COLA_TX"
      #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
      properties:
        #自定义拦截器,注意,这里结尾时classes(先于分区器,快递先贴了标签再指定地址)
        interceptor.classes: cn.com.controller.TimeInterceptor
        #自定义分区器
        #partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner
        #即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
        linger.ms: 1000
        #最大请求大小,200M = 200*1024*1024
        max.request.size: 209715200
        #Producer.send()方法的最大阻塞时间(115秒)
        max.block.ms: 115000
        #该配置控制客户端等待请求响应的最长时间。
        #如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。 
        #此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
        request.timeout.ms: 115000
        #等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX
        #如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creation
        delivery.timeout.ms: 120000
 
 
    #https://kafka.apache.org/documentation/#consumerconfigs
    consumer:
      bootstrap-servers: 10.200.8.29:9092
      group-id: auto-dev #消费者组
      auto-offset-reset: earliest #消费方式: earliest:从头开始消费   latest:从最新的开始消费   默认latest
      enable-auto-commit: false #是否自动提交偏移量offset
      auto-commit-interval: 1S #前提是 enable-auto-commit=true。自动提交的频率
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 2
      properties:
        #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        session.timeout.ms: 120000
        #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
        max.poll.interval.ms: 300000
        #配置控制客户端等待请求响应的最长时间。 
        #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        #或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000
        # 服务器返回的最大数据量,不能超过admin的message.max.bytes单条数据最大大小
        max.partition.fetch.bytes: 1048576
        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true
 
    listener:
      #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      missing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略
      #type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-records
      type: batch
      concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
 
    template:
      default-topic: "COLA"
生产环境遇到的问题 问题1:nested exception is org.apache.kafka.common.errors.RecordTooLargeException: 

nested exception is org.apache.kafka.common.errors.RecordTooLargeException: 

很明显,消息体目前过大。需要提高Kafka服务器能够接收的最大数据大小。

解决方法:提高Kafka服务器单条消息最大大小。配置参数 message.max.bytes

我的方法:生产环境大概每份文件不超过5M,调整服务器broker参数message.max.bytes=5*1024*1024

官方文档:

##brokerConfiguration

message.max.bytes
The largest record batch size allowed by Kafka 
(after compression if compression is enabled). 
If this is increased and there are consumers 
older than 0.10.2, the consumers' fetch size 
must also be increased so that they can fetch
 record batches this large. In the latest message
 format version, records are always grouped into 
batches for efficiency. In previous message format
 versions, uncompressed records are not grouped
 into batches and this limit only applies to a
 single record in that case.
This can be set per topic with the topic
 level max.message.bytes config.

Type:	int
Default:	1048588 (1M)
Valid Values:	[0,...]
importance:	high
Update Mode:	cluster-wide
问题2:Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.

异常类:org.apache.kafka.common.errors.DisconnectException

异常类描述:Server disconnected before a request could be completed.

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.clients.FetchSessionHandler[442] - [Consumer clientId=consumer-1, groupId=auto-dev] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.

问题定位:由于消息读写频率较高,因此kafka服务器的负载达到上限。所以有些消费者实例一直就poll不到数据。达到配置的时间【session.timeout.ms】、【request.timeout.ms】,消费者会被踢出Kafka。因此日志不断打印该错误。(其实另一方面也提示消费时间过长)

解决方法:提高consumer参数【session.timeout.ms】、【request.timeout.ms】的值。

我的方法:在application.yaml中为【session.timeout.ms】、【request.timeout.ms】这两个参数均调整到4分钟,即240000ms。

官方文档:

##ConsumerConfiguration

request.timeout.ms
The configuration controls the maximum amount 
of time the client will wait for the response of a request. 
If the response is not received before the timeout elapses 
the client will resend the request if necessary or 
fail the request if retries are exhausted.

Type:	int
Default:	30000 (30 seconds)
Valid Values:	[0,...]
importance:	medium
##ConsumerConfiguration

session.timeout.ms
The timeout used to detect client failures 
when using Kafka's group management facility. 
The client sends periodic heartbeats to indicate 
its liveness to the broker. If no heartbeats are 
received by the broker before the expiration of 
this session timeout, then the broker will 
remove this client from the group and initiate 
a rebalance. Note that the value must be 
in the allowable range as configured in the 
broker configuration by 
group.min.session.timeout.ms and group.max.session.timeout.ms.

Type:	int
Default:	45000 (45 seconds)
Valid Values:	
importance:	high
问题3:TimeoutException: Expiring 1 record(s) .. has passed since batch creation

问题定位:消息已经投递到kafka,但是kafka来不及处理。提示kafka写性能达到瓶颈。

我的方法:可以从两方面修改。

(1)控制代码发送频率,即控制调用kafkaTemplate.send方法的频率。作为有经验的开发人员,不要问我怎么控制发送频率。代码里加入Thread.sleep就可以不是么

(2)在生产者参数中提高【delivery.timeout.ms】的时间。默认2分钟kafka没有响应则报异常。我们可以增加该时间,单位毫秒。但是有上限,不能超过【request.timeout.ms】与【linger.ms】之和。请阅读下面我贴过来的官方文档。

官方文档:

##ProduerConfiguration

delivery.timeout.ms

An upper bound on the time to report success or 
failure after a call to send() returns. 
This limits the total time that a record will 
be delayed prior to sending, the time to await 
acknowledgement from the broker (if expected), 
and the time allowed for retriable send failures.
 The producer may report failure to send a record 
earlier than this config if either an unrecoverable
 error is encountered, the retries have been exhausted,
 or the record is added to a batch which reached 
an earlier delivery expiration deadline. 
The value of this config should be greater than 
or equal to the sum of request.timeout.ms and linger.ms.

Type:	int
Default:	120000 (2 minutes)
Valid Values:	[0,...]
importance:	medium

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701767.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号