栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka查看topic中的数据(kafka是干嘛的)

kafka查看topic中的数据(kafka是干嘛的)


感谢尚硅谷教育对资源的无私开放 一.概念 1. 定义

Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。

2.问题与解决方案 缓存/消峰

双十一期间,消息的发布速度远大于消息的消费(处理)速度,此时通过kafka可以接收发布的消息,消费者端按照自身的处理能力接收消息队列中的消息。

解耦

kafka提供接口,使不同渠道的消息发布者可以按照该接口将消息发布到队列中,消息的消费者根据接口订阅队列中的消息,不需要每个发布者针对不同的消费者而进行不同的处理。

异步处理

在appstore通过绑定支付宝买东西时,支付订单后就显示支付成功,但是扣钱的行为却是在一小时之后或者更久,这就是通过类似于kafka这种消息队列的异步处理,用户只需要支付订单,就会返回成功消息,而真实的扣钱行为却是在支付成功之后处理,从而提高用户体验。

3.消息队列的两种模式 点对点模式

发布者将消息发布到消息队列中,消费者接收消息,向队列确认收到,队列就会将该消息删除掉。

发布订阅模式

发布者将消息发布到队列,队列对消息进行分类,不同的消费者消费不同类别队列中的消息,接收消息后也不会删除队列中的消息。

4.结构

Consumer Group(CG)

消费者组。消费者组内每个消费者负责消费不同分区的数据(如果组内有三个消费者,topic主题队列有三个分区,则每个消费者,消费不同的分区的队列中的消息)。消费者组之间互不影响。

Broker

一台Kafka服务器就是一个broker。一个kafka集群由多个broker组成。一个broker可以容纳多个topic。

Topic

可以理解为一个队列,生产者和消费者面向的都是topic。

Partition

为了实现扩展性,一个非常大的topic可以分布到多个broker上分为多个分区。

Replica

副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。
生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
每个分区多个副本中的Follower实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

二.集群部署

zookeeper(xy36)
kafka(9co5)
linux版本jdk8(5d0o)

# 第一步:集群部署zookeeper
========================================start========================================
# 启动zookeeper前需要使服务器拥有java环境
# 添加可执行权限
chmod +x jdk-8u151-linux-x64.rpm
# 安装RPM软件包
rpm -ivh jdk-8u151-linux-x64.rpm
java -version
# 修改zookeeper/bin/zkEnv.sh文件,添加内容:
JAVA_HOME="/opt/1.8.0_151"
# 启动集群中所有服务器的zookeeper
./zkServer.sh start
# 配置zookeeper环境变量
vi /etc/profile.d/zookeeper.sh
# 添加内容:
==========star==========
export ZOOKEEPER_HOME=/zookeeper目录
export PATH=$PATH:$ZOOKEEPER_HOME/bin
==========end==========
source /etc/profile
# 创建文件夹
mkdir /opt/apache-zookeeper/zkdata
# zkdata目录下创建myid文件,文件内容为数字012...
# 修改zookeeper配置文件zoo.cfg
==========start==========
dataDir=/opt/apache-zookeeper/zkdata
# 0,1,2对应的是myid文件中的0,1,2,否则报错!!!!!!!!!!!!!!!!!!!
server.0=192.168.1.1:2888:3888
server.1=192.168.1.2:2888:3888
server.2=192.168.1.3:2888:3888
==========end==========
# 每个zookeeper配置文件都相同,只是myid中的内容不同
# 启动
./zkServer.sh start
./zkServer.sh status
# 批量启动的脚本
==========start==========
for host in 192.168.1.1 192.168.1.2 192.168.1.3
do
  ssh $host "source /etc/profile;/root/apps/zookeeper/bin/zkServer.sh start"
done
==========end==========
========================================end========================================




