1 scala的基本依赖设置
2 场景测试4.0.0 com.ssjt ods_flink1.0-SNAPSHOT 2.11 1.11.1 2.11 provided org.apache.flink flink-scala_${scala.version}${flink.version} ${scope.type} org.apache.flink flink-streaming-scala_${scala.version}${flink.version} ${scope.type} org.apache.flink flink-clients_${scala.version}${flink.version} ${scope.type} org.apache.flink flink-table-api-scala-bridge_${scala.version}${flink.version} ${scope.type} org.apache.flink flink-table-planner-blink_${scala.version}${flink.version} ${scope.type} org.apache.flink flink-table-planner_${scala.version}${flink.version} ${scope.type} org.apache.flink flink-json${flink.version} ${scope.type} org.apache.flink flink-parquet_${scala.version}${flink.version} ${scope.type} org.apache.flink flink-connector-kafka_${scala.version}${flink.version} ${scope.type} org.apache.flink flink-connector-hbase_${hbase.version}${flink.version} org.apache.flink flink-metrics-prometheus_${scala.version}${flink.version} ods_flink src/main/scala src/main/resource **/*.* org.scala-tools maven-scala-plugincompile testCompile ${scala.version}.8 -target:jvm-1.8org.apache.maven.plugins maven-resources-plugin3.0.2 org.apache.maven.plugins maven-compiler-plugin8 8 org.apache.maven.plugins maven-shade-plugin3.1.0 false package shade com.ssjt reference.conf *:*:*:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA
1 写出到文件系统
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object FileTest {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60 * 1000) // 必须设定
val tableEnv = StreamTableEnvironment.create(env)
// 连接数据源
val CreateSourceTable =
"""
|CREATE TABLE sourceTable (
| `iid` STRING COMMENT '',
| `local_time` AS LOCALTIMESTAMP
| )
| WITH (
| 'connector' = 'datagen',
| 'rows-per-second'='10000',
| 'fields.iid.length'='5',
| 'timeZone'='Asia/Shanghai'
|)
|""".stripMargin
tableEnv.executeSql(CreateSourceTable)
// 连接文件系统
val CreateFileTable =
"""
|CREATE TABLE test_fs_table_2 (
| iid STRING,
| local_time TIMESTAMP,
| dt STRING,
| dh STRING,
| dm STRING
|) PARTITIonED BY (dt,dh,dm) WITH (
| 'connector'='filesystem',
| 'path'='D:companyprojectods_flinkdataf_test',
| 'format'='json',
| 'sink.rolling-policy.file-size' = '1MB',
| 'sink.partition-commit.delay'='1 h',
| 'timeZone'='Asia/Shanghai',
| 'sink.partition-commit.policy.kind'='success-file'
|)
|""".stripMargin
tableEnv.executeSql(CreateFileTable)
// 数据写出到文件系统
val sql =
"""
|INSERT INTO test_fs_table_2
|SELECt
| iid,
| local_time,
| DATE_FORMAT(local_time,'yyyyMMdd'),
| DATE_FORMAT(local_time,'HH'),
| DATE_FORMAT(local_time,'mm')
|FROM sourceTable
|""".stripMargin
tableEnv.executeSql(sql)
}
}



