- 1 采集系统介绍
- 2 采集系统搭建
- 2.1 配置
- 2.2 启动
- 3 提交 Connector
- 3.1 提交 Connector
- 3.2 Connector其他REST API
- 4 测试
1 采集系统介绍
对于数据的抽取通常会搭建专业的数据采集系统来完成各种源数据的抽取。
采集系统的执⾏流程如下:
搭建步骤如下:
- 配置Kafka-Connecter(kafka-to-hdfs)
- 部署采集系统
- 部署web前端
启动Kafka - Connector
先新建一个kafka-to-hdfs目录, 把lib目录复制到kafka-to-hdfs内, 目录结构如下图:
.
`-- kafka-to-hdfs
`-- lib
|-- kafka-to-hdfs.jar
|-- .....
在kafka ( 项目中需要上传到kafka容器中 ) 的安装目录新建 plugins目录把kafka-to-hdfs目录复制进去, 最终目录结构如下图:
. ( kafka目录 ) |-- LICENSE |-- NOTICE |-- bin |-- config |-- libs |-- logs |-- plugins | `-- kafka-to-hdfs | `-- lib | |-- kafka-to-hdfs.jar | |-- ...... |-- site-docs
在config目录 找到connect-distributed.properties, 修改4项目配置, 新增加一项目配置, 详细修改如下:
## key.converter 与 value.converter 默认值 # key.converter=org.apache.kafka.connect.json.JsonConverter # value.converter=org.apache.kafka.connect.json.JsonConverter ## 修改为: key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ## 是否开启json转换 # key.converter.schemas.enable=true # value.converter.schemas.enable=true ## 修改为: key.converter.schemas.enable=false value.converter.schemas.enable=false ## 新增配置, 指定自定义Connector的配置, 这里是以插件的方式, 这里使用的是绝对路径 plugin.path=/opt/kafka_2.11-2.2.0/plugins/kafka-to-hdfs
以上配置完成, 接下来是启动及测试
2.2 启动启动命令如下:
# 进行kafka安装目录, 执行下面命令启动 ./bin/connect-distributed.sh ./config/connect-distributed.properties # 启动过程中没有发现报错说明启动成功, 可以使用ctrl + c 结束, 然后后台启动 # 后台启动方式 nohup $KAFKA_HOME/bin/connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties >$KAFKA_HOME/logs/output_distributed 2>&1 &3 提交 Connector 3.1 提交 Connector
提交接口: http://localhost:8083/connectors
请求类型: POST
参数类型: JSON
{
"name": "visit_topic",
"config": {
"connector.class": "cn.itcast.bigdata.common.kafka_to_hdfs.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "visit_topic",
"hdfs.url": "hdfs://xc-online-hadoop:9000/",
"flush.size": "100",
"expression":"yyyy-MM-dd",
"hdfs.path": "/user/hive/external/data_course/course_visit_source/",
"name": "visit_topic"
}
}
3.2 Connector其他REST API
注意这里是本机测试, ip为: localhost, 你测试时以你的为准
查询所有 Connector:
# 接口 http://localhost:8083/connectors # 请求方试 GET # 参数 无
查询指定 Connector 状态
# 接口
http://localhost:8083/connectors/{name}/status
# 请求方试
GET
# 参数 - Path Val
指定Connector的名字
查询指定 Connector 配置参数
# 接口
http://localhost:8083/connectors/{name}/config
# 请求方试
GET
# 参数 - Path Val
指定Connector的名字
删除指定 Connector
# 接口
http://localhost:8083/connectors/{name}
# 请求方试
DELETE
# 参数 - Path Val
指定Connector的名字
4 测试
测试方式:
js埋点 --> 数据发送到采集服务器 --> 采集服务器数据发送kafka --> kafka-connect监听topic --> 数据持久化到hdfs
流程图如下:
效果如下图:
最终hdfs上保存js埋点获取的数据, 如下图所示:



