栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

PyFlink 有状态流处理实例 实时排行榜

PyFlink 有状态流处理实例 实时排行榜

01 UDAF 聚合函数的使用

自定义聚合函数(UDAF),将多条记录聚合成一条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。

需要注意的是:当前聚合函数仅在流模式下的 GroupBy 聚合和 Group Window 聚合中支持通用的用户定义聚合功能;对于批处理模式,当前不支持该模式,需要使用向量化聚合函数。

1.1 UDAF 的处理逻辑

聚合函数的处理过程以累加器 accumulator 的为中心,累加器是一种中间数据结构,用于存储将多行输入计算出的最终聚合结果,即用来存储聚合的中间结果。

围绕累加器 accumulator,一个聚合任务还需要如下三个方法:

create_accumulator():用来初始化自定义的累加其 accumulator,将内部定义的变量赋值为空或者0。accumulate():定义根据输入更新 accumulator 的逻辑,主要是编写中间的逻辑代码,根据输入变量来更新输出中间变量。get_value():定义如何返回 accumulator 中存储的中间结果,作为UDAF的最终结果。

一个聚合处理过程如下图所示:

上例中,我们想要计算出饮品价目表中最高的价格,其中饮品价目表包含三个属性 (id, name, price) 和五条数据。

聚合处理过程中:首先,使用 create_accumulator() 为要处理的数据构造一个空累加器;然后,使用 accumulate() 方法根据输入的每条数据更新累加器中存储的中间结果;在所有数据都处理完成后,使用 get_value() 方法计算中间结果中最大值,即要返回的最终结果。

1.2 聚合函数的使用

如果要定义 Python 聚合函数, 可以通过继承 pyflink.table 中的基类 AggregateFunction,并实现 accumulate() 方法。 聚合函数的返回结果类型和累加器类型可以通过两种方式指定:

实现 get_result_type() 方法和 get_accumulator_type() 方法使用 udaf 装饰器封装函数实例并指明类型参数 result_type 和 accumulator_type

一个 UDAF 的使用示例如下:

定义 UDAF

class WeightedAvg(AggregateFunction):

    # 为了计算加权平均值,累加器需要存储所有已累计数据的加权和和计数。在本示例中,我们使用行对象作为累加器。
    def create_accumulator(self):
        # Row(sum, count)
        return Row(0, 0)

    def get_value(self, accumulator):
        if accumulator[1] == 0:
            return None
        else:
            return accumulator[0] / accumulator[1]

    def accumulate(self, accumulator, value, weight):
        accumulator[0] += value * weight
        accumulator[1] += weight
    
    # retract() 方法常应用在当前聚合操作之前存在可能生成收回消息的操作,例如组聚合、外部联接。
    def retract(self, accumulator, value, weight):
        accumulator[0] -= value * weight
        accumulator[1] -= weight
        
    def get_result_type(self):
        return DataTypes.BIGINT()
        
    def get_accumulator_type(self):
        return DataTypes.ROW([
            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
            DataTypes.FIELD("f1", DataTypes.BIGINT())])

使用 UDAF

# 创建流处理环境
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 注册 UDAF 
# 可以在其中指定结果类型,定义中已经实现了`get_result_type()` 和 `get_accumulator_type()` 方法指明了类型,不需要在重复指明
# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), accumulator_type=...)
weighted_avg = udaf(WeightedAvg())

t = table_env.from_elements([(1, 2, "Lee"),
                             (3, 4, "Jay"),
                             (5, 6, "Jay"),
                             (7, 8, "Lee")]).alias("value", "count", "name")

# 调用 UDAF
result = t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg")).to_pandas()
print(result)

其他调用 UDAF 的方式

    Table API 中注册及调用 UDAF
# 注册 UDAF
table_env.create_temporary_function("weighted_avg", WeightedAvg())

# Table API 中调用注册的 UDAF
result = t.group_by(t.name).select(call("weighted_avg", t.value, t.count).alias("avg")).to_pandas()
print(result)
    SQL 中注册及调用 UDAF
# 注册 UDAF
table_env.create_temporary_view("source", t)

