依赖
1.12.5 2.12 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-java_${scala.version}${flink.version} org.apache.flink flink-clients_${scala.version}${flink.version} com.hankcs hanlpportable-1.7.6
驯练
negDir和posDir是分别存放负面语料和正面语料的文件夹,里面的语料是txt文件
public class NlpTrain {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String negDir="E:\nlp\sample\neg";
String posDir="E:\nlp\sample\pos";
DataStream negData=env.readFile(FileRead.getDirTextInputFormat(negDir),negDir)
.assignTimestampsAndWatermarks(new WatermarkStrageWithTimestamp<>())
.filter(new FilterFunction() {
@Override
public boolean filter(String s) throws Exception {
return s.length()>0;
}
})
.map(new MapFunction() {
@Override
public TranObj map(String s) throws Exception {
return new TranObj("负向", s);
}
});
DataStream posData=env.readFile(FileRead.getDirTextInputFormat(posDir),posDir)
.assignTimestampsAndWatermarks(new WatermarkStrageWithTimestamp<>())
.filter(new FilterFunction() {
@Override
public boolean filter(String s) throws Exception {
return s.length()>0;
}
})
.map(new MapFunction() {
@Override
public TranObj map(String s) throws Exception {
return new TranObj("正向", s);
}
});;
negData.union(posData).keyBy(TranObj::getType)
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))
.aggregate(new AggregateFunction, Map>>() {
@Override
public List createAccumulator() {
return new ArrayList<>();
}
@Override
public List add(TranObj tranObj, List tranObjs) {
tranObjs.add(tranObj);
return tranObjs;
}
@Override
public Map> getResult(List tranObjs) {
List list=new ArrayList<>();
String key=tranObjs.get(0).getType();
for (TranObj obj : tranObjs){
list.add(obj.getSample());
}
Map> map=new HashMap<>();
map.put(key,list);
return map;
}
@Override
public List merge(List tranObjs, List acc1) {
tranObjs.addAll(acc1);
return tranObjs;
}
}).windowAll(EventTimeSessionWindows.withGap(Time.minutes(1)))
.reduce(new ReduceFunction
HanlpModel.java
public class HanlpModel {
public static void trainNaiveBayesModel(Map map, String path) {
IClassifier classifier = new NaiveBayesClassifier();
classifier.train(map);
NaiveBayesModel model = (NaiveBayesModel) classifier.getModel();
IOUtil.saveObjectTo(model, path);
}
public static NaiveBayesModel getNaiveBayesModel(String path){
return (NaiveBayesModel)IOUtil.readObjectFrom(path);
}
}
WatermarkStrageWithTimestamp.java
public class WatermarkStrageWithTimestampimplements WatermarkStrategy { @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new BoundedOutOfOrdernessWatermarks<>(Duration.ofSeconds(10)); } @Override public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) { return new TimestampAssigner () { @Override public long extractTimestamp(T t, long l) { return System.currentTimeMillis(); } }; } }
FileRead.java
public class FileRead {
public static TextInputFormat getDirTextInputFormat(String dir){
Path path = new Path(dir);
Configuration configuration = new Configuration();
configuration.setBoolean("recursive.file.enumeration", true);
TextInputFormat textInputFormat = new TextInputFormat(path);
textInputFormat.supportsMultiPaths();
textInputFormat.configure(configuration);
textInputFormat.setCharsetName("UTF-8");
return textInputFormat;
}
}
测试
public class Test {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("E:\log\test.txt").filter(new FilterFunction() {
@Override
public boolean filter(String input) throws Exception {
return input.length()>0;
}
}).map(new MapFunction() {
@Override
public DataObj map(String input) throws Exception {
return new DataObj(input);
}
}).map(new MapFunction() {
@Override
public String map(DataObj dataObj) throws Exception {
IClassifier classifier = new NaiveBayesClassifier(HanlpModel.getNaiveBayesModel("E:\log\test.ser"));
return classifier.classify(dataObj.getContent())+":"+dataObj.getContent();
}
}).print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}



