diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 8ca161d82e09a7e2c3331e70a1169480e8ae194b..6c7c1067949e94b7f42cf0796effdde018e47e29 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -766,4 +766,5 @@ public final class Constants { * dry run flag */ public static final int DRY_RUN_FLAG_NO = 0; + public static final int DRY_RUN_FLAG_YES = 1; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 32520813b829b2482b1ec9bb3b892db2fdc52360..8dfaf34fbc5aec23b840451108425e4662fd7f98 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.processor; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -135,19 +136,21 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort())); taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); - // local execute path - String execLocalPath = getExecLocalPath(taskExecutionContext); - logger.info("task instance local execute path : {}", execLocalPath); - taskExecutionContext.setExecutePath(execLocalPath); - - try { - FileUtils.createWorkDirIfAbsent(execLocalPath); - if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { - OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); + if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) { + // local execute path + String execLocalPath = getExecLocalPath(taskExecutionContext); + logger.info("task instance local execute path : {}", execLocalPath); + taskExecutionContext.setExecutePath(execLocalPath); + + try { + FileUtils.createWorkDirIfAbsent(execLocalPath); + if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { + OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); + } + } catch (Throwable ex) { + logger.error("create execLocalPath: {}", execLocalPath, ex); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); } - } catch (Throwable ex) { - logger.error("create execLocalPath: {}", execLocalPath, ex); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); } taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 084baf8a80a3e0c42b7234e126d7933a5021d73c..d8f108859110276759fef72cef8b821a0ca060bb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -125,8 +125,16 @@ public class TaskExecuteThread implements Runnable, Delayed { @Override public void run() { - TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId()); + if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { + responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode()); + responseCommand.setEndTime(new Date()); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); + taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + return; + } + try { logger.info("script path : {}", taskExecutionContext.getExecutePath()); // check if the OS user exists @@ -146,13 +154,8 @@ public class TaskExecuteThread implements Runnable, Delayed { } logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId()); - int dryRun = taskExecutionContext.getDryRun(); // copy hdfs/minio file to local - if (dryRun == Constants.DRY_RUN_FLAG_NO) { - downloadResource(taskExecutionContext.getExecutePath(), - taskExecutionContext.getResources(), - logger); - } + downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger); taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setDefinedParams(getGlobalParamsMap()); @@ -177,31 +180,28 @@ public class TaskExecuteThread implements Runnable, Delayed { taskRequest.setTaskLogName(taskLogName); task = taskChannel.createTask(taskRequest); + // task init this.task.init(); + //init varPool this.task.getParameters().setVarPool(taskExecutionContext.getVarPool()); - if (dryRun == Constants.DRY_RUN_FLAG_NO) { - // task handle - this.task.handle(); + // task handle + this.task.handle(); - // task result process - if (this.task.getNeedAlert()) { - sendAlert(this.task.getTaskAlertInfo()); - } - responseCommand.setStatus(this.task.getExitStatus().getCode()); - } else { - responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode()); - task.setExitStatusCode(Constants.EXIT_CODE_SUCCESS); + // task result process + if (this.task.getNeedAlert()) { + sendAlert(this.task.getTaskAlertInfo()); } + + responseCommand.setStatus(this.task.getExitStatus().getCode()); responseCommand.setEndTime(new Date()); responseCommand.setProcessId(this.task.getProcessId()); responseCommand.setAppIds(this.task.getAppIds()); responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus()); } catch (Throwable e) { - logger.error("task scheduler failure", e); kill(); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());