附上依赖
org.apache.spark spark-core_2.112.4.5
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
//setMaster是我在window环境直接运行写的,所以指定了是local模式
// 如果是打包上传到集群就不要再写这个了
.setMaster("local[*]")
//赋予程序一个名字,给Spark程序赋予一个名字,如果可以提交到yarn上,再8088上可以看见
.setAppName("WordCount")
//spark core的运行环境创建一下
val sc = new SparkContext(conf)
//读取指定路径的文件,生成一个RDD(里面的数据按行读取)
val textRDD: RDD[String] = sc.textFile("data/test.txt")
//切分数据,我们将每一行的数据按逗号切分开之后,flatMap算子会将最终函数返回值的集合类型切分开
val wordRDD = textRDD.flatMap(_.split(","))
//根据单词本身进行分组,单词作为key,相同的单词放进一个元组中
val groupRDD = wordRDD.groupBy(word => word)
//得到上面的结果后离结果就很相近了,可以看一下上一步的结果是(hello,(hello,hello))这种模式
//我们可以使用map方法获取到元组中的第一个元素作为key,第二个元素的长度作为value
val kvDS = groupRDD.map(kv => kv._1 + "," + kv._2.size)
//foreach是一个action算子,里面传入一个println方法,将结果打印在控制台
kvDS.foreach(println)
}
}
通常来说,我们写spark程序除了需要指定输入路径,还需要指定输出路径
在spark程序中如果指定的输出路径已经存在,那么程序也是会直接失败的
所以我们首先需要判断这个路径是不是已经存在了,存在了就先将这个路径删除掉
那么最终结果会生成几个文件?
这里还是引用了mapreduce中的切片规则,文件大小到达128m就是一个切片,也就是spark中的一个分区
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
//setMaster是我在window环境直接运行写的,所以指定了是local模式
// 如果是打包上传到集群就不要再写这个了
.setMaster("local[*]")
//赋予程序一个名字,给Spark程序赋予一个名字,如果可以提交到yarn上,再8088上可以看见
.setAppName("WordCount")
//spark core的运行环境创建一下
val sc = new SparkContext(conf)
//读取指定路径的文件,生成一个RDD(里面的数据按行读取)
val textRDD: RDD[String] = sc.textFile("data/test.txt")
//切分数据,我们将每一行的数据按逗号切分开之后,flatMap算子会将最终函数返回值的集合类型切分开
val wordRDD = textRDD.flatMap(_.split(","))
//根据单词本身进行分组,单词作为key,相同的单词放进一个元组中
val groupRDD = wordRDD.groupBy(word => word)
//得到上面的结果后离结果就很相近了,可以看一下上一步的结果是(hello,(hello,hello))这种模式
//我们可以使用map方法获取到元组中的第一个元素作为key,第二个元素的长度作为value
val kvDS = groupRDD.map(kv => kv._1 + "," + kv._2.size)
//和mapreduce中的那套一样,导入的包注意都是org.apache.hadoop下面的
val cf: Configuration = new Configuration()
val fs: FileSystem = FileSystem.get(cf)
val path: Path = new Path("data/result")
// 判断输出路径是否存在 存在则删除
if (fs.exists(path)) {
fs.delete(path, true)
}
//foreach是一个action算子,里面传入一个println方法,将结果打印在控制台
kvDS.saveAsTextFile("data/result.txt")
}
}



