栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

生产遇到问题大全

生产遇到问题大全

yarn查看日志:yarn logs -applicationId “任务Id”
统计hbase表的数据量:org.apache.hadoop.hbase.mapreduce.RowCounter ‘表名’

目前自己主要负责从impala读取数据,有些需要预计算,有些则是直接封装json写入hbase。
1.先是通过脚本将impala数据预计算或直接封装成json上传
1.1传入三个参数:开始时间、结束时间和hbase表名
1.2外置sql路径

biz_dt=$1
last_dt=$2
tablename=$3

sql_file_path="./sqlconfigfile/trade_rights_info_dt.sql"

source ./read_sqlfile.sh

sql_str=$(get_sql_str ${sql_file_path})

spark-submit --master yarn-cluster --num-executors 2 --executor-cores 4 --executor-memory 6G --class com.cgs.TradeRightsInfo twoFinAcctAnalysis-1.0-SNAPSHOT_20210817.jar ${biz_dt} ${last_dt} hdfs://nameservice1/sync/jsontohbase/${tablename/%_dt}_${last_dt}/"biz_dt"=${last_dt} ${sql_str}

2.通过spark代码读取impala数据
2.1此代码没有进行预计算,而是直接封装json,代码逻辑简单,有难度的可能就是通过业务设置rowkey。

package com.cgs

import com.edw.TradeRightsInfoBean
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.codehaus.jackson.map.ObjectMapper

//交易专区-交易权限(T-1)
//teradata表: nwf_data.int_d_nwf_trade_rights_info
//impala表: app_data.app_int_d_nwf_trade_rights_info_z
//hbase表: trade_rights_dt		每天一张表
//rowkey设计:资金账号后2位+资金账号+账户类型
//注释:
//1.只使用最新的数据(只需将最新的分区的数据刷入hbase即可)


//bizDate 	    	String(10)
//custCode 	    	String(19)
//custType 	    	String(1)
//kcbFlag 			String(1)
//cybFlag 			String(1)
//hgtFlag 			String(1)
//sgtFlag 			String(1)
//secuAcctFlag 		String(1)
//ydghAcctFlag 		String(1)
//gpzyAcctFlag 		String(1)
//rzghAcctFlag  	String(1)
//netAssetAvg20 	DECIMAL(26,4)
//tradeAlmt 		DECIMAL(16,4)

