1、每天利用python执行hive sql脚本
2、执行前检测依赖表是否生成
3、依赖表如果生成成功会touch一个success文件到hdfs目录
4、循环检测一定次数,失败抛出异常
上代码:#coding:utf-8
#@auth: lgy
import datetime
import subprocess
import time
import sys
#获取昨天日期
def get_yesterday(format="%Y%m%d"):
today = datetime.date.today()
yesterday = today + datetime.timedelta(days=-1)
return yesterday.strftime(format)
#检测文件
def work():
yesterday = get_yesterday()
error_count = 0
#需要检测的hdfs路径
hdfs_paths = ['hdfs://dt=%s/_SUCCESS'%yesterday,
'hdfs://dt=%s/_SUCCESS'%yesterday]
#循环检测
for hdfs_path in hdfs_paths:
#检测语句
filexistchk = "hdfs dfs -test -e " + hdfs_path + ";echo $?"
while 1:
#执行检测
filexistchk_output = subprocess.Popen(filexistchk, shell=True, stdout=subprocess.PIPE).communicate()
#如果存在
if '1' not in str(filexistchk_output[0]):
print (hdfs_path+" is exists!")
break
else:
error_count += 1
if error_count==50:
#50次没有检测出来 抛出异常
raise Exception("依赖表生成失败!")
now_t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print (hdfs_path+" %s not exists"%(now_t))
time.sleep(10*60)
#如果没有异常 执行hive脚本
execute_hive_path='/'
execute_hive="/app/hive/bin/hive --hivevar yesterday=%s -f %s.sql"%(yesterday,execute_hive_path+"/"+'file_name')
print(execute_hive)
#子进程执行hive
p = subprocess.Popen(execute_hive, shell=True,stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
#循环读取子进程执行情况,读取子进程返回结果
while p.poll()==None:
print(p.stdout.readline())
#如果返回code为失败
if(p.returncode!=0):
errorInfo = p.stdout.read()
print("%s execute failed,error info: %s"%('file_name', errorInfo))
sys.exit(1) #有错误退出
else:
print("%s execute success!")
sys.exit(0) #无错误退出



