栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

seatunnel(海量数据处理工具)实现HDFS导入Clickhouse

seatunnel(海量数据处理工具)实现HDFS导入Clickhouse

文章目录

介绍快速开始案例1:HDFS导入Clickhouse

ref: https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/

介绍

seatunnel 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。

为什么需要seatunnel ?

让Spark的使用更简单,更高效。简化开发

特性

简单易用,灵活配置,无需开发模块化和插件化,易于扩展支持利用SQL做数据处理和聚合 快速开始

参见:https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/quick-start

案例1:HDFS导入Clickhouse

在HDFS中存储的日志格式如下, 是很常见的Nginx日志

10.41.1.28 github.com 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:03:09:32 +0800] "GET /InterestingLab/seatunnel HTTP/1.1" 200 0 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)" "196" "-" "mainpage" "443" "-" "172.16.181.129"

CK建表

CREATE TABLE default.access_log
(
    `ip` String,
    `hostname` String,
    `remote_addr` String,
    `datetime` DateTime,
    `status` UInt32
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(datetime)
ORDER BY datetime
SETTINGS index_granularity = 8192

仅需要编写一个seatunnel Pipeline的配置文件即可完成数据的导入config/hdfs2ck.conf

spark {
  spark.app.name = "seatunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
input {
    hdfs {
        path = "hdfs://hadoop104:9000/rowlog/accesslog"
        result_table_name = "access_log"
        format = "text"
    }
}
filter {
    # 使用正则解析原始日志
    grok {
        source_field = "raw_message"
        pattern = "%{IP:ha_ip}\s%{NOTSPACE:domain}\s%{IP:remote_addr}\s%{NUMBER:request_time}s\s"%{data:upstream_ip}"\s\[%{HTTPDATE:timestamp}\]\s"%{NOTSPACE:method}\s%{data:url}\s%{NOTSPACE:http_ver}"\s%{NUMBER:status}\s%{NUMBER:body_bytes_send}\s%{data:referer}\s%{NOTSPACE:cookie_info}\s"%{data:user_agent}"\s%{data:uid}\s%{data:session_id}\s"%{data:pool}"\s"%{data:tag2}"\s%{data:tag3}\s%{data:tag4}"
    }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # "yyyy/MM/dd HH:mm:ss"格式的数据
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy/MM/dd HH:mm:ss"
    }
    # 使用SQL筛选关注的字段,并对字段进行处理
    # 甚至可以通过过滤条件过滤掉不关心的数据
    sql {
        table_name = "access"
        sql = "select ha_ip as ip, domain as hostname, remote_addr, datetime, bigint(status) from access"
    }
}
output {
    clickhouse {
        host = "hadoop104:8123"
        database = "default"
        table = "access_log"
        fields = ["ip", "hostname", "remote_addr", "datetime", "status"]
        username = "default"
        password = ""
    }
}

运行seatunnel,即可将数据写入ClickHouse

./bin/start-seatunnel.sh --config config/hdfs2ck.conf -e client -m 'local[2]'

CK结果展示

hadoop104 :) select * from access_log;

┌─ip─────────┬─hostname───┬─remote_addr─────┬────────────datetime─┬─status─┐
│ 10.41.1.28 │ github.com │ 114.250.140.241 │ 2018-10-26 03:09:32 │    200 │
│ 10.41.1.29 │ github.com │ 114.250.140.241 │ 2018-10-26 03:09:32 │    200 │
│ 10.41.1.30 │ github.com │ 114.250.140.241 │ 2018-10-26 03:09:32 │    200 │
│ 10.41.1.31 │ github.com │ 114.250.140.241 │ 2018-10-26 03:09:32 │    200 │
│ 10.41.1.32 │ github.com │ 114.250.140.241 │ 2018-10-26 03:09:32 │    200 │
│ 10.41.1.33 │ github.com │ 114.250.140.241 │ 2018-10-26 03:09:32 │    200 │
└────────────┴────────────┴─────────────────┴─────────────────────┴────────┘
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/707621.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号