记一次flink踩坑教训
上代码
public class FlinkKafkaConsumer1 {
public static void main(String[] args) throws Exception{
//1.获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.创建消费者
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"需要连接的hadoopIP:9092");
// properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties);
//3.消费者与对应的flink流关联
DataStreamSource dataStream = env.addSource(kafkaConsumer);
SingleOutputStreamOperator> dataSource = dataStream.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] file = value.split(",");
for ( String word : file ) {
out.collect(new Tuple2<>(word, 1));
}
}
})
.keyBy(0)
.sum(1);
//连接数据库
dataSource.addSink(new MyJDBCSink());
dataSource.print();
//4.执行操作
env.execute();
}
private static class MyJDBCSink extends RichSinkFunction> {
//定义sql连接、预编译器
Connection conn = null;
PreparedStatement insertStmt = null;
PreparedStatement updateStmt = null;
//初始化 、 创建连接 、 和 预编译语句
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://IP:端口号/数据库名","数据库的账号","数据库的密码");
insertStmt = conn.prepareStatement("INSERT INTO test_yuluo (word, number) VALUES (?, ?)");
updateStmt = conn.prepareStatement("UPDATE test_yuluo SET word = ? WHERe number = ? ");
}
// 执行更新语句,注意不要留 super
@Override
public void invoke(Tuple2 value, Context context) throws Exception {
updateStmt.setString(1, value.f0);
updateStmt.setInt(2,value.f1);
updateStmt.execute();
// 如果刚才 update 语句没有更新,那么插入
if ( updateStmt.getUpdateCount() == 0 ) {
insertStmt.setString(1, value.f0);
insertStmt.setInt(2,value.f1);
insertStmt.execute();
}
}
@Override
public void close() throws Exception {
insertStmt.close();
updateStmt.close();
conn.close();
}
}
}
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,“需要连接的hadoopIP:9092”);
conn = DriverManager.getConnection(“jdbc:mysql://IP:端口号/数据库名”,“数据库的账号”,“数据库的密码”);
注意:如上位置应该替换成自己项目的
我是连接的服务器,项目在本地 ,需要在服务器中先在kafka的bin目录中启动,
./kafka-console-producer.sh --broker-list 需要连接的hadoopIP:9092 --topic 项目中kafka的topic
然后启动本地的项目就报错了,错误显示如下
java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset
然后我看了本地的hadoop版本,下载了一个hadoop包在本地,配置了环境变量和path,重新启动idea,重新启动项目,还是报这个错误,(本人电脑是Windows10 专业版),抱着试一试的态度,重启电脑,重启服务和项目,嗯 这个错误消失了,就感觉很离谱,报了另一个错误,如下
原因是下载的Hadoop中没有winutils.exe 文件,又找了这个文件放在hadoop的bin目录下,重新启动,嗯,项目已经跑通了
虽然是个小的demo,但是一个很小的bug 也导致自己耽搁了不少时间,仅此记录一下.
我把我用的版本的压缩包上传到网盘了,需要的自己取吧,里面的bin目录是3.1.0版本的,自己本地配置的就是这一套,省的你们自己在找了,直接下载解压配置环境变量就可以
环境变量配置就不说了,不会的就自己百度,环境变量把配置完之后记得把bin目录底下的 这两个文件扔到
C:WindowsSystem32 目录底下即可,希望大家都可以少走弯路少踩坑.
hadoop3.1.3压缩包



