当前位置:网站首页>Dolphin scheduler dolphin scheduling upgrade code transformation -upgradedolphin scheduler
Dolphin scheduler dolphin scheduling upgrade code transformation -upgradedolphin scheduler
2022-06-23 05:48:00 【Zhun Xiaozhao】
dolphinscheduler Data upgrade code transformation
background
Recorded once last week Data migration process , Is to use dolphin scheduling's own upgrade code , When the upgrade is complete , Manually batch update related tables , When we continue to migrate today , It is found that the stored procedure is not good for batch updating , Although you can intercept updates method Parameters , However, the number and name of parameters are not easy to take , Can only write dead , This undoubtedly increases the follow-up workload . So start with the code , Transform the upgrade code .
Upgrade code transformation
The upgrade idea is to manually change a task , There's no problem with the test , Compared with tasks that cannot be displayed normally , Then the parameters that can be displayed normally are spliced through the code 
test
preparation
● take 1.2.1 Export the database used .
● Yes 2.0.5 Use the database for backup , When the backup is complete , Delete all library tables .
● Import 1.2.1 Exported table structure and data .
perform upgrade-dolphinscheduler.sh Script

Log error
Although I reported two mistakes , But it didn't affect , At least for now , No impact found , However, it's better to modify the code and run again , It's hard to look at mistakes
Conversion error
2022-04-22 21:39:18.577 ERROR 1488 --- [ main] o.a.d.dao.upgrade.UpgradeDao : update process definition json workergroup error
java.lang.ClassCastException: com.fasterxml.jackson.databind.node.IntNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.updateProcessDefinitionJsonWorkerGroup(UpgradeDao.java:171) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.upgradeDolphinSchedulerWorkerGroup(UpgradeDao.java:135) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager.upgradeDolphinScheduler(DolphinSchedulerManager.java:107) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler$UpgradeRunner.run(UpgradeDolphinScheduler.java:55) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:143) [spring-boot-2.5.6.jar:2.5.6]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler.main(UpgradeDolphinScheduler.java:39) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
Null pointer
java.lang.NullPointerException: null
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.updateProcessDefinitionJsonResourceList(UpgradeDao.java:214) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.upgradeDolphinSchedulerResourceList(UpgradeDao.java:143) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager.upgradeDolphinScheduler(DolphinSchedulerManager.java:109) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler$UpgradeRunner.run(UpgradeDolphinScheduler.java:55) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:143) [spring-boot-2.5.6.jar:2.5.6]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler.main(UpgradeDolphinScheduler.java:39) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
```
```
2022-04-22 22:40:39.070 ERROR 1902 --- [ main] o.a.d.dao.upgrade.UpgradeDao : update process definition json resource list error
java.lang.NullPointerException: null
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.updateProcessDefinitionJsonResourceList(UpgradeDao.java:218) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.upgradeDolphinSchedulerResourceList(UpgradeDao.java:144) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager.upgradeDolphinScheduler(DolphinSchedulerManager.java:109) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler$UpgradeRunner.run(UpgradeDolphinScheduler.java:55) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:143) [spring-boot-2.5.6.jar:2.5.6]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler.main(UpgradeDolphinScheduler.java:39) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
Modify the code and verify again

No error is reported in the log

