yarn查看日志:yarn logs -applicationId “任务Id”
统计hbase表的数据量:org.apache.hadoop.hbase.mapreduce.RowCounter ‘表名’
目前自己主要负责从impala读取数据,有些需要预计算,有些则是直接封装json写入hbase。
1.先是通过脚本将impala数据预计算或直接封装成json上传
1.1传入三个参数:开始时间、结束时间和hbase表名
1.2外置sql路径
biz_dt=$1
last_dt=$2
tablename=$3
sql_file_path="./sqlconfigfile/trade_rights_info_dt.sql"
source ./read_sqlfile.sh
sql_str=$(get_sql_str ${sql_file_path})
spark-submit --master yarn-cluster --num-executors 2 --executor-cores 4 --executor-memory 6G --class com.cgs.TradeRightsInfo twoFinAcctAnalysis-1.0-SNAPSHOT_20210817.jar ${biz_dt} ${last_dt} hdfs://nameservice1/sync/jsontohbase/${tablename/%_dt}_${last_dt}/"biz_dt"=${last_dt} ${sql_str}
2.通过spark代码读取impala数据
2.1此代码没有进行预计算,而是直接封装json,代码逻辑简单,有难度的可能就是通过业务设置rowkey。
package com.cgs
import com.edw.TradeRightsInfoBean
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.codehaus.jackson.map.ObjectMapper
//交易专区-交易权限(T-1)
//teradata表: nwf_data.int_d_nwf_trade_rights_info
//impala表: app_data.app_int_d_nwf_trade_rights_info_z
//hbase表: trade_rights_dt 每天一张表
//rowkey设计:资金账号后2位+资金账号+账户类型
//注释:
//1.只使用最新的数据(只需将最新的分区的数据刷入hbase即可)
//bizDate String(10)
//custCode String(19)
//custType String(1)
//kcbFlag String(1)
//cybFlag String(1)
//hgtFlag String(1)
//sgtFlag String(1)
//secuAcctFlag String(1)
//ydghAcctFlag String(1)
//gpzyAcctFlag String(1)
//rzghAcctFlag String(1)
//netAssetAvg20 DECIMAL(26,4)
//tradeAlmt DECIMAL(16,4)
object TradeRightsInfo {
def main(args: Array[String]): Unit = {
if(args.length != 4){
println(
"""
|com.cgs.TradeRights
|参数:
|startDt 开始日期
|endDt 结束日期
|outputPath 输出路径
|sqlStr 外置sql语句
""".stripMargin)
sys.exit()
}
val Array(startDt,endDt,outputPath,sqlStr) = args
val sparkconf = new SparkConf().setAppName(this.getClass.getName)
val sc = SparkContext.getOrCreate(sparkconf)
val hiveContext = new HiveContext(sc)
val conf = new Configuration()
//val sql =
// """
// |select
// | trim(nvl(biz_dt , "")) as biz_dt ,
// | trim(nvl(cust_cd , "")) as cust_cd ,
// | trim(nvl(cust_type , "")) as cust_type ,
// |trim(nvl(asset_acct,"")) as asset_acct,
// |trim(nvl(acct_flag,"")) as acct_flag,
// | trim(nvl(kcb_flag , "")) as kcb_flag ,
// | trim(nvl(cyb_flag , "")) as cyb_flag ,
// | trim(nvl(hgt_flag , "")) as hgt_flag ,
// | trim(nvl(sgt_flag , "")) as sgt_flag ,
// | trim(nvl(secu_acct_flag , "")) as secu_acct_flag,
// | trim(nvl(ydgh_flag , "")) as ydgh_flag,
// | trim(nvl(gpzy_flag , "")) as gpzy_flag,
// | trim(nvl(gzzhg_flag , "")) as gzzhg_flag,
// | cast(net_asset_avg_20 as DECIMAL(26,4)) as net_asset_avg_20,
// | cast(trade_almt as DECIMAL(16,4)) as trade_almt
// |from
// | app_data.app_int_d_nwf_trade_rights_info_z
// |where
// | biz_date>=$startDt and biz_date<=$endDt
// |""".stripMargin
val sql = sqlStr.replaceAll("@@"," ").trim()
.replaceFirst("\$startDt",startDt)
.replaceFirst("\$endDt",endDt)
val dataframe = hiveContext.sql(sql)
val dataRdd = dataframe.map(e => {
val biz_dt = e.getAs[String]("biz_dt")
val cust_cd = e.getAs[String]("cust_cd")
val cust_type = e.getAs[String]("cust_type")
val asset_acct = e.getAs[String]("asset_acct")
val acct_flag = e.getAs[String]("acct_flag")
val kcb_flag = e.getAs[String]("kcb_flag")
val cyb_flag = e.getAs[String]("cyb_flag")
val hgt_flag = e.getAs[String]("hgt_flag")
val sgt_flag = e.getAs[String]("sgt_flag")
val secu_acct_flag = e.getAs[String]("secu_acct_flag")
val ydgh_flag = e.getAs[String]("ydgh_flag")
val gpzy_flag = e.getAs[String]("gpzy_flag")
val gzzhg_flag = e.getAs[String]("gzzhg_flag")
val net_asset_avg_20 = e.getAs[java.math.BigDecimal]("net_asset_avg_20")
val trade_almt = e.getAs[java.math.BigDecimal]("trade_almt")
val tradeRightsInfoBean = new TradeRightsInfoBean(biz_dt, cust_cd, cust_type, kcb_flag, cyb_flag, hgt_flag, sgt_flag, secu_acct_flag, ydgh_flag, gpzy_flag, gzzhg_flag, net_asset_avg_20, trade_almt)
//资金账号后2位+资金账号+账户类型
val rowkeyStr = new StringBuilder()
.append(asset_acct.substring(asset_acct.length - 2))
.append(asset_acct)
.append(acct_flag)
val jsonStr = new ObjectMapper().writevalueAsString(tradeRightsInfoBean)
rowkeyStr.toString() + "|" + jsonStr
})
val path = new Path(outputPath)
val fileSystem = FileSystem.get(conf)
if(fileSystem.exists(path)){
fileSystem.delete(path,true)
}
dataRdd.saveAsTextFile(outputPath)
sc.stop()
}
}
3.然后通过脚本将hdfs文件刷到hbase。
#!/bin/bash
# 本脚本用于将两列(rowkey,value)的hdfs文件导入到hbase中
# 步骤:
# 1. 生成hfile文件,此过程指定hdfs文件的分隔符、列映射,源文件的地址,目标文件的地址,表名
# 2. 将hfile文件导入到hbase中,指定路径、表名
# 参数:
# 1. 日期 如20200514 2.增量/全量/字典表 z/q/d 3.表名 :
path=$(cd `dirname $0`;pwd)
cd $path
source ~/.bashrc
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
export Hbase_HOME=/opt/cloudera/parcels/CDH/lib/hbase
start_date=`date "+%Y%m%d"`
start_time=`date "+%H%M%S"`
if [[ $# -lt 3 ]] || [[ "$*" =~ "--help" ]];then
echo -e "USAGE: sh $0 20200519 z wealth_date n"
exit 1
fi
biz_dt=$1
incremental=$2
columnname=${biz_dt:6:8}
hbase_server_dir="/opt/cloudera/parcels/CDH/lib/hbase"
tn=$3
tablename=${tn/%_dt}
incremtn=${tn}
output_dir="/sync/hbase/hfile/${tablename}/biz_dt=${biz_dt}"
source_dir="/sync/jsontohbase/${tablename}/biz_dt=${biz_dt}"
#创建日志目录
log_dir="./log/${start_date}"
if [ ! -d $log_dir ];then
mkdir -p $log_dir
fi
log_file=${log_dir}/${tn}_${biz_dt}_${start_time}_$$.log
echo "传入的参数为:$0 $*" >> $log_file 2>&1
#-Dimporttsv.separator 为你要加载文件的分隔符
#-Dimporttsv.bulk.output 输出的Hfile的目录
#$tablename 表名,这里未指定namespace 就在default下
#$source_dir 存放要加载的文件的目录
hdfs dfs -rm -r $output_dir >> $log_file 2>&1
echo "$incremental"
if [[ "$incremental" = "q" ]];then
echo -e "`date` 开始全量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $tn n日志文件存放在$log_file 中" |tee -a $log_file |cat
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv
'-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val
-Dimporttsv.bulk.output=${output_dir} ${tablename}
$source_dir >> $log_file 2>&1
elif [[ "$incremental" = "z" ]];then
if [[ $incremtn =~ "position_cycle_events" ]]; then
echo -e "`date` 开始增量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $incremtn n日志文件存放在$log_file 中" |tee -a $log_file |cat
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv
'-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val,info:rownum
-Dimporttsv.bulk.output=${output_dir} $incremtn
$source_dir >> $log_file 2>&1
elif [[ $incremtn =~ "his_day_trade_info" ]]; then
echo -e "`date` 开始增量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $incremtn n日志文件存放在$log_file 中" |tee -a $log_file |cat
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv
'-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val,info:rownum
-Dimporttsv.bulk.output=${output_dir} $incremtn
$source_dir >> $log_file 2>&1
elif [[ $incremtn =~ "his_trade_info" ]]; then
echo -e "`date` 开始增量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $incremtn n日志文件存放在$log_file 中" |tee -a $log_file |cat
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv
'-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val,info:rownum
-Dimporttsv.bulk.output=${output_dir} $incremtn
$source_dir >> $log_file 2>&1
else
echo -e "`date` 开始增量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $incremtn n日志文件存放在$log_file 中" |tee -a $log_file |cat
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv
'-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val
-Dimporttsv.bulk.output=${output_dir} $incremtn
$source_dir >> $log_file 2>&1
fi
fi
if [[ $? -eq 0 ]];then
echo -e "`date` $source_dir hfile文件生成完毕。 n开始导入到hbase $tn 中。。" |tee -a $log_file |cat
else
echo -e "`date` $source_dir hfile文件生成失败。。" |tee -a $log_file |cat
exit 1
fi
#completebulkload
if [[ "$incremental" = "q" ]]
then
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar
${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar
completebulkload
${output_dir} $tn >> $log_file 2>&1
else
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar
${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar
completebulkload
${output_dir} $incremtn >> $log_file 2>&1
fi
if [[ $? -eq 0 ]];then
echo -e "success.. `date` hdfs 文件 $source_dir 已成功导入到hbase $tn 中 " |tee -a $log_file |cat
else
echo -e "error!! `date` hdfs 文件 $source_dir 导入到hbase $tn 中失败 " |tee -a $log_file |cat
fi