# 第一步:集群部署kafka 
========================================start========================================
# 解压到/opt/module目录下
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
cd /opt/module
# 修改文件名为kafka
mv kafka_2.12-3.0.0/ kafka
# 修改server.properties文件
cd /opt/module/kafka/config
vim server.properties
==========start==========
(修改处一)
#broker 的全局唯一编号,集群内不能重复,只能是数字(跟zookeeper配置文件中的server0,server1对应吧,别他么又错了,又要看日志找问题)。
broker.id=0
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
(修改处二)
# kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
# segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
# 每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
(修改处三)
# 配置连接 Zookeeper 集群地址
zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
(修改处四)
# 配置监听器(监听各自ip的)
listeners=PLAINTEXT://192.168.159.1:9092
==========end==========
# 在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
sudo vim /etc/profile.d/my_env.sh
==========start==========
# KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
==========end==========
# 刷新环境变量
source /etc/profile
# 启动kafka
cd /opt/module/kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 关闭kafka
bin/kafka-server-stop.sh
========================================end========================================

集群启停脚本kf.sh

#! /bin/bash
case $1 in
"start"){
 for i in 192.168.159.10 192.168.159.11 192.168.159.13
 do
 echo " --------启动 $i Kafka-------"
 ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
 done
};;
"stop"){
 for i in 192.168.159.10 192.168.159.11 192.168.159.13
 do
 echo " --------停止 $i Kafka-------"
 ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
 done
};;
esac

启停命令

chmod +x kf.sh
kf.sh start
kf.sh stop

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程。

集群部署中遇到的问题 问题一:org.apache.kafka.common.KafkaException: Socket server failed to bind to 114.115.219.189:9092: Cannot assign requested address.

解决方案:
由于监听别的ip的9092端口导致的,修改kafka的server.properties文件:
listeners=PLAINTEXT://本机IP:9092

问题二:afka.common.InconsistentClusterIdException: The Cluster ID VLPFfY-WQkWmK7X9_C1aHQ doesn’t match stored clusterId Some(0TVA9OzOTzCGMTqY50sBKA) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.

当kafka异常关闭时,日志记录就会出现异常,会把当时的情况记录到meta.properties文件中,重新启动时此文件会对启动造成影响。
解决方案:
删除kafka日志目录下meta.properties文件

三.命令行操作

基础架构

1.topic
参数描述
–bootstrap-server 连接的 Kafka Broker 主机名称和端口号
–topic 操作的 topic 名称
–create创建主题
–delete删除主题
–alter修改主题
–list查看所有主题
–describe查看主题详细描述
–partitions 设置分区数
–replication-factor设置分区副本
–config 更新系统默认的配置
cd /opt/module/kafka/bin
# 查看操作主题命令参数
./kafka-topics.sh
# 查看当前服务器中的所有 topic
./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --list
# 查看指定topic的详情
./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --topic first --describe
# 创建test主题
./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --create --partitions 1 --replication-factor 3 --topic test
# 修改分区数(分区数只能增加,不能减少)
./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --alter --partitions 2 --topic test
# 删除test主题
./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --delete --topic test
2.producer
参数描述
–bootstrap-server 连接的 Kafka Broker 主机名称和端口号
–topic 操作的 topic 名称
# 查看操作生产者命令参数
./kafka-console-producer.sh
# 生产者发送消息
./kafka-console-producer.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --topic first
3.consumer
参数描述
–bootstrap-server 连接的 Kafka Broker 主机名称和端口号
–topic 操作的 topic 名称
–from-beginning从头开始消费
–group 指定消费者组名称
# 查看操作消费者命令参数
./kafka-console-consumer.sh
## 消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --topic first
# 把主题中所有的数据都读取出来(包括历史数据)
./kafka-console-consumer.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --from-beginning --topic first
遇到个无法解决的问题:第一次时producer发送消息,consumer可以接收到,关闭consumer后,consumer再接收数据,就无法接收到。 四.Kafka 生产者 1.生产者发送数据到kafka集群的流程

生产者发送流程
在消息发送的过程中,涉及到main 线程和 Sender 线程。main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

