Flink自定义Filter方法
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
public class MyFilterDemo {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource lines = env.socketTextStream("localhost", 8888);
FilterFunction filterFunction = new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
return !value.startsWith("error");
}
};
SingleOutputStreamOperator filtered = lines.transform("MyFilter", TypeInformation.of(String.class), new MyStreamFilter<>(filterFunction));
filtered.print();
env.execute();
}
private static class MyStreamFilter extends AbstractUdfStreamOperator> implements OneInputStreamOperator {
public MyStreamFilter(FilterFunction userFunction) {
super(userFunction);
}
@Override
public void processElement(StreamRecord element) throws Exception {
I in = element.getValue();
boolean flag = userFunction.filter(in);
if (flag) {
//输出
output.collect(element);
}
}
}



