当前位置:网站首页>dolphinscheduler海豚调度升级代码改造-UpgradeDolphinScheduler
dolphinscheduler海豚调度升级代码改造-UpgradeDolphinScheduler
2022-06-23 03:52:00 【韧小钊】
dolphinscheduler数据升级代码改造
背景
上周记录了一次数据迁移过程,就是使用海豚调度自带的升级代码,升级完成后,手动批量更新相关表,今天继续迁移的时候,发现存储过程不好批量更新,虽然可以截取更新method参数,但是参数个数名称不好取,只能写死,这样无疑增加了后续工作量。因此还是从代码入手,对升级代码进行改造。
升级代码改造
升级思路就是手动改一个任务,测试没问题了,和无法正常显示的任务比较,然后通过代码拼接成可以正常显示的参数
测试
准备工作
● 将1.2.1使用的数据库导出。
● 对2.0.5使用的数据库进行备份,备份完成后,将库表全部删除。
● 导入1.2.1导出的表结构及数据。
执行upgrade-dolphinscheduler.sh脚本

日志报错
虽然报了两个错,但并没有影响,至少目前测试下来,没发现有什么影响,不过还是修改代码再重跑下,错误看着难受呀
转换错
2022-04-22 21:39:18.577 ERROR 1488 --- [ main] o.a.d.dao.upgrade.UpgradeDao : update process definition json workergroup error
java.lang.ClassCastException: com.fasterxml.jackson.databind.node.IntNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.updateProcessDefinitionJsonWorkerGroup(UpgradeDao.java:171) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.upgradeDolphinSchedulerWorkerGroup(UpgradeDao.java:135) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager.upgradeDolphinScheduler(DolphinSchedulerManager.java:107) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler$UpgradeRunner.run(UpgradeDolphinScheduler.java:55) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:143) [spring-boot-2.5.6.jar:2.5.6]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler.main(UpgradeDolphinScheduler.java:39) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
空指针
java.lang.NullPointerException: null
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.updateProcessDefinitionJsonResourceList(UpgradeDao.java:214) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.upgradeDolphinSchedulerResourceList(UpgradeDao.java:143) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager.upgradeDolphinScheduler(DolphinSchedulerManager.java:109) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler$UpgradeRunner.run(UpgradeDolphinScheduler.java:55) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:143) [spring-boot-2.5.6.jar:2.5.6]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler.main(UpgradeDolphinScheduler.java:39) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
```
```
2022-04-22 22:40:39.070 ERROR 1902 --- [ main] o.a.d.dao.upgrade.UpgradeDao : update process definition json resource list error
java.lang.NullPointerException: null
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.updateProcessDefinitionJsonResourceList(UpgradeDao.java:218) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.upgradeDolphinSchedulerResourceList(UpgradeDao.java:144) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager.upgradeDolphinScheduler(DolphinSchedulerManager.java:109) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler$UpgradeRunner.run(UpgradeDolphinScheduler.java:55) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) [spring-boot-2.5.6.jar:2.5.6]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:143) [spring-boot-2.5.6.jar:2.5.6]
at org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler.main(UpgradeDolphinScheduler.java:39) [dolphinscheduler-dao-2.0.5-SNAPSHOT.jar:2.0.5-SNAPSHOT]
修改代码再次验证

日志不报错

