栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

KAFKA 知乎

KAFKA 知乎

KAFKA module
import csv
from kafka import KafkaProducer
import time

def main():
    ##生产模块
    producer = KafkaProducer(bootstrap_servers=['121.196.222.214:9092'])
    with open('D:/QQfile/answer_results.csv','r',encoding='utf8')as fp:
        reader=csv.reader(fp)
        data_temp=[]
        for row in reader:
            data_temp.append(row)
        pre=[]
        date=[]
        pos=0
        for i in range(len(data_temp)-1):
            if data_temp[i][0]==data_temp[i+1][0]:
                date=data_temp[i][0]
                pre.append(data_temp[i][1])
            else:
                pos=i
                break
        print(pos)
        for i in range(len(pre)):
            string=pre[i]
            time.sleep(1)
            producer.send("txt", bytes(string.replace('n','').encode('utf-8')))
            print(bytes(string.replace('n','').encode('utf-8')))
if __name__ == '__main__':
    main()
KAFKA消费者
import findspark
findspark.init('/root/Downloads/spark-2.4.7-bin-hadoop2.7')
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition


def start():
    sconf = SparkConf()
    sconf.set('spark.cores.max', 2).set('spark.io.compression.codec','snappy')
    sc = SparkContext(appName='txt', conf=sconf)
    ssc = StreamingContext(sc, 15)
    brokers = "121.196.222.214:9092"
    topic = 'txt'
    user_data = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams={"metadata.broker.list": brokers})
    gender_users = user_data.flatMap(lambda x:x[1].split(' ')).map(lambda gender: (gender, 1)).reduceByKey(lambda a, b: a + b)
    gender_users.pprint()
    gender_users.saveAsTextFiles("/usr/wordcount/w")
    ssc.start()
    ssc.awaitTermination()


if __name__ == '__main__':
    start()
import csv
from kafka import KafkaProducer
import time

def main():
    ##生产模块
    producer = KafkaProducer(bootstrap_servers=['121.196.222.214:9092'])
    with open('D:/QQfile/answer_results.csv','r',encoding='utf8')as fp:
        reader=csv.reader(fp)
        data_temp=[]
        for row in reader:
            data_temp.append(row)
        pre=[]
        date=[]
        pos=0
        for i in range(len(data_temp)-1):
            if data_temp[i][0]==data_temp[i+1][0]:
                date=data_temp[i][0]
                pre.append(data_temp[i][1])
            else:
                pos=i
                break
        print(pos)
        for i in range(len(pre)):
            string=pre[i]
            time.sleep(1)
            producer.send("txt", bytes(string.replace('n','').encode('utf-8')))
            print(bytes(string.replace('n','').encode('utf-8')))
if __name__ == '__main__':
    main()

JSON转换

import json

filename='C:/Users/2020-01-20.txt'
with open(filename, 'r',encoding='UTF-8') as fp:
    lines = fp.readlines()
    dict_all={}
    day='2020-01-20'
    dict_day={}
    key=[]
    value=[]
    for line in lines:
        list=[]
        line=line.replace('(','').replace(')','').replace('n','').replace(''','').replace(')','')
        list=line.split(',')
        for i in range(len(list)):
            list[i]=list[i].strip()
        if list[0]=='0':
            list[0]='neutral'
        elif list[0]=='1':
            list[0]='postive'
        else:
            list[0]='negative'
        key.append(list[0])
        value.append(list[1])
    for i in range(len(key)):
        dict_day[str(key[i])]=int(value[i])
    dict_all[day]=dict_day
    str_json=json.dumps(dict_all)
    with open("result_mode.json", "w") as fp:
        fp.write(json.dumps(dict_all,indent=4))

JSON转换WORDCOUNTS

import json

filename='word.txt'
with open(filename, 'r',encoding='UTF-8') as fp:
    lines = fp.readlines()
    dict_all={}
    day='2020-01-20'
    dict_day={}
    key=[]
    value=[]
    for line in lines:
        list=[]
        line=line.replace('(','').replace(')','').replace('n','').replace(''','').replace(')','')
        list=line.split(',')
        for i in range(len(list)):
            list[i]=list[i].strip()
        key.append(list[0])
        value.append(list[1])
    for i in range(len(key)):
        dict_day[str(key[i])]=int(value[i])
    dict_all[day]=dict_day
    str_json=json.dumps(dict_all)
    with open("result.json", "w") as fp:
        fp.write(json.dumps(dict_all,indent=4))
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/707670.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号