# SQL 中调用注册的 UDAF
result = table_env.sql_query(
    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY name").to_pandas()
print(result)
    在 GroupBy Window 聚合中使用 Python 聚合函数
tumble_window = Tumble.over(lit(1).hours) 
            .on(col("rowtime")) 
            .alias("w")

result = t.window(tumble_window) 
        .group_by(col('w'), col('name')) 
        .select("w.start, w.end, weighted_avg(value, count)") 
        .to_pandas()
print(result)

完整代码

from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import call
from pyflink.table.udf import udaf
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble


class WeightedAvg(AggregateFunction):

    def create_accumulator(self):
        # Row(sum, count)
        return Row(0, 0)

    def get_value(self, accumulator):
        if accumulator[1] == 0:
            return None
        else:
            return accumulator[0] / accumulator[1]

    def accumulate(self, accumulator, value, weight):
        accumulator[0] += value * weight
        accumulator[1] += weight
    
    def retract(self, accumulator, value, weight):
        accumulator[0] -= value * weight
        accumulator[1] -= weight
        
    def get_result_type(self):
        return DataTypes.BIGINT()
        
    def get_accumulator_type(self):
        return DataTypes.ROW([
            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
            DataTypes.FIELD("f1", DataTypes.BIGINT())])


env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# the result type and accumulator type can also be specified in the udaf decorator:
# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), accumulator_type=...)
weighted_avg = udaf(WeightedAvg())
t = table_env.from_elements([(1, 2, "Lee"),
                             (3, 4, "Jay"),
                             (5, 6, "Jay"),
                             (7, 8, "Lee")]).alias("value", "count", "name")

# call function "inline" without registration in Table API
result = t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg")).to_pandas()
print(result)

# register function
table_env.create_temporary_function("weighted_avg", WeightedAvg())

# call registered function in Table API
result = t.group_by(t.name).select(call("weighted_avg", t.value, t.count).alias("avg")).to_pandas()
print(result)

# register table
table_env.create_temporary_view("source", t)

# call registered function in SQL
result = table_env.sql_query(
    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY name").to_pandas()
print(result)

# use the general Python aggregate function in GroupBy Window Aggregation
tumble_window = Tumble.over(lit(1).hours) 
            .on(col("rowtime")) 
            .alias("w")

result = t.window(tumble_window) 
        .group_by(col('w'), col('name')) 
        .select("w.start, w.end, weighted_avg(value, count)") 
        .to_pandas()
print(result)
1.3 聚合函数的视图 View

PyFlink 提供了更加高效的列表和字典存储结构 ListView 和 MapView,可以用于存储更大量的数据。

但是将 ListView 和 MapView 用于聚合操作是,累加器 accumulator 必须是 Row,且 ListView 和 MapView 必须是被声明在第一层。

使用方法入下所示:

from pyflink.table import ListView

class ListViewConcatAggregateFunction(AggregateFunction):

    def get_value(self, accumulator):
        return accumulator[1].join(accumulator[0])

    def create_accumulator(self):
        return Row(ListView(), '')

    def accumulate(self, accumulator, *args):
        accumulator[1] = args[1]
        accumulator[0].add(args[0])

    def get_accumulator_type(self):
        return DataTypes.ROW([
            DataTypes.FIELD("f0", DataTypes.LIST_VIEW(DataTypes.STRING())),
            DataTypes.FIELD("f1", DataTypes.BIGINT())])

    def get_result_type(self):
        return DataTypes.STRING()
1.4 向量化聚合函数

前面我们已经提到当前聚合函数仅在流模式下的 GroupBy 聚合和 Group Window 聚合中支持通用的用户定义聚合功能;对于批处理模式,当前不支持该模式,需要使用向量化聚合函数。

PyFlink 中向量化聚合函数以一个或多个 pandas.Series 类型的参数作为输入,并返回一个标量值作为输出。

向量化聚合函数不支持部分聚合,而且一个组或者窗口内的所有数据, 在执行的过程中,会被同时加载到内存,所以需要确保所配置的内存大小足够容纳这些数据。

如下示例中,展示了如何定义一个自定义向量化聚合函数,并在 GroupBy Aggregation、GroupBy Window Aggregation、Over Window Aggregation 中使用该函数。

定义自定义向量化聚合函数

# func_type="pandas" 输入类型
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def weighted_avg(value):
    return value.mean()

使用自定义向量化聚合函数

# 创建批处理环境
settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(settings)

weighted_avg = udaf(WeightedAvg())

my_table = table_env.from_elements([(1, 2, "Lee"),
                             (3, 4, "Jay"),
                             (5, 6, "Jay"),
                             (7, 8, "Lee")]).alias("value", "count", "name")

# 在 GroupBy Aggregation 中使用向量化聚合函数
my_table.group_by(my_table.name).select(my_table.name, weighted_avg(add(my_table.value)))


# 在 GroupBy Window Aggregation 中使用向量化聚合函数
tumble_window = Tumble.over(expr.lit(1).hours) 
            .on(expr.col("rowtime")) 
            .alias("w")

my_table.window(tumble_window) 
    .group_by("w") 
    .select("w.start, w.end, weighted_avg(value)")

# 在 Over Window Aggregation 中使用向量化聚合函数
table_env.create_temporary_function("weighted_avg", weighted_avg)
table_env.sql_query("""
    SELECT name,
        weighted_avg(value)
        over (PARTITION BY a ORDER BY rowtime
        ROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING)
    FROM MyTable""")
02 Kafka 连接器

Flink 的 Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力

2.1 下载依赖包

为了使用Kafka连接器,使用构建自动化工具(如Maven或SBT)的项目和使用SQL JAR包的SQL Client项目都需要下载依赖项 flink-connector-kafka_2.11。

Kafka 连接器目前并不包含在 Flink 的二进制发行版中,请查阅 这里 了解如何在集群运行中引用 Kafka 连接器。

2.2 创建 Kafka 表

作业中加入上述依赖包之后,使用 SQL / Table API 的 Kafka table 可以按如下定义:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) metaDATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

Kafka 连接器的一些参数与含义如下:

connector:指定使用什么类型的连接器,这里应该是’kafka’。topic:Kafka 记录的 Topic 名。properties.bootstrap.servers:逗号分隔的 Kafka broker 列表。properties.group.id:Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 “KafkaSource-{tableIdentifier}” 作为消费组 ID。scan.startup.mode:Kafka consumer 的启动模式。有效值为:‘earliest-offset’,‘latest-offset’,‘group-offsets’,‘timestamp’ 和 ‘specific-offsets’。format:用来序列化或反序列化 Kafka 消息的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘value.format’ 二者必需其一。 03 实时排行榜

本实例使用 Flink 的有状态流处理和滑动窗口,实现实时点击量排行榜。

该实例统计过去 1 分钟内,点击量最高的男女用户各 10 名及其具体的点击数,同时每隔 1 秒(实时)更新统计结果,等到排行榜数据并将结果同步到 kafka 中。

3.1 构建数据模拟器

首先,我们需要模拟实时产生的用户操作数据。

本实例中,我们编写一个 data_producer.py 的脚本,实时随机产生用户操作数据,并批量写入到 Kafka 中

每条写入 kafka 的用户操作数据包含如下字段:

{
    "ts": "2020-01-01 01:01:01",  # 当前时间
    "name": "刘备",  # 从根据性别随机产生的 50 个姓名里随机选择
    "sex": "男",  # 性别,60%概率为“男”,40%概率为“女”
    "action": "click",  # 动作,90%概率为“click”,10%概率为“scroll”
    "is_delete": 0,  # 是否要丢弃,90%概率为“0”(不丢弃),10%概率为1“丢弃”
}
构建候选用户组

我们创建一个用户类,生成候选用户组,并能够随机获取用户信息。

该类用于在向 Kafka 批量写入用户操作数据时随机生成用户信息。

seed = 2020  # 设置随机数种子,保证每次运行的结果都一样
num_users = 50  # 为了使得最后的结果不至于太平均,只初始化了 50 个用户,该 50 个用户有不同的概率来产生上面的数据
fake = Faker(locale='en_US') # fake 第三方库生成随机用户名称
Faker.seed(seed)
random.seed(seed)

class UserGroup:
    def __init__(self):
        # 为指定数量的用户分配不同的出现概率,每次按概率分布获取用户姓名
        self.users = [self.gen_male() if random.random() < 0.6 else self.gen_female() for _ in range(num_users)]
        prob = np.cumsum(np.random.uniform(1, 100, num_users))  # 用户点击次数的累加
        self.prob = prob / prob.max()  # 点击次数归一化,转换成点击率
    
    # 静态方法生成男性用户信息和女性用户信息
    @staticmethod
    def gen_male():
        return {'name': fake.name_male(), 'sex': 'male'}

    @staticmethod
    def gen_female():
        return {'name': fake.name_female(), 'sex': 'female'}

    # 获取随机用户信息
    def get_user(self):
        r = random.random()  # 生成一个 0~1的随机数
        index = np.searchsorted(self.prob, r)
        return self.users[index]
生成用户操作数据

使用 Kafka 生产者产生用户操作数据

max_msg_per_second = 20  # 每秒钟的最大消息数
run_seconds = 3600  # 脚本最长运行时间,防止无限写入 kafka
topic = "user_action"  # kafka topic
bootstrap_servers = ['localhost:9092']

def write_data():
    group = UserGroup()
    start_time = datetime.now()
    # 初始化 kafka 生产者
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        value_serializer=lambda x: dumps(x).encode('utf-8')
    )

    # 生产用户操作数据,并发送到 kafka
    while True:
        # 创建用户操作数据
        now = datetime.now()
        user = group.get_user()
        cur_data = {
            "ts": now.strftime("%Y-%m-%d %H:%M:%S"),
            "name": user['name'],
            "sex": user['sex'],
            "action": 'click' if random.random() < 0.9 else 'scroll',  # 用户的操作
            "is_delete": 0 if random.random() < 0.9 else 1  # 10% 的概率丢弃这条数据
        }
        # 将数据写入 kafka topic
        producer.send(topic, value=cur_data)

        # 终止条件
        if (now - start_time).seconds > run_seconds:
            break

        # 停止时间
        sleep(1 / max_msg_per_second)
