栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink消费kafka写到mysql

flink消费kafka写到mysql

main方法,消费kafka并sink到自定义实习类的mysql中

import akka.japi.tuple.Tuple4;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.Serializable;
import java.util.Properties;



public class KafkaToMysql implements Serializable {

	//定义内部类,和需要写入的表结构一致
    static class user {

        final String name;
        final String gender;
        final String phoneNumber;
        final Integer age;

        public user(String name, Integer age, String gender, String phoneNumber) {
            this.name = name;
            this.age = age;
            this.gender = gender;
            this.phoneNumber = phoneNumber;
        }


    }

    public static void main(String[] args) throws Exception {

		//kafka相关配置
        String topic = "mykafka";
        Properties kafkaConf = new Properties();
        kafkaConf.put(ConsumerConfig.GROUP_ID_CONFIG,"kafkaTest1");
        kafkaConf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.0.27.21:9092");
        kafkaConf.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        kafkaConf.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        kafkaConf.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);
        kafkaConf.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        kafkaConf.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        kafkaConf.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaConf.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //获取流执行环境
        StreamExecutionEnvironment envs = StreamExecutionEnvironment.getExecutionEnvironment();

		//添加kafka source
        DataStreamSource mykafka = envs.addSource(new FlinkKafkaConsumer(topic, new SimpleStringSchema(), kafkaConf));
		//打印数据
        mykafka.print();
         */

		//内部类转换数据格式
		//如果直接通过流返回Tuple4则会报错,当涉及Java泛型时,lambda方法不能为自动类型提取提供足够的信息
		
        DataStream> stream = mykafka.map(new MapFunction>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple4 map(String value) throws Exception {
                String[] strings = value.split(",");
                return new Tuple4(strings[0],Integer.parseInt(strings[1]),strings[2],strings[3]);
            }
        });    
        stream.addSink(new MysqlImpl());
        envs.execute();
    }
}

mysql Sink 实现类

import akka.japi.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;



public class MysqlImpl extends RichSinkFunction> {
        private Connection connection;
        private PreparedStatement preparedStatement;
        String username = "root";
        String password = "12345678";
        String drivername = "com.mysql.jdbc.Driver";   //配置改成自己的配置
        String dburl = "jdbc:mysql://localhost:3306/test_local";

    @Override
    public void invoke(Tuple4 value) throws Exception {
        //SinkFunction.super.invoke(value);
        Class.forName(drivername);
        connection = DriverManager.getConnection(dburl, username, password);
        String sql = "insert into user (name ,age,gender,phone_number) values(?,?,?,?)"; //假设mysql 有4列 name ,age,gender,phone_number
        preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(2, value.t1());
        preparedStatement.setInt(1, value.t2());
        preparedStatement.setString(3, value.t3());
        preparedStatement.setString(3, value.t4());
        preparedStatement.executeUpdate();
        if (preparedStatement != null) {
            preparedStatement.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    
    @Override
    public void invoke(Tuple4 value, Context context) throws Exception {
        //SinkFunction.super.invoke(value, context);
        Class.forName(drivername);
        connection = DriverManager.getConnection(dburl, username, password);
        String sql = "insert into user (name ,age,gender,phone_number) values(?,?,?,?)"; //假设mysql 有3列 id,num,price
        preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(1, value.t1());
        preparedStatement.setInt(2, value.t2());
        preparedStatement.setString(3, value.t3());
        preparedStatement.setString(4, value.t4());
        preparedStatement.execute();
        if (preparedStatement != null) {
            preparedStatement.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780259.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号