[[email protected] script]$ sh upgrade-dolphinscheduler.sh >> update.log
[[email protected] script]$
[[email protected] script]$ grep ERR update.log
[[email protected] script]$
查看页面验证
datax正常显示
存储过程(语句已拼接)
代码
UpgradeDao
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.dolphinscheduler.dao.upgrade;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import javax.sql.DataSource;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;
public abstract class UpgradeDao {
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class);
private static final String T_VERSION_NAME = "t_escheduler_version";
private static final String T_NEW_VERSION_NAME = "t_ds_version";
protected final DataSource dataSource;
protected UpgradeDao(DataSource dataSource) {
this.dataSource = dataSource;
}
protected abstract String initSqlPath();
protected abstract DbType getDbType();
public void initSchema() {
// Execute the dolphinscheduler full sql
runInitSql(getDbType());
}
/** * run init sql to init db schema * @param dbType db type */
private void runInitSql(DbType dbType) {
String sqlFile = String.format("dolphinscheduler_%s.sql",dbType.getDescp());
Resource mysqlSQLFilePath = new ClassPathResource("sql/" + sqlFile);
try (Connection conn = dataSource.getConnection()) {
// Execute the dolphinscheduler_ddl.sql script to create the table structure of dolphinscheduler
ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true);
Reader initSqlReader = new InputStreamReader(mysqlSQLFilePath.getInputStream());
initScriptRunner.runScript(initSqlReader);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
}
}
public abstract boolean isExistsTable(String tableName);
public abstract boolean isExistsColumn(String tableName, String columnName);
public String getCurrentVersion(String versionName) {
String sql = String.format("select version from %s", versionName);
Connection conn = null;
ResultSet rs = null;
PreparedStatement pstmt = null;
String version = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
version = rs.getString(1);
}
return version;
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
}
public void upgradeDolphinScheduler(String schemaDir) {
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql");
upgradeDolphinSchedulerDML(schemaDir);
}
/** * upgrade DolphinScheduler worker group * ds-1.3.0 modify the worker group for process definition json */
public void upgradeDolphinSchedulerWorkerGroup() {
updateProcessDefinitionJsonWorkerGroup();
}
/** * upgrade DolphinScheduler resource list * ds-1.3.2 modify the resource list for process definition json */
public void upgradeDolphinSchedulerResourceList() {
updateProcessDefinitionJsonResourceList();
}
/** * upgrade DolphinScheduler to 2.0.0 */
public void upgradeDolphinSchedulerTo200(String schemaDir) {
processDefinitionJsonSplit();
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
}
/** * updateProcessDefinitionJsonWorkerGroup */
protected void updateProcessDefinitionJsonWorkerGroup() {
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
//ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId");
//lw 20220422 modify for "IntNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode"
IntNode workerGroupNode = (IntNode) task.path("workerGroupId");
int workerGroupId = -1;
if (workerGroupNode != null && workerGroupNode.canConvertToInt()) {
workerGroupId = workerGroupNode.asInt(-1);
}
if (workerGroupId == -1) {
task.put("workerGroup", "default");
} else {
task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId));
}
}
jsonObject.remove("task");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap);
}
} catch (Exception e) {
logger.error("update process definition json workergroup error", e);
}
}
protected void updateProcessDefinitionJsonResourceList() {
ResourceDao resourceDao = new ResourceDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<String, Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection());
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.get(i);
if(task != null) {
//lw 20220422 add
ObjectNode param = (ObjectNode) task.get("params");
if (param != null) {
JsonNode resourceJsonNode = param.get("resourceList");
if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
//lw 20220422 add
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
ResourceInfo mainJar = JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
if (mainJar != null && mainJar.getId() == 0) {
String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() : String.format("/%s", mainJar.getRes());
if (resourcesMap.containsKey(fullName)) {
mainJar.setId(resourcesMap.get(fullName));
param.put("mainJar", JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
}
}
if (CollectionUtils.isNotEmpty(resourceList)) {
List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> {
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s", resInfo.getRes());
if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) {
resInfo.setId(resourcesMap.get(fullName));
}
return resInfo;
}).collect(Collectors.toList());
param.put("resourceList", JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
}
}
}
task.put("params", param);
}
}
jsonObject.remove("tasks");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap);
}
} catch (Exception e) {
logger.error("update process definition json resource list error", e);
}
}
private void upgradeDolphinSchedulerDML(String schemaDir) {
String schemaVersion = schemaDir.split("_")[0];
Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/dolphinscheduler_dml.sql", schemaDir, getDbType().name().toLowerCase()));
if (!sqlFilePath.exists()) {
logger.info("No dml file {}, returning", sqlFilePath);
return;
}
logger.info("sqlSQLFilePath" + sqlFilePath);
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
// Execute the upgraded dolphinscheduler dml
ScriptRunner scriptRunner = new ScriptRunner(conn, false, true);
Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream());
scriptRunner.runScript(sqlReader);
if (isExistsTable(T_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?", T_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
} else if (isExistsTable(T_NEW_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?", T_NEW_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
}
conn.commit();
} catch (FileNotFoundException e) {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException("sql file not found ", e);
} catch (IOException e) {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
try {
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
/** * upgradeDolphinScheduler DDL * * @param schemaDir schemaDir */
private void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) {
Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile));
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = dataSource.getConnection();
String dbName = conn.getCatalog();
logger.info(dbName);
conn.setAutoCommit(true);
// Execute the dolphinscheduler ddl.sql for the upgrade
ScriptRunner scriptRunner = new ScriptRunner(conn, true, true);
Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream());
scriptRunner.runScript(sqlReader);
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql file not found ", e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
/** * update version * * @param version version */
public void updateVersion(String version) {
// Change version in the version table to the new version
String versionName = T_VERSION_NAME;
if (!SchemaUtils.isAGreatVersion("1.2.0", version)) {
versionName = "t_ds_version";
}
String upgradeSQL = String.format("update %s set version = ?", versionName);
PreparedStatement pstmt = null;
Connection conn = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, version);
pstmt.executeUpdate();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + upgradeSQL, e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
/** * upgrade DolphinScheduler to 2.0.0, json split */
private void processDefinitionJsonSplit() {
ProjectDao projectDao = new ProjectDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
ScheduleDao scheduleDao = new ScheduleDao();
JsonSplitDao jsonSplitDao = new JsonSplitDao();
try {
// execute project
Map<Integer, Long> projectIdCodeMap = projectDao.queryAllProject(dataSource.getConnection());
projectDao.updateProjectCode(dataSource.getConnection(), projectIdCodeMap);
// execute process definition code
List<ProcessDefinition> processDefinitions = processDefinitionDao.queryProcessDefinition(dataSource.getConnection());
processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(), processDefinitions, projectIdCodeMap);
// execute schedule
Map<Integer, Long> allSchedule = scheduleDao.queryAllSchedule(dataSource.getConnection());
Map<Integer, Long> processIdCodeMap = processDefinitions.stream().collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode));
scheduleDao.updateScheduleCode(dataSource.getConnection(), allSchedule, processIdCodeMap);
// json split
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
List<ProcessDefinitionLog> processDefinitionLogs = new ArrayList<>();
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap = new HashMap<>();
splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, taskDefinitionLogs, processTaskMap);
convertDependence(taskDefinitionLogs, projectIdCodeMap, processTaskMap);
// execute json split
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), processDefinitionLogs);
jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(), processTaskRelationLogs);
jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(), taskDefinitionLogs);
} catch (Exception e) {
logger.error("json split error", e);
}
}
private void splitProcessDefinitionJson(List<ProcessDefinition> processDefinitions,
Map<Integer, String> processDefinitionJsonMap,
List<ProcessDefinitionLog> processDefinitionLogs,
List<ProcessTaskRelationLog> processTaskRelationLogs,
List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) throws Exception {
Map<Integer, ProcessDefinition> processDefinitionMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition));
Date now = new Date();
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
if (entry.getValue() == null) {
throw new Exception("processDefinitionJson is null");
}
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey());
if (processDefinition != null) {
processDefinition.setTenantId(jsonObject.get("tenantId") == null ? -1 : jsonObject.get("tenantId").asInt());
processDefinition.setTimeout(jsonObject.get("timeout").asInt());
processDefinition.setGlobalParams(jsonObject.get("globalParams").toString());
} else {
throw new Exception("It can't find processDefinition, please check !");
}
Map<String, Long> taskIdCodeMap = new HashMap<>();
Map<String, List<String>> taskNamePreMap = new HashMap<>();
Map<String, Long> taskNameCodeMap = new HashMap<>();
Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = new HashMap<>();
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode param = (ObjectNode) task.get("params");
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
String taskType = task.get("type").asText();
if (param != null) {
JsonNode resourceJsonNode = param.get("resourceList");
if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
List<Integer> resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, Constants.COMMA));
} else {
taskDefinitionLog.setResourceIds(StringUtils.EMPTY);
}
if (TaskType.SUB_PROCESS.getDesc().equals(taskType)) {
JsonNode jsonNodeDefinitionId = param.get("processDefinitionId");
if (jsonNodeDefinitionId != null) {
param.put("processDefinitionCode", processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode());
param.remove("processDefinitionId");
}
}
//lw 20220422 数据迁移改造 start
if (TaskType.DATAX.getDesc().equals(taskType)) {
param.put("customConfig", 0);
param.put("xms", 1);
param.put("xmx", 1);
param.put("waitStartTimeout", task.get("waitStartTimeout"));
param.put("switchResult", task.get("switchResult"));
}
//存储过程
if (TaskType.PROCEDURE.getDesc().equals(taskType)) {
param.put("waitStartTimeout", task.get("waitStartTimeout"));
param.put("switchResult", task.get("switchResult"));
String method = param.get("method").asText();
JsonNode localParamsJsonNode = param.get("localParams");
String callSQL ="{call "+method+"(${";
if (localParamsJsonNode != null && !localParamsJsonNode.isEmpty()) {
List<Property> resourceList = JSONUtils.toList(param.get("localParams").toString(), Property.class);
for(Property pro :resourceList) {
callSQL+=(pro.getProp()+"},${");
}
}
callSQL = callSQL.substring(0, callSQL.length()-3)+")}";
param.put("method", callSQL);
}
//lw 20220422 数据迁移改造 end
param.put("conditionResult", task.get("conditionResult"));
param.put("dependence", task.get("dependence"));
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
}
TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
if (timeout != null) {
taskDefinitionLog.setTimeout(timeout.getInterval());
taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
}
String desc = task.get("description") != null ? task.get("description").asText() :
task.get("desc") != null ? task.get("desc").asText() : "";
taskDefinitionLog.setDescription(desc);
taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO);
taskDefinitionLog.setTaskType(taskType);
taskDefinitionLog.setFailRetryInterval(TaskType.SUB_PROCESS.getDesc().equals(taskType) ? 1 : task.get("retryInterval").asInt());
taskDefinitionLog.setFailRetryTimes(TaskType.SUB_PROCESS.getDesc().equals(taskType) ? 0 : task.get("maxRetryTimes").asInt());
taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class));
String name = task.get("name").asText();
taskDefinitionLog.setName(name);
taskDefinitionLog.setWorkerGroup(task.get("workerGroup") == null ? "default" : task.get("workerGroup").asText());
long taskCode = CodeGenerateUtils.getInstance().genCode();
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
taskDefinitionLog.setUserId(processDefinition.getUserId());
taskDefinitionLog.setEnvironmentCode(-1);
taskDefinitionLog.setDelayTime(0);
taskDefinitionLog.setOperator(1);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLogList.add(taskDefinitionLog);
taskIdCodeMap.put(task.get("id").asText(), taskCode);
List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
taskNamePreMap.put(name, preTasks);
taskNameCodeMap.put(name, taskCode);
}
convertConditions(taskDefinitionLogList, taskNameCodeMap);
taskDefinitionLogs.addAll(taskDefinitionLogList);
processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap));
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
processDefinitionLog.setOperator(1);
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setUpdateTime(now);
processDefinitionLogs.add(processDefinitionLog);
handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs);
processCodeTaskNameCodeMap.put(processDefinition.getCode(), taskNameCodeMap);
processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap);
}
}
public void convertConditions(List<TaskDefinitionLog> taskDefinitionLogList, Map<String, Long> taskNameCodeMap) throws Exception {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())) {
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
// reset conditionResult
ObjectNode conditionResult = (ObjectNode) taskParams.get("conditionResult");
List<String> successNode = JSONUtils.toList(conditionResult.get("successNode").toString(), String.class);
List<Long> nodeCode = new ArrayList<>();
successNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("successNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
List<String> failedNode = JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class);
nodeCode.clear();
failedNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("failedNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
// reset dependItemList
ObjectNode dependence = (ObjectNode) taskParams.get("dependence");
ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
JsonNode depTasks = dependItem.get("depTasks");
dependItem.put("depTaskCode", taskNameCodeMap.get(depTasks.asText()));
dependItem.remove("depTasks");
dependItemList.set(j, dependItem);
}
dependTask.put("dependItemList", dependItemList);
dependTaskList.set(i, dependTask);
}
dependence.put("dependTaskList", dependTaskList);
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
}
}
}
private String convertLocations(String locations, Map<String, Long> taskIdCodeMap) {
if (StringUtils.isBlank(locations)) {
return locations;
}
Map<String, ObjectNode> locationsMap = JSONUtils.parseObject(locations, new TypeReference<Map<String, ObjectNode>>() {
});
if (locationsMap == null) {
return locations;
}
ArrayNode jsonNodes = JSONUtils.createArrayNode();
for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) {
ObjectNode nodes = JSONUtils.createObjectNode();
nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
ObjectNode oldNodes = entry.getValue();
nodes.put("x", oldNodes.get("x").asInt());
nodes.put("y", oldNodes.get("y").asInt());
jsonNodes.add(nodes);
}
return jsonNodes.toString();
}
public void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Long> projectIdCodeMap,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
ObjectNode dependence = (ObjectNode) taskParams.get("dependence");
ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
dependItem.put("projectCode", projectIdCodeMap.get(dependItem.get("projectId").asInt()));
int definitionId = dependItem.get("definitionId").asInt();
Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = processTaskMap.get(definitionId);
if (processCodeTaskNameCodeMap == null) {
logger.warn("We can't find processDefinition [{}], please check it is not exist, remove this dependence", definitionId);
dependItemList.remove(j);
continue;
}
Optional<Map.Entry<Long, Map<String, Long>>> mapEntry = processCodeTaskNameCodeMap.entrySet().stream().findFirst();
if (mapEntry.isPresent()) {
Map.Entry<Long, Map<String, Long>> processCodeTaskNameCodeEntry = mapEntry.get();
dependItem.put("definitionCode", processCodeTaskNameCodeEntry.getKey());
String depTasks = dependItem.get("depTasks").asText();
long taskCode = "ALL".equals(depTasks) || processCodeTaskNameCodeEntry.getValue() == null ? 0L : processCodeTaskNameCodeEntry.getValue().get(depTasks);
dependItem.put("depTaskCode", taskCode);
}
dependItem.remove("projectId");
dependItem.remove("definitionId");
dependItem.remove("depTasks");
dependItemList.set(j, dependItem);
}
dependTask.put("dependItemList", dependItemList);
dependTaskList.set(i, dependTask);
}
dependence.put("dependTaskList", dependTaskList);
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
}
}
}
private void handleProcessTaskRelation(Map<String, List<String>> taskNamePreMap,
Map<String, Long> taskNameCodeMap,
ProcessDefinition processDefinition,
List<ProcessTaskRelationLog> processTaskRelationLogs) {
Date now = new Date();
for (Map.Entry<String, List<String>> entry : taskNamePreMap.entrySet()) {
List<String> entryValue = entry.getValue();
if (CollectionUtils.isNotEmpty(entryValue)) {
for (String preTaskName : entryValue) {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName));
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
} else {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(0);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
}
}
private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition processDefinition, Date now) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLog.setOperator(1);
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
return processTaskRelationLog;
}
}
upgrade-dolphinscheduler.sh
[[email protected] script]$ cat upgrade-dolphinscheduler.sh
#!/bin/bash
DOLPHINSCHEDULER_HOME=/home/dolphinscheduler/app/dolphinscheduler/
export JAVA_HOME=/usr/local/java/jdk1.8.0_151
export DATABASE_TYPE=${
DATABASE_TYPE:-"mysql"}
export SPRING_PROFILES_ACTIVE=${
SPRING_PROFILES_ACTIVE:-"default"}
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},${DATABASE_TYPE}"
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/*
export DOLPHINSCHEDULER_SQL_DIR=$DOLPHINSCHEDULER_HOME/sql
export DOLPHINSCHEDULER_OPTS="-server -Xms64m -Xmx64m -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=64m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_SQL_DIR:$DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command
[[email protected] script]$
说明
create-dolphinscheduler.sh会判断有没有version表,有更新,无则初始化;
upgrade-dolphinscheduler.sh直接执行更新操作。
边栏推荐
- Mini Homer——几百块钱也能搞到一台远距离图数传链路?
- DSP7 环境
- DO280OpenShift命令及故障排查--常见故障排除和章节实验
- Altium designer 09 screen printing displays a green warning near the pad. How to prevent it from alarming?
- const理解之二
- CVE-2019-14287(sudo提权)
- Install and run mongodb under win10
- Actual combat | multiple intranet penetration through Viper
- 电流继电器HDL-A/1-110VDC-1
- Openwrt directory structure
猜你喜欢

Usage of API interface test ------ post

Gson typeadapter adapter

With the arrival of intelligent voice era, who is defining AI in the new era?

ICer技能02makefile脚本自跑vcs仿真

Abnova ACTN4纯化兔多克隆抗体说明书

【论文阅读】Semi-Supervised Learning with Ladder Networks
Laravel中使用 Editor.md 上传图片如何处理?

【图论】—— 二分图

Cve-2019-14287 (sudo right raising)

Abnova blood total nucleic acid purification kit protocol
随机推荐
云原生数据库如荼如火,未来可期
重装Cadence16.3,失败与成功
How to better organize the minimum web api code structure
Cloud native database is in full swing, and the future can be expected
Receive incoming files and download (simple usage) a tag
[paper reading] semi supervised learning with ladder networks
DO280OpenShift命令及故障排查--常见故障排除和章节实验
美团好文:从预编译的角度理解Swift与Objective-C及混编机制
Can bus Basics
ZK has a node reporting it is possible not running and the log has no obvious error
Ad9 tips
Static two position relay xjls-84/440/dc220v
E45: ‘readonly‘ option is set (add ! to override)
What are the types of independent station chat robots? How to quickly create your own free chat robot? It only takes 3 seconds!
左值与右值
altium designer 09丝印靠近焊盘显示绿色警告,如何阻止其报警?
元数据管理Apache Atlas编译(内嵌式)部署及遇到各种错误记录
gson TypeAdapter 适配器
Abnova 血液总核酸纯化试剂盒方案
Openjudge noi 1.13 51: ancient password