1-SpringBoot 相关 启动类,controller,service:
@SpringBootApplication
public class WebApplication {
public static void main(String[] args) {
SpringApplication.run(WebApplication.class,args);
}
}
@RestController
public class DemoController {
@Autowired
private DemoService demoService;
@GetMapping("/write")
public void demo(@RequestParam("path") String path, @RequestParam("outPut")String outPut){
demoService.test(path,outPut);
}
}
@Slf4j
@Service
public class DemoService {
public void test(String path,String outPut){
log.info("test~~~");
new Thread(new Runnable() {
@Override
public void run() {
ParquetUtils parquetUtils=new ParquetUtils();
parquetUtils.write(path,outPut);
}
}).start();
}
}
2-Spark相关 获取sparkSession
public class SingleSpark {
private static volatile SparkSession sparkSession=null;
private SingleSpark(){}
public static SparkSession getInstance(){
if(sparkSession==null){
synchronized (SingleSpark.class){
if(sparkSession==null){
sparkSession=SparkSession.builder()
.getOrCreate();
}
}
}
return sparkSession;
}
}
3-最后在scala包里实现service中具体调用的逻辑这里简单读取本地文件,然后通过spark sql写入parquet文件。
class ParquetUtils extends Serializable{
def write(path:String,outPut:String): Unit = {
val sparkSession=SingleSpark.getInstance();
val sc=sparkSession.sparkContext
val textRdd=sc.textFile(path)
val struct=StructType{
Array(
StructField("col01",StringType),
StructField("col02",StringType)
)
}
val df=sparkSession.createDataframe(textRdd.map(line=>{
val lines=line.split(",")
Row(lines(0),lines(1))
}),struct)
df.select("*").show()
df.write.parquet(outPut)
val outDF = sparkSession.read.parquet(outPut)
// Parquet files can also be used to create a temporary view and then used in SQL statements
outDF.createOrReplaceTempView("parquetFile")
val col01DF = sparkSession.sql("SELECT col01 FROM parquetFile")
col01DF.show()
}
}