- 前言
- 示例
- Local
- CSV File
- Hive
- Parquet
- ORC
- 后记
最近项目需要用Scala读取Hive的Table,但是Spark和Hive的API比较重量级,性能不够。于是调研,发现了一个更加底层的框架Eel:
为了便于理解,先从读取简单的CSV开始
Local CSV File我自己新建了一个CSV文件
school_number,student_name 1,Bernoulli 2,Newton 3,Leibniz
sbt导入依赖libraryDependencies += "io.eels" % "eel-core_2.12" % "1.2.4"后,读取的代码
import io.eels.{DataTypeRule, SchemaInferrer}
import io.eels.component.csv.CsvSource
import io.eels.schema.{IntType, StringType}
import java.nio.file.Paths
object CsvReaderDemo {
def main(args: Array[String]): Unit = {
val csvFilePath = Paths.get("./datas/students.csv")
// 打印Schema,默认都是Stirng类型
CsvSource(csvFilePath).toDataStream()
.schema.fields.foreach(f => println(f))
// 手动指定字段和类型之间的映射
val filedTypeMapping = Seq(
DataTypeRule("school_number", IntType.Signed),
DataTypeRule("student_name", StringType))
val schemaInferrer = SchemaInferrer(
StringType, // 默认类型
filedTypeMapping)
// 打印数据
CsvSource(csvFilePath).withSchemaInferrer(schemaInferrer)
.toDataStream()
.filter(_.values(0).toString.toInt > 0) // 根据索引
.filter(_.get("student_name").equals("Leibniz")) // 根据字段名判断
// .filter("student_name", _ == "Leibniz") // 也可以写成这样
.collect
.foreach(row => println(row))
}
}
打印的结果如下
需要创建HMS的客户端
val hiveConf = new HiveConf() hiveConf.set(ConfVars.metaSTOREURIS.varname, "thrift://localhost:9083") implicit val hivemetaStoreClient = new HivemetaStoreClient(hiveConf)Parquet
准备数据
-- 建表 CREATE EXTERNAL TABLE IF NOT EXISTS `eel_test.person` ( `NAME` string, `AGE` int, `SALARY` decimal(38,5), `CREATION_TIME` timestamp) PARTITIonED BY (`title` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION '/client/eel_test/persons'; -- 插入数据 INSERT INTO eel_test.person PARTITION (title='Mr') SELECT 'Fred', 50, 50000.99, CURRENT_TIMESTAMP(); INSERT INTO eel_test.person PARTITION (title='Mr') SELECT 'Gary', 50, 20000.34, CURRENT_TIMESTAMP(); INSERT INTO eel_test.person PARTITION (title='Mr') SELECT 'Alice', 50, 99999.98, CURRENT_TIMESTAMP()
读取数据的代码
HiveSource("eel_test", "eel_test")
.toDataStream()
.collect
.foreach(row => println(row))
ORC
准备数据
-- 建表 CREATE EXTERNAL TABLE IF NOT EXISTS eel_test.test_orc( id int, name string ) PARTITIonED BY (create_date string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS ORC LOCATION '/client/eel_test/test_orc' -- 插入数据 INSERT INTO eel_test.test_orc PARTITION(create_date='20210101') SELECT 1, 'Alice' UNIOn SELECT 2, 'Fred' UNIOn SELECT 3, 'Gary';
读取数据的代码
HiveSource("eel_test", "eel_test")
.toDataStream()
.collect
.foreach(row => println(row))
后记
Eel还能直接读取HDFS,更多例子可以参考github



