flink 采用批量 async io 方式写入hbase
一条一条数据写入hbase太慢了,故采用批量的方式,每2000条数据一个批量写入hbase,提高写入性能
设置一个三秒的翻滚窗口, 将数据聚合在一起, 然后批量的方式, Async IO 异步写入hbase
val RunDataDS: DataStream[FdcData[RunData]] = getDatas()
class RunDataProcessWindowFunction extends ProcessWindowFunction[RunData, List[RunData], String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[RunData], out: Collector[List[RunData]]): Unit = {
var res: ListBuffer[RunData] = new ListBuffer()
for (in <- input) {
res += in
}
out.collect(res.toList)
}
}
// 设置翻滚窗口,聚合数据
val alarmRuleResultStream: DataStream[List[RunData]] = RunDataDS
.map(_.datas)
.keyBy(_.toolName)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.process(new RunDataProcessWindowFunction())
// 异步IO 写入hbase
AsyncDataStream.orderedWait(
alarmRuleResultStream,
new TestDataHbaseSink(ProjectConfig.Hbase_RUNDATA_TABLE),
6000,
TimeUnit.MILLISECONDS,
100)
.name("Hbase Sink")
自定义sink: TestDataHbaseSink 继承 RichAsyncFunction
val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName))
批量写入hbase
import com.hzw.fdc.scalabean.{RunData, RunEventData}
import com.hzw.fdc.util.{ExceptionInfo, ProjectConfig}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
import org.apache.hadoop.hbase.client.{BufferedMutator, Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HbaseConfiguration, TableName}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
class TestDataHbaseSink(tableName: String) extends RichAsyncFunction[List[RunData], String] {
var connection: Connection = _
private val logger: Logger = LoggerFactory.getLogger(classOf[WindowEndRunDataHbaseSink])
override def open(parameters: Configuration): Unit = {
// 获取全局配置变量
val parameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
ProjectConfig.getConfig(parameters)
//创建hbase连接
val conf = HbaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", ProjectConfig.Hbase_ZOOKEEPER_QUORUM)
conf.set("hbase.zookeeper.property.clientPort", ProjectConfig.Hbase_ZOOKEEPER_PROPERTY_CLIENTPORT)
connection = ConnectionFactory.createConnection(conf)
}
override def asyncInvoke(runEventDataList: List[RunData], resultFuture: ResultFuture[String]): Unit = {
val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName))
val puts: ListBuffer[Put] = new ListBuffer()
var count = 0
try {
for (runStartEvent <- runEventDataList) {
try {
val runid = s"${runStartEvent.toolName}--${runStartEvent.chamberName}--${runStartEvent.runStartTime}"
val key = s"${runStartEvent.toolName}_${runStartEvent.chamberName}".hashCode % 10
val put = new Put(Bytes.toBytes(s"${key}_${runStartEvent.toolName}_${runStartEvent.chamberName}_${runStartEvent.runStartTime}"))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_ID"), Bytes.toBytes(runid))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("TOOL_NAME"), Bytes.toBytes(runStartEvent.toolName))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("CHAMBER_NAME"), Bytes.toBytes(runStartEvent.chamberName))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_START_TIME"), Bytes.toBytes(runStartEvent.runStartTime))
puts.append(put)
count = count + 1
if (count % 2000 == 0) {
table.mutate(puts.asJava)
puts.clear()
count = 0
}
}catch {
case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)} data: $runStartEvent")
}
}
table.mutate(puts.asJava)
}catch {
case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)}")
}finally {
table.close()
}
}
override def close(): Unit = {
connection.close()
super.close()
}
def hasLength(str: String): String = {
if (str != null) {
str
} else {
""
}
}
}



