- 实验要求
- 安装配置anaconda+pycharm环境
- anaconda安装配置
- pycharm配置anaconda环境
- 数据预处理
- 数据集介绍
- 数据预处理
- Python操作Kafka
- 命令行运行
- pycharm运行
- 终止kafka环境
现阶段在学习大数据处理相关的项目,先通过厦门大学林子雨老师的案例教程学习Spark+Kafka构建实时分析Dashboard案例学习Kafka和Spark的处理, 地址:http://dblab.xmu.edu.cn/post/8274/
通过博客记录一下学习过程中遇到的各种问题。在本次实验(二)中并没有遇到什么问题,因此主要记录一下实验过程。
查看前一步操作:(一)安装及环境准备
实验要求| 所需知识储备 | 训练技能 | 任务清单 |
|---|---|---|
| 简单使用Python 了解Kafka的使用 | Python基本使用、Python操作 Kafka代码库kafka-python使用 | 利用Python预处理数据 Python操作Kafka |
由于希望能用anaconda管理python的环境,因此在教程之外又额外装了anaconda(pycharm已经在上一节安装过了)。
在官网下载对应的anaconda版本,我直接下的最新版,在下载目录运行
bash Anaconda3-5.3.1-Linux-x86_64.sh
进行安装,按q跳到协议最后,再需要输入几次yes来确认是否默认安装目录,是否同意把anaconda安装地址加入到环境变量,是否安装vscode(这步我第一次yes的时候失败了,报错vscode_inst:installVSCodeExtenstions…或许是因为没有权限,可参考博客试一下,我直接no了)
安装好anaconda后,conda create --name kafka python=3.7创建了3.7的python新环境,conda activate kafka激活后,再安装上一节要求过的环境包pip install kafka-python。
anaconda常用命令
pycharm配置anaconda环境运行pycharm之后,新建项目,选择已有的解释器,在弹出来的框里选择conda environment,选择刚刚建好的环境kafka文件夹下的python,此时pycharm也会自动识别出来python为3.7版本,具体步骤可参考
https://blog.csdn.net/weixin_42489341/article/details/101371732
本案例采用的数据集压缩包为data_format.zip,该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv。在这个案例中只是用user_log.csv这个文件:
用户行为日志user_log.csv,日志中的字段定义如下:
- user_id | 买家id
- item_id | 商品id
- cat_id | 商品类别id
- merchant_id | 卖家id
- brand_id | 品牌id
- month | 交易时间:月
- day | 交易事件:日
- action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品
- age_range | 买家年龄分段:1表示年龄<18,2表示年龄在[18,24],3表示年龄在[25,29],4表示年龄在[30,34],5表示年龄在[35,39],6表示年龄在[40,49],7和8表示年龄>=50,0和NULL则表示未知
- gender | 性别:0表示女性,1表示男性,2和NULL表示未知
- province| 收获地址省份
这个案例实时统计每秒中男女生购物人数,因此针对每条购物日志,只需要获取gender,然后发送给Kafka,接下来Spark Streaming再接收gender进行处理。这将体现在代码里。
数据预处理本节使用Python对数据进行预处理,并将处理后的数据直接通过Kafka生产者发送给Kafka。
首先在新建好的项目里按照上一节说过的项目目录结构建好相应的文件夹,在scripts目录的producer.py里写好代码。
# coding: utf-8
import csv
import time
from kafka import KafkaProducer
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 打开数据文件
csvfile = open("../data/user_log.csv","r")
# 生成一个可用于读取csv文件的reader
reader = csv.reader(csvfile)
for line in reader:
gender = line[9] # 性别在每行日志代码的第9个元素
if gender == 'gender':
continue # 去除第一行表头
time.sleep(0.1) # 每隔0.1秒发送一行数据
# 发送数据,topic为'sex'
producer.send('sex',line[9].encode('utf8'))
代码很好理解,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为’sex’。
再写一个KafkaConsumer测试数据是否投递成功,文件名为consumer.py。
from kafka import KafkaConsumer
consumer = KafkaConsumer('sex')
for msg in consumer:
print((msg.value).decode('utf8'))
Python操作Kafka
命令行运行
先开启Kafka,命令如下:
cd /usr/local/kafka # 安装目录 bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties
第二条和第三条命令需要分别开两个shell来运行。第二条命令加了“&”,zookeeper就会在后台运行,即使关闭了这个终端,zookeeper也会一直在后台运行。但是这样做,可能会导致忘记了还有zookeeper在后台运行。
在Kafka开启之后,即可开启(运行)KafkaProducer和KafkaConsumer。
打开一个shell,进入到项目目录下,激活刚刚创建的conda环境,运行py文件。
conda activate kafka python producer.py #启动生产者发送消息给Kafaka
再在Ubuntu中,打开另外一个命令行终端窗口shell,激活刚刚创建的conda环境,运行py文件。
conda activate kafka python consumer.py #启动消费者从Kafaka接收消息
接着consumer的shell中会输出一行又一行的数字。
pycharm运行在pycharm中,分别在producer.py文件和consumer.py中,右键Run ‘producer’和’consumer’,此时因为在新建项目时就已经设定好解释器是3.7的kafka环境了,因此直接就在该环境中运行,如果生产者和消费者运行成功,则在consumer底下的窗口会输出数字的信息。
- 用Ctrl-C停止生产者和消费者端
- 用Ctrl-C停止Kafka broker
- 用Ctrl-C停止ZooKeeper server