object TradeRightsInfo {
  def main(args: Array[String]): Unit = {
    if(args.length != 4){
      println(
        """
          |com.cgs.TradeRights
          |参数:
          |startDt    开始日期
          |endDt      结束日期
          |outputPath 输出路径
          |sqlStr        外置sql语句
        """.stripMargin)
      sys.exit()
    }

    val Array(startDt,endDt,outputPath,sqlStr) = args

    val sparkconf = new SparkConf().setAppName(this.getClass.getName)
    val sc = SparkContext.getOrCreate(sparkconf)
    val hiveContext = new HiveContext(sc)

    val conf = new Configuration()

    //val sql =
    // """
    //   |select
    //   | trim(nvl(biz_dt 	   , "")) as biz_dt 	  ,
    //   | trim(nvl(cust_cd 	   , "")) as cust_cd 	  ,
    //   | trim(nvl(cust_type 	   , "")) as cust_type 	  ,
    //   |trim(nvl(asset_acct,"")) as asset_acct,
    //   |trim(nvl(acct_flag,"")) as acct_flag,
    //   | trim(nvl(kcb_flag 	   , "")) as kcb_flag 	  ,
    //   | trim(nvl(cyb_flag 	   , "")) as cyb_flag 	  ,
    //   | trim(nvl(hgt_flag 	   , "")) as hgt_flag 	  ,
    //   | trim(nvl(sgt_flag 	   , "")) as sgt_flag 	  ,
    //   | trim(nvl(secu_acct_flag , "")) as secu_acct_flag,
    //   | trim(nvl(ydgh_flag , "")) as ydgh_flag,
    //   | trim(nvl(gpzy_flag , "")) as gpzy_flag,
    //   | trim(nvl(gzzhg_flag , "")) as gzzhg_flag,
    //   | cast(net_asset_avg_20 as	DECIMAL(26,4)) as net_asset_avg_20,
    //   | cast(trade_almt 	as	DECIMAL(16,4)) as trade_almt
    //   |from
    //   | app_data.app_int_d_nwf_trade_rights_info_z
    //   |where
    //   | biz_date>=$startDt and biz_date<=$endDt
    //   |""".stripMargin

    val sql = sqlStr.replaceAll("@@"," ").trim()
      .replaceFirst("\$startDt",startDt)
      .replaceFirst("\$endDt",endDt)

    val dataframe = hiveContext.sql(sql)

    val dataRdd = dataframe.map(e => {
      val biz_dt = e.getAs[String]("biz_dt")
      val cust_cd = e.getAs[String]("cust_cd")
      val cust_type = e.getAs[String]("cust_type")
      val asset_acct = e.getAs[String]("asset_acct")
      val acct_flag = e.getAs[String]("acct_flag")
      val kcb_flag = e.getAs[String]("kcb_flag")
      val cyb_flag = e.getAs[String]("cyb_flag")
      val hgt_flag = e.getAs[String]("hgt_flag")
      val sgt_flag = e.getAs[String]("sgt_flag")
      val secu_acct_flag = e.getAs[String]("secu_acct_flag")
      val ydgh_flag = e.getAs[String]("ydgh_flag")
      val gpzy_flag = e.getAs[String]("gpzy_flag")
      val gzzhg_flag = e.getAs[String]("gzzhg_flag")
      val net_asset_avg_20 = e.getAs[java.math.BigDecimal]("net_asset_avg_20")
      val trade_almt = e.getAs[java.math.BigDecimal]("trade_almt")

      val tradeRightsInfoBean = new TradeRightsInfoBean(biz_dt, cust_cd, cust_type, kcb_flag, cyb_flag, hgt_flag, sgt_flag, secu_acct_flag, ydgh_flag, gpzy_flag, gzzhg_flag, net_asset_avg_20, trade_almt)

      //资金账号后2位+资金账号+账户类型
      val rowkeyStr = new StringBuilder()
        .append(asset_acct.substring(asset_acct.length - 2))
        .append(asset_acct)
        .append(acct_flag)

      val jsonStr = new ObjectMapper().writevalueAsString(tradeRightsInfoBean)

      rowkeyStr.toString() + "|" + jsonStr
    })

    val path = new Path(outputPath)
    val fileSystem = FileSystem.get(conf)
    if(fileSystem.exists(path)){
      fileSystem.delete(path,true)
    }

    dataRdd.saveAsTextFile(outputPath)

    sc.stop()
  }
}

3.然后通过脚本将hdfs文件刷到hbase。

#!/bin/bash
# 本脚本用于将两列(rowkey,value)的hdfs文件导入到hbase中
# 步骤:
# 1. 生成hfile文件,此过程指定hdfs文件的分隔符、列映射,源文件的地址,目标文件的地址,表名
# 2. 将hfile文件导入到hbase中,指定路径、表名
# 参数:
# 1. 日期 如20200514 2.增量/全量/字典表 z/q/d  3.表名 :