查看用户操作数据

使用 Kafka 消费者查看已经写入的用户操作数据

消费者的初始化参数:

group_id:在高并发量情况下,则需要有多个消费者协作,此时消费进度由 group_id 统一。例如消费者A与消费者B,在初始化时使用同一个 group_id。在进行消费时,一条消息被消费者A消费后,在kafka中会被标记,如果这条消息被A消费后且正确 commit,则该消息不会再被B消费。auto_offset_reset:该参数指定消费者启动的时刻。通常情况下,消息队列中可能会有已经堆积的未消费消息,有时候需求是从上一次未消费的位置开始读(则该参数设置为earliest);有时候的需求为从当前时刻开始读之后产生的,之前产生的数据不再消费(则该参数设置为latest)。

# 读取 kafka 的用户操作数据并打印
def print_data():
    consumer = KafkaConsumer(
        topic,  # topic的名称
        group_id= 'group', 
        bootstrap_servers=bootstrap_servers,  # 指定kafka服务器
        auto_offset_reset='latest', 
    )
    
    for msg in consumer:
        print(msg.value.decode('utf-8').encode('utf-8').decode('unicode_escape'))
数据模拟生成完整代码 3.2 根据输入数据和输出结果创建输入输出表

本实例的数据来源于 kafka 并将处理结果也输出到 kafka,所以我们要创建 kafka 表并指定topic, kafka_servers, group_id 等必要参数如下:

