1、让键盘代替鼠标工作
alt+f4 关闭当前窗口 菜单键 / shift+f10 右键 enter 左键 win+d 桌面 shift+alt+numlock 开启鼠标键 右边的数字按键代替鼠标移动,ctrl 加速,alt 减速。 5是左键。 上下键在ctrl+tab选中工作页面后是滚动作用。
2、Airflow任务及时性校验优化
开发过程:
1、表 dag 存有每个 dag的调度时间 schedule interval ,值为 Crontab schedule 格式。
2、mysql居然连字符串分割函数split 都没有,本来还想 将crontab schedule 格式的字段分割出 小时 和 分钟 呢。只能取出来,用python分割了。
知识点:
点1. 知道 Airflow 哪张配置表存放DAG调度时间信息 ,是表 dag,字段 schedule_interval ,格式是 crontab schedule (* * * * *) .
点2. mysql 没有split 函数,只能直接取出 schedule_interval 值,在python文件里做分割,获取调度频率的 小时、分钟。
点3. mysql 读取两个字段,一个是 dag_id,一个是 schedule_interval ,fetchall方法 是元组 ((元组),(元组),…) 类型出来的,内部元组各有两个元素,为方便后面使用转换为字典类型。列表转换为 字典,用 dict()方法
点4. pymysql.connect(连接信息) 非3306端口的话,要指定端口号
# 打开数据库连接
db = pymysql.connect(host=airflow_host_str, user=airflow_user_str, password=airflow_password_str, port=airflow_port_int, database=airflow_database_str)
# 使用cursor()方法获取操作游标
cursor = db.cursor()
sql="select dag_id,schedule_interval from dag where dag_id"
print(sql)
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
print(results)
dag_schedule_time=dict(results)
print("dag_schedule_time={}".format(dag_schedule_time))
#使用
schedule_time = dag_schedule_time[dag_id].replace('"','') #去掉双引号
schedule_hour_minute = schedule_time.split(' ')
print("schedule_hour_minute[0]={}, type={}".format(schedule_hour_minute[0],type(schedule_hour_minute[0])))
schedule_hour = int(schedule_hour_minute[1],10)
schedule_minute = int(schedule_hour_minute[0], 10)
# 测试用:vim test_0323.py
crontab_str="30 1 * * *"
schedule_seq=crontab_str.split(" ")
print(schedule_seq)
second=schedule_seq[0]
hour=schedule_seq[1]
print(hour)
print(second)
3、开发设计过程
调度频率:每天
数据量:一天一条数据
需刷历史数据
方案:
数据存储形式 :dt 表
保存策略:7天分区
开发逻辑:shell脚本传参 是否初始化变量 is_initial ,控制初始化的数据(首分区)不用 Union all 前一天的分区数据,非首分区的需要Union all 前一天的分区数据。根据是否初始化,配置时间范围,传参,可以将初始化逻辑和每日调度逻辑整合到一起,方便维护,注意边缘日期数据要测试好,第一天,昨天,前天。要有数据统计日期字段,两种格式(yyyy-MM-dd)和(yyyyMMdd)。
#参数说明
#statdate: 分区日期
#statdate_history_start:重刷历史数据的起始日期,即数据开始日期
#statdate_xdays_ago:用于表分区范围的开始索引,case1--初始化时是 statdate_history_start ,case2--每日调度是 statdate
#statdate_xdays_1day_ago:用于 statdate=t+1的fieldx和statdate=t的fieldx对比,取不在t天的fieldx
#is_initial:标记,用于脚本逻辑判断,初始化时为1,每日调度为0
#statdate_1day_ago:用于每日调度的合并,is_initial=0 时起作用!
if [[ is_initial -eq 1 ]]; then
${beeline_cmd}
--hivevar statdate=${statdate}
--hivevar statdate_xdays_ago=${statdate_history_start}
--hivevar statdate_xdays_1day_ago=${statdate_history_start}
--hivevar is_initial=${is_initial}
--hivevar statdate_1day_ago=${statdate_1day_ago}
-f xxx_dt.sql
else
${beeline_cmd}
--hivevar statdate=${statdate}
--hivevar statdate_xdays_ago=${statdate}
--hivevar statdate_xdays_1day_ago=${statdate_1day_ago}
--hivevar is_initial=${is_initial}
--hivevar statdate_1day_ago=${statdate_1day_ago}
-f xxx_dt.sql
fi
statdate_history_start=20211222
是否初始化 is_initial 的逻辑:
--每日更新的
select
from_unixtime(unix_timestamp(tt1.statdate,'yyyyMMdd'),'yyyy-MM-dd') as balance_day
,tt1.statdate
from ...
union all
--合并前一天的
select
balance_day
,etl_updatetime
,...
from table
where statdate = '${statdate_1day_ago}'
and '0' = '${is_initial}'
将初始化逻辑和每日调度逻辑整合
--condition1. 简单日期聚合统计
select t1.statdate
,sum(xxx) as xxx_num
,sum(nvl(...,0)) as ..._amount
from (
select fieldx,statdate from t_tmp
) t1
left join (
select fieldx,xxx,...,statdate
from xxx_dt
where statdate between '${statdate_xdays_ago}' and '${statdate}'
) t2
on t1.fieldx = t2.fieldx
and t1.statdate = t2.statdate
group by t1.statdate
--condition2. 用于 statdate=t+1的fieldx和statdate=t的fieldx对比,取不在t天的fieldx
select t1.statdate
,t1.fieldx
from (
select fieldx,statdate
from xxx_dt
where statdate between '${statdate_xdays_ago}' and '${statdate}'
and ...
) t1
left join (
select fieldx,statdate
from xxx_dt
where statdate between '${statdate_xdays_1day_ago}' and '${statdate}'
and ...
) t2
on t1.fieldx = t2.fieldx
and t1.statdate = date_format(to_date(date_add( from_unixtime(unix_timestamp(t2.statdate,'yyyyMMdd'),'yyyy-MM-dd'),1)),'yyyyMMdd') --hive
--and t1.statdate = from_unixtime(unix_timestamp(to_date(date_add( from_unixtime(unix_timestamp(t2.statdate,'yyyyMMdd'),'yyyy-MM-dd'),1))),'yyyyMMdd') --impala
where t2.fieldx is null
4、日期运算后的格式转换
hive:
select date_format(to_date(date_add( from_unixtime(unix_timestamp('20220323','yyyyMMdd'),'yyyy-MM-dd'),1)),'yyyyMMdd'); --20220324
impala:
select from_unixtime( unix_timestamp( to_date(date_add( --return timestamp type from_unixtime(unix_timestamp('20220320','yyyyMMdd'),'yyyy-MM-dd') --retutn format string ,1)) --,'yyyy-MM-dd' ) ,'yyyyMMdd'); --20220321