[[email protected] script]$ sh upgrade-dolphinscheduler.sh >> update.log
[[email protected] script]$
[[email protected] script]$ grep ERR update.log
[[email protected] script]$
View page validation
datax Normal display 
stored procedure ( Statement spliced )
Code
UpgradeDao
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.dolphinscheduler.dao.upgrade;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import javax.sql.DataSource;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;
public abstract class UpgradeDao {
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class);
private static final String T_VERSION_NAME = "t_escheduler_version";
private static final String T_NEW_VERSION_NAME = "t_ds_version";
protected final DataSource dataSource;
protected UpgradeDao(DataSource dataSource) {
this.dataSource = dataSource;
}
protected abstract String initSqlPath();
protected abstract DbType getDbType();
public void initSchema() {
// Execute the dolphinscheduler full sql
runInitSql(getDbType());
}
/** * run init sql to init db schema * @param dbType db type */
private void runInitSql(DbType dbType) {
String sqlFile = String.format("dolphinscheduler_%s.sql",dbType.getDescp());
Resource mysqlSQLFilePath = new ClassPathResource("sql/" + sqlFile);
try (Connection conn = dataSource.getConnection()) {
// Execute the dolphinscheduler_ddl.sql script to create the table structure of dolphinscheduler
ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true);
Reader initSqlReader = new InputStreamReader(mysqlSQLFilePath.getInputStream());
initScriptRunner.runScript(initSqlReader);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
}
}
public abstract boolean isExistsTable(String tableName);
public abstract boolean isExistsColumn(String tableName, String columnName);
public String getCurrentVersion(String versionName) {
String sql = String.format("select version from %s", versionName);
Connection conn = null;
ResultSet rs = null;
PreparedStatement pstmt = null;
String version = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
version = rs.getString(1);
}
return version;
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
}
public void upgradeDolphinScheduler(String schemaDir) {
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql");
upgradeDolphinSchedulerDML(schemaDir);
}
/** * upgrade DolphinScheduler worker group * ds-1.3.0 modify the worker group for process definition json */
public void upgradeDolphinSchedulerWorkerGroup() {
updateProcessDefinitionJsonWorkerGroup();
}
/** * upgrade DolphinScheduler resource list * ds-1.3.2 modify the resource list for process definition json */
public void upgradeDolphinSchedulerResourceList() {
updateProcessDefinitionJsonResourceList();
}
/** * upgrade DolphinScheduler to 2.0.0 */
public void upgradeDolphinSchedulerTo200(String schemaDir) {
processDefinitionJsonSplit();
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
}
/** * updateProcessDefinitionJsonWorkerGroup */
protected void updateProcessDefinitionJsonWorkerGroup() {
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
//ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId");
//lw 20220422 modify for "IntNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode"
IntNode workerGroupNode = (IntNode) task.path("workerGroupId");
int workerGroupId = -1;
if (workerGroupNode != null && workerGroupNode.canConvertToInt()) {
workerGroupId = workerGroupNode.asInt(-1);
}
if (workerGroupId == -1) {
task.put("workerGroup", "default");
} else {
task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId));
}
}
jsonObject.remove("task");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap);
}
} catch (Exception e) {
logger.error("update process definition json workergroup error", e);
}
}
protected void updateProcessDefinitionJsonResourceList() {
ResourceDao resourceDao = new ResourceDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<String, Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection());
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.get(i);
if(task != null) {
//lw 20220422 add
ObjectNode param = (ObjectNode) task.get("params");
if (param != null) {
JsonNode resourceJsonNode = param.get("resourceList");
if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
//lw 20220422 add
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
ResourceInfo mainJar = JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
if (mainJar != null && mainJar.getId() == 0) {
String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() : String.format("/%s", mainJar.getRes());
if (resourcesMap.containsKey(fullName)) {
mainJar.setId(resourcesMap.get(fullName));
param.put("mainJar", JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
}
}
if (CollectionUtils.isNotEmpty(resourceList)) {
List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> {
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s", resInfo.getRes());
if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) {
resInfo.setId(resourcesMap.get(fullName));
}
return resInfo;
}).collect(Collectors.toList());
param.put("resourceList", JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
}
}
}
task.put("params", param);
}
}
jsonObject.remove("tasks");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap);
}
} catch (Exception e) {
logger.error("update process definition json resource list error", e);
}
}
private void upgradeDolphinSchedulerDML(String schemaDir) {
String schemaVersion = schemaDir.split("_")[0];
Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/dolphinscheduler_dml.sql", schemaDir, getDbType().name().toLowerCase()));
if (!sqlFilePath.exists()) {
logger.info("No dml file {}, returning", sqlFilePath);
return;
}
logger.info("sqlSQLFilePath" + sqlFilePath);
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
// Execute the upgraded dolphinscheduler dml
ScriptRunner scriptRunner = new ScriptRunner(conn, false, true);
Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream());
scriptRunner.runScript(sqlReader);
if (isExistsTable(T_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?", T_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
} else if (isExistsTable(T_NEW_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?", T_NEW_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
}
conn.commit();
} catch (FileNotFoundException e) {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException("sql file not found ", e);
} catch (IOException e) {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
try {
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
/** * upgradeDolphinScheduler DDL * * @param schemaDir schemaDir */
private void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) {
Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile));
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = dataSource.getConnection();
String dbName = conn.getCatalog();
logger.info(dbName);
conn.setAutoCommit(true);
// Execute the dolphinscheduler ddl.sql for the upgrade
ScriptRunner scriptRunner = new ScriptRunner(conn, true, true);
Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream());
scriptRunner.runScript(sqlReader);
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql file not found ", e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
/** * update version * * @param version version */
public void updateVersion(String version) {
// Change version in the version table to the new version
String versionName = T_VERSION_NAME;
if (!SchemaUtils.isAGreatVersion("1.2.0", version)) {
versionName = "t_ds_version";
}
String upgradeSQL = String.format("update %s set version = ?", versionName);
PreparedStatement pstmt = null;
Connection conn = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, version);
pstmt.executeUpdate();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + upgradeSQL, e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
/** * upgrade DolphinScheduler to 2.0.0, json split */
private void processDefinitionJsonSplit() {
ProjectDao projectDao = new ProjectDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
ScheduleDao scheduleDao = new ScheduleDao();
JsonSplitDao jsonSplitDao = new JsonSplitDao();
try {
// execute project
Map<Integer, Long> projectIdCodeMap = projectDao.queryAllProject(dataSource.getConnection());
projectDao.updateProjectCode(dataSource.getConnection(), projectIdCodeMap);
// execute process definition code
List<ProcessDefinition> processDefinitions = processDefinitionDao.queryProcessDefinition(dataSource.getConnection());
processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(), processDefinitions, projectIdCodeMap);
// execute schedule
Map<Integer, Long> allSchedule = scheduleDao.queryAllSchedule(dataSource.getConnection());
Map<Integer, Long> processIdCodeMap = processDefinitions.stream().collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode));
scheduleDao.updateScheduleCode(dataSource.getConnection(), allSchedule, processIdCodeMap);
// json split
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
List<ProcessDefinitionLog> processDefinitionLogs = new ArrayList<>();
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap = new HashMap<>();
splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, taskDefinitionLogs, processTaskMap);
convertDependence(taskDefinitionLogs, projectIdCodeMap, processTaskMap);
// execute json split
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), processDefinitionLogs);
jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(), processTaskRelationLogs);
jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(), taskDefinitionLogs);
} catch (Exception e) {
logger.error("json split error", e);
}
}
private void splitProcessDefinitionJson(List<ProcessDefinition> processDefinitions,
Map<Integer, String> processDefinitionJsonMap,
List<ProcessDefinitionLog> processDefinitionLogs,
List<ProcessTaskRelationLog> processTaskRelationLogs,
List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) throws Exception {
Map<Integer, ProcessDefinition> processDefinitionMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition));
Date now = new Date();
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
if (entry.getValue() == null) {
throw new Exception("processDefinitionJson is null");
}
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey());
if (processDefinition != null) {
processDefinition.setTenantId(jsonObject.get("tenantId") == null ? -1 : jsonObject.get("tenantId").asInt());
processDefinition.setTimeout(jsonObject.get("timeout").asInt());
processDefinition.setGlobalParams(jsonObject.get("globalParams").toString());
} else {
throw new Exception("It can't find processDefinition, please check !");
}
Map<String, Long> taskIdCodeMap = new HashMap<>();
Map<String, List<String>> taskNamePreMap = new HashMap<>();
Map<String, Long> taskNameCodeMap = new HashMap<>();
Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = new HashMap<>();
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode param = (ObjectNode) task.get("params");
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
String taskType = task.get("type").asText();
if (param != null) {
JsonNode resourceJsonNode = param.get("resourceList");
if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
List<Integer> resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, Constants.COMMA));
} else {
taskDefinitionLog.setResourceIds(StringUtils.EMPTY);
}
if (TaskType.SUB_PROCESS.getDesc().equals(taskType)) {
JsonNode jsonNodeDefinitionId = param.get("processDefinitionId");
if (jsonNodeDefinitionId != null) {
param.put("processDefinitionCode", processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode());
param.remove("processDefinitionId");
}
}
//lw 20220422 Data migration and transformation start
if (TaskType.DATAX.getDesc().equals(taskType)) {
param.put("customConfig", 0);
param.put("xms", 1);
param.put("xmx", 1);
param.put("waitStartTimeout", task.get("waitStartTimeout"));
param.put("switchResult", task.get("switchResult"));
}
// stored procedure
if (TaskType.PROCEDURE.getDesc().equals(taskType)) {
param.put("waitStartTimeout", task.get("waitStartTimeout"));
param.put("switchResult", task.get("switchResult"));
String method = param.get("method").asText();
JsonNode localParamsJsonNode = param.get("localParams");
String callSQL ="{call "+method+"(${";
if (localParamsJsonNode != null && !localParamsJsonNode.isEmpty()) {
List<Property> resourceList = JSONUtils.toList(param.get("localParams").toString(), Property.class);
for(Property pro :resourceList) {
callSQL+=(pro.getProp()+"},${");
}
}
callSQL = callSQL.substring(0, callSQL.length()-3)+")}";
param.put("method", callSQL);
}
//lw 20220422 Data migration and transformation end
param.put("conditionResult", task.get("conditionResult"));
param.put("dependence", task.get("dependence"));
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
}
TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
if (timeout != null) {
taskDefinitionLog.setTimeout(timeout.getInterval());
taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
}
String desc = task.get("description") != null ? task.get("description").asText() :
task.get("desc") != null ? task.get("desc").asText() : "";
taskDefinitionLog.setDescription(desc);
taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO);
taskDefinitionLog.setTaskType(taskType);
taskDefinitionLog.setFailRetryInterval(TaskType.SUB_PROCESS.getDesc().equals(taskType) ? 1 : task.get("retryInterval").asInt());
taskDefinitionLog.setFailRetryTimes(TaskType.SUB_PROCESS.getDesc().equals(taskType) ? 0 : task.get("maxRetryTimes").asInt());
taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class));
String name = task.get("name").asText();
taskDefinitionLog.setName(name);
taskDefinitionLog.setWorkerGroup(task.get("workerGroup") == null ? "default" : task.get("workerGroup").asText());
long taskCode = CodeGenerateUtils.getInstance().genCode();
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
taskDefinitionLog.setUserId(processDefinition.getUserId());
taskDefinitionLog.setEnvironmentCode(-1);
taskDefinitionLog.setDelayTime(0);
taskDefinitionLog.setOperator(1);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLogList.add(taskDefinitionLog);
taskIdCodeMap.put(task.get("id").asText(), taskCode);
List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
taskNamePreMap.put(name, preTasks);
taskNameCodeMap.put(name, taskCode);
}
convertConditions(taskDefinitionLogList, taskNameCodeMap);
taskDefinitionLogs.addAll(taskDefinitionLogList);
processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap));
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
processDefinitionLog.setOperator(1);
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setUpdateTime(now);
processDefinitionLogs.add(processDefinitionLog);
handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs);
processCodeTaskNameCodeMap.put(processDefinition.getCode(), taskNameCodeMap);
processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap);
}
}
public void convertConditions(List<TaskDefinitionLog> taskDefinitionLogList, Map<String, Long> taskNameCodeMap) throws Exception {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())) {
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
// reset conditionResult
ObjectNode conditionResult = (ObjectNode) taskParams.get("conditionResult");
List<String> successNode = JSONUtils.toList(conditionResult.get("successNode").toString(), String.class);
List<Long> nodeCode = new ArrayList<>();
successNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("successNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
List<String> failedNode = JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class);
nodeCode.clear();
failedNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("failedNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
// reset dependItemList
ObjectNode dependence = (ObjectNode) taskParams.get("dependence");
ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
JsonNode depTasks = dependItem.get("depTasks");
dependItem.put("depTaskCode", taskNameCodeMap.get(depTasks.asText()));
dependItem.remove("depTasks");
dependItemList.set(j, dependItem);
}
dependTask.put("dependItemList", dependItemList);
dependTaskList.set(i, dependTask);
}
dependence.put("dependTaskList", dependTaskList);
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
}
}
}
private String convertLocations(String locations, Map<String, Long> taskIdCodeMap) {
if (StringUtils.isBlank(locations)) {
return locations;
}
Map<String, ObjectNode> locationsMap = JSONUtils.parseObject(locations, new TypeReference<Map<String, ObjectNode>>() {
});
if (locationsMap == null) {
return locations;
}
ArrayNode jsonNodes = JSONUtils.createArrayNode();
for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) {
ObjectNode nodes = JSONUtils.createObjectNode();
nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
ObjectNode oldNodes = entry.getValue();
nodes.put("x", oldNodes.get("x").asInt());
nodes.put("y", oldNodes.get("y").asInt());
jsonNodes.add(nodes);
}
return jsonNodes.toString();
}
public void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Long> projectIdCodeMap,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
ObjectNode dependence = (ObjectNode) taskParams.get("dependence");
ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
dependItem.put("projectCode", projectIdCodeMap.get(dependItem.get("projectId").asInt()));
int definitionId = dependItem.get("definitionId").asInt();
Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = processTaskMap.get(definitionId);
if (processCodeTaskNameCodeMap == null) {
logger.warn("We can't find processDefinition [{}], please check it is not exist, remove this dependence", definitionId);
dependItemList.remove(j);
continue;
}
Optional<Map.Entry<Long, Map<String, Long>>> mapEntry = processCodeTaskNameCodeMap.entrySet().stream().findFirst();
if (mapEntry.isPresent()) {
Map.Entry<Long, Map<String, Long>> processCodeTaskNameCodeEntry = mapEntry.get();
dependItem.put("definitionCode", processCodeTaskNameCodeEntry.getKey());
String depTasks = dependItem.get("depTasks").asText();
long taskCode = "ALL".equals(depTasks) || processCodeTaskNameCodeEntry.getValue() == null ? 0L : processCodeTaskNameCodeEntry.getValue().get(depTasks);
dependItem.put("depTaskCode", taskCode);
}
dependItem.remove("projectId");
dependItem.remove("definitionId");
dependItem.remove("depTasks");
dependItemList.set(j, dependItem);
}
dependTask.put("dependItemList", dependItemList);
dependTaskList.set(i, dependTask);
}
dependence.put("dependTaskList", dependTaskList);
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
}
}
}
private void handleProcessTaskRelation(Map<String, List<String>> taskNamePreMap,
Map<String, Long> taskNameCodeMap,
ProcessDefinition processDefinition,
List<ProcessTaskRelationLog> processTaskRelationLogs) {
Date now = new Date();
for (Map.Entry<String, List<String>> entry : taskNamePreMap.entrySet()) {
List<String> entryValue = entry.getValue();
if (CollectionUtils.isNotEmpty(entryValue)) {
for (String preTaskName : entryValue) {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName));
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
} else {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(0);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
}
}
private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition processDefinition, Date now) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLog.setOperator(1);
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
return processTaskRelationLog;
}
}
upgrade-dolphinscheduler.sh
[[email protected] script]$ cat upgrade-dolphinscheduler.sh
#!/bin/bash
DOLPHINSCHEDULER_HOME=/home/dolphinscheduler/app/dolphinscheduler/
export JAVA_HOME=/usr/local/java/jdk1.8.0_151
export DATABASE_TYPE=${
DATABASE_TYPE:-"mysql"}
export SPRING_PROFILES_ACTIVE=${
SPRING_PROFILES_ACTIVE:-"default"}
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},${DATABASE_TYPE}"
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/*
export DOLPHINSCHEDULER_SQL_DIR=$DOLPHINSCHEDULER_HOME/sql
export DOLPHINSCHEDULER_OPTS="-server -Xms64m -Xmx64m -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=64m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_SQL_DIR:$DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command
[[email protected] script]$
explain
create-dolphinscheduler.sh Will judge whether there is version surface , There are updates , If not, initialize ;
upgrade-dolphinscheduler.sh Directly perform the update operation .
边栏推荐
- C primer plus学习笔记 —— 2、常量与格式化IO(输入/输出)
- What is the reason for the black screen of the computer monitor when the computer is turned on? What should I do about the black screen of the computer monitor
- Composite API
- PAT 乙等 1017 C语言
- iNFTnews | 加密之家从宇宙寄来的明信片,你会收到哪一张?
- Pkav simple blasting
- Wechat applet: unfashionable love talk
- PAT 乙等 1019 C语言
- PAT 乙等 1020.月饼
- Lihongyi, machine learning 5 Tips for neural network design
猜你喜欢

软件设计开发笔记2:基于QT设计串口调试工具

Lottery DDD code
![[opencv450] inter frame difference method](/img/ad/c8a56e27d78cea581deb1874620613.png)
[opencv450] inter frame difference method

FS2119A同步升压IC输出3.3V和FS2119B同步升压IC输出5V

雷达图canvas

Wechat applet; AI intelligent dubbing assistant

树莓派assert初步使用练习

MySQL面试真题(二十八)——案例-通讯运营商指标分析

Alibaba cloud object storage oss+picgo+typera implements the construction map

MySQL面试真题(二十四)——行列互换
随机推荐
数字藏品赋能实体产业释放了哪些利好?
PAT 乙等 1017 C语言
Low cost 5W wireless charger scheme fs68001b simple charging chip
ORB_ Slam2 operation
数字藏品火热背后需要强大的技术团队支持 北方技术团队
MySQL面试真题(二十六)——滴滴2020年笔试题
Oracle异常
Lihongyi, machine learning 5 Tips for neural network design
The author believes that the so-called industrial Internet is a process of deep integration of industry and the Internet
51万奖池邀你参战——第二届阿里云ECS CloudBuild开发者大赛来袭
Opencv display image
Oracle exception
sprintf 格式代码使用不规范在不同平台下的表现
What is the reason for the black screen of the computer monitor when the computer is turned on? What should I do about the black screen of the computer monitor
runc 符号链接挂载与容器逃逸漏洞预警(CVE-2021-30465)
数字藏品如何赋能经济实体?
数字藏品市场才刚刚开始
【owt】owt-client-native-p2p-e2e-test vs2017构建 6:修改脚本自动生成vs工程
Export PDF with watermark
Skill self check | do you know these 6 skills if you want to be a test leader?