2.异步发送 创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker

pom.xml


	org.apache.kafka
	kafka-clients
	3.0.0

Producer1.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


public class Producer1 {

    public static void main(String[] args) {
        // 1.创建发送者对象
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer producer = new KafkaProducer(properties);
        // 2.发送数据到kafka集群
        for (int i = 0; i < 5; i++) {
            // producer.send(new ProducerRecord("topic1", "value:" + i));
            // 回调函数会在 producer 收到 ack 时调用,为异步调用(send()方法后加.get(),变为同步发送)
            producer.send(new ProducerRecord<>("topic1", "值:" + i), (metadata, exception) -> {
                        if (exception == null) {
                            System.out.println("主题:" + metadata.topic() + ",分区:" + metadata.partition());
                        }
                    }
            );
        }
        // 3.关闭资源
        producer.close();
    }

}

Kafka分区好处

合理使用存储资源。100T的数据通过三个分区存储,每个分区对应的服务器只需要33T。
提高并行度。生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

生产者发送消息时的默认分区策略

默认的分区器DefaultPartitioner

public class DefaultPartitioner implements Partitioner {
自定义分区器,取消使用默认分区器

需求:发送者发送的数据中包含name,则将该数据放到0分区,包含age,则将该数据放到1分区,其他放到2分区。

第一步:创建Partitioner接口的实现类

MyPartition.java

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;


public class MyPartition implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int partiton = 2;
        String content = value.toString();
        if (content != null){
            if (content.contains("name")){
                partiton = 0;
            }else if (content.contains("age")){
                partiton = 1;
            }
        }
        return partiton;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map configs) {

    }
}
第二步:发送者指定分区器

Producer1.java

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


public class Producer1 {

    public static void main(String[] args) {
        // 1.创建发送者对象
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value值为自定义分区器类的全类名
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "MyPartition");
        KafkaProducer producer = new KafkaProducer(properties);
        // 2.发送数据到kafka集群
        for (int i = 0; i < 5; i++) {
            //producer.send(new ProducerRecord("topic1", "value:" + i));
            // 回调函数会在 producer 收到 ack 时调用,为异步调用
            producer.send(new ProducerRecord<>("topic1", "b", "fdshjkfsdf:" + i), (metadata, exception) -> {
                        if (exception == null) {
                            System.out.println("主题:" + metadata.topic() + ",分区:" + metadata.partition());
                        }
                    }
            );
        }
        // 3.关闭资源
        producer.close();
    }

}

3.生产者提高吞吐量的方式

生产者如何提高吞吐量
设置缓冲区的大小
设置从缓存区发往broker的每批次的大小(batch.size)
设置从缓存区发往broker的等待时间的大小(linger.time)
设置压缩每批次发送的数据,使不改变每批次大小的条件下一次发送更多的数据
Producer1.java

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


public class Producer1 {

    public static void main(String[] args) {
        // 1.创建发送者对象
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value值为自定义分区器类的全类名
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "MyPartition");
        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
        KafkaProducer producer = new KafkaProducer(properties);
        // 2.发送数据到kafka集群
        for (int i = 0; i < 5; i++) {
            //producer.send(new ProducerRecord("topic1", "value:" + i));
            // 回调函数会在 producer 收到 ack 时调用,为异步调用
            producer.send(new ProducerRecord<>("topic1", "b", "fdshjkfsdf:" + i), (metadata, exception) -> {
                        if (exception == null) {
                            System.out.println("主题:" + metadata.topic() + ",分区:" + metadata.partition());
                        }
                    }
            );
        }
        // 3.关闭资源
        producer.close();
    }

}
4.发送的数据的可靠性

生产者将数据通过broker发送给消费者时,可能会发生相关问题。
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

ack应答级别 acks=0

生产者发送完数据后就应答。

acks=1

生产者发送过来的数据,Leader收到数据后应答。

