栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka-12-Kafka消息时间戳kafka message timestamp

kafka-12-Kafka消息时间戳kafka message timestamp

参考Kafka消息时间戳(kafka message timestamp)

1 Kafka消息的时间戳

在消息中增加了一个时间戳字段和时间戳类型。
目前支持的时间戳类型有两种:CreateTime和LogAppendTime。
前者表示producer创建这条消息的时间;
后者表示broker接收到这条消息的时间(严格来说,是leader broker将这条消息写入到log的时间)。

2 客户端消息格式的变化

ProducerRecord:增加了timestamp字段,允许producer指定消息的时间戳,如果不指定的话使用producer客户端的当前时间。
ConsumerRecord:增加了timestamp字段,允许消费消息时获取到消息的时间戳。

2.1 获取时间戳
from kafka import KafkaConsumer
global false, null, true
false = null = true = ''

import time
def ms_tostamp(stampint):
    c = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(stampint/ 1000)))
    return c + "." + str(stampint)[-3:]



consumer = KafkaConsumer(bootstrap_servers=['10.80.62.52:9092'], auto_offset_reset='latest', group_id="group1")
consumer.subscribe('yourtest')
for msg in consumer:

    try:
        if msg.value is not None:
            print(ms_tostamp(msg.timestamp))
            data_json = msg.value.decode()
            print(data_json)
    except Exception as e:
        print(e)
print("finish")

输出
2022-02-22 14:32:26.291
aa

2.2 获取最新偏移量的消息时间
from kafka import KafkaConsumer, TopicPartition
global false, null, true
false = null = true = ''

import time
def ms_tostamp(stampint):
    c = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(stampint/ 1000)))
    return c + "." + str(stampint)[-3:]


topic = 'topicname'
consumer = KafkaConsumer(bootstrap_servers=['IP:9092'])

# (1)获取指定分区的最新偏移量
topic_offset_dict = consumer.end_offsets([TopicPartition(topic, 0)])
newoffset = topic_offset_dict[TopicPartition(topic, 0)]
print("最新偏移量",newoffset)
# (2)手动指定分区给consumer去消费
consumer.assign([TopicPartition(topic, 0)])
# (3)针对分区,从指定抓取的偏移量处开始消费
# newoffset-1是因为newoffset的数据还未进来,减去1才是最新的数据
consumer.seek(TopicPartition(topic, 0), newoffset-1)
for msg in consumer:
    try:
        if msg.value is not None:
            print("消息时间",ms_tostamp(msg.timestamp))
            print("本条offset",msg.offset)
            data_json = msg.value.decode()
            print("消息内容",data_json)
            break # 取到一条消息就退出
    except Exception as e:
        print(e)

print("finish")
consumer.close()

输出

最新偏移量 67661482
消息时间 2022-02-22 16:15:06.744
本条offset 67661481
消息内容 message
finish
3 监控指定topic的最新消息时间
from kafka import KafkaConsumer, TopicPartition
global false, null, true
false = null = true = ''

import time
def ms_tostamp(stampint):
    c = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(stampint/ 1000)))
    return c + "." + str(stampint)[-3:]




consumer = KafkaConsumer(bootstrap_servers=['10.26.10.113:9092'])
#方式一指定topic列表
topics = ['SB_01',
          'SB_02',
          'SB_03',
          'SB_50',
          'SB_20',
          'SB_30',
          'SB_08'
          ]
def get_topic_msgNewTime(topics):
    try:
        for topic in topics:
            # (1)获取指定分区的最新偏移量
            topic_offset_dict = consumer.end_offsets([TopicPartition(topic, 0)])
            newoffset = topic_offset_dict[TopicPartition(topic, 0)]
            # print("最新偏移量",newoffset)
            # (2)手动指定分区给consumer去消费
            consumer.assign([TopicPartition(topic, 0)])
            # (3)针对分区,从指定抓取的偏移量处开始消费
            # newoffset-1是因为newoffset的数据还未进来,减去1才是最新的数据
            consumer.seek(TopicPartition(topic, 0), newoffset-1)
            for msg in consumer:
                if msg.value is not None:
                    newtime_str = ms_tostamp(msg.timestamp)
                    print(topic,"最新消息时间",newtime_str)
                    # print("本条offset",msg.offset)
                    # data_json = msg.value.decode()
                    # print("消息内容",data_json)
                    break # 取到一条消息就退出
    except Exception as e:
        print(e)
get_topic_msgNewTime(topics)
consumer.close()
print("finish")

输出

SB_01 最新消息时间 2022-02-22 16:37:09.234
SB_02 最新消息时间 2022-02-22 16:34:39.021
SB_03 最新消息时间 2022-02-22 16:35:05.777
SB_50 最新消息时间 2022-02-22 16:30:42.775
SB_20 最新消息时间 2022-02-22 16:37:20.060
SB_30 最新消息时间 2022-02-22 16:37:20.337
SB_08 最新消息时间 2022-02-22 16:35:35.935
finish
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745949.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号