未验证 提交 bb52671f 编写于 作者: Q qiaozhanwei 提交者: GitHub

Worker Group display #2627 (#2630)

* dispatch task fail will set task status failed

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* send mail error, #2466 bug fix

* send mail error, #2466 bug fix

* send mail error, #2466 bug fix

* send mail error, #2466 bug fix

* #2486 bug fix

* host and workergroup compatible

* EnterpriseWeChatUtils modify

* EnterpriseWeChatUtils modify

* EnterpriseWeChatUtils modify

* #2499 bug fix

* add comment

* revert comment

* revert comment

* #2499 buf fix

* #2499 bug fix

* #2499 bug fix

* #2499 bug fix

* #2499 bug fix

* #2499 bug fix

* #2499 bug fix

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* No master don't create command #2571

* No master don't create command #2571

* No master don't create command #2571

* Worker Group display #2627

* Worker Group display #2627

* Worker Group display #2627

* Worker Group display #2627

* Worker Group display #2627

* Worker Group display #2627
Co-authored-by: Nqiaozhanwei <qiaozhanwei@analysys.com.cn>
上级 8c8e128d
...@@ -52,35 +52,7 @@ public class WorkerGroupController extends BaseController { ...@@ -52,35 +52,7 @@ public class WorkerGroupController extends BaseController {
WorkerGroupService workerGroupService; WorkerGroupService workerGroupService;
/**
* create or update a worker group
*
* @param loginUser login user
* @param id worker group id
* @param name worker group name
* @param ipList ip list
* @return create or update result code
*/
@ApiOperation(value = "saveWorkerGroup", notes = "CREATE_WORKER_GROUP_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"),
@ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "ipList", value = "WORKER_IP_LIST", required = true, dataType = "String")
})
@PostMapping(value = "/save")
@ResponseStatus(HttpStatus.OK)
@ApiException(SAVE_ERROR)
public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false, defaultValue = "0") int id,
@RequestParam(value = "name") String name,
@RequestParam(value = "ipList") String ipList
) {
logger.info("save worker group: login user {}, id:{}, name: {}, ipList: {} ",
loginUser.getUserName(), id, name, ipList);
Map<String, Object> result = workerGroupService.saveWorkerGroup(loginUser, id, name, ipList);
return returnDataList(result);
}
/** /**
* query worker groups paging * query worker groups paging
...@@ -132,28 +104,5 @@ public class WorkerGroupController extends BaseController { ...@@ -132,28 +104,5 @@ public class WorkerGroupController extends BaseController {
return returnDataList(result); return returnDataList(result);
} }
/**
* delete worker group by id
*
* @param loginUser login user
* @param id group id
* @return delete result code
*/
@ApiOperation(value = "deleteById", notes = "DELETE_WORKER_GROUP_BY_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", required = true, dataType = "Int", example = "10"),
})
@GetMapping(value = "/delete-by-id")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_WORKER_GROUP_FAIL)
public Result deleteById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("id") Integer id
) {
logger.info("delete worker group: login user {}, id:{} ",
loginUser.getUserName(), id);
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(id);
return returnDataList(result);
}
} }
...@@ -96,9 +96,6 @@ public class ProcessDefinitionService extends BaseDAGService { ...@@ -96,9 +96,6 @@ public class ProcessDefinitionService extends BaseDAGService {
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Autowired
private WorkerGroupMapper workerGroupMapper;
/** /**
* create process definition * create process definition
* *
...@@ -310,14 +307,14 @@ public class ProcessDefinitionService extends BaseDAGService { ...@@ -310,14 +307,14 @@ public class ProcessDefinitionService extends BaseDAGService {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
return result; return result;
} else { } else {
return createProcessDefinition( return createProcessDefinition(
loginUser, loginUser,
projectName, projectName,
processDefinition.getName()+"_copy_"+System.currentTimeMillis(), processDefinition.getName()+"_copy_"+System.currentTimeMillis(),
processDefinition.getProcessDefinitionJson(), processDefinition.getProcessDefinitionJson(),
processDefinition.getDescription(), processDefinition.getDescription(),
processDefinition.getLocations(), processDefinition.getLocations(),
processDefinition.getConnects()); processDefinition.getConnects());
} }
} }
...@@ -408,19 +405,19 @@ public class ProcessDefinitionService extends BaseDAGService { ...@@ -408,19 +405,19 @@ public class ProcessDefinitionService extends BaseDAGService {
public Map<String, Object> verifyProcessDefinitionName(User loginUser, String projectName, String name) { public Map<String, Object> verifyProcessDefinitionName(User loginUser, String projectName, String name) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS); Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) { if (resultEnum != Status.SUCCESS) {
return checkResult; return checkResult;
} }
ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), name); ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), name);
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.PROCESS_INSTANCE_EXIST, name); putMsg(result, Status.PROCESS_INSTANCE_EXIST, name);
} }
return result; return result;
} }
......
...@@ -91,8 +91,7 @@ public class ProcessInstanceService extends BaseDAGService { ...@@ -91,8 +91,7 @@ public class ProcessInstanceService extends BaseDAGService {
@Autowired @Autowired
LoggerService loggerService; LoggerService loggerService;
@Autowired
WorkerGroupMapper workerGroupMapper;
@Autowired @Autowired
UsersService usersService; UsersService usersService;
......
...@@ -16,24 +16,24 @@ ...@@ -16,24 +16,24 @@
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.AccessToken;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
/** /**
* work group service * work group service
...@@ -42,90 +42,13 @@ import java.util.*; ...@@ -42,90 +42,13 @@ import java.util.*;
public class WorkerGroupService extends BaseService { public class WorkerGroupService extends BaseService {
@Autowired
WorkerGroupMapper workerGroupMapper;
@Autowired @Autowired
ProcessInstanceMapper processInstanceMapper; ProcessInstanceMapper processInstanceMapper;
@Autowired @Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator; protected ZookeeperCachedOperator zookeeperCachedOperator;
/**
* create or update a worker group
*
* @param loginUser login user
* @param id worker group id
* @param name worker group name
* @param ipList ip list
* @return create or update result code
*/
public Map<String, Object> saveWorkerGroup(User loginUser,int id, String name, String ipList){
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (checkAdmin(loginUser, result)){
return result;
}
if(StringUtils.isEmpty(name)){
putMsg(result, Status.NAME_NULL);
return result;
}
Date now = new Date();
WorkerGroup workerGroup = null;
if(id != 0){
workerGroup = workerGroupMapper.selectById(id);
//check exist
if (workerGroup == null){
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
}
}else{
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
}
workerGroup.setName(name);
workerGroup.setIpList(ipList);
workerGroup.setUpdateTime(now);
if(checkWorkerGroupNameExists(workerGroup)){
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
return result;
}
if(workerGroup.getId() != 0 ){
workerGroupMapper.updateById(workerGroup);
}else{
workerGroupMapper.insert(workerGroup);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check worker group name exists
* @param workerGroup
* @return
*/
private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) {
List<WorkerGroup> workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName());
if(CollectionUtils.isNotEmpty(workerGroupList)){
// new group has same name..
if(workerGroup.getId() == 0){
return true;
}
// update group...
for(WorkerGroup group : workerGroupList){
if(group.getId() != workerGroup.getId()){
return true;
}
}
}
return false;
}
/** /**
* query worker group paging * query worker group paging
...@@ -138,66 +61,100 @@ public class WorkerGroupService extends BaseService { ...@@ -138,66 +61,100 @@ public class WorkerGroupService extends BaseService {
*/ */
public Map<String,Object> queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) { public Map<String,Object> queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) {
// list from index
Integer fromIndex = (pageNo - 1) * pageSize;
// list to index
Integer toIndex = (pageNo - 1) * pageSize + pageSize;
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
if (checkAdmin(loginUser, result)) { if (checkAdmin(loginUser, result)) {
return result; return result;
} }
Page<WorkerGroup> page = new Page(pageNo, pageSize); List<WorkerGroup> workerGroups = getWorkerGroups(true);
IPage<WorkerGroup> workerGroupIPage = workerGroupMapper.queryListPaging(
page, searchVal); List<WorkerGroup> resultDataList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(workerGroups)){
List<WorkerGroup> searchValDataList = new ArrayList<>();
if (StringUtils.isNotEmpty(searchVal)){
for (WorkerGroup workerGroup : workerGroups){
if (workerGroup.getName().contains(searchVal)){
searchValDataList.add(workerGroup);
}
}
}else {
searchValDataList = workerGroups;
}
if (searchValDataList.size() < pageSize){
toIndex = (pageNo - 1) * pageSize + searchValDataList.size();
}
resultDataList = searchValDataList.subList(fromIndex, toIndex);
}
PageInfo<WorkerGroup> pageInfo = new PageInfo<>(pageNo, pageSize); PageInfo<WorkerGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount((int)workerGroupIPage.getTotal()); pageInfo.setTotalCount(resultDataList.size());
pageInfo.setLists(workerGroupIPage.getRecords()); pageInfo.setLists(resultDataList);
result.put(Constants.DATA_LIST, pageInfo); result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
/** /**
* delete worker group by id * query all worker group
* @param id worker group id *
* @return delete result code * @return all worker group list
*/ */
@Transactional(rollbackFor = Exception.class) public Map<String,Object> queryAllGroup() {
public Map<String,Object> deleteWorkerGroupById(Integer id) { Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5); List<WorkerGroup> workerGroups = getWorkerGroups(false);
List<ProcessInstance> processInstances = processInstanceMapper.queryByWorkerGroupIdAndStatus(id, Constants.NOT_TERMINATED_STATES); Set<String> availableWorkerGroupSet = workerGroups.stream()
if(CollectionUtils.isNotEmpty(processInstances)){ .map(workerGroup -> workerGroup.getName())
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size()); .collect(Collectors.toSet());
return result; result.put(Constants.DATA_LIST, availableWorkerGroupSet);
}
workerGroupMapper.deleteById(id);
processInstanceMapper.updateProcessInstanceByWorkerGroupId(id, Constants.DEFAULT_WORKER_ID);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
/** /**
* query all worker group * get worker groups
* *
* @return all worker group list * @param isPaging whether paging
* @return WorkerGroup list
*/ */
public Map<String,Object> queryAllGroup() { private List<WorkerGroup> getWorkerGroups(boolean isPaging) {
Map<String, Object> result = new HashMap<>();
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker"; String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
// available workerGroup list // available workerGroup list
List<String> availableWorkerGroupList = new ArrayList<>(); List<String> availableWorkerGroupList = new ArrayList<>();
List<WorkerGroup> workerGroups = new ArrayList<>();
for (String workerGroup : workerGroupList){ for (String workerGroup : workerGroupList){
String workerGroupPath= workerPath + "/" + workerGroup; String workerGroupPath= workerPath + "/" + workerGroup;
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
if (CollectionUtils.isNotEmpty(childrenNodes)){ if (CollectionUtils.isNotEmpty(childrenNodes)){
availableWorkerGroupList.add(workerGroup); availableWorkerGroupList.add(workerGroup);
WorkerGroup wg = new WorkerGroup();
wg.setName(workerGroup);
if (isPaging){
wg.setIpList(childrenNodes);
String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0));
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[3]));
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[4]));
}
workerGroups.add(wg);
} }
} }
return workerGroups;
result.put(Constants.DATA_LIST, availableWorkerGroupList);
putMsg(result, Status.SUCCESS);
return result;
} }
} }
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.ApiApplicationServer;
...@@ -29,9 +28,7 @@ import org.apache.dolphinscheduler.common.utils.FileUtils; ...@@ -29,9 +28,7 @@ import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.apache.http.entity.ContentType; import org.apache.http.entity.ContentType;
import org.json.JSONException; import org.json.JSONException;
import org.junit.Assert; import org.junit.Assert;
...@@ -41,12 +38,8 @@ import org.mockito.InjectMocks; ...@@ -41,12 +38,8 @@ import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.quartz.Scheduler;
import org.skyscreamer.jsonassert.JSONAssert; import org.skyscreamer.jsonassert.JSONAssert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;
import org.springframework.mock.web.MockMultipartFile; import org.springframework.mock.web.MockMultipartFile;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
...@@ -59,7 +52,6 @@ import java.util.*; ...@@ -59,7 +52,6 @@ import java.util.*;
@RunWith(MockitoJUnitRunner.Silent.class) @RunWith(MockitoJUnitRunner.Silent.class)
@SpringBootTest(classes = ApiApplicationServer.class) @SpringBootTest(classes = ApiApplicationServer.class)
public class ProcessDefinitionServiceTest { public class ProcessDefinitionServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceTest.class);
@InjectMocks @InjectMocks
ProcessDefinitionService processDefinitionService; ProcessDefinitionService processDefinitionService;
...@@ -79,8 +71,7 @@ public class ProcessDefinitionServiceTest { ...@@ -79,8 +71,7 @@ public class ProcessDefinitionServiceTest {
@Mock @Mock
private ScheduleMapper scheduleMapper; private ScheduleMapper scheduleMapper;
@Mock
private WorkerGroupMapper workerGroupMapper;
@Mock @Mock
private ProcessService processService; private ProcessService processService;
...@@ -347,7 +338,7 @@ public class ProcessDefinitionServiceTest { ...@@ -347,7 +338,7 @@ public class ProcessDefinitionServiceTest {
//release error code //release error code
Map<String, Object> failRes = processDefinitionService.releaseProcessDefinition(loginUser, "project_test1", Map<String, Object> failRes = processDefinitionService.releaseProcessDefinition(loginUser, "project_test1",
46, 2); 46, 2);
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS)); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS));
//FIXME has function exit code 1 when exception //FIXME has function exit code 1 when exception
...@@ -530,7 +521,6 @@ public class ProcessDefinitionServiceTest { ...@@ -530,7 +521,6 @@ public class ProcessDefinitionServiceTest {
@Test @Test
public void testExportProcessMetaDataStr() { public void testExportProcessMetaDataStr() {
Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(getSchedulerList()); Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(getSchedulerList());
Mockito.when(workerGroupMapper.selectById(-1)).thenReturn(null);
ProcessDefinition processDefinition = getProcessDefinition(); ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(sqlDependentJson); processDefinition.setProcessDefinitionJson(sqlDependentJson);
...@@ -573,17 +563,14 @@ public class ProcessDefinitionServiceTest { ...@@ -573,17 +563,14 @@ public class ProcessDefinitionServiceTest {
WorkerGroup workerGroup = new WorkerGroup(); WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setName("ds-test-workergroup"); workerGroup.setName("ds-test-workergroup");
workerGroup.setId(2);
List<WorkerGroup> workerGroups = new ArrayList<>(); List<WorkerGroup> workerGroups = new ArrayList<>();
workerGroups.add(workerGroup); workerGroups.add(workerGroup);
Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(workerGroups);
processMetaCron.setScheduleWorkerGroupName("ds-test"); processMetaCron.setScheduleWorkerGroupName("ds-test");
int insertFlagWorker = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, int insertFlagWorker = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron,
processDefinitionName, processDefinitionId); processDefinitionName, processDefinitionId);
Assert.assertEquals(0, insertFlagWorker); Assert.assertEquals(0, insertFlagWorker);
Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(null);
int workerNullFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, int workerNullFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron,
processDefinitionName, processDefinitionId); processDefinitionName, processDefinitionId);
Assert.assertEquals(0, workerNullFlag); Assert.assertEquals(0, workerNullFlag);
...@@ -659,7 +646,7 @@ public class ProcessDefinitionServiceTest { ...@@ -659,7 +646,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-4")).thenReturn(null); Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-4")).thenReturn(null);
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "testProject")).thenReturn(shellDefinition2); Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "testProject")).thenReturn(shellDefinition2);
processDefinitionService.importSubProcess(loginUser,testProject,jsonArray,subProcessIdMap); processDefinitionService.importSubProcess(loginUser,testProject, jsonArray, subProcessIdMap);
String correctSubJson = jsonArray.toString(); String correctSubJson = jsonArray.toString();
...@@ -667,60 +654,32 @@ public class ProcessDefinitionServiceTest { ...@@ -667,60 +654,32 @@ public class ProcessDefinitionServiceTest {
} }
@Test
public void testCreateProcess() throws IOException{
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String projectName = "test";
String name = "dag_test";
String description = "desc test";
String connects = "[]";
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.SUCCESS);
result.put("processDefinitionId",1);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
Project project = getProject(projectName);
//project not found
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Mockito.when(processDefineMapper.insert(getProcessDefinition())).thenReturn(1);
Map<String, Object> result1 = processDefinitionService.createProcessDefinition(loginUser,projectName,name,json,description,locations,connects);
Assert.assertEquals(Status.SUCCESS,result1.get(Constants.STATUS));
}
@Test @Test
public void testImportProcessDefinitionById() throws IOException { public void testImportProcessDefinitionById() throws IOException {
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; String processJson = "[{\"projectName\":\"testProject\",\"processDefinitionName\":\"shell-4\"," +
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; "\"processDefinitionJson\":\"{\\\"tenantId\\\":1,\\\"globalParams\\\":[]," +
"\\\"tasks\\\":[{\\\"workerGroupId\\\":\\\"default\\\",\\\"description\\\":\\\"\\\",\\\"runFlag\\\":\\\"NORMAL\\\"," +
String projectName = "test"; "\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"#!/bin/bash\\\\necho \\\\\\\"shell-4\\\\\\\"\\\"," +
String name = "dag_test"; "\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}," +
String description = "desc test"; "\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-4\\\"," +
String connects = "[]"; "\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-84090\\\"}," +
Map<String, Object> result = new HashMap<>(5); "{\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-5\\\",\\\"workerGroupId\\\":\\\"default\\\\," +
putMsg(result, Status.SUCCESS); "\\\"description\\\":\\\"\\\",\\\"dependence\\\":{},\\\"preTasks\\\":[\\\"shell-4\\\"],\\\"id\\\":\\\"tasks-87364\\\"," +
result.put("processDefinitionId",1); "\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SUB_PROCESS\\\",\\\"params\\\":{\\\"processDefinitionId\\\":46}," +
"\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}}],\\\"timeout\\\":0}\"," +
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); "\"processDefinitionDescription\":\"\",\"processDefinitionLocations\":\"{\\\"tasks-84090\\\":{\\\"name\\\":\\\"shell-4\\\"," +
User loginUser = new User(); "\\\"targetarr\\\":\\\"\\\",\\\"x\\\":128,\\\"y\\\":114},\\\"tasks-87364\\\":{\\\"name\\\":\\\"shell-5\\\"," +
loginUser.setId(1); "\\\"targetarr\\\":\\\"tasks-84090\\\",\\\"x\\\":266,\\\"y\\\":115}}\"," +
loginUser.setUserType(UserType.ADMIN_USER); "\"processDefinitionConnects\":\"[{\\\"endPointSourceId\\\":\\\"tasks-84090\\\"," +
Project project = getProject(projectName); "\\\"endPointTargetId\\\":\\\"tasks-87364\\\"}]\"}]";
//project not found String subProcessJson = "{\"globalParams\":[]," +
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result); "\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," +
Mockito.when(processDefineMapper.insert(getProcessDefinition())).thenReturn(1); "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo \\\"shell-5\\\"\"},\"description\":\"\"," +
Map<String, Object> result1 = processDefinitionService.createProcessDefinition(loginUser,projectName,name,json,description,locations,connects); "\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":\\\"default\\\\," +
String processJson = "[{\"processDefinitionConnects\":\"[]\",\"processDefinitionJson\":\"{\\\"tenantId\\\":-1,\\\"globalParams\\\":[],\\\"tasks\\\":[{\\\"workerGroupId\\\":-1,\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"aa=\\\\\\\"1234\\\\\\\"\\\\necho ${aa}\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"},\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"ssh_test1\\\",\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-36196\\\",\\\"desc\\\":\\\"\\\"}],\\\"timeout\\\":0}\",\"processDefinitionLocations\":\"{\\\"tasks-36196\\\":{\\\"name\\\":\\\"ssh_test1\\\",\\\"targetarr\\\":\\\"\\\",\\\"x\\\":141,\\\"y\\\":70}}\",\"processDefinitionName\":\"dag_test\",\"projectName\":\"test\"}]"; "\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
FileUtils.writeStringToFile(new File("/tmp/task.json"),processJson); FileUtils.writeStringToFile(new File("/tmp/task.json"),processJson);
...@@ -731,37 +690,45 @@ public class ProcessDefinitionServiceTest { ...@@ -731,37 +690,45 @@ public class ProcessDefinitionServiceTest {
MultipartFile multipartFile = new MockMultipartFile(file.getName(), file.getName(), MultipartFile multipartFile = new MockMultipartFile(file.getName(), file.getName(),
ContentType.APPLICATION_OCTET_STREAM.toString(), fileInputStream); ContentType.APPLICATION_OCTET_STREAM.toString(), fileInputStream);
String currentProjectName = "test"; User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
String currentProjectName = "testProject";
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.SUCCESS, currentProjectName);
ProcessDefinition shellDefinition2 = new ProcessDefinition(); ProcessDefinition shellDefinition2 = new ProcessDefinition();
shellDefinition2.setId(25); shellDefinition2.setId(46);
shellDefinition2.setName("B"); shellDefinition2.setName("shell-5");
shellDefinition2.setProjectId(1); shellDefinition2.setProjectId(2);
shellDefinition2.setProcessDefinitionJson(subProcessJson);
Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName)); Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineId(25)).thenReturn(shellDefinition2); Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
//import process //import process
Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName); // Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName);
//
Assert.assertEquals(Status.SUCCESS, importProcessResult.get(Constants.STATUS)); // Assert.assertEquals(Status.SUCCESS, importProcessResult.get(Constants.STATUS));
//
boolean delete = file.delete(); // boolean delete = file.delete();
Assert.assertTrue(delete);
String processMetaJson = "[]";
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
// //
processMetaJson = "[{\"scheduleWorkerGroupId\":-1}]"; // Assert.assertTrue(delete);
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
processMetaJson = "[{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}]"; // String processMetaJson = "";
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson); // improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
//
// processMetaJson = "{\"scheduleWorkerGroupId\":-1}";
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
//
// processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}";
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
//
// processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}";
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
processMetaJson = "[{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}]";
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
} }
...@@ -773,7 +740,7 @@ public class ProcessDefinitionServiceTest { ...@@ -773,7 +740,7 @@ public class ProcessDefinitionServiceTest {
* @param processMetaJson process meta json * @param processMetaJson process meta json
* @throws IOException IO exception * @throws IOException IO exception
*/ */
private void importProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException { private void improssProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException {
//check null //check null
FileUtils.writeStringToFile(new File("/tmp/task.json"),processMetaJson); FileUtils.writeStringToFile(new File("/tmp/task.json"),processMetaJson);
......
...@@ -80,8 +80,7 @@ public class ProcessInstanceServiceTest { ...@@ -80,8 +80,7 @@ public class ProcessInstanceServiceTest {
@Mock @Mock
LoggerService loggerService; LoggerService loggerService;
@Mock
WorkerGroupMapper workerGroupMapper;
@Mock @Mock
UsersService usersService; UsersService usersService;
...@@ -486,7 +485,6 @@ public class ProcessInstanceServiceTest { ...@@ -486,7 +485,6 @@ public class ProcessInstanceServiceTest {
*/ */
private WorkerGroup getWorkGroup() { private WorkerGroup getWorkGroup() {
WorkerGroup workerGroup = new WorkerGroup(); WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setId(1);
workerGroup.setName("test_workergroup"); workerGroup.setName("test_workergroup");
return workerGroup; return workerGroup;
} }
......
...@@ -26,10 +26,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; ...@@ -26,10 +26,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
...@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; ...@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class WorkerGroupServiceTest { public class WorkerGroupServiceTest {
...@@ -51,100 +52,55 @@ public class WorkerGroupServiceTest { ...@@ -51,100 +52,55 @@ public class WorkerGroupServiceTest {
@InjectMocks @InjectMocks
private WorkerGroupService workerGroupService; private WorkerGroupService workerGroupService;
@Mock
private WorkerGroupMapper workerGroupMapper;
@Mock @Mock
private ProcessInstanceMapper processInstanceMapper; private ProcessInstanceMapper processInstanceMapper;
@Mock @Mock
private ZookeeperCachedOperator zookeeperCachedOperator; private ZookeeperCachedOperator zookeeperCachedOperator;
private String groupName="groupName000001";
/** @Before
* create or update a worker group public void init(){
*/ ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
@Test zookeeperConfig.setDsRoot("/dolphinscheduler_qzw");
public void testSaveWorkerGroup(){ Mockito.when(zookeeperCachedOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
User user = new User(); String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
// general user add
user.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1");
logger.info(result.toString());
Assert.assertEquals( Status.USER_NO_OPERATION_PERM.getMsg(),(String) result.get(Constants.MSG));
//success List<String> workerGroupStrList = new ArrayList<>();
user.setUserType(UserType.ADMIN_USER); workerGroupStrList.add("default");
result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1"); workerGroupStrList.add("test");
logger.info(result.toString()); Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
Assert.assertEquals(Status.SUCCESS.getMsg(),(String)result.get(Constants.MSG));
// group name exist List<String> defaultIpList = new ArrayList<>();
Mockito.when(workerGroupMapper.selectById(2)).thenReturn(getWorkerGroup(2)); defaultIpList.add("192.168.220.188:1234");
Mockito.when(workerGroupMapper.queryWorkerGroupByName(groupName)).thenReturn(getList()); defaultIpList.add("192.168.220.189:1234");
result = workerGroupService.saveWorkerGroup(user, 2, groupName, "127.0.0.1");
logger.info(result.toString());
Assert.assertEquals(Status.NAME_EXIST,result.get(Constants.STATUS));
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultIpList);
Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultIpList.get(0))).thenReturn("0.02,0.23,0.03,2020-05-08 11:24:14,2020-05-08 14:22:24");
} }
/** /**
* query worker group paging * query worker group paging
*/ */
@Test @Test
public void testQueryAllGroupPaging(){ public void testQueryAllGroupPaging(){
User user = new User(); User user = new User();
// general user add // general user add
user.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = workerGroupService.queryAllGroupPaging(user, 1, 10, groupName);
logger.info(result.toString());
Assert.assertEquals((String) result.get(Constants.MSG), Status.USER_NO_OPERATION_PERM.getMsg());
//success
user.setUserType(UserType.ADMIN_USER); user.setUserType(UserType.ADMIN_USER);
Page<WorkerGroup> page = new Page<>(1,10); Map<String, Object> result = workerGroupService.queryAllGroupPaging(user, 1, 10, null);
page.setRecords(getList()); PageInfo<WorkerGroup> pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
page.setSize(1L); Assert.assertEquals(pageInfo.getLists().size(),1);
Mockito.when(workerGroupMapper.queryListPaging(Mockito.any(Page.class), Mockito.eq(groupName))).thenReturn(page);
result = workerGroupService.queryAllGroupPaging(user, 1, 10, groupName);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),(String)result.get(Constants.MSG));
PageInfo<WorkerGroup> pageInfo = (PageInfo<WorkerGroup>) result.get(Constants.DATA_LIST);
Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getLists()));
} }
/**
* delete group by id
*/
@Test
public void testDeleteWorkerGroupById(){
//DELETE_WORKER_GROUP_BY_ID_FAIL
Mockito.when(processInstanceMapper.queryByWorkerGroupIdAndStatus(1, Constants.NOT_TERMINATED_STATES)).thenReturn(getProcessInstanceList());
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(1);
logger.info(result.toString());
Assert.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),((Status) result.get(Constants.STATUS)).getCode());
//correct
result = workerGroupService.deleteWorkerGroupById(2);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),(String)result.get(Constants.MSG));
}
@Test @Test
public void testQueryAllGroup() throws Exception { public void testQueryAllGroup() throws Exception {
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
zookeeperConfig.setDsRoot("/ds");
Mockito.when(zookeeperCachedOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
List<String> workerGroupStrList = new ArrayList<>();
workerGroupStrList.add("workerGroup1");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(Mockito.anyString())).thenReturn(workerGroupStrList);
Map<String, Object> result = workerGroupService.queryAllGroup(); Map<String, Object> result = workerGroupService.queryAllGroup();
logger.info(result.toString()); Set<String> workerGroups = (Set<String>) result.get(Constants.DATA_LIST);
Assert.assertEquals(Status.SUCCESS.getMsg(),(String)result.get(Constants.MSG)); Assert.assertEquals(workerGroups.size(), 1);
List<WorkerGroup> workerGroupList = (List<WorkerGroup>) result.get(Constants.DATA_LIST);
Assert.assertTrue(workerGroupList.size()>0);
} }
...@@ -158,25 +114,5 @@ public class WorkerGroupServiceTest { ...@@ -158,25 +114,5 @@ public class WorkerGroupServiceTest {
processInstances.add(new ProcessInstance()); processInstances.add(new ProcessInstance());
return processInstances; return processInstances;
} }
/**
* get Group
* @return
*/
private WorkerGroup getWorkerGroup(int id){
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setName(groupName);
workerGroup.setId(id);
return workerGroup;
}
private WorkerGroup getWorkerGroup(){
return getWorkerGroup(1);
}
private List<WorkerGroup> getList(){
List<WorkerGroup> list = new ArrayList<>();
list.add(getWorkerGroup());
return list;
}
} }
\ No newline at end of file
...@@ -21,41 +21,22 @@ import com.baomidou.mybatisplus.annotation.TableId; ...@@ -21,41 +21,22 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import java.util.Date; import java.util.Date;
import java.util.List;
/** /**
* worker group for task running * worker group
*/ */
@TableName("t_ds_worker_group")
public class WorkerGroup { public class WorkerGroup {
@TableId(value="id", type=IdType.AUTO)
private int id;
private String name; private String name;
private String ipList; private List<String> ipList;
private Date createTime; private Date createTime;
private Date updateTime; private Date updateTime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getIpList() {
return ipList;
}
public void setIpList(String ipList) {
this.ipList = ipList;
}
public Date getCreateTime() { public Date getCreateTime() {
return createTime; return createTime;
} }
...@@ -72,18 +53,6 @@ public class WorkerGroup { ...@@ -72,18 +53,6 @@ public class WorkerGroup {
this.updateTime = updateTime; this.updateTime = updateTime;
} }
@Override
public String toString() {
return "Worker group model{" +
"id= " + id +
",name= " + name +
",ipList= " + ipList +
",createTime= " + createTime +
",updateTime= " + updateTime +
"}";
}
public String getName() { public String getName() {
return name; return name;
} }
...@@ -91,4 +60,14 @@ public class WorkerGroup { ...@@ -91,4 +60,14 @@ public class WorkerGroup {
public void setName(String name) { public void setName(String name) {
this.name = name; this.name = name;
} }
public List<String> getIpList() {
return ipList;
}
public void setIpList(List<String> ipList) {
this.ipList = ipList;
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* worker group mapper interface
*/
public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
/**
* query all worker group
* @return worker group list
*/
List<WorkerGroup> queryAllWorkerGroup();
/**
* query worer grouop by name
* @param name name
* @return worker group list
*/
List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
/**
* worker group page
* @param page page
* @param searchVal searchVal
* @return worker group IPage
*/
IPage<WorkerGroup> queryListPaging(IPage<WorkerGroup> page,
@Param("searchVal") String searchVal);
}
...@@ -101,9 +101,15 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ...@@ -101,9 +101,15 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque())); new NettyRemoteChannel(channel, command.getOpaque()));
this.doAck(taskExecutionContext); try {
this.doAck(taskExecutionContext);
}catch (Exception e){
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
this.doAck(taskExecutionContext);
}
// submit task // submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext,taskCallbackService)); workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
} }
private void doAck(TaskExecutionContext taskExecutionContext){ private void doAck(TaskExecutionContext taskExecutionContext){
......
...@@ -18,10 +18,12 @@ package org.apache.dolphinscheduler.server.worker.runner; ...@@ -18,10 +18,12 @@ package org.apache.dolphinscheduler.server.worker.runner;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
...@@ -131,7 +133,12 @@ public class TaskExecuteThread implements Runnable { ...@@ -131,7 +133,12 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId()); responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds()); responseCommand.setAppIds(task.getAppIds());
} finally { } finally {
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); try {
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}catch (Exception e){
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}
} }
} }
......
...@@ -113,10 +113,7 @@ public class DependencyConfig { ...@@ -113,10 +113,7 @@ public class DependencyConfig {
return Mockito.mock(ResourceMapper.class); return Mockito.mock(ResourceMapper.class);
} }
@Bean
public WorkerGroupMapper workerGroupMapper(){
return Mockito.mock(WorkerGroupMapper.class);
}
@Bean @Bean
public ErrorCommandMapper errorCommandMapper(){ public ErrorCommandMapper errorCommandMapper(){
......
...@@ -107,10 +107,6 @@ public class TaskCallbackServiceTestConfig { ...@@ -107,10 +107,6 @@ public class TaskCallbackServiceTestConfig {
return Mockito.mock(ResourceMapper.class); return Mockito.mock(ResourceMapper.class);
} }
@Bean
public WorkerGroupMapper workerGroupMapper(){
return Mockito.mock(WorkerGroupMapper.class);
}
@Bean @Bean
public ErrorCommandMapper errorCommandMapper(){ public ErrorCommandMapper errorCommandMapper(){
......
...@@ -86,8 +86,7 @@ public class ProcessService { ...@@ -86,8 +86,7 @@ public class ProcessService {
@Autowired @Autowired
private ResourceMapper resourceMapper; private ResourceMapper resourceMapper;
@Autowired
private WorkerGroupMapper workerGroupMapper;
@Autowired @Autowired
private ErrorCommandMapper errorCommandMapper; private ErrorCommandMapper errorCommandMapper;
...@@ -1670,15 +1669,7 @@ public class ProcessService { ...@@ -1670,15 +1669,7 @@ public class ProcessService {
return queue; return queue;
} }
/**
* query worker group by id
* @param workerGroupId workerGroupId
* @return WorkerGroup
*/
public WorkerGroup queryWorkerGroupById(int workerGroupId){
return workerGroupMapper.selectById(workerGroupId);
}
/** /**
* get task worker group * get task worker group
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册