kafka_servers = "localhost:9092"
kafka_consumer_group_id = "group1"  # group ID
source_topic = "user_action"  # 源数据
sink_topic = "click_rank"  # 结果

本实例的数据对象就是用户的操作数据,输入数据包含 name:姓名,sex:性别,action:操作,is_delete:删除状态,ts:点击时间共五个字段,创建源表如下:

source_ddl = """
    CREATE TABLE source (
        name VARCHAR,                
        sex VARCHAR,                 
        action VARCHAR,             
        is_delete BIGINT,            
        ts TIMESTAMP(3),             
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 声明 ts 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
    ) with (
        'connector' = 'kafka',
        'topic' = '{source_topic}',
        'properties.bootstrap.servers' = '{kafka_servers}',
        'properties.group.id' = '{kafka_consumer_group_id}',
        'scan.startup.mode' = 'latest-offset',
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true',
        'format' = 'json'
    )
"""

t_env.execute_sql(source_ddl)

本实例的统计结果包含 male_top10:点击量最高的 10 个男性用户,female_top10:点击量最高的 10 个女性用户,start_time:窗口开始时间,end_time:窗口结束时间 共四个字段,创建结果表如下:

sink_ddl = """
    CREATE TABLE sink (
        male_top10 STRING,        
        female_top10 STRING,      
        start_time TIMESTAMP(3),  
        end_time TIMESTAMP(3)      
    ) with (
        'connector' = 'kafka',
        'topic' = '{sink_topic}',
        'properties.bootstrap.servers' = '{kafka_servers}',
        'properties.group.id' = '{kafka_consumer_group_id}',
        'scan.startup.mode' = 'latest-offset',
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true',
        'format' = 'json'
    )
"""

t_env.execute_sql(sink_ddl)
3.3 编写用户自定义聚合函数 UDAF

