未验证 提交 a561a618 编写于 作者: J JinyLeeChina 提交者: GitHub

[Feature][JsonSplit-api] api of processDefinition create/update (#5602)

* processDefinition create/update

* fix codeStyle

* fix codeStyle

* fix ut
Co-authored-by: NJinyLeeChina <297062848@qq.com>
上级 cc9e5d5d
......@@ -95,16 +95,18 @@ public class ProcessDefinitionController extends BaseController {
* @param loginUser login user
* @param projectName project name
* @param name process definition name
* @param json process definition json
* @param description description
* @param locations locations for nodes
* @param globalParams globalParams
* @param connects connects for nodes
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @return create result code
*/
@ApiOperation(value = "save", notes = "CREATE_PROCESS_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, type = "String"),
@ApiImplicitParam(name = "processDefinitionJson", value = "PROCESS_DEFINITION_JSON", required = true, type = "String"),
@ApiImplicitParam(name = "locations", value = "PROCESS_DEFINITION_LOCATIONS", required = true, type = "String"),
@ApiImplicitParam(name = "connects", value = "PROCESS_DEFINITION_CONNECTS", required = true, type = "String"),
@ApiImplicitParam(name = "description", value = "PROCESS_DEFINITION_DESC", required = false, type = "String"),
......@@ -116,13 +118,16 @@ public class ProcessDefinitionController extends BaseController {
public Result createProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "name", required = true) String name,
@RequestParam(value = "processDefinitionJson", required = true) String json,
@RequestParam(value = "locations", required = true) String locations,
@RequestParam(value = "connects", required = true) String connects,
@RequestParam(value = "description", required = false) String description) throws JsonProcessingException {
@RequestParam(value = "description", required = false) String description,
@RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams,
@RequestParam(value = "connects", required = false) String connects,
@RequestParam(value = "locations", required = false) String locations,
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "tenantCode", required = true) String tenantCode,
@RequestParam(value = "taskRelationJson", required = true) String taskRelationJson) throws JsonProcessingException {
Map<String, Object> result = processDefinitionService.createProcessDefinition(loginUser, projectName, name, json,
description, locations, connects);
Map<String, Object> result = processDefinitionService.createProcessDefinition(loginUser, projectName, name, description, globalParams,
connects, locations, timeout, tenantCode, taskRelationJson);
return returnDataList(result);
}
......@@ -207,19 +212,21 @@ public class ProcessDefinitionController extends BaseController {
* @param loginUser login user
* @param projectName project name
* @param name process definition name
* @param id process definition id
* @param processDefinitionJson process definition json
* @param code process definition code
* @param description description
* @param locations locations for nodes
* @param globalParams globalParams
* @param connects connects for nodes
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @return update result code
*/
@ApiOperation(value = "updateProcessDefinition", notes = "UPDATE_PROCESS_DEFINITION_NOTES")
@ApiOperation(value = "update", notes = "UPDATE_PROCESS_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, type = "String"),
@ApiImplicitParam(name = "id", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "processDefinitionJson", value = "PROCESS_DEFINITION_JSON", required = true, type = "String"),
@ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "123456789"),
@ApiImplicitParam(name = "locations", value = "PROCESS_DEFINITION_LOCATIONS", required = true, type = "String"),
@ApiImplicitParam(name = "connects", value = "PROCESS_DEFINITION_CONNECTS", required = true, type = "String"),
@ApiImplicitParam(name = "description", value = "PROCESS_DEFINITION_DESC", required = false, type = "String"),
......@@ -232,15 +239,18 @@ public class ProcessDefinitionController extends BaseController {
public Result updateProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "name", required = true) String name,
@RequestParam(value = "id", required = true) int id,
@RequestParam(value = "processDefinitionJson", required = true) String processDefinitionJson,
@RequestParam(value = "locations", required = false) String locations,
@RequestParam(value = "connects", required = false) String connects,
@RequestParam(value = "code", required = true) long code,
@RequestParam(value = "description", required = false) String description,
@RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams,
@RequestParam(value = "connects", required = false) String connects,
@RequestParam(value = "locations", required = false) String locations,
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "tenantCode", required = true) String tenantCode,
@RequestParam(value = "taskRelationJson", required = true) String taskRelationJson,
@RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) {
Map<String, Object> result = processDefinitionService.updateProcessDefinition(loginUser, projectName, id, name,
processDefinitionJson, description, locations, connects);
Map<String, Object> result = processDefinitionService.updateProcessDefinition(loginUser, projectName, name, code, description, globalParams,
connects, locations, timeout, tenantCode, taskRelationJson);
// If the update fails, the result will be returned directly
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return returnDataList(result);
......@@ -248,7 +258,7 @@ public class ProcessDefinitionController extends BaseController {
// Judge whether to go online after editing,0 means offline, 1 means online
if (releaseState == ReleaseState.ONLINE) {
result = processDefinitionService.releaseProcessDefinition(loginUser, projectName, id, releaseState);
result = processDefinitionService.releaseProcessDefinition(loginUser, projectName, code, releaseState);
}
return returnDataList(result);
}
......@@ -342,14 +352,14 @@ public class ProcessDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectName project name
* @param processId process definition id
* @param code process definition code
* @param releaseState release state
* @return release result code
*/
@ApiOperation(value = "releaseProcessDefinition", notes = "RELEASE_PROCESS_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, type = "String"),
@ApiImplicitParam(name = "processId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "123456789"),
@ApiImplicitParam(name = "releaseState", value = "PROCESS_DEFINITION_CONNECTS", required = true, dataType = "ReleaseState"),
})
@PostMapping(value = "/release")
......@@ -358,10 +368,10 @@ public class ProcessDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result releaseProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "processId", required = true) int processId,
@RequestParam(value = "code", required = true) long code,
@RequestParam(value = "releaseState", required = true) ReleaseState releaseState) {
Map<String, Object> result = processDefinitionService.releaseProcessDefinition(loginUser, projectName, processId, releaseState);
Map<String, Object> result = processDefinitionService.releaseProcessDefinition(loginUser, projectName, code, releaseState);
return returnDataList(result);
}
......@@ -390,7 +400,7 @@ public class ProcessDefinitionController extends BaseController {
}
/**
* query datail of process definition by name
* query detail of process definition by name
*
* @param loginUser login user
* @param projectName project name
......
......@@ -270,6 +270,7 @@ public enum Status {
PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"),
PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"),
CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/**
......
......@@ -40,20 +40,26 @@ public interface ProcessDefinitionService {
* @param loginUser login user
* @param projectName project name
* @param name process definition name
* @param processDefinitionJson process definition json
* @param desc description
* @param locations locations for nodes
* @param description description
* @param globalParams global params
* @param connects connects for nodes
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @return create result code
* @throws JsonProcessingException JsonProcessingException
*/
Map<String, Object> createProcessDefinition(User loginUser,
String projectName,
String name,
String processDefinitionJson,
String desc,
String description,
String globalParams,
String connects,
String locations,
String connects) throws JsonProcessingException;
int timeout,
String tenantCode,
String taskRelationJson) throws JsonProcessingException;
/**
* query process definition list
......@@ -141,19 +147,27 @@ public interface ProcessDefinitionService {
* @param loginUser login user
* @param projectName project name
* @param name process definition name
* @param id process definition id
* @param processDefinitionJson process definition json
* @param desc description
* @param locations locations for nodes
* @param code process definition code
* @param description description
* @param globalParams global params
* @param connects connects for nodes
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @return update result code
*/
Map<String, Object> updateProcessDefinition(User loginUser,
String projectName,
int id,
String name,
String processDefinitionJson, String desc,
String locations, String connects);
long code,
String description,
String globalParams,
String connects,
String locations,
int timeout,
String tenantCode,
String taskRelationJson);
/**
* verify process definition name unique
......@@ -184,13 +198,13 @@ public interface ProcessDefinitionService {
*
* @param loginUser login user
* @param projectName project name
* @param id process definition id
* @param code process definition code
* @param releaseState release state
* @return release result code
*/
Map<String, Object> releaseProcessDefinition(User loginUser,
String projectName,
int id,
long code,
ReleaseState releaseState);
/**
......@@ -299,6 +313,7 @@ public interface ProcessDefinitionService {
*/
Map<String, Object> deleteByProcessDefinitionIdAndVersion(User loginUser, String projectName,
int processDefinitionId, long version);
/**
* check has associated process definition
*
......
......@@ -56,11 +56,13 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
......@@ -69,6 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -159,27 +162,36 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private SchedulerService schedulerService;
@Autowired
private TenantMapper tenantMapper;
/**
* create process definition
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionName process definition name
* @param processDefinitionJson process definition json
* @param desc description
* @param locations locations for nodes
* @param name process definition name
* @param description description
* @param globalParams global params
* @param connects connects for nodes
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @return create result code
*/
@Override
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> createProcessDefinition(User loginUser,
String projectName,
String processDefinitionName,
String processDefinitionJson,
String desc,
String name,
String description,
String globalParams,
String connects,
String locations,
String connects) {
int timeout,
String tenantCode,
String taskRelationJson) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
......@@ -190,34 +202,74 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return checkResult;
}
ProcessDefinition processDefinition = new ProcessDefinition();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);
if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkProcessJson;
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkRelationJson;
}
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
long processDefinitionCode;
try {
long processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
processDefinition.setCode(processDefinitionCode);
processDefinition.setVersion(1);
processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
} catch (SnowFlakeException e) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
return result;
}
int saveResult = processService.saveProcessDefinition(loginUser, project, processDefinitionName, desc,
locations, connects, processData, processDefinition, true);
int insertVersion = processService.saveProcessDefine(loginUser, project, name, description, globalParams,
locations, connects, timeout, tenant.getId(), processDefinitionCode, 0, true);
if (saveResult > 0) {
putMsg(result, Status.SUCCESS);
// return processDefinition object with ID
result.put(Constants.DATA_LIST, processDefinition.getId());
if (insertVersion > 0) {
int insertResult = processService.saveTaskRelation(loginUser, project.getCode(), processDefinitionCode, insertVersion, taskRelationList);
if (insertResult > 0) {
putMsg(result, Status.SUCCESS);
// return processDefinitionCode
result.put(Constants.DATA_LIST, processDefinitionCode);
} else {
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
}
} else {
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
}
return result;
}
private Map<String, Object> checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList, String taskRelationJson) {
Map<String, Object> result = new HashMap<>();
try {
if (taskRelationList == null || taskRelationList.isEmpty()) {
logger.error("task relation list is null");
putMsg(result, Status.DATA_IS_NOT_VALID, taskRelationJson);
return result;
}
// TODO check has cycle
// if (graphHasCycle(taskRelationList)) {
// logger.error("process DAG has cycle");
// putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
// return result;
// }
// check whether the task relation json is normal
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
if (processTaskRelationLog.getPostTaskCode() == 0 || processTaskRelationLog.getPostTaskVersion() == 0) {
logger.error("the post_task_code or post_task_version can't be zero");
putMsg(result, Status.CHECK_PROCESS_TASK_RELATION_ERROR);
return result;
}
}
putMsg(result, Status.SUCCESS);
} catch (Exception e) {
result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
result.put(Constants.MSG, e.getMessage());
}
return result;
}
/**
......@@ -364,22 +416,28 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param loginUser login user
* @param projectName project name
* @param name process definition name
* @param id process definition id
* @param processDefinitionJson process definition json
* @param desc description
* @param locations locations for nodes
* @param code process definition code
* @param description description
* @param globalParams global params
* @param connects connects for nodes
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @return update result code
*/
@Override
public Map<String, Object> updateProcessDefinition(User loginUser,
String projectName,
int id,
String name,
String processDefinitionJson,
String desc,
long code,
String description,
String globalParams,
String connects,
String locations,
String connects) {
int timeout,
String tenantCode,
String taskRelationJson) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
......@@ -389,16 +447,22 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return checkResult;
}
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);
if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) {
return checkProcessJson;
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkRelationJson;
}
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
// TODO processDefinitionMapper.queryByCode
ProcessDefinition processDefinition = processService.findProcessDefineById(id);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
// check process definition exists
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
......@@ -406,21 +470,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
return result;
}
if (!name.equals(processDefinition.getName())) {
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
}
ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc,
locations, connects, newProcessData, processDefinition, true);
if (saveResult > 0) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
int insertVersion = processService.saveProcessDefine(loginUser, project, name, description, globalParams,
locations, connects, timeout, tenant.getId(), code, processDefinition.getId(), true);
if (insertVersion > 0) {
int insertResult = processService.saveTaskRelation(loginUser, project.getCode(), code, insertVersion, taskRelationList);
if (insertResult > 0) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
} else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
......@@ -536,13 +596,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*
* @param loginUser login user
* @param projectName project name
* @param id process definition id
* @param code process definition code
* @param releaseState release state
* @return release result code
*/
@Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> releaseProcessDefinition(User loginUser, String projectName, int id, ReleaseState releaseState) {
public Map<String, Object> releaseProcessDefinition(User loginUser, String projectName, long code, ReleaseState releaseState) {
HashMap<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
......@@ -558,7 +618,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.selectById(id);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
switch (releaseState) {
case ONLINE:
......@@ -587,7 +647,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
);
for (Schedule schedule : scheduleList) {
logger.info("set schedule offline, project id: {}, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id);
logger.info("set schedule offline, project id: {}, schedule id: {}, process definition code: {}", project.getId(), schedule.getId(), code);
// set status
schedule.setReleaseState(ReleaseState.OFFLINE);
scheduleMapper.updateById(schedule);
......@@ -833,7 +893,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processMeta,
processDefinitionName,
processDefinitionId);
}
/**
......@@ -847,13 +906,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
String importProcessParam) {
Map<String, Object> createProcessResult = null;
try {
createProcessResult = createProcessDefinition(loginUser
, currentProjectName,
processDefinitionName + "_import_" + DateUtils.getCurrentTimeStamp(),
importProcessParam,
processMeta.getProcessDefinitionDescription(),
processMeta.getProcessDefinitionLocations(),
processMeta.getProcessDefinitionConnects());
// TODO import
// createProcessResult = createProcessDefinition(loginUser
// , currentProjectName,
// processDefinitionName + "_import_" + DateUtils.getCurrentTimeStamp(),
// importProcessParam,
// processMeta.getProcessDefinitionDescription(),
// processMeta.getProcessDefinitionLocations(),
// processMeta.getProcessDefinitionConnects());
putMsg(result, Status.SUCCESS);
} catch (Exception e) {
logger.error("import process meta json data: {}", e.getMessage(), e);
......@@ -1049,13 +1109,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
try {
createProcessDefinition(loginUser
, targetProject.getName(),
subProcess.getName(),
subProcessJson,
subProcess.getDescription(),
subProcess.getLocations(),
subProcess.getConnects());
// TODO import subProcess
// createProcessDefinition(loginUser
// , targetProject.getName(),
// subProcess.getName(),
// subProcessJson,
// subProcess.getDescription(),
// subProcess.getLocations(),
// subProcess.getConnects());
logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), subProcess.getName());
} catch (Exception e) {
......@@ -1424,14 +1485,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
processData.setTasks(taskNodeList);
String processDefinitionJson = JSONUtils.toJsonString(processData);
return createProcessDefinition(
loginUser,
targetProject.getName(),
processDefinition.getName() + "_copy_" + currentTimeStamp,
processDefinitionJson,
processDefinition.getDescription(),
locationsJN.toString(),
processDefinition.getConnects());
// TODO copy process
// return createProcessDefinition(
// loginUser,
// targetProject.getName(),
// processDefinition.getName() + "_copy_" + currentTimeStamp,
// processDefinitionJson,
// processDefinition.getDescription(),
// locationsJN.toString(),
// processDefinition.getConnects());
// TODO remove
return result;
}
}
......
......@@ -278,26 +278,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
return processInstanceMapper.queryByTenantIdAndStatus(tenant.getId(), Constants.NOT_TERMINATED_STATES);
}
/**
* query tenant list
*
* @param tenantCode tenant code
* @return tenant list
*/
public Map<String, Object> queryTenantList(String tenantCode) {
Map<String, Object> result = new HashMap<>();
List<Tenant> resourceList = tenantMapper.queryByTenantCode(tenantCode);
if (CollectionUtils.isNotEmpty(resourceList)) {
result.put(Constants.DATA_LIST, resourceList);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.TENANT_NOT_EXIST);
}
return result;
}
/**
* query tenant list
*
......
......@@ -73,26 +73,27 @@ public class ProcessDefinitionControllerTest {
@Test
public void testCreateProcessDefinition() throws Exception {
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\""
+ ":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\"
+ "necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\""
+ ",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]";
String projectName = "test";
String name = "dag_test";
String description = "desc test";
String globalParams = "[]";
String connects = "[]";
String locations = "[]";
int timeout = 0;
String tenantCode = "root";
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1);
Mockito.when(processDefinitionService.createProcessDefinition(user, projectName, name, json,
description, locations, connects)).thenReturn(result);
Mockito.when(processDefinitionService.createProcessDefinition(user, projectName, name, description, globalParams,
connects, locations, timeout, tenantCode, json)).thenReturn(result);
Result response = processDefinitionController.createProcessDefinition(user, projectName, name, json,
locations, connects, description);
Result response = processDefinitionController.createProcessDefinition(user, projectName, name, description, globalParams,
connects, locations, timeout, tenantCode, json);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
......@@ -121,28 +122,28 @@ public class ProcessDefinitionControllerTest {
}
@Test
public void updateProcessDefinition() throws Exception {
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\""
+ ",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"}"
+ ",\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\""
+ ":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\""
+ ":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
public void updateProcessDefinition() {
String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String projectName = "test";
String name = "dag_test";
String description = "desc test";
String connects = "[]";
int id = 1;
String globalParams = "[]";
int timeout = 0;
String tenantCode = "root";
long code = 123L;
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
result.put("processDefinitionId", 1);
Mockito.when(processDefinitionService.updateProcessDefinition(user, projectName, id, name, json,
description, locations, connects)).thenReturn(result);
Mockito.when(processDefinitionService.updateProcessDefinition(user, projectName, name, code, description, globalParams,
connects, locations, timeout, tenantCode, json)).thenReturn(result);
Result response = processDefinitionController.updateProcessDefinition(user, projectName, name, id, json,
locations, connects, description,ReleaseState.OFFLINE);
Result response = processDefinitionController.updateProcessDefinition(user, projectName, name, code, description, globalParams,
connects, locations, timeout, tenantCode, json, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
......
......@@ -597,7 +597,7 @@ public class ProcessDefinitionServiceTest {
// project check auth success, processs definition online
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition());
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition(
loginUser, "project_test1", 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
......@@ -1035,10 +1035,10 @@ public class ProcessDefinitionServiceTest {
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectName, 1, "test",
sqlDependentJson, "", "", "");
Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectName, "test", 1,
"", "", "", "", 0, "root", sqlDependentJson);
Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, updateResult.get(Constants.STATUS));
Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS));
}
@Test
......@@ -1112,7 +1112,7 @@ public class ProcessDefinitionServiceTest {
processDefinition.setProjectId(2);
processDefinition.setTenantId(1);
processDefinition.setDescription("");
processDefinition.setCode(9999L);
processDefinition.setCode(46L);
return processDefinition;
}
......
......@@ -44,4 +44,12 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper<ProcessTaskRela
@Param("processVersion") int processVersion,
@Param("taskCode") long taskCode,
@Param("taskVersion") long taskVersion);
/**
* batch insert process task relation
*
* @param taskRelationList taskRelationList
* @return int
*/
int batchInsert(@Param("taskRelationList") List<ProcessTaskRelationLog> taskRelationList);
}
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.ibatis.annotations.Param;
......@@ -65,4 +66,12 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
*/
int deleteByCode(@Param("projectCode") Long projectCode,
@Param("processCode") Long processCode);
/**
* batch insert process task relation
*
* @param taskRelationList taskRelationList
* @return int
*/
int batchInsert(@Param("taskRelationList") List<ProcessTaskRelationLog> taskRelationList);
}
......@@ -38,9 +38,9 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/**
* query tenant by code
* @param tenantCode tenantCode
* @return tenant list
* @return tenant
*/
List<Tenant> queryByTenantCode(@Param("tenantCode") String tenantCode);
Tenant queryByTenantCode(@Param("tenantCode") String tenantCode);
/**
* tenant page
......
......@@ -29,8 +29,7 @@
WHERE process_definition_code = #{processCode}
and process_definition_version = #{processVersion}
</select>
<select id="queryByTaskRelationList"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
<select id="queryByTaskRelationList" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
select
<include refid="baseSql"/>
from t_ds_process_task_relation_log
......@@ -39,4 +38,16 @@
and post_task_code = #{taskCode}
and post_task_version = #{taskVersion}
</select>
<insert id="batchInsert">
insert into t_ds_process_task_relation_log (`name`, process_definition_version, project_code, process_definition_code,
pre_task_code, pre_task_version, post_task_code, post_task_version, condition_type, condition_params, operator, operate_time,
create_time, update_time)
values
<foreach collection="taskRelationList" item="relation" separator=",">
(#{relation.name},#{relation.process_definition_version},#{relation.project_code},#{relation.process_definition_code},
#{relation.pre_task_code},#{relation.pre_task_version},#{relation.post_task_code},#{relation.post_task_version},
#{relation.condition_type},#{relation.condition_params},#{relation.operator},#{relation.operate_time},
#{relation.create_time},#{relation.update_time})
</foreach>
</insert>
</mapper>
......@@ -59,4 +59,14 @@
WHERE project_code = #{projectCode}
and process_definition_code = #{processCode}
</delete>
<insert id="batchInsert">
insert into t_ds_process_task_relation (`name`, process_definition_version, project_code, process_definition_code,
pre_task_code, pre_task_version, post_task_code, post_task_version, condition_type, condition_params, create_time, update_time)
values
<foreach collection="taskRelationList" item="relation" separator=",">
(#{relation.name},#{relation.process_definition_version},#{relation.project_code},#{relation.process_definition_code},
#{relation.pre_task_code},#{relation.pre_task_version},#{relation.post_task_code},#{relation.post_task_version},
#{relation.condition_type},#{relation.condition_params},#{relation.create_time},#{relation.update_time})
</foreach>
</insert>
</mapper>
......@@ -119,12 +119,10 @@ public class TenantMapperTest {
*/
@Test
public void testQueryByTenantCode() {
Tenant tenant = insertOne();
tenant.setTenantCode("ut code");
tenantMapper.updateById(tenant);
List<Tenant> tenantList = tenantMapper.queryByTenantCode("ut code");
Assert.assertEquals(1, tenantList.size());
Assert.assertNotNull(tenantMapper.queryByTenantCode("ut code"));
}
/**
......
......@@ -2225,6 +2225,69 @@ public class ProcessService {
/**
* save processDefinition (including create or update processDefinition)
*/
public int saveProcessDefine(User operator, Project project, String name, String description, String globalParams,
String locations, String connects, int timeout, int tenantId, long processDefinitionCode,
int processDefinitionId, Boolean isFromProcessDefine) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionCode);
int insertVersion = version == null || version == 0 ? 1 : version + 1;
processDefinitionLog.setUserId(operator.getId());
processDefinitionLog.setCode(processDefinitionCode);
processDefinitionLog.setVersion(insertVersion);
processDefinitionLog.setName(name);
processDefinitionLog.setFlag(Flag.YES);
processDefinitionLog.setReleaseState(isFromProcessDefine ? ReleaseState.OFFLINE : ReleaseState.ONLINE);
processDefinitionLog.setProjectCode(project.getCode());
processDefinitionLog.setDescription(description);
processDefinitionLog.setGlobalParams(globalParams);
processDefinitionLog.setLocations(locations);
processDefinitionLog.setConnects(connects);
processDefinitionLog.setTimeout(timeout);
processDefinitionLog.setTenantId(tenantId);
processDefinitionLog.setOperator(operator.getId());
Date now = new Date();
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setUpdateTime(now);
processDefinitionLog.setCreateTime(now);
int insertLog = processDefineLogMapper.insert(processDefinitionLog);
int result;
if (0 == processDefinitionId) {
result = processDefineMapper.insert(processDefinitionLog);
} else {
processDefinitionLog.setId(processDefinitionId);
result = processDefineMapper.updateById(processDefinitionLog);
}
return (insertLog & result) > 0 ? insertVersion : 0;
}
/**
* save task relations
*/
public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion,
List<ProcessTaskRelationLog> taskRelationList) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
}
Date now = new Date();
taskRelationList.forEach(processTaskRelationLog -> {
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now);
});
int result = processTaskRelationMapper.batchInsert(taskRelationList);
int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
return result & resultLog;
}
/**
* save processDefinition (including create or update processDefinition)
*/
@Deprecated
public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
String connects, ProcessData processData, ProcessDefinition processDefinition,
Boolean isFromProcessDefine) {
......@@ -2240,6 +2303,7 @@ public class ProcessService {
/**
* save processDefinition
*/
@Deprecated
public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName,
ProcessData processData, Project project,
String desc, String locations, String connects) {
......@@ -2280,6 +2344,7 @@ public class ProcessService {
/**
* handle task definition
*/
@Deprecated
public Map<String, TaskDefinition> handleTaskDefinition(User operator, Long projectCode, List<TaskNode> taskNodes, Boolean isFromProcessDefine) {
if (taskNodes == null) {
return null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册