sparkStreaming 从指定时间戳开始消费kafka topic
def getOffsetByTimestamp(kafkaParams: collection.Map[String, Object], time: String, topic: String): mutable.HashMap[TopicPartition, Long] = {
val consumer = new KafkaConsumer[String, String](new java.util.HashMap[String, Object](kafkaParams.asJava))
val fetchTime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").parseMillis(time)
//记录(topic,分区) --->对应时间戳
val timestampToSearch: java.util.Map[TopicPartition, java.lang.Long] = new java.util.HashMap[TopicPartition, java.lang.Long]()
//记录分区和他对应的offset
val partitionOffset = new mutable.HashMap[TopicPartition, Long]
//获取topic的partition信息 可以得到这个topic的所有partition 返回值是一个list[PartitionInfo]
val partitionInfos = consumer.partitionsFor(topic)
for (partitionInfo <- partitionInfos.asScala) {
val tp = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
timestampToSearch.put(tp, fetchTime)
}
val topicPartitionToOffsetAndTimestamp = consumer.offsetsForTimes(timestampToSearch)
for ((tp, offsetAndTimeStamp) <- topicPartitionToOffsetAndTimestamp.asScala) {
val offset = offsetAndTimeStamp.offset()
partitionOffset += tp -> offset
}
consumer.close()
partitionOffset
}
val messages: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams, getOffsetByTimestamp(kafkaParams, startTime, topic)))
messages
}
sparkStreaming 写入到hdfs
val value: DStream[String] =...
value.repartition(1).map((_, "")).foreachRDD(rdd => {
var date: Date = new Date()
val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val dir: String = simpleDateFormat.format(date)
RDD.rddToPairRDDFunctions(rdd).partitionBy(new HashPartitioner(1))
.saveAsHadoopFile("hdfs://localhost:8020/test/" + hdfsPath + "/" + dir + "/", classOf[String], classOf[String], classOf[selfOutputformat])
})
selfOutputformat
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.{InvalidJobConfException, JobConf}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.hadoop.mapreduce.security.TokenCache
case class selfOutputformat() extends MultipleTextOutputFormat[Any, Any] {
val currentTime: Date = new Date()
val formatter = new SimpleDateFormat("yyyy-MM-dd-HHmmss");
val dateString: String = formatter.format(currentTime);
//自定义保存文件名
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
//key 和 value就是rdd中的(key,value),name是part-00000默认的文件名
//保存的文件名称,这里用字符串拼接系统生成的时间戳来区分文件名,可以自己定义
dateString + ".txt"
}
override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = {
val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR)
var outDir: Path = if (name == null) null else new Path(name)
//当输出任务不等于0 且输出的路径为空,则抛出异常
if (outDir == null && job.getNumReduceTasks != 0) {
throw new InvalidJobConfException("Output directory not set in JobConf.")
}
//当有输出任务和输出路径不为null时
if (outDir != null) {
val fs: FileSystem = outDir.getFileSystem(job)
outDir = fs.makeQualified(outDir)
outDir = new Path(job.getWorkingDirectory, outDir)
job.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, outDir.toString)
TokenCache.obtainTokensForNamenodes(job.getCredentials, Array[Path](outDir), job)
//下面的注释掉,就不会出现这个目录已经存在的提示了
}
}
}



