栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

杂记

杂记

1、让键盘代替鼠标工作

alt+f4 关闭当前窗口
菜单键 / shift+f10 右键
enter 左键
win+d 桌面
shift+alt+numlock 开启鼠标键
右边的数字按键代替鼠标移动,ctrl 加速,alt 减速。
5是左键。
上下键在ctrl+tab选中工作页面后是滚动作用。

2、Airflow任务及时性校验优化
开发过程:
1、表 dag 存有每个 dag的调度时间 schedule interval ,值为 Crontab schedule 格式。
2、mysql居然连字符串分割函数split 都没有,本来还想 将crontab schedule 格式的字段分割出 小时 和 分钟 呢。只能取出来,用python分割了。
知识点:
点1. 知道 Airflow 哪张配置表存放DAG调度时间信息 ,是表 dag,字段 schedule_interval ,格式是 crontab schedule (* * * * *) .
点2. mysql 没有split 函数,只能直接取出 schedule_interval 值,在python文件里做分割,获取调度频率的 小时、分钟。
点3. mysql 读取两个字段,一个是 dag_id,一个是 schedule_interval ,fetchall方法 是元组 ((元组),(元组),…) 类型出来的,内部元组各有两个元素,为方便后面使用转换为字典类型。列表转换为 字典,用 dict()方法
点4. pymysql.connect(连接信息) 非3306端口的话,要指定端口号

    # 打开数据库连接
    db = pymysql.connect(host=airflow_host_str, user=airflow_user_str, password=airflow_password_str, port=airflow_port_int, database=airflow_database_str)
    # 使用cursor()方法获取操作游标
    cursor = db.cursor()

    sql="select dag_id,schedule_interval from dag where dag_id"
    print(sql)
    # 执行SQL语句
    cursor.execute(sql)
    # 获取所有记录列表
    results = cursor.fetchall()
    print(results)
    dag_schedule_time=dict(results)
    print("dag_schedule_time={}".format(dag_schedule_time))

	#使用
    schedule_time = dag_schedule_time[dag_id].replace('"','') #去掉双引号
    schedule_hour_minute = schedule_time.split(' ')
    print("schedule_hour_minute[0]={}, type={}".format(schedule_hour_minute[0],type(schedule_hour_minute[0])))
    schedule_hour = int(schedule_hour_minute[1],10)
    schedule_minute = int(schedule_hour_minute[0], 10)
# 测试用:vim test_0323.py
crontab_str="30 1 * * *"
schedule_seq=crontab_str.split(" ")
print(schedule_seq)
second=schedule_seq[0]
hour=schedule_seq[1]
print(hour)
print(second)

3、开发设计过程
调度频率:每天
数据量:一天一条数据
需刷历史数据
方案:
数据存储形式 :dt 表
保存策略:7天分区
开发逻辑:shell脚本传参 是否初始化变量 is_initial ,控制初始化的数据(首分区)不用 Union all 前一天的分区数据,非首分区的需要Union all 前一天的分区数据。根据是否初始化,配置时间范围,传参,可以将初始化逻辑和每日调度逻辑整合到一起,方便维护,注意边缘日期数据要测试好,第一天,昨天,前天。要有数据统计日期字段,两种格式(yyyy-MM-dd)和(yyyyMMdd)。

#参数说明
#statdate: 分区日期
#statdate_history_start:重刷历史数据的起始日期,即数据开始日期
#statdate_xdays_ago:用于表分区范围的开始索引,case1--初始化时是 statdate_history_start ,case2--每日调度是 statdate
#statdate_xdays_1day_ago:用于 statdate=t+1的fieldx和statdate=t的fieldx对比,取不在t天的fieldx
#is_initial:标记,用于脚本逻辑判断,初始化时为1,每日调度为0
#statdate_1day_ago:用于每日调度的合并,is_initial=0 时起作用!

	if [[ is_initial -eq 1 ]]; then
		${beeline_cmd} 
				--hivevar statdate=${statdate} 
				--hivevar statdate_xdays_ago=${statdate_history_start} 
				--hivevar statdate_xdays_1day_ago=${statdate_history_start} 
				--hivevar is_initial=${is_initial} 
				--hivevar statdate_1day_ago=${statdate_1day_ago} 
				-f xxx_dt.sql
	else 
		${beeline_cmd} 
				--hivevar statdate=${statdate} 
				--hivevar statdate_xdays_ago=${statdate} 
				--hivevar statdate_xdays_1day_ago=${statdate_1day_ago} 
				--hivevar is_initial=${is_initial} 
				--hivevar statdate_1day_ago=${statdate_1day_ago} 
				-f xxx_dt.sql
	fi
	
statdate_history_start=20211222

是否初始化 is_initial 的逻辑:

--每日更新的
select 
	from_unixtime(unix_timestamp(tt1.statdate,'yyyyMMdd'),'yyyy-MM-dd') as balance_day
	,tt1.statdate
from ...
union all 
--合并前一天的
select 
	balance_day
	,etl_updatetime
	,...
from table
where statdate = '${statdate_1day_ago}'
and '0' = '${is_initial}'

将初始化逻辑和每日调度逻辑整合

--condition1. 简单日期聚合统计
	select t1.statdate 
		,sum(xxx) as xxx_num
		,sum(nvl(...,0)) as ..._amount
	from (
		select fieldx,statdate from t_tmp
	) t1 
	left join (
		select fieldx,xxx,...,statdate
		from xxx_dt
		where statdate between '${statdate_xdays_ago}' and '${statdate}'
	) t2
	on t1.fieldx = t2.fieldx
	and t1.statdate = t2.statdate 
	group by t1.statdate
	
--condition2. 用于 statdate=t+1的fieldx和statdate=t的fieldx对比,取不在t天的fieldx
	select t1.statdate
		,t1.fieldx
	from (
		select fieldx,statdate
		from xxx_dt
		where statdate between '${statdate_xdays_ago}' and '${statdate}'
		and ...
	) t1 
	left join (
		select fieldx,statdate
		from xxx_dt
		where statdate between '${statdate_xdays_1day_ago}' and '${statdate}'
		and ...
	) t2 
	on t1.fieldx = t2.fieldx
	and t1.statdate = date_format(to_date(date_add( from_unixtime(unix_timestamp(t2.statdate,'yyyyMMdd'),'yyyy-MM-dd'),1)),'yyyyMMdd') --hive
	--and t1.statdate = from_unixtime(unix_timestamp(to_date(date_add( from_unixtime(unix_timestamp(t2.statdate,'yyyyMMdd'),'yyyy-MM-dd'),1))),'yyyyMMdd') --impala
	where t2.fieldx is null 

4、日期运算后的格式转换
hive:
select date_format(to_date(date_add( from_unixtime(unix_timestamp('20220323','yyyyMMdd'),'yyyy-MM-dd'),1)),'yyyyMMdd'); --20220324
impala:
select from_unixtime( unix_timestamp( to_date(date_add( --return timestamp type from_unixtime(unix_timestamp('20220320','yyyyMMdd'),'yyyy-MM-dd') --retutn format string ,1)) --,'yyyy-MM-dd' ) ,'yyyyMMdd'); --20220321

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774473.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号