2 消息生产者org.springframework.kafka spring-kafka2.1.0.RELEASE org.apache.flink flink-connector-kafka_2.111.10.0 org.apache.flink flink-java1.10.0 org.apache.flink flink-streaming-java_2.111.10.0 org.apache.flink flink-clients_2.111.10.0 joda-time joda-time2.8.1 com.alibaba fastjson1.2.73 org.projectlombok lomboktrue mysql mysql-connector-java5.1.34 org.apache.commons commons-dbcp22.1.1
import com.alibaba.fastjson.JSON;
import com.quwan.domain.MessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.stereotype.Component;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Slf4j
@Component
public class MessageProducter {
private static Properties getProps(){
Properties props = new Properties();
props.put("bootstrap.servers", "10.112.192.96:9092");
props.put("acks", "all"); // 发送所有ISR
props.put("retries", Integer.MAX_VALUE); // 重试次数
props.put("batch.size", 16384); // 批量发送大小
props.put("buffer.memory", 102400); // 缓存大小,根据本机内存大小配置
props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送
props.put("client.id", "producer-syn-1"); // 发送端id,便于统计
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static void main(String[] args) throws Exception {
KafkaProducer producer = new KafkaProducer<>(getProps());
for (int i=30;i<=100;i++){
MessageEntity message = new MessageEntity();
message.setMessage("第"+i+"条:message");
message.setTotalDate(DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
message.setMessageId(System.currentTimeMillis());
log.info(JSON.toJSonString(message));
ProducerRecord record = new ProducerRecord<>("flink_test_01",
System.currentTimeMillis()+"", JSON.toJSonString(message));
Future metadataFuture = producer.send(record);
Recordmetadata recordmetadata;
try {
recordmetadata = metadataFuture.get();
log.info("发送成功!");
log.info("topic:"+recordmetadata.topic());
log.info("partition:"+recordmetadata.partition());
log.info("offset:"+recordmetadata.offset());
} catch (InterruptedException|ExecutionException e) {
System.out.println("发送失败!");
e.printStackTrace();
}
Thread.sleep(10000);
}
}
}
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink_test_01 --from-beginning3 从kafka中读取数据,转成Message实体来写入
public class KafkaToMysql {
public static void main(String[] args) throws Exception{
// 构建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定checkpoint的触发间隔
env.enableCheckpointing(5000);
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.112.192.96:9092");
properties.setProperty("group.id", "flink_consumer_01");
properties.setProperty("auto.offset.reset", "earliest");
SingleOutputStreamOperator streamOperator = env.addSource(
new FlinkKafkaConsumer<>("flink_test_01",
new MessageDeSerializationSchema(), properties)).setParallelism(1)
.map(record -> {
MessageEntity message = null;
try {
System.out.println("consumerRecord"+record.value());
message = JSON.parseObject(record.value(), MessageEntity.class);
System.out.println(message.toString());
}catch (Exception e){
System.out.println("格式问题:"+ record);
}
return message;
});
SingleOutputStreamOperator> process = streamOperator.timeWindowAll(Time.seconds(10))
.process(new AllWindownFunction1());
process.addSink(new SinkToMysql()).setParallelism(1).name("flink_test_01");
env.execute("consumer start");
}
public static class AllWindownFunction1 extends ProcessAllWindowFunction, TimeWindow> {
@Override
public void process(ProcessAllWindowFunction,
TimeWindow>.Context context, Iterable iterable,
Collector> collector) throws Exception {
if (iterable != null){
System.out.println("非空集合"+iterable);
ArrayList list = Lists.newArrayList(iterable);
if (list.size() > 0) {
System.out.println("10s数据条数:" + list.size());
collector.collect(list);
}
}else {
System.out.println("空集合");
}
}
}
}
@Data
public class MessageEntity {
private Long messageId;
private String message;
private String totalDate;
}
4 mysqlSink类
import com.quwan.domain.MessageEntity; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.List; public class SinkToMysql extends RichSinkFunction> { PreparedStatement ps; BasicDataSource dataSource; private Connection connection; @Override public void open(Configuration parameters) throws Exception { System.out.println("1111111111111"); dataSource = new BasicDataSource(); connection = getConnection(dataSource); String sql = "insert into flink_test_01(message_id, message, total_date) values(?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //关闭连接和释放资源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } @Override public void invoke(List
value, Context context) throws Exception { //遍历数据集合 for (MessageEntity message : value) { ps.setLong(1, message.getMessageId()); ps.setString(2, message.getMessage()); System.out.println("message--------------------------:"+message.getMessage()); ps.setString(3, message.getTotalDate()); ps.addBatch(); } int[] count = ps.executeBatch();//批量后执行 System.out.println("成功了插入了" + count.length + "行数据"); } private static Connection getConnection(BasicDataSource dataSource) { dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUrl("jdbc:mysql://10.112.30.216:3306/hela?useUnicode=true&characterEncoding=utf8"); dataSource.setUsername("root"); dataSource.setPassword("123456"); dataSource.setInitialSize(10); dataSource.setMaxTotal(50); dataSource.setMinIdle(2); Connection con = null; try { con = dataSource.getConnection(); } catch (Exception e) { System.out.println("msg:" + e.getMessage()); } return con; } }
运行效果:



