main方法,消费kafka并sink到自定义实习类的mysql中
import akka.japi.tuple.Tuple4;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.Serializable;
import java.util.Properties;
public class KafkaToMysql implements Serializable {
//定义内部类,和需要写入的表结构一致
static class user {
final String name;
final String gender;
final String phoneNumber;
final Integer age;
public user(String name, Integer age, String gender, String phoneNumber) {
this.name = name;
this.age = age;
this.gender = gender;
this.phoneNumber = phoneNumber;
}
}
public static void main(String[] args) throws Exception {
//kafka相关配置
String topic = "mykafka";
Properties kafkaConf = new Properties();
kafkaConf.put(ConsumerConfig.GROUP_ID_CONFIG,"kafkaTest1");
kafkaConf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.0.27.21:9092");
kafkaConf.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
kafkaConf.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
kafkaConf.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);
kafkaConf.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
kafkaConf.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
kafkaConf.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaConf.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//获取流执行环境
StreamExecutionEnvironment envs = StreamExecutionEnvironment.getExecutionEnvironment();
//添加kafka source
DataStreamSource mykafka = envs.addSource(new FlinkKafkaConsumer(topic, new SimpleStringSchema(), kafkaConf));
//打印数据
mykafka.print();
*/
//内部类转换数据格式
//如果直接通过流返回Tuple4则会报错,当涉及Java泛型时,lambda方法不能为自动类型提取提供足够的信息
DataStream> stream = mykafka.map(new MapFunction>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple4 map(String value) throws Exception {
String[] strings = value.split(",");
return new Tuple4(strings[0],Integer.parseInt(strings[1]),strings[2],strings[3]);
}
});
stream.addSink(new MysqlImpl());
envs.execute();
}
}
mysql Sink 实现类
import akka.japi.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class MysqlImpl extends RichSinkFunction> { private Connection connection; private PreparedStatement preparedStatement; String username = "root"; String password = "12345678"; String drivername = "com.mysql.jdbc.Driver"; //配置改成自己的配置 String dburl = "jdbc:mysql://localhost:3306/test_local"; @Override public void invoke(Tuple4 value) throws Exception { //SinkFunction.super.invoke(value); Class.forName(drivername); connection = DriverManager.getConnection(dburl, username, password); String sql = "insert into user (name ,age,gender,phone_number) values(?,?,?,?)"; //假设mysql 有4列 name ,age,gender,phone_number preparedStatement = connection.prepareStatement(sql); preparedStatement.setString(2, value.t1()); preparedStatement.setInt(1, value.t2()); preparedStatement.setString(3, value.t3()); preparedStatement.setString(3, value.t4()); preparedStatement.executeUpdate(); if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } @Override public void invoke(Tuple4 value, Context context) throws Exception { //SinkFunction.super.invoke(value, context); Class.forName(drivername); connection = DriverManager.getConnection(dburl, username, password); String sql = "insert into user (name ,age,gender,phone_number) values(?,?,?,?)"; //假设mysql 有3列 id,num,price preparedStatement = connection.prepareStatement(sql); preparedStatement.setString(1, value.t1()); preparedStatement.setInt(2, value.t2()); preparedStatement.setString(3, value.t3()); preparedStatement.setString(4, value.t4()); preparedStatement.execute(); if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } }



