栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

使用Scala读取Parquet文件(Parquet4s的初次使用)

使用Scala读取Parquet文件(Parquet4s的初次使用)

文章目录
  • 前言
  • Local File
    • 专用
    • 通用
  • HDFS File

前言

之前介绍过的Eel可以读取Hive表,也能读取Parquet文件,但是Eel的ParquetySource并不支持读Parquet目录。下面要介绍的这个项目Parquet4s支持读取Parquet目录,还能和Akka进行集成。

Eel适合读Csv、Orc多种格式的场景,单论Parquet还是Parquet4s更专业

我使用sbt添加依赖

libraryDependencies ++= Seq(
  "com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4",
  "org.apache.hadoop" % "hadoop-client" % yourHadoopVersion
)
Local File

下面的几行代码就展示了些Parquet并读Parquet的操作,用起来十分方便

专用

这里的字段用case class进行mapping,所以代码量很少

package org.nefu
import com.github.mjakubowski84.parquet4s.{ ParquetReader, ParquetWriter }
case class User(userId: String, name: String, created: java.sql.Timestamp)

object parquet4s {
  def main(args: Array[String]): Unit = {
	// data
    val users: Iterable[User] = Seq(
      User("1", "parquet", new java.sql.Timestamp(1L))
    )
    val path = "./datas/user/part0"

    // writing
    ParquetWriter.writeAndClose(path, users)

    // reading
    val parquetIterable = ParquetReader.read[User](path)
    try {
      parquetIterable.foreach(println)
    } finally parquetIterable.close()

  }
}

通用

如果读取很多schema不一样的数据需要写很多case calss,所以也有更通用的方案

package org.nefu

import java.time.{LocalDate, ZoneOffset}
import java.util.TimeZone
import com.github.mjakubowski84.parquet4s.{ParquetReader, ParquetWriter, RowParquetRecord, ValueCodecConfiguration}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, INT32, INT64}
import org.apache.parquet.schema.Type.Repetition.{OPTIONAL, REQUIRED}
import org.apache.parquet.schema.{LogicalTypeAnnotation, MessageType, OriginalType, Types}

import java.nio.file.Files

object WriteAndReadGenericApp extends App {

  val ID = "id"
  val Name = "name"
  val Birthday = "birthday"
  val SchemaName = "user_schema"
  val path = "./datas/person"
  val vcc = ValueCodecConfiguration(TimeZone.getTimeZone(ZoneOffset.UTC))

  val users = List(
    (1L, "Alice", LocalDate.of(2000, 1, 1)),
    (2L, "Bob", LocalDate.of(1980, 2, 28)),
    (3L, "Cecilia", LocalDate.of(1977, 3, 15))
  ).map { case (id, name, birthday) =>
    RowParquetRecord.empty
      .add(ID, id, vcc)
      .add(Name, name, vcc)
      .add(Birthday, birthday, vcc)
  }

  // write
  implicit val schema: MessageType = Types.buildMessage()
    .addField(Types.primitive(INT64, REQUIRED).as(LogicalTypeAnnotation.intType(64, true)).named(ID))
    .addField(Types.primitive(BINARY, OPTIONAL).as(LogicalTypeAnnotation.stringType()).named(Name))
    .addField(Types.primitive(INT32, OPTIONAL).as(LogicalTypeAnnotation.dateType()).named(Birthday))
    .named(SchemaName)

  ParquetWriter.writeAndClose(s"$path/users.parquet", users)

  //read
  val readData = ParquetReader.read[RowParquetRecord](path)
  try {
    readData.foreach { record =>
      val id = record.get[Long](ID, vcc)
      val name = record.get[String](Name, vcc)
      val birthday = record.get[LocalDate](Birthday, vcc)
      println(s"Person[$ID=$id,$Name=$name,$Birthday=$birthday]")
    }
  } finally readData.close()

}
HDFS File
  val hadoopConfiguration = new Configuration()
  hadoopConfiguration.addResource(new Path("/opt/hadoop/conf/hdfs-site.xml"))
  hadoopConfiguration.addResource(new Path("/opt/hadoop/conf/core-site.xml"))
  val path = "/user/jiajunbernoulli/persons/title=Mr"
  val parquetIterable = ParquetReader.read[RowParquetRecord](path, ParquetReader.Options(hadoopConf = hadoopConfiguration))
  val vcc = ValueCodecConfiguration(TimeZone.getTimeZone(ZoneOffset.UTC))
  try {
    parquetIterable.foreach(
      recode => {
        val name = recode.get("name", vcc)(ValueCodec.stringCodec)
        println(s"name:$name")
      })
  } finally parquetIterable.close()

同样也能用来进行filter等操作,详细内容可见

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/344297.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号