栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Schema上传备份的shell/python脚本

Schema上传备份的shell/python脚本

直接上代码 shell脚本
#!/usr/bin/env bash
# 上传
# /bin/bash
workhome=/opt/sqoop/one_make
hdfs_schema_dir=/data/dw/ods/one_make/avsc
biz_date=20210101
biz_fmt_date=2021-01-01
local_schema_backup_filename=schema_${biz_date}.tar.gz
hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
log_file=${workhome}/log/upload_avro_schema_${biz_fmt_date}.log

# 打印日志,定义一个log函数
log() {
    cur_time=`date "+%F %T"`
    echo "${cur_time} $*" >> ${log_file}
}

#刷新环境变量,然后切换到workhome
source /etc/profile
cd ${workhome}

#  hadoop fs [generic options] [-test -[defsz] ]
# -test -[defsz]  :
#   Answer various questions about , with result via exit status.
#     -d  return 0 if  is a directory.
#     -e  return 0 if  exists.
#     -f  return 0 if  is a file.
#     -s  return 0 if file  is greater than zero bytes in size.
#     -z  return 0 if file  is zero bytes in size, else return 1.

#调用函数
log "Check if the HDFS Avro schema directory ${hdfs_schema_dir}..."

# 判断一个hdfs文件目录是否存在,不存在返回 非0,c存在返回 0
hdfs dfs -test -e ${hdfs_schema_dir} > /dev/null

# 目录不存在,则创建
if [ $? != 0 ]; then
    log "Path: ${hdfs_schema_dir} is not exists. Create a new one."
    log "hdfs dfs -mkdir -p ${hdfs_schema_dir}"
    hdfs dfs -mkdir -p ${hdfs_schema_dir}
fi

log "Check if the file ${hdfs_schema_dir}/CISS4_CISS_base_AREAS.avsc has uploaded to the HFDS..."

# 判断一个CISS4_CISS_base_AREAS.avsc.avsc是否存在,不存在返回 非0,c存在返回 0
hdfs dfs -test -e ${hdfs_schema_dir}/CISS4_CISS_base_AREAS.avsc > /dev/null

# 目录不存在,则上传
if [ $? != 0 ]; then
    log "Upload all the .avsc schema file."
    log "hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}"
    hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
fi

# 压缩
log "Check if the backup tar.gz file has generated in the local server..." 
# ! -e 不存在,则对../java_code/*.avsc做压缩
if [ ! -e ${local_schema_backup_filename} ]; then
    log "package and compress the schema files"
    log "tar -czf ${local_schema_backup_filename} ./java_code/*.avsc"
    tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
fi

#上传
log "Check if the backup tar.gz file has upload to the HDFS..."
hdfs dfs -test -e ${hdfs_schema_backup_filename} > /dev/null
if [ $? != 0 ]; then
    log "upload the schema package file to HDFS"
    log "hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}"
    hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}
fi

代码拆解:avro文件会存储schema信息

Python 脚本
#!/usr/bin/env python
# @Time : 2021/7/20 15:46
# @desc :
__coding__ = "utf-8"
__author__ = "itcast"

# import pyhdfs
import logging
import os

workhome = '/opt/sqoop/one_make'
hdfs_schema_dir = '/data/dw/ods/one_make/avsc'
biz_date = '20210101'
biz_fmt_date = '2021-01-01'
local_schema_backup_filename = 'schema_%s.tar.gz' % biz_date
hdfs_schema_backup_filename = '%s/avro_schema_%s.tar.gz' % (hdfs_schema_dir, biz_date)
log_file = '%s/log/upload_avro_schema_%s.log' % (workhome, biz_fmt_date)

# append log to file
logging.basicConfig(level=logging.INFO,
                    filename=log_file,
                    filemode='a',
                    format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')

os.system('source /etc/profile')
os.system('cd %s' % workhome)

#  hadoop fs [generic options] [-test -[defsz] ]
# -test -[defsz]  :
#   Answer various questions about , with result via exit status.
#     -d  return 0 if  is a directory.
#     -e  return 0 if  exists.
#     -f  return 0 if  is a file.
#     -s  return 0 if file  is greater than zero bytes in size.
#     -z  return 0 if file  is zero bytes in size, else return 1.
logging.info('Check if the HDFS Avro schema directory %s...', hdfs_schema_dir)
# hdfs = pyhdfs.HdfsClient(hosts="node1,9000", user_name="hdfs")
# print(hdfs.listdir('/'))
# hdfs dfs -test -e ${hdfs_schema_dir} > /dev/null
commStatus = os.system('hdfs dfs -test -e %s > /dev/null' % hdfs_schema_dir)
if commStatus is not 0:
    logging.info('Path: %s is not exists. Create a new one.', hdfs_schema_dir)
    logging.info('hdfs dfs -mkdir -p %s', hdfs_schema_dir)
    os.system('hdfs dfs -mkdir -p %s' % hdfs_schema_dir)

logging.info('Check if the file %s/CISS4_CISS_base_AREAS.avsc has uploaded to the HFDS...', hdfs_schema_dir)
commStatus = os.system('hdfs dfs -test -e %s/CISS4_CISS_base_AREAS.avsc > /dev/null' % hdfs_schema_dir)
if commStatus is not 0:
    logging.info('Upload all the .avsc schema file.')
    logging.info('hdfs dfs -put %s/java_code/*.avsc %s', workhome, hdfs_schema_dir)
    os.system('hdfs dfs -put %s/java_code/*.avsc %s' % (workhome, hdfs_schema_dir))

# backup
logging.info('Check if the backup tar.gz file has generated in the local server...')
commStatus = os.system('[ -e %s ]' % local_schema_backup_filename)
if commStatus is not 0:
    logging.info('package and compress the schema files')
    logging.info('tar -czf %s ./java_code/*.avsc', local_schema_backup_filename)
    os.system('tar -czf %s ./java_code/*.avsc' % local_schema_backup_filename)

logging.info('Check if the backup tar.gz file has upload to the HDFS...')
commStatus = os.system('hdfs dfs -test -e %s > /dev/null' % hdfs_schema_backup_filename)
if commStatus is not 0:
    logging.info('upload the schema package file to HDFS')
    logging.info('hdfs dfs -put %s %s', local_schema_backup_filename, hdfs_schema_backup_filename)
    os.system('hdfs dfs -put %s %s' %(local_schema_backup_filename, hdfs_schema_backup_filename))

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742949.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号