栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink基础系列23-Sink之JDBC

Flink基础系列23-Sink之JDBC

文章目录
  • 一.Sink之JDBC概述
  • 二.pom文件配置
  • 三.MySQL配置
  • 四.编写Java代码
  • 五.运行Flink程序查看数据
  • 参考:

一.Sink之JDBC概述

Flink的Sink支持的数据库:

Bahir中支持的数据库:

从上两图可以看到,Flink的Sink并支持类似MySQL的这种关系型数据库,那么如果我需要通过Flink连接MySQL,该如何操作呢?

这个时候我们可以使用Flink Sink的JDBC连接。

二.pom文件配置

此处,我本地MySQL版本是 8.0.19


    mysql
    mysql-connector-java
    8.0.19

三.MySQL配置

新建数据库及表

CREATE DATAbase `flink_test` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;

CREATE TABLE `sensor_temp` (
  `id` varchar(32) NOT NULL,
  `temp` double NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
四.编写Java代码
package org.flink.sink;

import org.flink.beans.SensorReading;
import org.example.SourceTest4_UDF;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;



public class SinkTest4_Jdbc {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件读取数据
//        DataStream inputStream = env.readTextFile("D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt");
//
//        // 转换成SensorReading类型
//        DataStream dataStream = inputStream.map(line -> {
//            String[] fields = line.split(",");
//            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
//        });

        DataStream dataStream = env.addSource(new SourceTest4_UDF.MySensorSource());

        dataStream.addSink(new MyJdbcSink());

        env.execute();
    }

    // 实现自定义的SinkFunction
    public static class MyJdbcSink extends RichSinkFunction {
        // 声明连接和预编译语句
        Connection connection = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
            insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
            updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
        }

        // 每来一条数据,调用连接,执行sql
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
            // 直接执行更新语句,如果没有更新那么就插入
            updateStmt.setDouble(1, value.getTemperature());
            updateStmt.setString(2, value.getId());
            updateStmt.execute();
            if( updateStmt.getUpdateCount() == 0 ){
                insertStmt.setString(1, value.getId());
                insertStmt.setDouble(2, value.getTemperature());
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            connection.close();
        }
    }
}
五.运行Flink程序查看数据
mysql> select * from sensor_temp;
+-----------+-------------------+
| id        | temp              |
+-----------+-------------------+
| sensor_3  | 65.31089123002162 |
| sensor_10 | 20.23454807781744 |
| sensor_4  | 79.87349739590283 |
| sensor_1  | 68.79742249825429 |
| sensor_2  |  44.1766638371653 |
| sensor_7  | 99.47000620947128 |
| sensor_8  |  68.7360059804266 |
| sensor_5  |  69.9135258264366 |
| sensor_6  | 38.85722751176939 |
| sensor_9  | 69.97758295030204 |
+-----------+-------------------+
10 rows in set (0.00 sec)

mysql>
参考:
  1. https://www.bilibili.com/video/BV1qy4y1q728
  2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354443.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号