String转为Tuple2
public class test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
//并行集合生成JavaRDD
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas pip", "numpy", "pip", "pip", "pip"));
//map 输出包含newTail的元素
//这个是给每个元素尾部加newTail
JavaRDD mapResult = lines.map(new Function() {
@Override
public String call(String o) throws Exception {
return o.concat("newTail");
}
});
//下面这个是给每个元素String转为Tuple2元组类型的
//map算子输入与输出一对一,输出结果为
// [(pandas, 1),(pip, 1)]
// [(numpy, 1)]
// [(pip, 1)]
// [(pip, 1)]
// [(pip, 1)]
JavaRDD>> maprdd = lines.map(new Function>>() {
public Iterable> call(String line) throws Exception {
String[] fields = line.split(" ");
ArrayList> al = new ArrayList>();
for (int i = 0; i < fields.length; i++) {
al.add(new Tuple2(fields[i], 1));
}
return al;
}
});
maprdd.foreach(s -> {
System.out.println(s);
});
}
}
flatMap
public class test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas pip", "numpy", "pip", "pip", "pip"));
// flatMap
// 与输入是一对多的关系,返回的是可迭代的List
// pandas
// pip
// numpy
// pip
// pip
// pip
JavaRDD flatMapResult = lines.flatMap(new FlatMapFunction() {
@Override
public Iterator call(String line) throws Exception {
return Arrays.asList(line.toString().split(" ")).iterator();
}
});
flatMapResult.foreach(s -> {
System.out.println(s);
});
}
}
flatMapToPair
public class test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas pip", "numpy", "pip", "pip", "pip"));
// flatMapToPair
// 注意返回值,返回的是list,里面封装了tuple2
// (pandas,1)
// (pip,1)
// (numpy,1)
// (pip,1)
// (pip,1)
// (pip,1)
JavaPairRDD result1 = lines.flatMapToPair(new PairFlatMapFunction() {
@Override
public Iterator> call(String line) throws Exception {
String[] fields = line.split(" ");
ArrayList> al = new ArrayList>();
for (int i = 0; i < fields.length; i++) {
al.add(new Tuple2(fields[i], 1));
}
return al.iterator();
}
});
result1.foreach(s -> {
System.out.println(s);
});
}
}