在 PyFlink 中定义 UDAF 需要 Flink >= 1.12,使用 UDAF 可以将多行的标量值映射到新的标量值。

本例子中我们需要使用滑动窗口计算点击率前十的用户,使用向量化的 Python 聚合函数( Pandas UDAF )进行 windows 聚合,即在使用 UDAF 时,指定参数 func_type=“pandas”。

用于统计点击量最多的 10 个男性和女性的向量化聚合函数如下所示:

# 统计点击量最多的 10 个男人(只统计 sex=male、action=click 的数量,忽略 is_delete=1 的数据)
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def male_click_top10(name, sex):
    names = name[sex == 'male']
    return names.value_counts().iloc[:10].to_json()

# 统计点击量最多的 10 个女人(只统计 sex=female、action=click 的数量,忽略 is_delete=1 的数据)
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def female_click_top10(name, sex, action, is_delete):
    names = name[sex == 'female']
    return names.value_counts().iloc[:10].to_json()
3.4 流处理完整代码

除了上述源表和结果表的创建,以及定义 UDAF 聚合函数,流处理过程中要需要完成如下任务:

创建流处理环境指定 kafka 依赖注册 UDAF使用 UDAF 完成流处理任务 3.5 打印实时排行榜

完成流处理任务之后,实时排行结果被写入到 kafka 的 click_rank topic 中,我们从该 topic 中读取用户操作数据并打印

"""
读取 kafka 的用户操作数据并打印
"""
from kafka import KafkaConsumer
from reprint import output
import json

topic = 'click_rank'
bootstrap_servers = ['localhost:9092']
group_id = 'group1'

def sink_output():
    consumer = KafkaConsumer(
        topic,  
        group_id=group_id,  
        bootstrap_servers=bootstrap_servers, 
        auto_offset_reset='latest',  
    )

    with output(output_type="list", initial_len=22, interval=0) as output_lines:
        # 初始化打印行     
        for i in range(14):
            if i==0 :
                output_lines[i] = '=== 窗口时间 ==='
            elif i==2 :
                output_lines[i] = '=== 男 ==='
            elif i == 8 :
                output_lines[i] = '=== 女 ==='
            else:
                output_lines[i] = 'name click'

        for msg in consumer:
            # 解析结果
            data = json.loads(msg.value)
            male_rank = json.loads(data['male_top10'])
            female_rank = json.loads(data['female_top10'])
            start_time = data['start_time']
            end_time = data['end_time']
            output_lines[1] = f'开始时间{start_time:6s} 结束时间{end_time}'

            # 逐行打印
            for i in range(5):
                if i < len(male_rank):
                    name = list(male_rank.keys())[i]
                    value = list(male_rank.values())[i]
                    output_lines[i+3] = f'{name:6s} {value}'
                else:
                    output_lines[i+3] = ''
            
            for i in range(5):
                if i < len(female_rank):
                    name = list(female_rank.keys())[i]
                    value = list(female_rank.values())[i]
                    output_lines[i+9] = f'{name:6s} {value}'
                else:
                    output_lines[i+9] = ''
                        
if __name__ == "__main__":
    sink_output()
3.6 运行实例

首先我们使用 docker 按照如下容器编排创建一个 kafka,同时构建一个 zookeeper 与 kafka 结合一起使用,用于管理 kafka 的 broker,以及实现负载均衡。

version: "3.5"
services:
  zookeeper:
    image: zookeeper:3.6.2
    ports:
      - "2181:2181"                        ## 对外暴露的 zookeeper 端口号
    container_name: zookeeper
  kafka:
    image: wurstmeister/kafka:2.13-2.6.0
    volumes:
      - /etc/localtime:/etc/localtime      ## kafka 镜像和宿主机器之间时间保持一致
    ports:
      - "9092:9092"                        ## 对外暴露的 kafka 端口号
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_BROKER_ID: 1
      KAFKA_LOG_RETENTION_HOURS: 120
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DELETE_RETENTION_MS: 1000
      KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"      ## 自动创建 topics
    container_name: kafka

1 启动容器环境

docker-compose up -d

2 运行数据模拟程序

python data_producer.py

3 运行流处理任务程序

flink run -m localhost:8081 -python ranklist.py

4 运行排行榜打印程序

python data_comsumer.py

5 运行结果

参考资料

Flink 官方文档:向量化聚合函数官方文档

Flink 官方文档:Apache Kafka SQL 连接器

PyFlink 从入门到精通

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745406.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号