acks=-1(all)

生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。

设置ack与重试次数:

// 设置 acks
 properties.put(ProducerConfig.ACKS_CONFIG, "all");
 // 重试次数 retries,默认是 int 最大值,2147483647
 properties.put(ProducerConfig.RETRIES_CONFIG, 3);

问题一:
可能会出现follower同步leader的数据时,follower挂掉了,导致一直没有应答,但是leader和其他follower是已经接收到数据了的。
解决办法:
Leader维护了一个动态的ISR,意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
如果Follower长时间(默认30s)未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR,这样就不用等长期联系不上或者已经故障的节点。
问题二:
如果分区副本设置为1个,或者ISR里应答的最小副本数量设置为1,即leader:0,isr:0,这样和和ack=1的效果是一样的,仍然有丢数的风险。
结论:
数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2(副本数量=leader+follower的数量)

5.数据去重

生产者发送数据时,leader和follower都接收到了,但是可能会出现leader挂掉了,就没有给producer应答,此时某个follower会成为新的leader,并且这个新的leader中已经含有了生产者发送的数据,但是由于之前没有给producer应答,producer会再次发送该数据给leader,但是leader已经有该数据了,导致发送的数据重复。

至少一次= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
精确一次:幂等性和事务 + 至少一次

幂等性

幂等性是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
开启参数 enable.idempotence 即开启幂等性。默认为 true,false 关闭。
重复数据的判断标准:具有相同主键的消息提交时,Broker只会持久化一条。
PID:Kafka每次重启都会分配一个新的PID
Partition :分区号
Sequence Number:单调自增的数字。
幂等性只能保证的是在单会话、单分区、单主题内不重复。

生产者事务

事务解决了kafka服务器不小心关掉再重启后可能导致的数据重复问题。
开启事务,必须开启幂等性。
事务工作原理
PS:这工作原理听一遍很难记住啊!

kafka事务的API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
代码

使用步骤:

properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionid1");
// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();
try {
    // 2.发送数据到kafka集群
    producer.send();...............................
    // 提交事务
    producer.commitTransaction();
} catch (Exception e) {
    // 终止事务
    producer.abortTransaction();
} finally {
    // 3.关闭资源
    producer.close();
}

Producer1.java

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


public class Producer1 {

    public static void main(String[] args) {

        // 1.创建发送者对象
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.130:9092,192.168.159.131:9092,192.168.159.133:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value值为自定义分区器类的全类名
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "MyPartition");
        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionid1");
        KafkaProducer producer = new KafkaProducer(properties);
        // 设置事务 id(必须),事务 id 任意起名
        // 初始化事务
        producer.initTransactions();
        // 开启事务
        producer.beginTransaction();

        try {
            // 2.发送数据到kafka集群
            for (int i = 0; i < 5; i++) {
                //producer.send(new ProducerRecord("topic1", "value:" + i));
                // 回调函数会在 producer 收到 ack 时调用,为异步调用
                producer.send(new ProducerRecord<>("topic1", "b", "trans:" + i), (metadata, exception) -> {
                            if (exception == null) {
                                System.out.println("主题:" + metadata.topic() + ",分区:" + metadata.partition());
                            }
                        }
                );
            }
            // 提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            // 终止事务
            producer.abortTransaction();
        } finally {
            // 3.关闭资源
            producer.close();
        }
    }

}
6.数据有序

1)kafka在1.x版本之前保证数据单分区有序,条件为:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)
2)kafka在1.x及以后版本保证数据单分区有序,条件为:
开启幂等性,max.in.flight.requests.per.connection需要设置小于等于5即可;
或者未开启幂等性,max.in.flight.requests.per.connection设置为1
因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,无论如何都可以保证最近5个request的数据都是有序的。

五.Kafka Broker 1.Zookeeper 存储的 Kafka 信息有哪些

prettyZoo

2. Kafka Broker 总体工作流程

总体工作流程

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771980.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号