读取canal-json的数据到hbase中
(1)导包
org.apache.flink flink-connector-hbase_2.111.11.2
(2)代码实现
object Demo10toHbase {
def main(args: Array[String]): Unit = {
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner() //使用blink的计划器
.inStreamingMode() //使用流模型
.build()
//窗口table 环境
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
// val configuration = new Configuration()
// //如果主键为null,自动删除
// configuration.setString("table.exec.sink.not-null-enforcer", "drop")
// configuration.setString("table.dynamic-table-options.enabled", "true")
// bsTableEnv.getConfig.addConfiguration(configuration)
bsTableEnv.executeSql(
"""
|
|CREATE TABLE canal_student (
| id STRING,
| name STRING,
| age BIGINT,
| gender STRING,
| clazz STRING
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'student.student',
| 'properties.bootstrap.servers' = 'master:9092',
| 'properties.group.id' = 'sdfg',
| 'format' = 'canal-json',
| 'scan.startup.mode' = 'earliest-offset',
| 'canal-json.ignore-parse-errors' = 'true'
|)
|
""".stripMargin)
bsTableEnv.executeSql(
"""
|
|CREATE TABLE hbase_sink (
| id STRING,
| info ROW,
| PRIMARY KEY (id) NOT ENFORCED
|) WITH (
| 'connector' = 'hbase-1.4',
| 'table-name' = 'student',
| 'zookeeper.quorum' = 'master:2181'
|)
|
""".stripMargin)
bsTableEnv.executeSql(
"""
|insert into hbase_sink
|select id,ROW(name,age,gender,clazz) as info from canal_student
|
""".stripMargin)
}
}



