- 前言
- Local File
- 专用
- 通用
- HDFS File
之前介绍过的Eel可以读取Hive表,也能读取Parquet文件,但是Eel的ParquetySource并不支持读Parquet目录。下面要介绍的这个项目Parquet4s支持读取Parquet目录,还能和Akka进行集成。
Eel适合读Csv、Orc多种格式的场景,单论Parquet还是Parquet4s更专业
我使用sbt添加依赖
libraryDependencies ++= Seq( "com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4", "org.apache.hadoop" % "hadoop-client" % yourHadoopVersion )Local File
下面的几行代码就展示了些Parquet并读Parquet的操作,用起来十分方便
专用这里的字段用case class进行mapping,所以代码量很少
package org.nefu
import com.github.mjakubowski84.parquet4s.{ ParquetReader, ParquetWriter }
case class User(userId: String, name: String, created: java.sql.Timestamp)
object parquet4s {
def main(args: Array[String]): Unit = {
// data
val users: Iterable[User] = Seq(
User("1", "parquet", new java.sql.Timestamp(1L))
)
val path = "./datas/user/part0"
// writing
ParquetWriter.writeAndClose(path, users)
// reading
val parquetIterable = ParquetReader.read[User](path)
try {
parquetIterable.foreach(println)
} finally parquetIterable.close()
}
}
通用
如果读取很多schema不一样的数据需要写很多case calss,所以也有更通用的方案
package org.nefu
import java.time.{LocalDate, ZoneOffset}
import java.util.TimeZone
import com.github.mjakubowski84.parquet4s.{ParquetReader, ParquetWriter, RowParquetRecord, ValueCodecConfiguration}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, INT32, INT64}
import org.apache.parquet.schema.Type.Repetition.{OPTIONAL, REQUIRED}
import org.apache.parquet.schema.{LogicalTypeAnnotation, MessageType, OriginalType, Types}
import java.nio.file.Files
object WriteAndReadGenericApp extends App {
val ID = "id"
val Name = "name"
val Birthday = "birthday"
val SchemaName = "user_schema"
val path = "./datas/person"
val vcc = ValueCodecConfiguration(TimeZone.getTimeZone(ZoneOffset.UTC))
val users = List(
(1L, "Alice", LocalDate.of(2000, 1, 1)),
(2L, "Bob", LocalDate.of(1980, 2, 28)),
(3L, "Cecilia", LocalDate.of(1977, 3, 15))
).map { case (id, name, birthday) =>
RowParquetRecord.empty
.add(ID, id, vcc)
.add(Name, name, vcc)
.add(Birthday, birthday, vcc)
}
// write
implicit val schema: MessageType = Types.buildMessage()
.addField(Types.primitive(INT64, REQUIRED).as(LogicalTypeAnnotation.intType(64, true)).named(ID))
.addField(Types.primitive(BINARY, OPTIONAL).as(LogicalTypeAnnotation.stringType()).named(Name))
.addField(Types.primitive(INT32, OPTIONAL).as(LogicalTypeAnnotation.dateType()).named(Birthday))
.named(SchemaName)
ParquetWriter.writeAndClose(s"$path/users.parquet", users)
//read
val readData = ParquetReader.read[RowParquetRecord](path)
try {
readData.foreach { record =>
val id = record.get[Long](ID, vcc)
val name = record.get[String](Name, vcc)
val birthday = record.get[LocalDate](Birthday, vcc)
println(s"Person[$ID=$id,$Name=$name,$Birthday=$birthday]")
}
} finally readData.close()
}
HDFS File
val hadoopConfiguration = new Configuration()
hadoopConfiguration.addResource(new Path("/opt/hadoop/conf/hdfs-site.xml"))
hadoopConfiguration.addResource(new Path("/opt/hadoop/conf/core-site.xml"))
val path = "/user/jiajunbernoulli/persons/title=Mr"
val parquetIterable = ParquetReader.read[RowParquetRecord](path, ParquetReader.Options(hadoopConf = hadoopConfiguration))
val vcc = ValueCodecConfiguration(TimeZone.getTimeZone(ZoneOffset.UTC))
try {
parquetIterable.foreach(
recode => {
val name = recode.get("name", vcc)(ValueCodec.stringCodec)
println(s"name:$name")
})
} finally parquetIterable.close()
同样也能用来进行filter等操作,详细内容可见



