栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink使用hanlp进行情感分析

flink使用hanlp进行情感分析

依赖


    1.12.5
    2.12



    
        org.apache.flink
        flink-java
        ${flink.version}
    
    
        org.apache.flink
        flink-streaming-java_${scala.version}
        ${flink.version}
    
    
        org.apache.flink
        flink-clients_${scala.version}
        ${flink.version}
    

    
    
        com.hankcs
        hanlp
        portable-1.7.6
    

驯练

negDir和posDir是分别存放负面语料和正面语料的文件夹,里面的语料是txt文件

public class NlpTrain {
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String negDir="E:\nlp\sample\neg";
        String posDir="E:\nlp\sample\pos";
        DataStream negData=env.readFile(FileRead.getDirTextInputFormat(negDir),negDir)
                .assignTimestampsAndWatermarks(new WatermarkStrageWithTimestamp<>())
                .filter(new FilterFunction() {
                    @Override
                    public boolean filter(String s) throws Exception {
                        return s.length()>0;
                    }
                })
                .map(new MapFunction() {
                    @Override
                    public TranObj map(String s) throws Exception {
                        return new TranObj("负向", s);
                    }
                });
        DataStream posData=env.readFile(FileRead.getDirTextInputFormat(posDir),posDir)
                .assignTimestampsAndWatermarks(new WatermarkStrageWithTimestamp<>())
                .filter(new FilterFunction() {
                    @Override
                    public boolean filter(String s) throws Exception {
                        return s.length()>0;
                    }
                })
                .map(new MapFunction() {
                    @Override
                    public TranObj map(String s) throws Exception {
                        return new TranObj("正向", s);
                    }
                });;
        negData.union(posData).keyBy(TranObj::getType)
                .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
                .aggregate(new AggregateFunction, Map>>() {
                    @Override
                    public List createAccumulator() {
                        return new ArrayList<>();
                    }

                    @Override
                    public List add(TranObj tranObj, List tranObjs) {
                        tranObjs.add(tranObj);
                        return tranObjs;
                    }

                    @Override
                    public Map> getResult(List tranObjs) {
                        List list=new ArrayList<>();
                        String key=tranObjs.get(0).getType();
                        for (TranObj obj : tranObjs){
                            list.add(obj.getSample());
                        }
                        Map> map=new HashMap<>();
                        map.put(key,list);
                        return map;
                    }

                    @Override
                    public List merge(List tranObjs, List acc1) {
                        tranObjs.addAll(acc1);
                        return tranObjs;
                    }
                }).windowAll(EventTimeSessionWindows.withGap(Time.minutes(1)))
                .reduce(new ReduceFunction>>() {
                    @Override
                    public Map> reduce(Map> map1, Map> map2) throws Exception {
                        for (String key:map1.keySet()){
                            if (map2.containsKey(key)){
                                map1.get(key).addAll(map2.get(key));
                            }
                        }
                        for (String key:map2.keySet()){
                            if (!map1.containsKey(key)){
                                map1.put(key,map2.get(key));
                            }
                        }
                        return map1;
                    }
                }).map(new MapFunction>, Map>() {
                    @Override
                    public Map map(Map> stringListMap) throws Exception {
                        Map tranMap=new HashMap<>();
                        for (String k:stringListMap.keySet()){
                            String[] arr=new String[stringListMap.get(k).size()];
                            tranMap.put(k,stringListMap.get(k).toArray(arr));
                        }
                        HanlpModel.trainNaiveBayesModel(tranMap, "E:\log\test.ser");
                        return tranMap;
                    }
        });

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 HanlpModel.java

public class HanlpModel {
    public static void trainNaiveBayesModel(Map map, String path) {
        IClassifier classifier = new NaiveBayesClassifier();
        classifier.train(map);
        NaiveBayesModel model = (NaiveBayesModel) classifier.getModel();
        IOUtil.saveObjectTo(model, path);
    }

    public static NaiveBayesModel getNaiveBayesModel(String path){
        return (NaiveBayesModel)IOUtil.readObjectFrom(path);
    }

}

WatermarkStrageWithTimestamp.java

public class WatermarkStrageWithTimestamp implements WatermarkStrategy {
    @Override
    public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new BoundedOutOfOrdernessWatermarks<>(Duration.ofSeconds(10));
    }

    @Override
    public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return new TimestampAssigner() {
            @Override
            public long extractTimestamp(T t, long l) {
                return System.currentTimeMillis();
            }
        };
    }
}

FileRead.java 

public class FileRead {
    public static TextInputFormat getDirTextInputFormat(String dir){
        Path path = new Path(dir);
        Configuration configuration = new Configuration();
        configuration.setBoolean("recursive.file.enumeration", true);
        TextInputFormat textInputFormat = new TextInputFormat(path);
        textInputFormat.supportsMultiPaths();
        textInputFormat.configure(configuration);
        textInputFormat.setCharsetName("UTF-8");
        return textInputFormat;
    }
}

测试

public class Test {
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.readTextFile("E:\log\test.txt").filter(new FilterFunction() {
            @Override
            public boolean filter(String input) throws Exception {
                return input.length()>0;
            }
        }).map(new MapFunction() {
            @Override
            public DataObj map(String input) throws Exception {
                return new DataObj(input);
            }
        }).map(new MapFunction() {
            @Override
            public String map(DataObj dataObj) throws Exception {
                IClassifier classifier = new NaiveBayesClassifier(HanlpModel.getNaiveBayesModel("E:\log\test.ser"));
                return classifier.classify(dataObj.getContent())+":"+dataObj.getContent();
            }
        }).print();
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/326383.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号