path=$(cd `dirname $0`;pwd)
cd $path
source ~/.bashrc
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
export Hbase_HOME=/opt/cloudera/parcels/CDH/lib/hbase
start_date=`date "+%Y%m%d"`
start_time=`date "+%H%M%S"`
if [[ $# -lt 3 ]] || [[ "$*" =~ "--help" ]];then
  echo -e "USAGE: sh $0 20200519 z wealth_date n"
  exit 1
fi
biz_dt=$1
incremental=$2
columnname=${biz_dt:6:8}
hbase_server_dir="/opt/cloudera/parcels/CDH/lib/hbase"
tn=$3 
tablename=${tn/%_dt}
incremtn=${tn}

output_dir="/sync/hbase/hfile/${tablename}/biz_dt=${biz_dt}"
source_dir="/sync/jsontohbase/${tablename}/biz_dt=${biz_dt}"

#创建日志目录
log_dir="./log/${start_date}"
if [ ! -d $log_dir ];then
  mkdir -p $log_dir
fi

log_file=${log_dir}/${tn}_${biz_dt}_${start_time}_$$.log
echo "传入的参数为:$0 $*" >> $log_file 2>&1

#-Dimporttsv.separator 为你要加载文件的分隔符

#-Dimporttsv.bulk.output 输出的Hfile的目录

#$tablename 表名,这里未指定namespace 就在default下

#$source_dir 存放要加载的文件的目录

hdfs dfs -rm -r $output_dir >> $log_file 2>&1
echo "$incremental"
if [[ "$incremental" = "q" ]];then
  echo -e "`date` 开始全量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $tn n日志文件存放在$log_file 中" |tee -a $log_file |cat
  HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv 
  '-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val 
  -Dimporttsv.bulk.output=${output_dir} ${tablename} 
  $source_dir >> $log_file 2>&1
elif [[ "$incremental" = "z" ]];then
  if [[ $incremtn =~ "position_cycle_events" ]]; then
    echo -e "`date` 开始增量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $incremtn n日志文件存放在$log_file 中" |tee -a $log_file |cat
    HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv 
    '-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val,info:rownum 
    -Dimporttsv.bulk.output=${output_dir} $incremtn 
    $source_dir >> $log_file 2>&1
  elif [[ $incremtn =~ "his_day_trade_info" ]]; then
    echo -e "`date` 开始增量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $incremtn n日志文件存放在$log_file 中" |tee -a $log_file |cat
    HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv 
    '-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val,info:rownum 
    -Dimporttsv.bulk.output=${output_dir} $incremtn 
    $source_dir >> $log_file 2>&1
  elif [[ $incremtn =~ "his_trade_info" ]]; then
    echo -e "`date` 开始增量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $incremtn n日志文件存放在$log_file 中" |tee -a $log_file |cat
    HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv 
    '-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val,info:rownum 
    -Dimporttsv.bulk.output=${output_dir} $incremtn 
    $source_dir >> $log_file 2>&1
  else
    echo -e "`date` 开始增量生成$source_dir 的hfile文件,目标地址$output_dir,目标表 $incremtn n日志文件存放在$log_file 中" |tee -a $log_file |cat
    HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar importtsv 
    '-Dimporttsv.separator=|' -Dimporttsv.columns=Hbase_ROW_KEY,info:val 
    -Dimporttsv.bulk.output=${output_dir} $incremtn 
    $source_dir >> $log_file 2>&1
  fi
fi
if [[ $? -eq 0 ]];then
  echo -e "`date` $source_dir hfile文件生成完毕。 n开始导入到hbase $tn 中。。" |tee -a $log_file |cat
else
  echo -e "`date` $source_dir hfile文件生成失败。。" |tee -a $log_file |cat
  exit 1
fi
#completebulkload
if [[ "$incremental" = "q" ]]
then
  HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar 
  ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar 
  completebulkload 
  ${output_dir} $tn >> $log_file 2>&1
else
  HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase classpath`:${Hbase_HOME}/lib/hbase/lib/*.jar 
  ${HADOOP_HOME}/bin/hadoop jar ${hbase_server_dir}/hbase-server.jar 
  completebulkload 
  ${output_dir} $incremtn >> $log_file 2>&1
fi
if [[ $? -eq 0 ]];then
  echo -e "success.. `date` hdfs 文件 $source_dir 已成功导入到hbase $tn 中 " |tee -a $log_file |cat
else
  echo -e "error!! `date`  hdfs 文件 $source_dir 导入到hbase $tn 中失败 " |tee -a $log_file |cat
fi


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/283119.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号