栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

使用Apache Spark和Java将CSV解析为DataFrame / DataSet

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用Apache Spark和Java将CSV解析为DataFrame / DataSet

程序

  • 创建一个类(模式)以封装您的结构(方法B不是必需的,但是如果使用Java,它将使您的代码更易于阅读)

    public class Record implements Serializable {

    String department;
    String designation;
    long costToCompany;
    String state;
    // constructor , getters and setters
    }

  • 加载CVS(JSON)文件

        JavaSparkContext sc;    JavaRDD<String> data = sc.textFile("path/input.csv");    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions     SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified    JavaRDD<Record> rdd_records = sc.textFile(data).map(      new Function<String, Record>() {          public Record call(String line) throws Exception {  // Here you can use JSON  // Gson gson = new Gson();  // gson.fromJson(line, Record.class);  String[] fields = line.split(",");  Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);  return sd;          }    });

此时,您有2种方法:

A.SparkSQL

  • 注册一个表(使用您定义的模式类)
        JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);    table.registerAsTable("record_table");    table.printSchema();
  • 用所需的查询分组查询表
        JavaSchemaRDD res = sqlContext.sql("      select department,designation,state,sum(costToCompany),count(*)       from record_table       group by department,designation,state    ");
  • 在这里,您还可以使用SQL方法执行所需的任何其他查询

火花

  • 使用复合密钥映射:
    Department
    Designation
    State
        JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD =     rdd_records.mapToPair(new      PairFunction<Record, String, Tuple2<Long, Integer>>(){        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){          Tuple2<String, Tuple2<Long, Integer>> t2 =new Tuple2<String, Tuple2<Long,Integer>>( record.Department + record.Designation + record.State, new Tuple2<Long, Integer>(record.costToCompany,1)          );          return t2;    }

});

  • 使用组合键,求和
    costToCompany
    列和按键累积记录数的reduceByKey
        JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records =      records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,     Integer>, Tuple2<Long, Integer>>() {        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,        Tuple2<Long, Integer> v2) throws Exception { return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);        }    });


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/435206.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号