栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark学习

spark学习

sparkStreaming 从指定时间戳开始消费kafka topic

  def getOffsetByTimestamp(kafkaParams: collection.Map[String, Object], time: String, topic: String): mutable.HashMap[TopicPartition, Long] = {
    val consumer = new KafkaConsumer[String, String](new java.util.HashMap[String, Object](kafkaParams.asJava))
    val fetchTime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").parseMillis(time)

    //记录(topic,分区) --->对应时间戳
    val timestampToSearch: java.util.Map[TopicPartition, java.lang.Long] = new java.util.HashMap[TopicPartition, java.lang.Long]()
    //记录分区和他对应的offset
    val partitionOffset = new mutable.HashMap[TopicPartition, Long]


    //获取topic的partition信息   可以得到这个topic的所有partition  返回值是一个list[PartitionInfo]
    val partitionInfos = consumer.partitionsFor(topic)
    for (partitionInfo <- partitionInfos.asScala) {
      val tp = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
      timestampToSearch.put(tp, fetchTime)
    }

    val topicPartitionToOffsetAndTimestamp = consumer.offsetsForTimes(timestampToSearch)
    for ((tp, offsetAndTimeStamp) <- topicPartitionToOffsetAndTimestamp.asScala) {
      val offset = offsetAndTimeStamp.offset()
      partitionOffset += tp -> offset
    }
    consumer.close()
    partitionOffset
  }


    val messages: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams, getOffsetByTimestamp(kafkaParams, startTime, topic)))
    messages
  }

sparkStreaming 写入到hdfs

val value: DStream[String] =...

value.repartition(1).map((_, "")).foreachRDD(rdd => {
      var date: Date = new Date()
      val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
      val dir: String = simpleDateFormat.format(date)
      RDD.rddToPairRDDFunctions(rdd).partitionBy(new HashPartitioner(1))
        .saveAsHadoopFile("hdfs://localhost:8020/test/" + hdfsPath + "/" + dir + "/", classOf[String], classOf[String], classOf[selfOutputformat])
    })

selfOutputformat

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.{InvalidJobConfException, JobConf}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.hadoop.mapreduce.security.TokenCache

case class selfOutputformat() extends MultipleTextOutputFormat[Any, Any] {

  val currentTime: Date = new Date()
  val formatter = new SimpleDateFormat("yyyy-MM-dd-HHmmss");
  val dateString: String = formatter.format(currentTime);

  //自定义保存文件名
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
    //key 和 value就是rdd中的(key,value),name是part-00000默认的文件名
    //保存的文件名称,这里用字符串拼接系统生成的时间戳来区分文件名,可以自己定义
    dateString + ".txt"
  }

  override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = {
    val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR)
    var outDir: Path = if (name == null) null else new Path(name)
    //当输出任务不等于0 且输出的路径为空,则抛出异常
    if (outDir == null && job.getNumReduceTasks != 0) {
      throw new InvalidJobConfException("Output directory not set in JobConf.")
    }
    //当有输出任务和输出路径不为null时
    if (outDir != null) {
      val fs: FileSystem = outDir.getFileSystem(job)
      outDir = fs.makeQualified(outDir)
      outDir = new Path(job.getWorkingDirectory, outDir)
      job.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, outDir.toString)
      TokenCache.obtainTokensForNamenodes(job.getCredentials, Array[Path](outDir), job)
      //下面的注释掉,就不会出现这个目录已经存在的提示了
      
    }
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699413.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号