栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

建模杂谈系列81 项目元数据实践3

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

建模杂谈系列81 项目元数据实践3

说明

这个实践接着前面的总结,结合可用的时间,进行一定的升级。

内容 1 背景

在跑批时,由于目标库的空间不足,导致大约40%的结果没有存入。并且我发现,由于没有做交付的确认(向目标库查询),程序默认的把未交付的过程数据都删掉了,所以要找出并重跑这部分。

在前面的设计中,以已经总结了如何通过设计来避免类似的问题,以下则是从完成这个任务的角度去看,如何进行改造

2 过程

验证项目元数据方法的灵活和复用

我们现在设定:流程文件的编号仅仅代表了创建时的顺序,而不代表执行的先后顺序。也就是说,流程是可以并行启动的。

  • 1 先将原来的一个执行文件夹拷贝,重新命名为ner_processing_local_v2

  • 2 将另一个文件夹下的任务元数据拷贝(cp -r)到ner_processing_local_v2的任务元数据下面
    find . -name '*.pkl'| xargs -I '{}' cp {} /home/ztm2/ner_processing_local_v2/schema_001/tasks

  • 3 手工修改,给任务元数据增加记录的最小和最大的id。

# 手工修改
for tem_pkl_file in pkl_file_list1:
    min_rec_id = int(tem_pkl_file.split('_')[-1])
    max_rec_id = min_rec_id + 10000
    tem_task_meta = fs.from_pickle(tem_pkl_file, schema_dict['tasks_path'])
    tem_task_meta['min_rec_id'] = min_rec_id
    tem_task_meta['max_rec_id'] = max_rec_id
    fs.to_pickle(tem_task_meta, schema_dict['tasks_path'])

当前的流程(存放在process_pys下面):

序号名称流程文件名解释
1启动流程proc_001.py手动或自动的生成若干按id范围编号的子任务
2数据预处理(公司)proc_002.py按公司的模式对数据进行预处理
3数据预处理(人名)proc_003.py按人名的模式对数据进行预处理
4预测1-公司proc_004.py识别预处理后的数据中的公司实体,去重后使用英文逗号分割
5预测2-人名proc_005.py识别预处理后的数据中的人名实体,去重后使用英文逗号分割
6后处理proc_006.py按对应的任务需要,将结果处理为可输出的模式
7回写proc_007.py根据任务对应的库,将结果回写到生产库
8删除proc_008.py扫描input和各process文件夹下的任务,并到任务元数据下进行查询,如果状态为交付,则删除文件;可以连带删除input文件或只删除流程文件

本次需要修改的主要是:

  • 1 【预测1-公司】将处理的任务参数化,按任务头获取任务,避免开两个容器
  • 2 【预测2-人名】将处理的任务参数化,按任务头获取任务,避免开两个容器
  • 3 【回写】如果插入命令失败,不更新is_fetched状态

为了修补这次任务,需要增加的流程为

序号名称流程文件名解释
1交付查询扫描proc_009.py按照任务的起始id和终止id,向目标库查询记录数量(rec_num),更新到任务元数据中。如果rec_num=0,将is_fetched置为0
3 流程配置
序号名字文件名作用
1检查并修复任务processes_003.conf按照任务的起始id和终止id到目标库中搜索,对未交付的任务重新生成数据(从本地库中提取)

流程配置的内容

# 扫描结果表
[proc_009]
# 憾地结果
target_table1 =TABLE A
# 信数结果
target_table2 = TABLE B

# 根据fetch字段结果,重新向本地库拉取数据
[proc_010]
mode=manual
# 憾地表
source_table1 = TABLE A
cols1 = COLS A
db1 = SOME DB
# 信数表
source_table2 = TABLE B
cols2 = COLS B
db2 = SOME DB

4 总结

这版根据之前的总结做了一些改进,总体还是可以的。不过这个临时的工作项目天然存在一个很大的缺陷:一个流程里包了2个任务分支,A和B,原则上,一个流程在业务上是单一的,业务上的差别应该体现在流程配置文件(processes_xxx.conf)中。

Pros:

  • 1 这次实验明确了流程配置文件的作用(业务适配)
  • 2 丰富了流程文件的一些规范
    • 1 运行模式: Auto/Force
    • 2 流动模式: 根据Last和Current的差集驱动运行
    • 3 业务专一:每个流程只负责一个业务,功能的多样性可以通过参数控制。换句话说,流程是跟着功能走。流程配置文件才跟着业务走。
    • 4 shard参数:每个流程必须要带shard参数(以及max_shards)。这个只是对任务进行分片(无论是文件形式还是数据库形式),可以叠加在其他筛选规则上。
    • 5 筛选规则:每个流程,【需要有】按照任务名进行筛选的预置规则。这样一方面允许了流程对业务是【不敏感】的,流程只关心实现某个功能,只不过你可以要求某个流程(以容器方式)只处理某一种。这是比shard更高一层的并行
    • 6 总数校验:每个流程都需要将自己的任务总数汇报,以确保调度者可以大致的了解到流程的工作状态。(是否正常运行)
  • 3 明确了后续的复用方式
    • 1 建立某个新的文件夹
    • 2 使用容器映射的方式共享py文件和配置文件
    • 3 使用jinja的方式生成这个docker命令
    • 4 整合到agent上,以后都问agent

Cons:

还是花了一些时间成本的

  • 1 本次实验的代码要花一些时间整合
  • 2 本次实验的代码只有一个固定应用场景
  • 3 连通性测试还是要前置,架不住各种miss
结构
序号名字修改
1项目元数据给每个项目增加最小min_rec_id和最大max_rec_id记录ID
2关键字参数名称修改将关键字参数改的更短。schema_name --> sname, processes_name --> pfile
3增加记录映射元数据项目文件夹将固定一个集合映射,存放某个任务对应的记录id集合

高可靠要求:要么不运行,运行必然是对的

增加的规则

序号规则解释
1流程步不应该存在任务分支如果A、B是不同的任务,就不要放在一个流程步里,哪怕很相似。流程步允许参数调整功能,而不是做分支。(proc_009中的tem参数本来不应该存在)
2流动规则流程步中固定run_mode参数,运行模式分为 auto/force, auto会尽量避免重跑,force会强制重跑。
3总数校验规则对于每个步骤的总量进行校验
4shard规则每个流程步都要有shard参数,用于确认分片。可以和筛选规则联合使用。

每个流程步都需要做的就是校验差集(决定了流动)

连通性测试还是要前置,架不住各种miss

5 Next

一个大的改进是通过docker的方式映射新的工作文件夹,共享py文件和配置。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/461838.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号