栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink从kafka读取数据并传到mysql数据库

flink从kafka读取数据并传到mysql数据库

记一次flink踩坑教训

上代码

public class FlinkKafkaConsumer1 {
    public static void main(String[] args) throws Exception{
        //1.获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.创建消费者
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"需要连接的hadoopIP:9092");
//        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties);
        //3.消费者与对应的flink流关联
        DataStreamSource dataStream = env.addSource(kafkaConsumer);
        SingleOutputStreamOperator> dataSource = dataStream.flatMap(new FlatMapFunction>() {

                    @Override
                    public void flatMap(String value, Collector> out) throws Exception {
                        String[] file = value.split(",");
                        for ( String word : file ) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .sum(1);

        //连接数据库
        dataSource.addSink(new MyJDBCSink());
        dataSource.print();
        //4.执行操作
        env.execute();
    }

    private static class MyJDBCSink extends RichSinkFunction> {
        //定义sql连接、预编译器
        Connection conn = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        //初始化 、 创建连接 、 和 预编译语句
        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://IP:端口号/数据库名","数据库的账号","数据库的密码");
            insertStmt = conn.prepareStatement("INSERT INTO test_yuluo (word, number) VALUES (?, ?)");
            updateStmt = conn.prepareStatement("UPDATE test_yuluo SET word = ? WHERe number = ? ");
        }

        // 执行更新语句,注意不要留 super
        @Override
        public void invoke(Tuple2 value, Context context) throws Exception {

            updateStmt.setString(1, value.f0);
            updateStmt.setInt(2,value.f1);
            updateStmt.execute();


            // 如果刚才 update 语句没有更新,那么插入
            if ( updateStmt.getUpdateCount() == 0 ) {
                insertStmt.setString(1, value.f0);
                insertStmt.setInt(2,value.f1);
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            conn.close();

        }
    }
}

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,“需要连接的hadoopIP:9092”);
conn = DriverManager.getConnection(“jdbc:mysql://IP:端口号/数据库名”,“数据库的账号”,“数据库的密码”);
注意:如上位置应该替换成自己项目的
我是连接的服务器,项目在本地 ,需要在服务器中先在kafka的bin目录中启动,
./kafka-console-producer.sh --broker-list 需要连接的hadoopIP:9092 --topic 项目中kafka的topic
然后启动本地的项目就报错了,错误显示如下
java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset
然后我看了本地的hadoop版本,下载了一个hadoop包在本地,配置了环境变量和path,重新启动idea,重新启动项目,还是报这个错误,(本人电脑是Windows10 专业版),抱着试一试的态度,重启电脑,重启服务和项目,嗯 这个错误消失了,就感觉很离谱,报了另一个错误,如下


原因是下载的Hadoop中没有winutils.exe 文件,又找了这个文件放在hadoop的bin目录下,重新启动,嗯,项目已经跑通了
虽然是个小的demo,但是一个很小的bug 也导致自己耽搁了不少时间,仅此记录一下.
我把我用的版本的压缩包上传到网盘了,需要的自己取吧,里面的bin目录是3.1.0版本的,自己本地配置的就是这一套,省的你们自己在找了,直接下载解压配置环境变量就可以
环境变量配置就不说了,不会的就自己百度,环境变量把配置完之后记得把bin目录底下的 这两个文件扔到
C:WindowsSystem32 目录底下即可,希望大家都可以少走弯路少踩坑.

hadoop3.1.3压缩包

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745703.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号