diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index afc9b9a0363751409ed834dbbce7abf15558280a..807c5218caaa0d6e17eb0603285dd6896ec2a8a2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -61,6 +61,7 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setResources(taskInstance.getResources()); taskExecutionContext.setDelayTime(taskInstance.getDelayTime()); taskExecutionContext.setVarPool(taskInstance.getVarPool()); + taskExecutionContext.setDryRun(taskInstance.getDryRun()); return this; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index eaa7cc40a3ed20386cd8e49d6a36f1082138abc9..d1f1c39717c964f6a91ec4606b455639c04aba1c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.RetryerUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -41,8 +40,6 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.alert.AlertClientService; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException; import org.apache.dolphinscheduler.spi.task.AbstractTask; @@ -113,11 +110,6 @@ public class TaskExecuteThread implements Runnable, Delayed { private TaskPluginManager taskPluginManager; - /** - * process database access - */ - protected ProcessService processService; - /** * constructor * @@ -130,7 +122,6 @@ public class TaskExecuteThread implements Runnable, Delayed { this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; this.alertClientService = alertClientService; - this.processService = SpringApplicationContext.getBean(ProcessService.class); } public TaskExecuteThread(TaskExecutionContext taskExecutionContext, @@ -141,7 +132,6 @@ public class TaskExecuteThread implements Runnable, Delayed { this.taskCallbackService = taskCallbackService; this.alertClientService = alertClientService; this.taskPluginManager = taskPluginManager; - this.processService = SpringApplicationContext.getBean(ProcessService.class); } @Override @@ -167,8 +157,7 @@ public class TaskExecuteThread implements Runnable, Delayed { } logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId()); - TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); - int dryRun = taskInstance.getDryRun(); + int dryRun = taskExecutionContext.getDryRun(); // copy hdfs/minio file to local if (dryRun == Constants.DRY_RUN_FLAG_NO) { downloadResource(taskExecutionContext.getExecutePath(), @@ -294,10 +283,7 @@ public class TaskExecuteThread implements Runnable, Delayed { if (task != null) { try { task.cancelApplication(true); - TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); - if (taskInstance != null) { - ProcessUtils.killYarnJob(taskExecutionContext); - } + ProcessUtils.killYarnJob(taskExecutionContext); } catch (Exception e) { logger.error(e.getMessage(), e); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java index 9f73d82906dd7089ab74dc5965ddfb61a42d9eb8..609566ab5266b984623e3ba7059970e693145034 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java @@ -232,6 +232,11 @@ public class TaskExecutionContext implements Serializable { */ private String varPool; + /** + * dry run flag + */ + private int dryRun; + /** * business param */ @@ -552,6 +557,14 @@ public class TaskExecutionContext implements Serializable { this.sqoopTaskExecutionContext = sqoopTaskExecutionContext; } + public int getDryRun() { + return dryRun; + } + + public void setDryRun(int dryRun) { + this.dryRun = dryRun; + } + @Override public String toString() { return "TaskExecutionContext{" @@ -579,6 +592,7 @@ public class TaskExecutionContext implements Serializable { + ", projectCode=" + projectCode + ", taskParams='" + taskParams + '\'' + ", envFile='" + envFile + '\'' + + ", dryRun='" + dryRun + '\'' + ", definedParams=" + definedParams + ", taskAppId='" + taskAppId + '\'' + ", taskTimeoutStrategy=" + taskTimeoutStrategy