#!/usr/bin/python
# -*- coding: UTF-8 -*-
#测试kafka接收数据,通过pyflink,写入本地hdfs
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(2)
# 必须开启checkpoint,时间间隔为毫秒,否则不能输出数据
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
print("AAAAAAAAAA")
sourceKafkaDdl = """
create table sourceKafka(
agent_send_timestamp timestamp comment '客户端时间戳',
collector_recv_timestamp timestamp comment '采集服务器时间戳',
raw_message varchar comment '原始日志',
line_no bigint comment '行号-只有文件类存在',
ip varchar(255) comment 'ip地址',
source varchar(255) comment '路径-只有文件类存在',
logtype varchar(255) comment '类型',
hostname varchar(255) comment '主机名',
appname varchar(255) comment '应用系统标记',
file_id varchar(255) comment '文件唯一标识',
context_id bigint comment '标识',
raw_message_length bigint comment '字节长度'
)comment '从kafka中源源不断获取数据'
with(
'connector' = 'kafka',
'topic' = 'raw_message',
'properties.group.id' = 'rizhiyiTohdfsGroup1',
'properties.bootstrap.servers' = '*:9092,*:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
st_env.execute_sql(sourceKafkaDdl)
st_env.from_path("sourceKafka").print_schema()
#st_env.from_path("sourceKafka").to_pandas().get("id")
print("BBBBBBBBBBB, try sink table")
# 3. 创建 sink 表
st_env.execute_sql("""
CREATE TABLE sink_hdfs_test (
agent_send_timestamp timestamp,
collector_recv_timestamp timestamp,
raw_message varchar,
line_no bigint,
ip varchar(255),
source varchar(255),
logtype varchar(255),
hostname varchar(255),
appname varchar(255),
file_id varchar(255),
context_id bigint,
raw_message_length bigint
) WITH (
'connector' = 'filesystem',
'path' ='hdfs://master:9000/data/*t/testdata2',
'format' = 'csv'
)
""")
print("CCCCCCCCC")
resultQuery = st_env.sql_query("select * from sourceKafka where appname='system'")
#print("schemma:",resultQuery.get_schema())
#print("check data:",resultQuery.to_pandas())
print("DDDDDDDDDDD")
resultQuery.insert_into("sink_hdfs_test")
print("EEEEE")
st_env.execute("pyflink-kafka-hdfs")
参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830



