全量导入
#!/usr/bin/env bash
# /bin/bash
biz_date=20210101
biz_fmt_date=2021-01-01
dw_parent_dir=/data/dw/ods/one_make/full_imp
workhome=/opt/sqoop/one_make
full_imp_tables=${workhome}/full_import_tables.txt
mkdir ${workhome}/log
orcl_srv=oracle.bigdata.cn
orcl_port=1521
orcl_sid=helowin
orcl_user=ciss
orcl_pwd=123456
sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"
# load hadoop/sqoop env
source /etc/profile
while read p; do
# parallel execution import
${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 &
cur_time=`date "+%F %T"`
echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_full_imp.log
sleep 30
done < ${full_imp_tables}
# 如果使用了MR的Uber模式,必须在程序中加上以下-Dmapreduce.job.user.classpath.first=true 避免类冲突问题
#--outdir:Sqoop解析出来的MR的Java程序等输出文件输出的文件
# p^^ 变成 大写
# cur_time=`date "+%F %T"` 获取当前时间
# sleep 30 防止内存资源不足
增量导入
#!/usr/bin/env bash
# 编写SHELL脚本的时候要特别小心,特别是编写SQL的条件,如果中间加了空格,就会导致命令执行失败
# /bin/bash
biz_date=20210101
biz_fmt_date=2021-01-01
dw_parent_dir=/data/dw/ods/one_make/incr_imp
workhome=/opt/sqoop/one_make
incr_imp_tables=${workhome}/incr_import_tables.txt
orcl_srv=oracle.bigdata.cn
orcl_port=1521
orcl_sid=helowin
orcl_user=ciss
orcl_pwd=123456
mkdir ${workhome}/log
sqoop_condition_params="--where "'${biz_fmt_date}'=to_char(CREATE_TIME,'yyyy-mm-dd')""
sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"
# load hadoop/sqoop env
source /etc/profile
while read p; do
# clean old directory in HDFS
hdfs dfs -rm -r ${dw_parent_dir}/${p}/${biz_date}
# parallel execution import
${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} ${sqoop_condition_params} -m 1 &
cur_time=`date "+%F %T"`
echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} ${sqoop_condition_params} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_incr_imp.log
sleep 30
done < ${incr_imp_tables}
执行
cd /opt/sqoop/one_make 执行前,需要更改时间biz_date和biz_fmt_date sh -x full_import_tables.shpython 脚本
全量导入
#!/usr/bin/env python
# @Time : 2021/7/14 15:34
# @desc :
__coding__ = "utf-8"
__author__ = "itcast"
import os
import subprocess
import datetime
import time
import logging
biz_date = '20210101'
biz_fmt_date = '2021-01-01'
dw_parent_dir = '/data/dw/ods/one_make/full_imp'
workhome = '/opt/sqoop/one_make'
full_imp_tables = workhome + '/full_import_tables.txt'
if os.path.exists(workhome + '/log'):
os.system('make ' + workhome + '/log')
orcl_srv = 'oracle.bigdata.cn'
orcl_port = '1521'
orcl_sid = 'helowin'
orcl_user = 'ciss'
orcl_pwd = '123456'
sqoop_import_params = 'sqoop import -Dmapreduce.job.user.classpath.first=true --outdir %s/java_code --as-avrodatafile' % workhome
sqoop_jdbc_params = '--connect jdbc:oracle:thin:@%s:%s:%s --username %s --password %s' % (orcl_srv, orcl_port, orcl_sid, orcl_user, orcl_pwd)
# load hadoop/sqoop env
subprocess.call("source /etc/profile", shell=True)
print('executing...')
# read file
fr = open(full_imp_tables)
for line in fr.readlines():
tblName = line.rstrip('n')
# parallel execution import
# ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 &
# sqoopimportCommand = f''' {sqoop_import_params} {sqoop_jdbc_params} --target-dir {dw_parent_dir}/{tblName}/{biz_date} --table {tblName.upper()} -m 1 &'''
sqoopimportCommand = '''
%s %s --target-dir %s/%s/%s --table %s -m 1 &
''' % (sqoop_import_params, sqoop_jdbc_params, dw_parent_dir, tblName, biz_date, tblName.upper())
# parallel execution import
subprocess.call(sqoopimportCommand, shell=True)
# cur_time=`date "+%F %T"`
# cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logging.basicConfig(level=logging.INFO, # 控制台打印的日志级别
filename='%s/log/%s_full_imp.log' % (workhome, biz_fmt_date),
# 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志; a是追加模式,默认如果不写的话,就是追加模式
filemode='a',
# 日志格式
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
# logging.info(cur_time + ' : ' + sqoopimportCommand)
logging.info(sqoopimportCommand)
time.sleep(15)
增量导入
#!/usr/bin/env python
# @Time : 2021/7/20 15:19
# @desc :
__coding__ = "utf-8"
__author__ = "itcast"
import os
import subprocess
import datetime
import time
import logging
biz_date = '20210101'
biz_fmt_date = '2021-01-01'
dw_parent_dir = '/data/dw/ods/one_make/incr_imp'
workhome = '/opt/sqoop/one_make'
incr_imp_tables = workhome + '/incr_import_tables.txt'
if os.path.exists(workhome + '/log'):
os.system('make ' + workhome + '/log')
orcl_srv = 'oracle.bigdata.cn'
orcl_port = '1521'
orcl_sid = 'helowin'
orcl_user = 'ciss'
orcl_pwd = '123456'
sqoop_import_params = 'sqoop import -Dmapreduce.job.user.classpath.first=true --outdir %s/java_code --as-avrodatafile' % workhome
sqoop_jdbc_params = '--connect jdbc:oracle:thin:@%s:%s:%s --username %s --password %s' % (orcl_srv, orcl_port, orcl_sid, orcl_user, orcl_pwd)
# load hadoop/sqoop env
subprocess.call("source /etc/profile", shell=True)
print('executing...')
# read file
fr = open(incr_imp_tables)
for line in fr.readlines():
tblName = line.rstrip('n')
# clean old directory in HDFS
hdfs_command = 'hdfs dfs -rm -r %s/%s/%s' % (dw_parent_dir, tblName, biz_date)
# parallel execution import
# ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 &
# sqoopimportCommand = f''' {sqoop_import_params} {sqoop_jdbc_params} --target-dir {dw_parent_dir}/{tblName}/{biz_date} --table {tblName.upper()} -m 1 &'''
sqoopimportCommand = '''
%s %s --target-dir %s/%s/%s --table %s -m 1 &
''' % (sqoop_import_params, sqoop_jdbc_params, dw_parent_dir, tblName, biz_date, tblName.upper())
# parallel execution import
subprocess.call(sqoopimportCommand, shell=True)
# cur_time=`date "+%F %T"`
# cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logging.basicConfig(level=logging.INFO,
filename='%s/log/%s_full_imp.log' % (workhome, biz_fmt_date),
filemode='a',
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
# logging.info(cur_time + ' : ' + sqoopimportCommand)
logging.info(sqoopimportCommand)
time.sleep(15)



