栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

spark streaming读取kafka内容并进行反序列化

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

spark streaming读取kafka内容并进行反序列化

import org.apache.spark.sql.ForeachWriter

import com.fasterxml.jackson.module.scala.DefaultScalaModule

import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper

import com.fasterxml.jackson.databind.ObjectMapper

import com.fasterxml.jackson.databind.DeserializationFeature

object KafkaToPostgresql {

def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession.builder

.appName(“kafka to postgresql”)

.master(“local[2]”)

.getOrCreate()

import spark.implicits._

val df = spark

.readStream

.format(“kafka”)

.option(“kafka.bootstrap.servers”, “xx.xx.xx.xx:9092”)

.option(“subscribe”, “topic-name”)

.option(“startingOffsets”,“latest”)

.load()

// 显示kafka报文格式

df.printSchema()

val rowDataset = df.selectExpr(“CAST(value AS STRING)”)

rowDataset.writeStream.foreach(new ForeachWriterRow{

override def process(record: Row): Unit = {

val mapper = new ObjectMapper()

mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, fals 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 e)

// 很重要

mapper.registerModule(DefaultScalaModule)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/859556.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号