这个实践接着前面的总结,结合可用的时间,进行一定的升级。
内容 1 背景在跑批时,由于目标库的空间不足,导致大约40%的结果没有存入。并且我发现,由于没有做交付的确认(向目标库查询),程序默认的把未交付的过程数据都删掉了,所以要找出并重跑这部分。
在前面的设计中,以已经总结了如何通过设计来避免类似的问题,以下则是从完成这个任务的角度去看,如何进行改造
2 过程验证项目元数据方法的灵活和复用
我们现在设定:流程文件的编号仅仅代表了创建时的顺序,而不代表执行的先后顺序。也就是说,流程是可以并行启动的。
-
1 先将原来的一个执行文件夹拷贝,重新命名为ner_processing_local_v2
-
2 将另一个文件夹下的任务元数据拷贝(cp -r)到ner_processing_local_v2的任务元数据下面
find . -name '*.pkl'| xargs -I '{}' cp {} /home/ztm2/ner_processing_local_v2/schema_001/tasks -
3 手工修改,给任务元数据增加记录的最小和最大的id。
# 手工修改
for tem_pkl_file in pkl_file_list1:
min_rec_id = int(tem_pkl_file.split('_')[-1])
max_rec_id = min_rec_id + 10000
tem_task_meta = fs.from_pickle(tem_pkl_file, schema_dict['tasks_path'])
tem_task_meta['min_rec_id'] = min_rec_id
tem_task_meta['max_rec_id'] = max_rec_id
fs.to_pickle(tem_task_meta, schema_dict['tasks_path'])
当前的流程(存放在process_pys下面):
| 序号 | 名称 | 流程文件名 | 解释 |
|---|---|---|---|
| 1 | 启动流程 | proc_001.py | 手动或自动的生成若干按id范围编号的子任务 |
| 2 | 数据预处理(公司) | proc_002.py | 按公司的模式对数据进行预处理 |
| 3 | 数据预处理(人名) | proc_003.py | 按人名的模式对数据进行预处理 |
| 4 | 预测1-公司 | proc_004.py | 识别预处理后的数据中的公司实体,去重后使用英文逗号分割 |
| 5 | 预测2-人名 | proc_005.py | 识别预处理后的数据中的人名实体,去重后使用英文逗号分割 |
| 6 | 后处理 | proc_006.py | 按对应的任务需要,将结果处理为可输出的模式 |
| 7 | 回写 | proc_007.py | 根据任务对应的库,将结果回写到生产库 |
| 8 | 删除 | proc_008.py | 扫描input和各process文件夹下的任务,并到任务元数据下进行查询,如果状态为交付,则删除文件;可以连带删除input文件或只删除流程文件 |
本次需要修改的主要是:
- 1 【预测1-公司】将处理的任务参数化,按任务头获取任务,避免开两个容器
- 2 【预测2-人名】将处理的任务参数化,按任务头获取任务,避免开两个容器
- 3 【回写】如果插入命令失败,不更新is_fetched状态
为了修补这次任务,需要增加的流程为
| 序号 | 名称 | 流程文件名 | 解释 |
|---|---|---|---|
| 1 | 交付查询扫描 | proc_009.py | 按照任务的起始id和终止id,向目标库查询记录数量(rec_num),更新到任务元数据中。如果rec_num=0,将is_fetched置为0 |
| 序号 | 名字 | 文件名 | 作用 |
|---|---|---|---|
| 1 | 检查并修复任务 | processes_003.conf | 按照任务的起始id和终止id到目标库中搜索,对未交付的任务重新生成数据(从本地库中提取) |
流程配置的内容
# 扫描结果表 [proc_009] # 憾地结果 target_table1 =TABLE A # 信数结果 target_table2 = TABLE B # 根据fetch字段结果,重新向本地库拉取数据 [proc_010] mode=manual # 憾地表 source_table1 = TABLE A cols1 = COLS A db1 = SOME DB # 信数表 source_table2 = TABLE B cols2 = COLS B db2 = SOME DB4 总结
这版根据之前的总结做了一些改进,总体还是可以的。不过这个临时的工作项目天然存在一个很大的缺陷:一个流程里包了2个任务分支,A和B,原则上,一个流程在业务上是单一的,业务上的差别应该体现在流程配置文件(processes_xxx.conf)中。
Pros:
- 1 这次实验明确了流程配置文件的作用(业务适配)
- 2 丰富了流程文件的一些规范
- 1 运行模式: Auto/Force
- 2 流动模式: 根据Last和Current的差集驱动运行
- 3 业务专一:每个流程只负责一个业务,功能的多样性可以通过参数控制。换句话说,流程是跟着功能走。流程配置文件才跟着业务走。
- 4 shard参数:每个流程必须要带shard参数(以及max_shards)。这个只是对任务进行分片(无论是文件形式还是数据库形式),可以叠加在其他筛选规则上。
- 5 筛选规则:每个流程,【需要有】按照任务名进行筛选的预置规则。这样一方面允许了流程对业务是【不敏感】的,流程只关心实现某个功能,只不过你可以要求某个流程(以容器方式)只处理某一种。这是比shard更高一层的并行
- 6 总数校验:每个流程都需要将自己的任务总数汇报,以确保调度者可以大致的了解到流程的工作状态。(是否正常运行)
- 3 明确了后续的复用方式
- 1 建立某个新的文件夹
- 2 使用容器映射的方式共享py文件和配置文件
- 3 使用jinja的方式生成这个docker命令
- 4 整合到agent上,以后都问agent
Cons:
还是花了一些时间成本的
- 1 本次实验的代码要花一些时间整合
- 2 本次实验的代码只有一个固定应用场景
- 3 连通性测试还是要前置,架不住各种miss
| 序号 | 名字 | 修改 |
|---|---|---|
| 1 | 项目元数据 | 给每个项目增加最小min_rec_id和最大max_rec_id记录ID |
| 2 | 关键字参数名称修改 | 将关键字参数改的更短。schema_name --> sname, processes_name --> pfile |
| 3 | 增加记录映射元数据 | 项目文件夹将固定一个集合映射,存放某个任务对应的记录id集合 |
高可靠要求:要么不运行,运行必然是对的
增加的规则
| 序号 | 规则 | 解释 |
|---|---|---|
| 1 | 流程步不应该存在任务分支 | 如果A、B是不同的任务,就不要放在一个流程步里,哪怕很相似。流程步允许参数调整功能,而不是做分支。(proc_009中的tem参数本来不应该存在) |
| 2 | 流动规则 | 流程步中固定run_mode参数,运行模式分为 auto/force, auto会尽量避免重跑,force会强制重跑。 |
| 3 | 总数校验规则 | 对于每个步骤的总量进行校验 |
| 4 | shard规则 | 每个流程步都要有shard参数,用于确认分片。可以和筛选规则联合使用。 |
每个流程步都需要做的就是校验差集(决定了流动)
连通性测试还是要前置,架不住各种miss
5 Next一个大的改进是通过docker的方式映射新的工作文件夹,共享py文件和配置。



