在处理大数据时,会用到spark平台,对于一些对实时性要求不高,但数据量比较大的场景,我们可以考虑将生成数据存入hive表中。那么怎么将数据按时间存入hive表中,怎么将hive表中的数据直接读取出来并做处理呢?废话不多说,直接上代码!
package spark;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import systest.TestCaseResult;
import java.util.List;
public class SparkSqlHiveIO {
public static Dataset getRowDataSetFromHive(SparkSession sparkSession, String querySqlText){
Dataset rowDataset = null;
try {
rowDataset = sparkSession.sql(querySqlText);
} catch (Exception e) {
e.printStackTrace();
}
return rowDataset;
}
public static void insertTestResultToHive(SparkSession sparkSession, JavaRDD rstInfo, String insertSqlText) {
// SparkSession sparkSession = new EagleSpark().sparkSession;
try {
Dataset view = sparkSession.createDataframe(rstInfo,TestCaseResult.class);
view.createOrReplaceTempView("temp_view");
System.out.println("insertsqlText="+insertSqlText);
sparkSession.sql(insertSqlText);
} catch (Exception e) {
e.printStackTrace();
}
// }finally {
sparkSession1.close();
// }
}
public void insertTestResultListToHive(SparkSession sparkSession, List> testResultLoopList, String insertSqlText) {
for(int i=0;i
附spark的初始定义:
package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public class StudySpark {
public SparkSession sparkSession;
public StudySpark(String appName){
SparkConf conf = new SparkConf().setAppName(appName).set("spark.streaming.concurrentJobs", "3").set("spark.scheduler.mode", "FIFO");
this.sparkSession = SparkSession
.builder()
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
}
pom.xml文件
4.0.0
com.test.java.study
com.test.java.study
1.0-SNAPSHOT
org.apache.spark
spark-core_2.11
2.4.8
org.apache.spark
spark-sql_2.11
2.4.8



