栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

记录学习Spark+Kafka构建实时分析Dashboard案例中遇到的问题(三)Spark Streaming实时处理数据

记录学习Spark+Kafka构建实时分析Dashboard案例中遇到的问题(三)Spark Streaming实时处理数据

目录
    • 实验要求
    • pyspark
      • pyspark.streaming
    • Spark和Kafka的组合使用
      • 整合
      • 使用
    • 问题
      • spark streaming+kafka成功运行后,换数据再次运行时,消费者无打印输出
      • kafka消费者拉不出数据
      • kafka单机重启topic丢失问题排查

现阶段在学习大数据处理相关的项目,先通过厦门大学林子雨老师的案例教程学习Spark+Kafka构建实时分析Dashboard案例学习Kafka和Spark的处理, 地址:http://dblab.xmu.edu.cn/post/8274/

通过博客记录一下学习过程中遇到的各种问题。

查看前一步操作:(二)数据处理和Python操作Kafka

实验要求
所需知识储备训练技能任务清单
1.使用Scala/Python编写Spark Streaming程序
2.Kafka原理
1.编写Spark Streaming序
2.熟悉Spark/pySpark操作Kafka
1.Spark Streaming实时处理Kafka数据
2.将处理后的结果发送给Kafka
pyspark

SparkContext意义:主入口点
SparkContext作用:连接Spark集群

SparkConf作用:创建SparkContext前得使用SparkConf进行配置,以键值对形式进行

  1. 创建SparkContext
  • 连接到Spark“集群”:local、standalone、yarn、mesos
  • 通过SparkContext来创建RDD、广播变量到集群
  1. 创建SparkContext前还需要创建SparkConf对象
  2. SparkConf().setAppName(appname).setMaster(‘local’)这个设置高于系统设置
  3. pyspark.SparkContext连接到Spark‘集群’即master(local[单机]、Standalone[标准]、yarn(hadoop上)、mesos),创建RDD,广播变量到集群
 conf = SparkConf().setAppName(appname).setMaster('local')
 sc   = SparkContext(Conf=Conf)
pyspark.streaming

参考博客

核心组件:

  • Streaming Context
  • Dstream(离散流)

Streaming Context是Spark Streaming功能的主要入口点,生成Streaming Context之前需要生成SparkContext,SparkContext可以理解为申请Spark集群的计算资源,Streaming Context可以理解为申请Spark Streaming的计算资源。
Streaming Context表示与Spark集群的连接,可用于创建DStream各种输入源。它可以来自现有的SparkContext。创建和转换DStreams后,可分别使用context.start()和context.stop()启动和停止流计算。context.awaitTermination()允许当前线程等待处理结束(手动结束或因为错误而结束)。

class pyspark.streaming.StreamingContext(sparkContext, batchDuration=None, jssc=None)
Spark和Kafka的组合使用 整合

2种整合方式:

  • receiver createStream
  • direct createDirectStream(更推荐)

spark读取kafka数据 createStream和createDirectStream的区别
spark streaming全介绍
pyspark整合Spark Streaming与Kafka
kafka-0.8.x和kafka-0.10.0.0版本KafkaUtils.createDirectStream方法的比较
kafka消费组

使用

按照教程分别修改/usr/local/spark/conf/spark-env.sh文件和/usr/local/spark/bin/pyspark文件,其中python路径填写anaconda里自己新建的环境路径(envs下寻找),对应自己的python版本。

spark-submit提交py文件时报错TypeError: namedtuple() missing 3 required keyword-only arguments: 'rename', 'defaults', and 'module',spark2.1版本以下不支持python3.6(我的是3.7,应该更不兼容了),因此换成python3.5的环境

安装了kafkatool工具可视化查看topic和consumer情况。

问题 spark streaming+kafka成功运行后,换数据再次运行时,消费者无打印输出

已排查:
将reader改成enumerate:换回去仍无输出
换数据:变成原数据原代码仍无输出
编码问题:除了producer.send里必加encode以外,别的地方依然无影响

在排查:
封装成Kafka消息的topic跟上次一致,用的原来打包过的文件:查看topic里的数据,已经变成新数据生成的情况了
是否有可能topic里写入数据了,但非空则无法输出?
隔日后的情况:topic里也没有正确输出,在spark streaming连kafka数据时就出了问题,submit提交程序时没有将结果写入新的主题,尝试直接打印到屏幕时没有输出,因此应该是没有spark的处理结果,在ctrl-c结束程序后的很短时间内会出现很少的错误数据,不ctrl-c就没有输出;
间或报错Current offset 88926 for partition [count,0] out of range; reset offset to 2

解决:
不知道是哪个起作用,倾向于第一个。

1.线程数不够
没加setMaster(“local[2]”),参考博客

SparkConf conf = new SparkConf().setAppName("SparkStreamingPollDataFromFlume").setMaster("local[2]");

配置SparkConf:

  • 至少2条线程:因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条线程用于处理接受的数据(否则的话无法有线程用于处理数据,随着时间的推移,内存和磁盘都会不堪重负)

2.kafka队列中有太多积压得信息未被读取
重新创建kafka消费者,消耗掉队列积压的消息,参考博客

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wordsendertest --from-beginning
kafka消费者拉不出数据

原因:客户端版本与服务端不一致。参考博客。

kafka单机重启topic丢失问题排查

原因:zookeeper和kafka数据和日志存放目录在tmp下

解决:修改数据目录和日志目录(主要是数据目录)和kafka的配置文件,kafka本身的server.properties。参考博客。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350545.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号