在Flink流式程序设计中,经常需要与外部系统进行交互,很多时候外部系统的性能会成为任务整体吞吐的瓶颈,通常的解决方案会通过提高任务并发度增加对外部系统并发访问,如此会带来Flink额外的资源管理负载以及整体cpu利用率不高的问题。
对于Flink与外部存储交互的场景,可以通过Flink 异步IO和单并发度多线程的机制提高任务吞吐能力,而不需要提高任务并发度从而提升整体资源利用率。
一 Flink异步IO
对于Flink程序,通常的交互实现为同步请求,即发送一个请求,直到收到响应,继续处理,很多情况下这种等待占据了函数的绝大多数时间,当外部系统出现性能瓶颈会大幅降低任务的吞吐能力。Flink提供了异步IO机制,可以实现发送请求以后,不用等待结果返回继续发送下一个请求,对于查询结果是异步返回的,返回结果之后会自动进入下一个算子的计算,从而避免外部系统性能对整个计算任务的影响,可以提高整体吞吐和资源利用率。
示例代码:
public class AsyncHbase extends RichAsyncFunction{ private transient HbaseClient client; private transient ExecutorService executorService; @Override public void asyncInvoke(String input, ResultFuture resultFuture) throws Exception { try { executorService.submit(() -> { // submit query ObjectMapper mapper=new ObjectMapper(); String imei = null; try { imei = (String)mapper.readValue(input, HashMap.class).get("user_id"); } catch (IOException e) { e.printStackTrace(); } String user = client.query(imei); //System.out.println("----------"+user); resultFuture.complete(Collections.singletonList(user)); }); } catch (Exception e) { //log.error("get from redis fail", e); throw new RuntimeException("get from mysql fail", e); } } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); client = new HbaseClient(); client.init("hdp_teu_dpd:hdp_teu_dpd_flink_iotest"); //线程池大小 executorService = Executors.newFixedThreadPool(30); } //异步客户端 @Override public void close() throws Exception { super.close(); executorService.shutdown(); } }
public class HbaseClient {
private static Configuration conf =null;
private static final String ZKconnect="10.162.12.102:2181";
public void init (String tableName) throws IOException {
conf= HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", ZKconnect);
htable=new HTable(conf, Bytes.toBytes(tableName));
}
private HTable htable;
public String query(String id) {
String user = "";
try {
Get get=new Get(Bytes.toBytes(id));
get.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("user"));
Result result=htable.get(get);
byte[] resByte = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("user"));
user = Bytes.toString(resByte);
//flink record不能为null
if(user == null){
user = "";
}
} catch (Exception e) {
e.printStackTrace();
}
return user;
}
}
Flink 异步IO的实现需要外部系统支持异步client,对于不支持异步client的系统,可以采用多线程机制替代实现。
二 单并发度多线程
Flink 异步IO主要针对从外部系统读取数据,对于写数据的场景,可以在Sink端实现多线程的方式
示例代码:
public class SinkToMySQLMultiThread extends RichSinkFunction implements CheckpointedFunction {
private Logger LOG = LoggerFactory.getLogger(SinkToMySQLMultiThread.class);
// Client 线程的默认数量
private final int DEFAULT_CLIENT_THREAD_NUM = 5;
// 数据缓冲队列的默认容量
private final int DEFAULT_QUEUE_CAPACITY = 5000;
private linkedBlockingQueue bufferQueue;
private CyclicBarrier clientBarrier;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// new 一个容量为 DEFAULT_CLIENT_THREAD_NUM 的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,
0L, TimeUnit.MILLISECONDS, new linkedBlockingQueue<>());
// new 一个容量为 DEFAULT_QUEUE_CAPACITY 的数据缓冲队列
this.bufferQueue = Queues.newlinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);
// barrier 需要拦截 (DEFAULT_CLIENT_THREAD_NUM + 1) 个线程
this.clientBarrier = new CyclicBarrier(DEFAULT_CLIENT_THREAD_NUM + 1);
// 创建并开启消费者线程
MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue, clientBarrier);
for (int i = 0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {
threadPoolExecutor.execute(consumerClient);
}
}
@Override
public void invoke(Data value, Context context) throws Exception {
// 往 bufferQueue 的队尾添加数据
bufferQueue.put(value);
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
LOG.info("snapshotState : 所有的 client 准备 flush !!!");
// barrier 开始等待
clientBarrier.await();
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
}
}
public class MultiThreadConsumerClient implements Runnable {
private Logger LOG = LoggerFactory.getLogger(MultiThreadConsumerClient.class);
private static String jdbcUrl = "jdbc:mysql://nightfury-test.db.58dns.org:23832/dbwww58com_nightfury?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadonly=false";
private static String username = "nightfury_user";
private static String password = "6d436b6156ebe38b";
private static String driver = "com.mysql.jdbc.Driver";
private linkedBlockingQueue bufferQueue;
private CyclicBarrier barrier;
private Connection connection = null;
private PreparedStatement ps = null;
public MultiThreadConsumerClient(
linkedBlockingQueue bufferQueue, CyclicBarrier barrier) {
this.bufferQueue = bufferQueue;
this.barrier = barrier;
}
@Override
public void run() {
try {
//1.加载驱动
Class.forName(driver);
//2.创建连接
connection = DriverManager.getConnection(jdbcUrl, username, password);
String sql = "insert into mutiflinktest(id,behavior,category_id,item_id)values(?,?,?,?);";
//3.获得执行语句
ps = connection.prepareStatement(sql);
int batchSize = 0;
Data entity;
while (true) {
// 从 bufferQueue 的队首消费数据,并设置 timeout
entity = bufferQueue.poll(50, TimeUnit.MILLISECONDS);
// entity != null 表示 bufferQueue 有数据
if (entity != null) {
System.out.println(Thread.currentThread().getName());
System.out.println(batchSize);
// 执行 client 消费数据的逻辑
dobatch(entity);
batchSize++;
if (batchSize > 5) {
ps.executeBatch();
batchSize = 0;
}
} else {
// entity == null 表示 bufferQueue 中已经没有数据了,
// 且 barrier wait 大于 0 表示当前正在执行 Checkpoint,
// client 需要执行 flush,保证 Checkpoint 之前的数据都消费完成
//System.out.println(barrier.getNumberWaiting());
if (barrier.getNumberWaiting() > 0) {
LOG.info("MultiThreadConsumerClient 执行 flush, " + "当前 wait 的线程数:" + barrier.getNumberWaiting());
// client 执行 flush 操作,防止丢数据
ps.executeBatch();
batchSize = 0;
barrier.await();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// client 消费数据的逻辑
private void dobatch(Data entity) throws Exception {
//4.批量插入
ps.setInt(1, entity.getUser_id());
ps.setString(2, entity.getBehavior());
ps.setString(3, entity.getCategory_id());
ps.setString(4, entity.getItem_id());
ps.addBatch();
}
public class MultiThreadMysqlFlink {
public static void main(String args[]) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "xxx:9092");
props.setProperty("zookeeper.connect", "xxxx:2181/58_kafka_cluster");
props.setProperty("group.id", "flink-kafka_sync_v1");
env.setParallelism(1);
env.enableCheckpointing(1000*60*3);
FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011("hdp_teu_dpd_flink", new SimpleStringSchema(), props);
DataStream stream = env.addSource(consumer).map(new RichMapFunction() {
@Override
public Data map(String input) throws Exception {
Data data = new Data();
int user_id = 1;
String item_id ="";
String category_id = "";
String behavior = "" ;
data.setUser_id(user_id);
data.setBehavior(behavior);
data.setCategory_id(category_id);
data.setItem_id(item_id);
return data;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
});
stream.addSink(new SinkToMySQLMultiThread());
// resultStream.print();
env.execute("MultiThreadMysqlFlink");
}
}



