From 433b41dd7f31c816cbaed79b88919b9325835b17 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 7 Apr 2020 13:29:20 +0800 Subject: [PATCH] refactor-worker merge to dev bug fix --- .../common/utils/HadoopUtils.java | 3 +- .../dao/entity/TaskInstance.java | 18 ++++++- .../builder/TaskExecutionContextBuilder.java | 1 + .../server/entity/TaskExecutionContext.java | 14 +++++ .../consumer/TaskPriorityQueueConsumer.java | 53 ++++++++++++++++--- .../worker/runner/TaskExecuteThread.java | 4 +- .../service/process/ProcessService.java | 12 ++++- pom.xml | 43 +++++++-------- 8 files changed, 110 insertions(+), 38 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 2863738c9..3dd548df8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -91,8 +91,7 @@ public class HadoopUtils implements Closeable { */ private void initHdfsPath() { - String hdfsPath = PropertyUtils.getString(resourceUploadPath); - Path path = new Path(hdfsPath); + Path path = new Path(resourceUploadPath); try { if (!fs.exists(path)) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index c43fd3da3..243f2af61 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -29,6 +29,7 @@ import com.baomidou.mybatisplus.annotation.TableName; import java.io.Serializable; import java.util.Date; +import java.util.List; /** * task instance @@ -47,6 +48,8 @@ public class TaskInstance implements Serializable { */ private String name; + + /** * task type */ @@ -205,8 +208,12 @@ public class TaskInstance implements Serializable { private String executorName; + @TableField(exist = false) + private List resources; + + - public void init(String host,Date startTime,String executePath){ + public void init(String host,Date startTime,String executePath){ this.host = host; this.startTime = startTime; this.executePath = executePath; @@ -446,6 +453,15 @@ public class TaskInstance implements Serializable { || this.getState().typeIsCancel() || (this.getState().typeIsFailure() && !taskCanRetry()); } + + public List getResources() { + return resources; + } + + public void setResources(List resources) { + this.resources = resources; + } + /** * determine if you can try again * @return can try result diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index fc60e8836..0809f3112 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -50,6 +50,7 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup()); taskExecutionContext.setHost(taskInstance.getHost()); + taskExecutionContext.setResources(taskInstance.getResources()); return this; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index bba4b36c5..264055f90 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; +import java.util.List; import java.util.Map; /** @@ -166,6 +167,10 @@ public class TaskExecutionContext implements Serializable{ */ private String workerGroup; + /** + * resources full name + */ + private List resources; /** * sql TaskExecutionContext @@ -433,6 +438,14 @@ public class TaskExecutionContext implements Serializable{ this.dependenceTaskExecutionContext = dependenceTaskExecutionContext; } + public List getResources() { + return resources; + } + + public void setResources(List resources) { + this.resources = resources; + } + @Override public String toString() { return "TaskExecutionContext{" + @@ -462,6 +475,7 @@ public class TaskExecutionContext implements Serializable{ ", taskTimeoutStrategy=" + taskTimeoutStrategy + ", taskTimeout=" + taskTimeout + ", workerGroup='" + workerGroup + '\'' + + ", resources=" + resources + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext + ", dataxTaskExecutionContext=" + dataxTaskExecutionContext + ", dependenceTaskExecutionContext=" + dependenceTaskExecutionContext + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 4aaf90163..6c1be97d0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -22,6 +22,8 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; @@ -29,10 +31,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.DataSource; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.dao.entity.UdfFunc; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.*; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; @@ -47,7 +47,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * TaskUpdateQueue consumer @@ -127,6 +132,12 @@ public class TaskPriorityQueueConsumer extends Thread{ protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId){ TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId); + // task type + TaskType taskType = TaskType.valueOf(taskInstance.getTaskType()); + + // task node + TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); + Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); @@ -145,14 +156,14 @@ public class TaskPriorityQueueConsumer extends Thread{ taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); taskInstance.setExecutePath(getExecLocalPath(taskInstance)); + taskInstance.setResources(getResourceFullNames(taskNode)); + SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext(); - TaskType taskType = TaskType.valueOf(taskInstance.getTaskType()); - TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); // SQL task if (taskType == TaskType.SQL){ setSQLTaskRelation(sqlTaskExecutionContext, taskNode); @@ -171,7 +182,6 @@ public class TaskPriorityQueueConsumer extends Thread{ } - return TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) @@ -270,4 +280,33 @@ public class TaskPriorityQueueConsumer extends Thread{ } return false; } + + + /** + * create project resource files + */ + private List getResourceFullNames(TaskNode taskNode){ + + Set resourceIdsSet = new HashSet<>(); + AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); + + if (baseParam != null) { + List projectResourceFiles = baseParam.getResourceFilesList(); + if (projectResourceFiles != null) { + Stream resourceInfotream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId()); + resourceIdsSet.addAll(resourceInfotream.collect(Collectors.toSet())); + + } + } + + Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); + + List resources = processService.listResourceByIds(resourceIds); + + List resourceFullNames = resources.stream() + .map(resourceInfo -> resourceInfo.getFullName()) + .collect(Collectors.toList()); + + return resourceFullNames; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 4d8600d65..84ff23a9e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -83,11 +83,9 @@ public class TaskExecuteThread implements Runnable { // task node TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); - // get resource files - List resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local downloadResource(taskExecutionContext.getExecutePath(), - resourceFiles, + taskExecutionContext.getResources(), taskExecutionContext.getTenantCode(), logger); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index fc67ae580..0aa6a3f60 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1487,7 +1487,7 @@ public class ProcessService { * @return tenant code */ public String queryTenantCodeByResName(String resName,ResourceType resourceType){ - return resourceMapper.queryTenantCodeByResourceName(resName,resourceType.ordinal()); + return resourceMapper.queryTenantCodeByResourceName(resName, resourceType.ordinal()); } /** @@ -1766,4 +1766,14 @@ public class ProcessService { } + /** + * list resources by ids + * @param resIds resIds + * @return resource list + */ + public List listResourceByIds(Integer[] resIds){ + return resourceMapper.listResourceByIds(resIds); + } + + } diff --git a/pom.xml b/pom.xml index 7a0d3978e..1bad82a31 100644 --- a/pom.xml +++ b/pom.xml @@ -691,25 +691,25 @@ **/alert/utils/JSONUtilsTest.java **/alert/utils/MailUtilsTest.java **/alert/utils/PropertyUtilsTest.java - **/api/controller/AccessTokenControllerTest.java - **/api/controller/AlertGroupControllerTest.java - **/api/controller/DataAnalysisControllerTest.java - **/api/controller/DataSourceControllerTest.java - **/api/controller/ExecutorControllerTest.java - **/api/controller/LoggerControllerTest.java - **/api/controller/LoginControllerTest.java - **/api/controller/MonitorControllerTest.java - **/api/controller/ProcessDefinitionControllerTest.java - **/api/controller/ProcessInstanceControllerTest.java - **/api/controller/ProjectControllerTest.java - **/api/controller/QueueControllerTest.java - **/api/controller/ResourcesControllerTest.java - **/api/controller/SchedulerControllerTest.java - **/api/controller/TaskInstanceControllerTest.java - **/api/controller/TaskRecordControllerTest.java - **/api/controller/TenantControllerTest.java - **/api/controller/UsersControllerTest.java - **/api/controller/WorkerGroupControllerTest.java + + + + + + + + + + + + + + + + + + + **/api/dto/resources/filter/ResourceFilterTest.java **/api/dto/resources/visitor/ResourceTreeVisitorTest.java **/api/enums/testGetEnum.java @@ -783,11 +783,6 @@ **/common/utils/StringUtilsTest.java **/common/utils/TaskParametersUtilsTest.java **/common/ConstantsTest.java - - - - - **/dao/mapper/AccessTokenMapperTest.java **/dao/mapper/AlertGroupMapperTest.java **/dao/mapper/AlertMapperTest.java -- GitLab