- 前言
- 一、示例
- 二、启动流程过程
- 三、写入数据库
前言
本文主要分析一下flowable在启动流程过程中,涉及到到数据库表。
一、示例使用流程定义的key来启动流程
@Test
public void startProcessDefinition() {
Map variables = new HashMap();
variables.put("employee", "张三");
variables.put("nrOfHolidays", 3);
variables.put("description", "有事请假");
ProcessInstance processInstance =
runtimeService.startProcessInstanceByKey("holidayRequest", variables);
System.out.println("processInstance: " + JsonUtils.toJson(processInstance));
}
二、启动流程过程
1、直接从 runtimeService.startProcessInstanceByKey 开始
@Override
public ProcessInstance startProcessInstanceByKey(String processDefinitionKey, Map variables) {
return commandExecutor.execute(new StartProcessInstanceCmd(processDefinitionKey, null, null, variables));
}
这里创建了 StartProcessInstanceCmd ,前面的文章分析过了CommandExecutor 的执行流程,会依次执行拦截器链,最终回调 Command 的 execute() 方法,下面就直接进入 StartProcessInstanceCmd 的 execute() 方法
2、execute()
StartProcessInstanceCmd.java
@Override
public ProcessInstance execute(CommandContext commandContext) {
//流程引擎配置
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
processInstanceHelper = processEngineConfiguration.getProcessInstanceHelper();
//获取流程定义,从 ACT_RE_PROCDEF 中获取
ProcessDefinition processDefinition = getProcessDefinition(processEngineConfiguration, commandContext);
ProcessInstance processInstance = null;
if (hasFormData()) {
//是否有表单数据
processInstance = handleProcessInstanceWithForm(commandContext, processDefinition, processEngineConfiguration);
} else {
//启动流程实例
processInstance = startProcessInstance(processDefinition);
}
return processInstance;
}
3、startProcessInstance()
protected ProcessInstance startProcessInstance(ProcessDefinition processDefinition) {
return processInstanceHelper.createProcessInstance(processDefinition, businessKey, businessStatus, processInstanceName,
overrideDefinitionTenantId, predefinedProcessInstanceId, variables, transientVariables,
callbackId, callbackType, referenceId, referenceType, stageInstanceId, true);
}
4、createProcessInstance()
public ProcessInstance createProcessInstance(ProcessDefinition processDefinition, String businessKey, String businessStatus, String processInstanceName,
String overrideDefinitionTenantId, String predefinedProcessInstanceId, Map variables, Map transientVariables,
String callbackId, String callbackType, String referenceId, String referenceType, String stageInstanceId, boolean startProcessInstance) {
CommandContext commandContext = Context.getCommandContext();
if (Flowable5Util.isFlowable5ProcessDefinition(processDefinition, commandContext)) {
Flowable5CompatibilityHandler compatibilityHandler = Flowable5Util.getFlowable5CompatibilityHandler();
return compatibilityHandler.startProcessInstance(processDefinition.getKey(), processDefinition.getId(),
variables, transientVariables, businessKey, processDefinition.getTenantId(), processInstanceName);
}
// Do not start process a process instance if the process definition is suspended
//流程定义是否处于挂起状态
if (ProcessDefinitionUtil.isProcessDefinitionSuspended(processDefinition.getId())) {
throw new FlowableException("Cannot start process instance. Process definition " + processDefinition.getName() + " (id = " + processDefinition.getId() + ") is suspended");
}
// Get model from cache
//解析 bpmn 文件,获取 Process
Process process = ProcessDefinitionUtil.getProcess(processDefinition.getId());
if (process == null) {
throw new FlowableException("Cannot start process instance. Process model " + processDefinition.getName() + " (id = " + processDefinition.getId() + ") could not be found");
}
FlowElement initialFlowElement = process.getInitialFlowElement();
if (initialFlowElement == null) {
throw new FlowableException("No start element found for process definition " + processDefinition.getId());
}
//创建、启动 流程实例
return createAndStartProcessInstanceWithInitialFlowElement(processDefinition, businessKey, businessStatus, processInstanceName,
overrideDefinitionTenantId,
predefinedProcessInstanceId, initialFlowElement, process, variables, transientVariables,
callbackId, callbackType, referenceId, referenceType, stageInstanceId, startProcessInstance);
}
5、ProcessDefinitionUtil.getProcess()
解析流程部署文件信息,封装到 Process 里面
public static Process getProcess(String processDefinitionId) {
if (Context.getCommandContext() == null) {
throw new FlowableException("Cannot get process model: no current command context is active");
} else if (CommandContextUtil.getProcessEngineConfiguration() == null) {
return Flowable5Util.getFlowable5CompatibilityHandler().getProcessDefinitionProcessObject(processDefinitionId);
} else {
DeploymentManager deploymentManager = CommandContextUtil.getProcessEngineConfiguration().getDeploymentManager();
// This will check the cache in the findDeployedProcessDefinitionById and resolveProcessDefinition method
//解析流程定义
ProcessDefinition processDefinitionEntity = deploymentManager.findDeployedProcessDefinitionById(processDefinitionId);
//上一步解析过了,这里获取 Process
return deploymentManager.resolveProcessDefinition(processDefinitionEntity).getProcess();
}
}
6、findDeployedProcessDefinitionById()
public ProcessDefinition findDeployedProcessDefinitionById(String processDefinitionId) {
if (processDefinitionId == null) {
throw new FlowableIllegalArgumentException("Invalid process definition id : null");
}
// first try the cache
ProcessDefinitionCacheEntry cacheEntry = processDefinitionCache.get(processDefinitionId);
ProcessDefinition processDefinition = cacheEntry != null ? cacheEntry.getProcessDefinition() : null;
if (processDefinition == null) {
processDefinition = processDefinitionEntityManager.findById(processDefinitionId);
if (processDefinition == null) {
throw new FlowableObjectNotFoundException("no deployed process definition found with id '" + processDefinitionId + "'", ProcessDefinition.class);
}
//解析流程定义
processDefinition = resolveProcessDefinition(processDefinition).getProcessDefinition();
}
return processDefinition;
}
7、resolveProcessDefinition()
public ProcessDefinitionCacheEntry resolveProcessDefinition(ProcessDefinition processDefinition) {
String processDefinitionId = processDefinition.getId();
String deploymentId = processDefinition.getDeploymentId();
ProcessDefinitionCacheEntry cachedProcessDefinition = processDefinitionCache.get(processDefinitionId);
if (cachedProcessDefinition == null) {
if (Flowable5Util.isFlowable5ProcessDefinition(processDefinition, processEngineConfiguration)) {
return Flowable5Util.getFlowable5CompatibilityHandler().resolveProcessDefinition(processDefinition);
}
DeploymentEntity deployment = deploymentEntityManager.findById(deploymentId);
deployment.setNew(false);
//解析流程部署信息
deploy(deployment, null);
cachedProcessDefinition = processDefinitionCache.get(processDefinitionId);
if (cachedProcessDefinition == null) {
throw new FlowableException("deployment '" + deploymentId + "' didn't put process definition '" + processDefinitionId + "' in the cache");
}
}
return cachedProcessDefinition;
}
8、deploy()
deployers 包含 BpmnDeployer 和 AppDeployer
public void deploy(DeploymentEntity deployment, MapdeploymentSettings) { for (EngineDeployer deployer : deployers) { deployer.deploy(deployment, deploymentSettings); } }
下面看一下 BpmnDeployer 的 deploy()
@Override
public void deploy(EngineDeployment deployment, Map deploymentSettings) {
LOGGER.debug("Processing deployment {}", deployment.getName());
// The ParsedDeployment represents the deployment, the process definitions, and the BPMN
// resource, parse, and model associated with each process definition.
//解析流程部署的文件,涉及到 BpmnModel、BpmnParse、Process、ParsedDeployment 等
ParsedDeployment parsedDeployment = parsedDeploymentBuilderFactory
.getBuilderForDeploymentAndSettings(deployment, deploymentSettings)
.build();
...
}
这里将流程部署文件的组件都解析出来,然后回到第4步,创建、启动 流程实例
9、createAndStartProcessInstanceWithInitialFlowElement()
public ProcessInstance createAndStartProcessInstanceWithInitialFlowElement(ProcessDefinition processDefinition,
String businessKey, String businessStatus, String processInstanceName,
String overrideDefinitionTenantId, String predefinedProcessInstanceId,
FlowElement initialFlowElement, Process process,
Map variables, Map transientVariables,
String callbackId, String callbackType, String referenceId, String referenceType,
String stageInstanceId, boolean startProcessInstance) {
...
//保存 执行实例
ExecutionEntity processInstance = processEngineConfiguration.getExecutionEntityManager()
.createProcessInstanceExecution(startInstanceBeforeContext.getProcessDefinition(), startInstanceBeforeContext.getPredefinedProcessInstanceId(),
startInstanceBeforeContext.getBusinessKey(), startInstanceBeforeContext.getBusinessStatus(),
startInstanceBeforeContext.getProcessInstanceName(), startInstanceBeforeContext.getCallbackId(),
startInstanceBeforeContext.getCallbackType(), startInstanceBeforeContext.getReferenceId(),
startInstanceBeforeContext.getReferenceType(), stageInstanceId, startInstanceBeforeContext.getTenantId(),
startInstanceBeforeContext.getInitiatorVariableName(), startInstanceBeforeContext.getInitialActivityId());
//保存 历史流程实例
processEngineConfiguration.getHistoryManager().recordProcessInstanceStart(processInstance);
...
//保存 流程变量
processInstance.setVariables(processDataObjects(process.getDataObjects()));
// Set the variables passed into the start command
//保存 流程变量
if (startInstanceBeforeContext.getVariables() != null) {
for (String varName : startInstanceBeforeContext.getVariables().keySet()) {
processInstance.setVariable(varName, startInstanceBeforeContext.getVariables().get(varName));
}
}
// Create the first execution that will visit all the process definition elements
//保存 执行实例 ChildExecution
ExecutionEntity execution = processEngineConfiguration.getExecutionEntityManager().createChildExecution(processInstance);
execution.setCurrentFlowElement(startInstanceBeforeContext.getInitialFlowElement());
//保存 流程开始
processEngineConfiguration.getActivityInstanceEntityManager().recordActivityStart(execution);
if (startProcessInstance) {
// 往 DefaultFlowableEngineAgenda 放入了新的 ExecutionEntity
startProcessInstance(processInstance, commandContext, startInstanceBeforeContext.getVariables());
}
if (callbackId != null) {
callCaseInstanceStateChangeCallbacks(commandContext, processInstance, null, ProcessInstanceState.RUNNING);
}
if (processEngineConfiguration.getStartProcessInstanceInterceptor() != null) {
StartProcessInstanceAfterContext startInstanceAfterContext = new StartProcessInstanceAfterContext(processInstance, execution,
startInstanceBeforeContext.getVariables(), startInstanceBeforeContext.getTransientVariables(),
startInstanceBeforeContext.getInitialFlowElement(), startInstanceBeforeContext.getProcess(),
startInstanceBeforeContext.getProcessDefinition());
processEngineConfiguration.getStartProcessInstanceInterceptor().afterStartProcessInstance(startInstanceAfterContext);
}
return processInstance;
}
10、createProcessInstanceExecution()
创建执行实例
@Override
public ExecutionEntity createProcessInstanceExecution(ProcessDefinition processDefinition, String predefinedProcessInstanceId, String businessKey,
String businessStatus, String processInstanceName, String callbackId, String callbackType, String referenceId, String referenceType,
String propagatedStageInstanceId, String tenantId, String initiatorVariableName, String startActivityId) {
//创建执行实例实体
ExecutionEntity processInstanceExecution = dataManager.create();
//设置值省略
...
// Store in database
//将流程实例放入缓存
insert(processInstanceExecution, false);
...
return processInstanceExecution;
}
在第9步中,往 DefaultFlowableEngineAgenda 放入了新的 ExecutionEntity,则会在 CommandInvoker 使用 while 循环继续调用新的 ExecutionEntity ,这里就不展开分析了。
protected void executeOperations(final CommandContext commandContext) {
FlowableEngineAgenda agenda = CommandContextUtil.getAgenda(commandContext);
while (!agenda.isEmpty()) {
Runnable runnable = agenda.getNextOperation();
executeOperation(commandContext, runnable);
}
}
三、写入数据库
在将所有的操作数据库的操作放入缓存之后,下面看一下写入数据库的过程,在前面的文章中已经分析过了写入的操作是在 CommandContextInterceptor 的finally() 方法中,调用 commandContext.close(),会触发写入数据库的操作。
protected void flushInserts() {
//检查缓存要写入数据库的数据
if (insertedObjects.size() == 0) {
return;
}
// Handle in entity dependency order
//按顺序写入数据库
for (Class extends Entity> entityClass : dbSqlSessionFactory.getInsertionOrder()) {
if (insertedObjects.containsKey(entityClass)) {
flushInsertEntities(entityClass, insertedObjects.get(entityClass).values());
insertedObjects.remove(entityClass);
}
}
// Next, in case of custom entities or we've screwed up and forgotten some entity
//其他自定义的对象写入数据库
if (insertedObjects.size() > 0) {
for (Class extends Entity> entityClass : insertedObjects.keySet()) {
flushInsertEntities(entityClass, insertedObjects.get(entityClass).values());
}
}
insertedObjects.clear();
}
缓存 insertedObjects 中有 10条数据,分别对应数据库的10张表,然后按照 flowable 给定的写入顺序依次写入数据库。
总结:
在启动流程中,涉及到的几个关键的表:
ACT_HI_VARINST : 历史变量表
ACT_HI_IDENTITYLINK: 历史关联用户信息表
ACT_HI_TASKINST: 历史任务实例表
ACT_HI_PROCINST: 历史流程实例表
ACT_HI_ACTINST: 历史任务执行步骤表
ACT_RU_EXECUTION :运行时流程执行实例表
ACT_RU_ACTINST : 运行时任务执行步骤表
ACT_RU_TASK: 运行时任务表
ACT_RU_IDENTITYLINK : 运行时关联用户信息表
ACT_RU_VARIABLE :运行时变量表
如果配置了定时任务、事件等,还会涉及到其他一些表。



