未验证 提交 c4375a54 编写于 作者: L LiuBodong 提交者: GitHub

[Fix][Task] Task lost resource information (#6292) (#6295)



Co-authored-by: liubodong <liubodong>
上级 35312fd4
......@@ -20,26 +20,43 @@ package org.apache.dolphinscheduler.common.process;
* resource info
*/
public class ResourceInfo {
/**
* res the name of the resource that was uploaded
*/
private int id;
/**
* res id of the resource that was uploaded
*/
private int id;
public int getId() {
return id;
}
private String res;
public void setId(int id) {
this.id = id;
}
/**
* full name of the resource that was uploaded
*/
private String resourceName;
private String res;
public ResourceInfo() {
// do nothing, void constructor
}
public String getRes() {
return res;
}
public int getId() {
return id;
}
public void setRes(String res) {
this.res = res;
}
public void setId(int id) {
this.id = id;
}
public String getRes() {
return res;
}
public void setRes(String res) {
this.res = res;
}
public String getResourceName() {
return resourceName;
}
public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
}
......@@ -30,6 +30,7 @@ import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
......@@ -1527,10 +1528,78 @@ public class ProcessService {
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
updateTaskDefinitionResources(taskDefinition);
taskInstance.setTaskDefine(taskDefinition);
return taskInstance;
}
/**
* Update {@link ResourceInfo} information in {@link TaskDefinition}
*
* @param taskDefinition the given {@link TaskDefinition}
*/
private void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
Map<String, Object> taskParameters = JSONUtils.parseObject(
taskDefinition.getTaskParams(),
new TypeReference<Map<String, Object>>() { });
if (taskParameters != null) {
// if contains mainJar field, query resource from database
// Flink, Spark, MR
if (taskParameters.containsKey("mainJar")) {
Object mainJarObj = taskParameters.get("mainJar");
ResourceInfo mainJar = JSONUtils.parseObject(
JSONUtils.toJsonString(mainJarObj),
ResourceInfo.class);
ResourceInfo resourceInfo = updateResourceInfo(mainJar);
if (resourceInfo != null) {
taskParameters.put("mainJar", resourceInfo);
}
}
// update resourceList information
if (taskParameters.containsKey("resourceList")) {
String resourceListStr = JSONUtils.toJsonString(taskParameters.get("resourceList"));
List<ResourceInfo> resourceInfos = JSONUtils.toList(resourceListStr, ResourceInfo.class);
List<ResourceInfo> updatedResourceInfos = resourceInfos
.stream()
.map(this::updateResourceInfo)
.filter(Objects::nonNull)
.collect(Collectors.toList());
taskParameters.put("resourceList", updatedResourceInfos);
}
// set task parameters
taskDefinition.setTaskParams(JSONUtils.toJsonString(taskParameters));
}
}
/**
* update {@link ResourceInfo} by given original ResourceInfo
*
* @param res origin resource info
* @return {@link ResourceInfo}
*/
private ResourceInfo updateResourceInfo(ResourceInfo res) {
ResourceInfo resourceInfo = null;
// only if mainJar is not null and does not contains "resourceName" field
if (res != null) {
int resourceId = res.getId();
if (resourceId <= 0) {
logger.error("invalid resourceId, {}", resourceId);
return null;
}
resourceInfo = new ResourceInfo();
// get resource from database, only one resource should be returned
Resource resource = getResourceById(resourceId);
resourceInfo.setId(resourceId);
resourceInfo.setRes(resource.getFileName());
resourceInfo.setResourceName(resource.getFullName());
if (logger.isInfoEnabled()) {
logger.info("updated resource info {}",
JSONUtils.toJsonString(resourceInfo));
}
}
return resourceInfo;
}
/**
* get id list by task state
*
......
......@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.service.process;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants;
......@@ -32,6 +31,8 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
......@@ -41,6 +42,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
......@@ -51,6 +54,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
......@@ -58,10 +62,13 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Test;
......@@ -70,6 +77,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -108,6 +116,8 @@ public class ProcessServiceTest {
private ProcessTaskRelationMapper processTaskRelationMapper;
@Mock
private ProcessDefinitionLogMapper processDefineLogMapper;
@Mock
private ResourceMapper resourceMapper;
@Test
public void testCreateSubCommand() {
......@@ -477,4 +487,121 @@ public class ProcessServiceTest {
processService.changeOutParam(taskInstance);
}
@Test
public void testUpdateTaskDefinitionResources() throws Exception {
TaskDefinition taskDefinition = new TaskDefinition();
String taskParameters = "{\n"
+ " \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n"
+ " \"mainJar\": {\n"
+ " \"id\": 1\n"
+ " },\n"
+ " \"deployMode\": \"cluster\",\n"
+ " \"resourceList\": [\n"
+ " {\n"
+ " \"id\": 3\n"
+ " },\n"
+ " {\n"
+ " \"id\": 4\n"
+ " }\n"
+ " ],\n"
+ " \"localParams\": [],\n"
+ " \"driverCores\": 1,\n"
+ " \"driverMemory\": \"512M\",\n"
+ " \"numExecutors\": 2,\n"
+ " \"executorMemory\": \"2G\",\n"
+ " \"executorCores\": 2,\n"
+ " \"appName\": \"\",\n"
+ " \"mainArgs\": \"\",\n"
+ " \"others\": \"\",\n"
+ " \"programType\": \"JAVA\",\n"
+ " \"sparkVersion\": \"SPARK2\",\n"
+ " \"dependence\": {},\n"
+ " \"conditionResult\": {\n"
+ " \"successNode\": [\n"
+ " \"\"\n"
+ " ],\n"
+ " \"failedNode\": [\n"
+ " \"\"\n"
+ " ]\n"
+ " },\n"
+ " \"waitStartTimeout\": {}\n"
+ "}";
taskDefinition.setTaskParams(taskParameters);
Map<Integer, Resource> resourceMap =
Stream.of(1, 3, 4)
.map(i -> {
Resource resource = new Resource();
resource.setId(i);
resource.setFileName("file" + i);
resource.setFullName("/file" + i);
return resource;
})
.collect(
Collectors.toMap(
Resource::getId,
resource -> resource)
);
for (Integer integer : Arrays.asList(1, 3, 4)) {
Mockito.when(resourceMapper.selectById(integer))
.thenReturn(resourceMap.get(integer));
}
Whitebox.invokeMethod(processService,
"updateTaskDefinitionResources",
taskDefinition);
String taskParams = taskDefinition.getTaskParams();
SparkParameters sparkParameters = JSONUtils.parseObject(taskParams, SparkParameters.class);
ResourceInfo mainJar = sparkParameters.getMainJar();
Assert.assertEquals(1, mainJar.getId());
Assert.assertEquals("file1", mainJar.getRes());
Assert.assertEquals("/file1", mainJar.getResourceName());
Assert.assertEquals(2, sparkParameters.getResourceList().size());
ResourceInfo res1 = sparkParameters.getResourceList().get(0);
ResourceInfo res2 = sparkParameters.getResourceList().get(1);
Assert.assertEquals(3, res1.getId());
Assert.assertEquals("file3", res1.getRes());
Assert.assertEquals("/file3", res1.getResourceName());
Assert.assertEquals(4, res2.getId());
Assert.assertEquals("file4", res2.getRes());
Assert.assertEquals("/file4", res2.getResourceName());
}
@Test
public void testUpdateResourceInfo() throws Exception {
// test if input is null
ResourceInfo resourceInfoNull = null;
ResourceInfo updatedResourceInfo1 = Whitebox.invokeMethod(processService,
"updateResourceInfo",
resourceInfoNull);
Assert.assertNull(updatedResourceInfo1);
// test if resource id less than 1
ResourceInfo resourceInfoVoid = new ResourceInfo();
ResourceInfo updatedResourceInfo2 = Whitebox.invokeMethod(processService,
"updateResourceInfo",
resourceInfoVoid);
Assert.assertNull(updatedResourceInfo2);
// test normal situation
ResourceInfo resourceInfoNormal = new ResourceInfo();
resourceInfoNormal.setId(1);
Resource resource = new Resource();
resource.setId(1);
resource.setFileName("test.txt");
resource.setFullName("/test.txt");
Mockito.when(resourceMapper.selectById(1)).thenReturn(resource);
ResourceInfo updatedResourceInfo3 = Whitebox.invokeMethod(processService,
"updateResourceInfo",
resourceInfoNormal);
Assert.assertEquals(1, updatedResourceInfo3.getId());
Assert.assertEquals("test.txt", updatedResourceInfo3.getRes());
Assert.assertEquals("/test.txt", updatedResourceInfo3.getResourceName());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册