From: https://liudongdong1.github.io/2021/07/19/yu-yan-kuang-jia/framework/middleware/kafka/kafka-python/
1. frameWorkKafka 属于 Apache 组织,是一个高性能跨语言分布式发布订阅消息队列系统 [7]。它的主要特点有:
以时间复杂度 O (1) 的方式提供消息持久化能力,并对大数据量能保证常数时间的访问性能;高吞吐率,单台服务器可以达到每秒几十万的吞吐速率;支持服务器间的消息分区,支持分布式消费,同时保证了每个分区内的消息顺序;轻量级,支持实时数据处理和离线数据处理两种方式。
Producer 使用 push 模式将消息发布到 broker,consumer 通过监听使用 pull 模式从 broker 订阅并消费消息。 多个 broker 协同工作,producer 和 consumer 部署在各个业务逻辑中。三者通过 zookeeper 管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。
broker: 消息中间件处理节点,一个 Kafka 节点就是一个 broker,一个或者多个 Broker 可以组成一个 Kafka 集群 topic: 主题,Kafka 根据 topic 对消息进行归类,发布到 Kafka 集群的每条消息都需要指定一个 topicconsumergroup: 每个 Consumer 属于一个特定的 Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息Partion: 一个 topic 可以分为多个 partition,每个 partition 内部是有序的
cleaner-offset-checkpoint: 存了每个log的最后清理offsetmeta.properties: broker.id 信息 recovery-point-offset-checkpoint: 表示已经刷写到磁盘的记录。recoveryPoint 以下的数据都是已经刷 到磁盘上的了。replication-offset-checkpoint: 用来存储每个 replica 的 HighWatermark 的 (high watermark (HW),表示已经被 commited 的 message,HW 以下的数据都是各个 replicas 间同步的,一致的。) 1. KafkaConsumer
The consumer willtransparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers. It also interacts with the assigned kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
- 最多一次:客户端收到消息后,在处理消息前自动提交,这样 kafka 就认为 consumer 已经消费过了,偏移量增加。enable.auto.commit 为 ture; enable.auto.commit 为 ture; client 不要调用 commitSync (),kafka 在特定的时间间隔内自动提交。最少一次:客户端收到消息,处理消息,再提交反馈。这样就可能出现消息处理完了,在提交反馈前,网络中断或者程序挂了,那么 kafka 认为这个消息还没有被 consumer 消费,产生重复消息推送。设置 enable.auto.commit 为 false; client 调用 commitSync (),增加消息偏移;正好一次:保证消息处理和提交反馈在同一个事务中,即有原子性。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000)
for msg in consumer:
print(msg)
第 1 个参数为 topic 的名称
group_id : 指定此消费者实例属于的组名,可以不指定
bootstrap_servers : 指定 kafka 服务器
若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
consumer_timeout_ms : 毫秒数
from kafka import KafkaConsumer
from kafka import TopicPartition
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
consumer.assign([TopicPartition(topic= 'my_topic', partition= 0)]) #手动分配partition
for msg in consumer:
print(msg)
订阅多个 topic
from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
consumer.subscribe(topics= ['my_topic', 'topic_1'])
for msg in consumer:
print(msg)
正则订阅一类 topic
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
print(msg)
2. KafkaProducer
创建 ProducerRecord必须包含Topic和Value,key 和 partition 可选。然后,序列化key和value对象为ByteArray,并发送到网络。消息发送到partitioner。如果创建 ProducerRecord 时指定了 partition,此时 partitioner 啥也不用做,简单的返回指定的 partition 即可。如果未指定 partition,partitioner会基于ProducerRecord的key生成partition。producer 选择好 partition 后,增加 record 到对应 topic 和 partition 的 batch record。最后,专有线程负责发送batch record到合适的Kafka broker。
当 broker 收到消息时,它会返回一个应答(response)。如果消息成功写入Kafka,broker将返回Recordmetadata对象(包含topic,partition和offset);相反,broker 将返回 error。这时 producer 收到 error 会尝试重试发送消息几次,直到 producer 返回 error。
立即发送:只管发送消息到server端,不 care 消息是否成功发送。大部分情况下,这种发送方式会成功,因为 Kafka 自身具有高可用性,producer 会自动重试;但有时也会丢失消息;同步发送:通过send()方法发送消息,并返回Future对象。get()方法会等待Future对象,看send()方法是否成功;异步发送:通过带有回调函数的send()方法发送消息,当 producer 收到 Kafka broker 的 response 会触发回调函数
The producer consists of a pool of buffer space that holds records that haven’t yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.
bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the producer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.send(topic, value=None**,** key=None**,** headers=None**,** partition=None**,** timestamp_ms=None**)**
a key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer.
发送字符串类型的 key 和 value
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)
result = future.get(timeout= 10) #等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替
print(result)
消费者收到的为字符串类型,就需要解码操作,key_deserializer= bytes.decode
from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], key_deserializer= bytes.decode, value_deserializer= bytes.decode)
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
print(msg)
发送 msgpack 消息: msgpack 为 MessagePack 的简称,是高效二进制序列化类库,比 json 高效
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
可压缩消息发送,若消息过大,还可压缩消息发送,可选值为 ‘gzip’, ‘snappy’, ‘lz4’, or None
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], compression_type='gzip')
future = producer.send('my_topic' , key= b'key_3', value= b'value_3', partition= 0)
future.get(timeout= 10)
3. KafkaAdminClient
4. KafkaClientThe KafkaAdminClient class will negotiate for the latest version of each message protocol format supported by both the kafka-python client library and the Kafka broker.
5. BrokerConnectionA network client for asynchronous request/response network I/O. This is an internal class used to implement the user-facing producer and consumer clients.
6. ClustermetadataInitialize a Kafka broker connection
A class to manage kafka cluster metadata. This class does not perform any IO. It simply updates internal state given API responses (metadataResponse, GroupCoordinatorResponse).
ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=b'my_value', value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)
topicpartitionoffset : 这条消息的偏移量 timestamp : 时间戳 timestamp_type : 时间戳类型 key : key 值,字节类型 value : value 值,字节类型 checksum : 消息的校验和 serialized_key_size : 序列化 key 的大小 serialized_value_size : 序列化 value 的大小,可以看到 value=None 时,大小为 - 1 7. Example
producer
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')
# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})
# produce asynchronously
for _ in range(100):
producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# block until all async messages are sent
producer.flush()
# configure multiple retries
producer = KafkaProducer(retries=5)
consumer.py
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)
# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)
# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')
# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
2. Application
.1. 日志分析平台
.2. 网站活动跟踪Kafka 性能高效,采集日志时业务无感知以及 Hadoop/ODPS 等离线仓库存储和 Storm/Spark 等实时在线分析对接的特性决定它非常适合作为 "日志收集中心"。
.3. 流计算通过消息队列 Kafka 版可以实时收集网站活动数据(包括用户浏览页面、搜索及其他行为等)。发布 - 订阅的模式可以根据不同的业务数据类型,将消息发布到不同的 Topic;还可通过订阅消息的实时投递,将消息流用于实时监控与业务分析或加载到 Hadoop、ODPS 等离线数据仓库系统进行离线处理。
.4. 数据中转枢纽股市走向分析、气象数据测控、网站用户行为分析等领域,由于数据产生快、实时性强、数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需求。而大数据消息中间件 Kafka 以及 Storm/Samza/Spark 等流计算引擎的出现,可以根据业务需求对数据进行计算分析,最终把结果保存或者分发给需要的组件。
Resouce近年来 KV存储(Hbase)、搜索(ElasticSearch)、流式处理(Storm/Spark Streaming/Samza)、时序数据库(OpenTSDB)等专用系统应运而生,产生了同一份数据集需要被注入到多个专用系统内的需求。利用大数据消息中间件 Kafka 作为数据中转枢纽,同份数据可以被导入到不同专用系统中。
https://kafka-python.readthedocs.io/en/master/usage.html#kafkaconsumer



