- 背景
- 升级代码改造
- 测试
- 准备工作
- 执行upgrade-dolphinscheduler.sh脚本
- 日志报错
- 修改代码再次验证
- 日志不报错
- 查看页面验证
- 代码
- UpgradeDao
- upgrade-dolphinscheduler.sh
- 说明
上周记录了一次数据迁移过程,就是使用海豚调度自带的升级代码,升级完成后,手动批量更新相关表,今天继续迁移的时候,发现存储过程不好批量更新,虽然可以截取更新method参数,但是参数个数名称不好取,只能写死,这样无疑增加了后续工作量。因此还是从代码入手,对升级代码进行改造。
升级代码改造升级思路就是手动改一个任务,测试没问题了,和无法正常显示的任务比较,然后通过代码拼接成可以正常显示的参数
● 将1.2.1使用的数据库导出。
● 对2.0.5使用的数据库进行备份,备份完成后,将库表全部删除。
● 导入1.2.1导出的表结构及数据。
虽然报了两个错,但并没有影响,至少目前测试下来,没发现有什么影响,不过还是修改代码再重跑下,错误看着难受呀
转换错
2022-04-22 21:39:18.577 ERROR 1488 --- [ main] o.a.d.dao.upgrade.UpgradeDao : update process definition json workergroup error java.lang.ClassCastException: com.fasterxml.jackson.databind.node.IntNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.updateProcessDefinitionJsonWorkerGroup(UpgradeDao.java:171) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT] at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.upgradeDolphinSchedulerWorkerGroup(UpgradeDao.java:135) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT] at org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager.upgradeDolphinScheduler(DolphinSchedulerManager.java:107) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT] at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler$UpgradeRunner.run(UpgradeDolphinScheduler.java:55) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.5.6.jar:2.5.6] at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) [spring-boot-2.5.6.jar:2.5.6] at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) [spring-boot-2.5.6.jar:2.5.6] at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:143) [spring-boot-2.5.6.jar:2.5.6] at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler.main(UpgradeDolphinScheduler.java:39) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
空指针
java.lang.NullPointerException: null
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.updateProcessDefinitionJsonResourceList(UpgradeDao.java:214) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.upgradeDolphinSchedulerResourceList(UpgradeDao.java:143) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager.upgradeDolphinScheduler(DolphinSchedulerManager.java:109) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler$UpgradeRunner.run(UpgradeDolphinScheduler.java:55) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:143) [spring-boot-2.5.6.jar:2.5.6]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler.main(UpgradeDolphinScheduler.java:39) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
```
```
2022-04-22 22:40:39.070 ERROR 1902 --- [ main] o.a.d.dao.upgrade.UpgradeDao : update process definition json resource list error
java.lang.NullPointerException: null
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.updateProcessDefinitionJsonResourceList(UpgradeDao.java:218) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.upgradeDolphinSchedulerResourceList(UpgradeDao.java:144) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager.upgradeDolphinScheduler(DolphinSchedulerManager.java:109) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler$UpgradeRunner.run(UpgradeDolphinScheduler.java:55) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:143) [spring-boot-2.5.6.jar:2.5.6]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler.main(UpgradeDolphinScheduler.java:39) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
修改代码再次验证
日志不报错
[dolphinscheduler@host1 script]$ sh upgrade-dolphinscheduler.sh >> update.log [dolphinscheduler@host1 script]$ [dolphinscheduler@host1 script]$ grep ERR update.log [dolphinscheduler@host1 script]$查看页面验证
datax正常显示
存储过程(语句已拼接)
package org.apache.dolphinscheduler.dao.upgrade;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import javax.sql.DataSource;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;
public abstract class UpgradeDao {
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class);
private static final String T_VERSION_NAME = "t_escheduler_version";
private static final String T_NEW_VERSION_NAME = "t_ds_version";
protected final DataSource dataSource;
protected UpgradeDao(DataSource dataSource) {
this.dataSource = dataSource;
}
protected abstract String initSqlPath();
protected abstract DbType getDbType();
public void initSchema() {
// Execute the dolphinscheduler full sql
runInitSql(getDbType());
}
private void runInitSql(DbType dbType) {
String sqlFile = String.format("dolphinscheduler_%s.sql",dbType.getDescp());
Resource mysqlSQLFilePath = new ClassPathResource("sql/" + sqlFile);
try (Connection conn = dataSource.getConnection()) {
// Execute the dolphinscheduler_ddl.sql script to create the table structure of dolphinscheduler
ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true);
Reader initSqlReader = new InputStreamReader(mysqlSQLFilePath.getInputStream());
initScriptRunner.runScript(initSqlReader);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
}
}
public abstract boolean isExistsTable(String tableName);
public abstract boolean isExistsColumn(String tableName, String columnName);
public String getCurrentVersion(String versionName) {
String sql = String.format("select version from %s", versionName);
Connection conn = null;
ResultSet rs = null;
PreparedStatement pstmt = null;
String version = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
version = rs.getString(1);
}
return version;
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
}
public void upgradeDolphinScheduler(String schemaDir) {
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql");
upgradeDolphinSchedulerDML(schemaDir);
}
public void upgradeDolphinSchedulerWorkerGroup() {
updateProcessDefinitionJsonWorkerGroup();
}
public void upgradeDolphinSchedulerResourceList() {
updateProcessDefinitionJsonResourceList();
}
public void upgradeDolphinSchedulerTo200(String schemaDir) {
processDefinitionJsonSplit();
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
}
protected void updateProcessDefinitionJsonWorkerGroup() {
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map replaceProcessDefinitionMap = new HashMap<>();
try {
Map oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
Map processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
//ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId");
//lw 20220422 modify for "IntNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode"
IntNode workerGroupNode = (IntNode) task.path("workerGroupId");
int workerGroupId = -1;
if (workerGroupNode != null && workerGroupNode.canConvertToInt()) {
workerGroupId = workerGroupNode.asInt(-1);
}
if (workerGroupId == -1) {
task.put("workerGroup", "default");
} else {
task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId));
}
}
jsonObject.remove("task");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap);
}
} catch (Exception e) {
logger.error("update process definition json workergroup error", e);
}
}
protected void updateProcessDefinitionJsonResourceList() {
ResourceDao resourceDao = new ResourceDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map replaceProcessDefinitionMap = new HashMap<>();
try {
Map resourcesMap = resourceDao.listAllResources(dataSource.getConnection());
Map processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.get(i);
if(task != null) {//lw 20220422 add
ObjectNode param = (ObjectNode) task.get("params");
if (param != null) {
JsonNode resourceJsonNode = param.get("resourceList");
if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {//lw 20220422 add
List resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
ResourceInfo mainJar = JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
if (mainJar != null && mainJar.getId() == 0) {
String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() : String.format("/%s", mainJar.getRes());
if (resourcesMap.containsKey(fullName)) {
mainJar.setId(resourcesMap.get(fullName));
param.put("mainJar", JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
}
}
if (CollectionUtils.isNotEmpty(resourceList)) {
List newResourceList = resourceList.stream().map(resInfo -> {
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s", resInfo.getRes());
if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) {
resInfo.setId(resourcesMap.get(fullName));
}
return resInfo;
}).collect(Collectors.toList());
param.put("resourceList", JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
}
}
}
task.put("params", param);
}
}
jsonObject.remove("tasks");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap);
}
} catch (Exception e) {
logger.error("update process definition json resource list error", e);
}
}
private void upgradeDolphinSchedulerDML(String schemaDir) {
String schemaVersion = schemaDir.split("_")[0];
Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/dolphinscheduler_dml.sql", schemaDir, getDbType().name().toLowerCase()));
if (!sqlFilePath.exists()) {
logger.info("No dml file {}, returning", sqlFilePath);
return;
}
logger.info("sqlSQLFilePath" + sqlFilePath);
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
// Execute the upgraded dolphinscheduler dml
ScriptRunner scriptRunner = new ScriptRunner(conn, false, true);
Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream());
scriptRunner.runScript(sqlReader);
if (isExistsTable(T_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?", T_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
} else if (isExistsTable(T_NEW_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?", T_NEW_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
}
conn.commit();
} catch (FileNotFoundException e) {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException("sql file not found ", e);
} catch (IOException e) {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
try {
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
private void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) {
Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile));
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = dataSource.getConnection();
String dbName = conn.getCatalog();
logger.info(dbName);
conn.setAutoCommit(true);
// Execute the dolphinscheduler ddl.sql for the upgrade
ScriptRunner scriptRunner = new ScriptRunner(conn, true, true);
Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream());
scriptRunner.runScript(sqlReader);
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql file not found ", e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
public void updateVersion(String version) {
// Change version in the version table to the new version
String versionName = T_VERSION_NAME;
if (!SchemaUtils.isAGreatVersion("1.2.0", version)) {
versionName = "t_ds_version";
}
String upgradeSQL = String.format("update %s set version = ?", versionName);
PreparedStatement pstmt = null;
Connection conn = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, version);
pstmt.executeUpdate();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + upgradeSQL, e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
private void processDefinitionJsonSplit() {
ProjectDao projectDao = new ProjectDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
ScheduleDao scheduleDao = new ScheduleDao();
JsonSplitDao jsonSplitDao = new JsonSplitDao();
try {
// execute project
Map projectIdCodeMap = projectDao.queryAllProject(dataSource.getConnection());
projectDao.updateProjectCode(dataSource.getConnection(), projectIdCodeMap);
// execute process definition code
List processDefinitions = processDefinitionDao.queryProcessDefinition(dataSource.getConnection());
processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(), processDefinitions, projectIdCodeMap);
// execute schedule
Map allSchedule = scheduleDao.queryAllSchedule(dataSource.getConnection());
Map processIdCodeMap = processDefinitions.stream().collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode));
scheduleDao.updateScheduleCode(dataSource.getConnection(), allSchedule, processIdCodeMap);
// json split
Map processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
List processDefinitionLogs = new ArrayList<>();
List processTaskRelationLogs = new ArrayList<>();
List taskDefinitionLogs = new ArrayList<>();
Map>> processTaskMap = new HashMap<>();
splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, taskDefinitionLogs, processTaskMap);
convertDependence(taskDefinitionLogs, projectIdCodeMap, processTaskMap);
// execute json split
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), processDefinitionLogs);
jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(), processTaskRelationLogs);
jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(), taskDefinitionLogs);
} catch (Exception e) {
logger.error("json split error", e);
}
}
private void splitProcessDefinitionJson(List processDefinitions,
Map processDefinitionJsonMap,
List processDefinitionLogs,
List processTaskRelationLogs,
List taskDefinitionLogs,
Map>> processTaskMap) throws Exception {
Map processDefinitionMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition));
Date now = new Date();
for (Map.Entry entry : processDefinitionJsonMap.entrySet()) {
if (entry.getValue() == null) {
throw new Exception("processDefinitionJson is null");
}
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey());
if (processDefinition != null) {
processDefinition.setTenantId(jsonObject.get("tenantId") == null ? -1 : jsonObject.get("tenantId").asInt());
processDefinition.setTimeout(jsonObject.get("timeout").asInt());
processDefinition.setGlobalParams(jsonObject.get("globalParams").toString());
} else {
throw new Exception("It can't find processDefinition, please check !");
}
Map taskIdCodeMap = new HashMap<>();
Map> taskNamePreMap = new HashMap<>();
Map taskNameCodeMap = new HashMap<>();
Map> processCodeTaskNameCodeMap = new HashMap<>();
List taskDefinitionLogList = new ArrayList<>();
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode param = (ObjectNode) task.get("params");
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
String taskType = task.get("type").asText();
if (param != null) {
JsonNode resourceJsonNode = param.get("resourceList");
if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
List resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
List resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, Constants.COMMA));
} else {
taskDefinitionLog.setResourceIds(StringUtils.EMPTY);
}
if (TaskType.SUB_PROCESS.getDesc().equals(taskType)) {
JsonNode jsonNodeDefinitionId = param.get("processDefinitionId");
if (jsonNodeDefinitionId != null) {
param.put("processDefinitionCode", processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode());
param.remove("processDefinitionId");
}
}
//lw 20220422 数据迁移改造 start
if (TaskType.DATAX.getDesc().equals(taskType)) {
param.put("customConfig", 0);
param.put("xms", 1);
param.put("xmx", 1);
param.put("waitStartTimeout", task.get("waitStartTimeout"));
param.put("switchResult", task.get("switchResult"));
}
//存储过程
if (TaskType.PROCEDURE.getDesc().equals(taskType)) {
param.put("waitStartTimeout", task.get("waitStartTimeout"));
param.put("switchResult", task.get("switchResult"));
String method = param.get("method").asText();
JsonNode localParamsJsonNode = param.get("localParams");
String callSQL ="{call "+method+"(${";
if (localParamsJsonNode != null && !localParamsJsonNode.isEmpty()) {
List resourceList = JSONUtils.toList(param.get("localParams").toString(), Property.class);
for(Property pro :resourceList) {
callSQL+=(pro.getProp()+"},${");
}
}
callSQL = callSQL.substring(0, callSQL.length()-3)+")}";
param.put("method", callSQL);
}
//lw 20220422 数据迁移改造 end
param.put("conditionResult", task.get("conditionResult"));
param.put("dependence", task.get("dependence"));
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
}
TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
if (timeout != null) {
taskDefinitionLog.setTimeout(timeout.getInterval());
taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
}
String desc = task.get("description") != null ? task.get("description").asText() :
task.get("desc") != null ? task.get("desc").asText() : "";
taskDefinitionLog.setDescription(desc);
taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO);
taskDefinitionLog.setTaskType(taskType);
taskDefinitionLog.setFailRetryInterval(TaskType.SUB_PROCESS.getDesc().equals(taskType) ? 1 : task.get("retryInterval").asInt());
taskDefinitionLog.setFailRetryTimes(TaskType.SUB_PROCESS.getDesc().equals(taskType) ? 0 : task.get("maxRetryTimes").asInt());
taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class));
String name = task.get("name").asText();
taskDefinitionLog.setName(name);
taskDefinitionLog.setWorkerGroup(task.get("workerGroup") == null ? "default" : task.get("workerGroup").asText());
long taskCode = CodeGenerateUtils.getInstance().genCode();
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
taskDefinitionLog.setUserId(processDefinition.getUserId());
taskDefinitionLog.setEnvironmentCode(-1);
taskDefinitionLog.setDelayTime(0);
taskDefinitionLog.setOperator(1);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLogList.add(taskDefinitionLog);
taskIdCodeMap.put(task.get("id").asText(), taskCode);
List preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
taskNamePreMap.put(name, preTasks);
taskNameCodeMap.put(name, taskCode);
}
convertConditions(taskDefinitionLogList, taskNameCodeMap);
taskDefinitionLogs.addAll(taskDefinitionLogList);
processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap));
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
processDefinitionLog.setOperator(1);
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setUpdateTime(now);
processDefinitionLogs.add(processDefinitionLog);
handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs);
processCodeTaskNameCodeMap.put(processDefinition.getCode(), taskNameCodeMap);
processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap);
}
}
public void convertConditions(List taskDefinitionLogList, Map taskNameCodeMap) throws Exception {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())) {
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
// reset conditionResult
ObjectNode conditionResult = (ObjectNode) taskParams.get("conditionResult");
List successNode = JSONUtils.toList(conditionResult.get("successNode").toString(), String.class);
List nodeCode = new ArrayList<>();
successNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("successNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
List failedNode = JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class);
nodeCode.clear();
failedNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("failedNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
// reset dependItemList
ObjectNode dependence = (ObjectNode) taskParams.get("dependence");
ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
JsonNode depTasks = dependItem.get("depTasks");
dependItem.put("depTaskCode", taskNameCodeMap.get(depTasks.asText()));
dependItem.remove("depTasks");
dependItemList.set(j, dependItem);
}
dependTask.put("dependItemList", dependItemList);
dependTaskList.set(i, dependTask);
}
dependence.put("dependTaskList", dependTaskList);
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
}
}
}
private String convertLocations(String locations, Map taskIdCodeMap) {
if (StringUtils.isBlank(locations)) {
return locations;
}
Map locationsMap = JSONUtils.parseObject(locations, new TypeReference
upgrade-dolphinscheduler.sh
[dolphinscheduler@host1 script]$ cat upgrade-dolphinscheduler.sh
#!/bin/bash
DOLPHINSCHEDULER_HOME=/home/dolphinscheduler/app/dolphinscheduler/
export JAVA_HOME=/usr/local/java/jdk1.8.0_151
export DATABASE_TYPE=${DATABASE_TYPE:-"mysql"}
export SPRING_PROFILES_ACTIVE=${SPRING_PROFILES_ACTIVE:-"default"}
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},${DATABASE_TYPE}"
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/*
export DOLPHINSCHEDULER_SQL_DIR=$DOLPHINSCHEDULER_HOME/sql
export DOLPHINSCHEDULER_OPTS="-server -Xms64m -Xmx64m -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=64m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_SQL_DIR:$DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command
[dolphinscheduler@host1 script]$
说明
create-dolphinscheduler.sh会判断有没有version表,有更新,无则初始化;
upgrade-dolphinscheduler.sh直接执行更新操作。



