栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

python直接操作hive hdfs实例代码

python直接操作hive hdfs实例代码

from hdfs import InsecureClient

lian = InsecureClient(url='http://master:9870')  # 指定远程地址,和用户名
print(lian.list('/'))  # 列出根目录下的所有文件
lian.write(hdfs_path='/niubi/shang2', overwrite=True, data='世界你好,我来了'.encode('utf-8'))  # 在/niubi/目录下面创建名字为shang的文件,里面写入数据“世界你好,我来了”。注意,数据要使用utf-8编码才行,该方法还有一个要注意的参数,append参数用于说明要不要覆盖已有的内容,默认为False,即在尾部添加。overwrite参数指明如果文件已经存在时的操作,True表示覆盖,False时如果文件已存在就抛异常
print(lian.list('/niubi'))

#查看文件的内容
with lian.read('/niubi/shang2') as f:  # read方法返回的是上下文管理器对象,所以要使用with调用
    print(f.data.decode('utf-8'))  # 返回的数据放在read函数的data变量里面,并且存储的是utf-8的编码,所以要转码才能看到中文,当然,英文不用转码

from pyhive import hive
import thrift
import sasl
import thrift_sasl
print("hello dog")
conn = hive.Connection(host='master', port=10000, database='hive_dlp_data_test',auth='NONE')
cursor=conn.cursor()
cursor.execute('show tables')
print(cursor.fetchall())
print("hello AAA")
# for result in cursor.fetchall():
#     print("hello BBB")
#     print(result)
#     print("hello CCC")
# cursor.execute('select * from hive_dlp_data_test.sink_dlp_hive limit 6')
# print(cursor.fetchall())
cursor.execute("select * from hive_dlp_data_test.sink_dlp_hive where ts_date='20211021' and ts_hour='19' limit 6")
print(cursor.fetchall())
print("BBBB")
cursor.execute("SELECt * FROM sink_dlp_hive WHERe ts_date = '20211021' and ts_hour = '19' LIMIT 2")

print(cursor.fetchall())

print("CCC")
cursor.execute("SELECt * FROM sink_dlp_hive WHERe ts_date = '20211021' and ts_hour = '19' LIMIT 2")

print(cursor.fetchall())
print("DDD test count :")
cursor.execute("SELECt * FROM sink_dlp_hive WHERe ts_date = '20211021' and ts_hour = '19' LIMIT 2")

#!/usr/bin/python
# -*- coding: UTF-8 -*-



import pandas as pd
from pyhive import hive
import thrift
import sasl
import thrift_sasl
database_name= "hive_dlp_data_test"
table_name = "sink_dlp_hive"
conn = hive.Connection(host='master', port=10000, database=database_name, auth='NONE')
cursor = conn.cursor()
cursor.execute('show tables')
print("tables:",cursor.fetchall())
cursor.execute('show databases')
print("databases:",cursor.fetchall())

def fetch_data(database_name, table_name, ts_date, ts_hour, limit=None):
    print("database_name:",database_name)
    try:
        print("BBB")
        describe_query = 'describe extended ' + table_name
        cursor.execute(describe_query)
        print("CCCC")
        colnames = []
        for result in cursor.fetchall():
            colnames.append(result[0])
        print("DDDD",colnames)
        if limit:
            lmt_cmd = ' LIMIT ' + str(limit)
        else:
            lmt_cmd = ''
        select_query = "SELECT * FROM " + table_name + " WHERe ts_date = " + "'"+ts_date + "'"+" and ts_hour = " + "'"+ts_hour+"'" + lmt_cmd

        print("EEEE",select_query)
        print("FFFF")

        cursor.execute(select_query)
        print("#####data###get data")


        print(cursor.fetchall())
        print("GGGG")
        # df = pd.read_sql_query(select_query, conn)
        # df.columns = colnames[:df.shape[1]]


        return None
        #return df

    except Exception as e:
        # if conn:
        #     conn.close()
        print("except ERROR :",e)
        return None


for day in range(20, 22):
    for hour in range(18, 20):

        if hour < 10:
            hour_str = '0' + str(hour)
        else:
            hour_str = str(hour)
        print("AAA:",day, hour)
        fetch_data(database_name, table_name, '202110' + str(day), hour_str,limit=2)
        #df =print(day, hour, df.shape)
if conn:
    conn.close()

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import pandas as pd
from pyhive import hive
import thrift
import sasl
import thrift_sasl

database_name= "hive_dlp_data_test"
table_name = "sink_dlp_hive"
conn = hive.Connection(host='master', port=10000, database=database_name, auth='NONE')
cursor = conn.cursor()
cursor.execute('show tables')
print("tables:",cursor.fetchall())
cursor.execute('show databases')
print("databases:",cursor.fetchall())
cursor.execute('SELECt COUNT(*) FROM hive_dlp_data_test.sink_dlp_hive WHERe TS_DATE = 20211021 AND TS_HOUR = 18')
print("count test:",cursor.fetchall())

def fetch_data(database_name, table_name, ts_date, ts_hour, limit=None):
    print("databases:",database_name)
    try:
        describe_query = 'describe extended ' + table_name
        cursor.execute(describe_query)
        colnames = []
        for result in cursor.fetchall():
            colnames.append(result[0])

        if limit:
            lmt_cmd = ' LIMIT ' + str(limit)
        else:
            lmt_cmd = ''
        select_query = 'SELECT * FROM ' + table_name + ' WHERe ts_date = ' + ts_date + ' and ts_hour = ' + ts_hour + lmt_cmd
        print("select_query:",select_query)
        df = pd.read_sql_query(select_query, conn)
        print(type(df))
        df.columns = colnames[:df.shape[1]]
        return df

    except Exception as e:
        print("ts_date, ts_hour:",ts_date, ts_hour)
        print("except ERROR:",e)
        # if conn:
        #     conn.close()


for day in range(20, 22):
    for hour in range(0, 24):

        if hour < 10:
            hour_str = '0' + str(hour)
        else:
            hour_str = str(hour)
        print("date,hour:",day,hour)
        df = fetch_data(database_name, table_name, '202110' + str(day), hour_str)
        print("result:",day, hour, df.shape)

if conn:
    conn.close()

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/729367.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号