filter的例子
例1:Python 版本筛选的例子
lines = sc.textFile("README.md")
pythonLines = lines.filter(lambda line: "Python" in line)
例2:Scala 版本筛选的例子
val lines = sc.textFile("README.md") // 创建一个叫lines的RDD
val pythonLines = lines.filter(line => line.contains("Python"))
如果你对例1 和例2 中的lambda 或者=> 语法不熟悉,可以把它们理解为Python 和 Scala 中定义内联函数的简写方法。当你在这些语言中使用Spark 时,你也可以单独定义一个函数,然后把函数名传给Spark。比如,Python 中可以这样做:
def hasPython(line):
return "Python" in line
pythonLines = lines.filter(hasPython)
在Java 中向Spark 传递函数也是可行的,但是在这种情况下,我们必须把函数定义为
实现了Function 接口的类。例如:
JavaRDDpythonLines = lines.filter( new Function () { Boolean call(String line) { return line.contains("Python"); } } );
注意参数类型,前面是输入类型,后面是返回类型,这里是filter函数,需要用到Boolean类型的返回值来进行过滤
flatMap的小例子
public class test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas pip", "numpy", "pip", "pip", "pip"));
JavaRDD flatMapResult = lines.flatMap(new FlatMapFunction() {
@Override
public Iterator call(String line) throws Exception {
return Arrays.asList(line.toString().split(" ")).iterator();
}
});
flatMapResult.foreach(s -> {
System.out.println(s);
});
}
}
最终输出的是这样的,在这flatMap函数需要得到一个Iterator迭代器类型的返回值,之后会将其拆分成单个元素
// 与输入是一对多的关系,返回的是可迭代的lis // pandas // pip // numpy // pip // pip // pip
map的例子
public class test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas pip", "numpy", "pip", "pip", "pip"));
JavaRDD>> maprdd = lines.map(new Function>>() {
public Iterable> call(String line) throws Exception {
String[] fields = line.split(" ");
ArrayList> al = new ArrayList>();
for (int i = 0; i < fields.length; i++) {
al.add(new Tuple2(fields[i], 1));
}
return al;
}
});
}
}
将每行String类型转为Iterable
// [(pandas, 1),(pip, 1)] // [(numpy, 1)] // [(pip, 1)] // [(pip, 1)] // [(pip, 1)]
flatMapToPair的例子
是将每行元素拆分后 再转为键值对类型的RDD
JavaPairRDDresult1 = lines.flatMapToPair(new PairFlatMapFunction () { @Override public Iterator > call(String line) throws Exception { String[] fields = line.split(" "); ArrayList > al = new ArrayList >(); for (int i = 0; i < fields.length; i++) { al.add(new Tuple2 (fields[i], 1)); } return al.iterator(); } });
转后的结果为
(pandas,1) (pip,1) (numpy,1) (pip,1) (pip,1) (pip,1)lambda 简写语法
filter的例子
Java 8 提供了类似Python 和Scala 的lambda 简写语法。下面就是一个使用这种语法的
代码的例子:
JavaRDDpythonLines = lines.filter(line -> line.contains("Python"));
flatMap的例子
JavaRDDflatMapResult = lines.flatMap( line->Arrays.asList(line.toString().split(" ")).iterator());
map的例子
JavaRDD>> maprdd = lines.map(line -> { String[] fields = line.toString().split(" "); ArrayList > al = new ArrayList >(); for (int i = 0; i < fields.length; i++) { al.add(new Tuple2(fields[i], 1)); } return al; });
flatMap例子 return al.iterator();有红色波浪线但是运行是正常的,IDEA 中
JavaPairRDD参考result1 = lines.flatMapToPair(line-> { String[] fields = line.toString().split(" "); ArrayList > al = new ArrayList<>(); for (int i = 0; i < fields.length; i++) { al.add(new Tuple2<>(fields[i], 1)); } return al.iterator(); });



