- 环境
- 解析
- 源码下载
| 组件 | 版本 |
|---|---|
| scala | 2.12 |
| netcat | * |
| flink | 1.13.3 |
自定义Filter函数,继承 flink 的 FilterFunction 方法
package com.z.demo.filter
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.functions.FilterFunction
object NullFilter extends FilterFunction[String]{
override def filter(value: String): Boolean = {
//判断是否为null,或者空字符串""
if(StringUtils.isEmpty(value)){
false //false 的数据不传递到下一个算子
}else{
true
}
}
}
具体使用如下
val resultStream = inputStream
.filter(NullFilter)//filter算子,使用自定义的Filter类,过滤为空数据
...
...
源码下载
https://download.csdn.net/download/sinat_25528181/44038825



