diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java index 4841f7c128b9698bcac8a4b6e853fb6abc0affb8..b3f05335e80318d3a0112cf6756e42573df5e20e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api.async; import lombok.Data; import lombok.NonNull; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @@ -26,16 +27,16 @@ import java.util.concurrent.TimeUnit; @Data public class AsyncTaskExecutionContext implements Delayed { - private final int taskInstanceId; + private final TaskExecutionContext taskExecutionContext; - private final @NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction; + private final AsyncTaskExecuteFunction asyncTaskExecuteFunction; - private final @NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction; + private final AsyncTaskCallbackFunction asyncTaskCallbackFunction; - public AsyncTaskExecutionContext(int taskInstanceId, + public AsyncTaskExecutionContext(@NonNull TaskExecutionContext taskExecutionContext, @NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction, @NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction) { - this.taskInstanceId = taskInstanceId; + this.taskExecutionContext = taskExecutionContext; this.asyncTaskExecuteFunction = asyncTaskExecuteFunction; this.asyncTaskCallbackFunction = asyncTaskCallbackFunction; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncTaskLooper.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncTaskLooper.java index 1ed1cde2a52d90fca354bd889da61362ff448ceb..383205fdc9b6b85fe4a0daa73557e7b75f8814d8 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncTaskLooper.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncTaskLooper.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskCallbackFunction; import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecuteFunction; @@ -61,12 +62,13 @@ public class AsyncTaskLooper extends BaseDaemonThread { if (asyncTaskExecutionContext == null) { continue; } - - if (TaskExecutionContextCacheManager.getByTaskInstanceId(asyncTaskExecutionContext.getTaskInstanceId()) == null) { + final TaskExecutionContext taskExecutionContext = asyncTaskExecutionContext.getTaskExecutionContext(); + if (TaskExecutionContextCacheManager.getByTaskInstanceId(taskExecutionContext.getTaskInstanceId()) == null) { logger.warn("Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed"); continue; } asyncCheckThreadPool.submit(() -> { + Thread.currentThread().setName(taskExecutionContext.getTaskLogName()); final AsyncTaskExecuteFunction asyncTaskExecuteFunction = asyncTaskExecutionContext.getAsyncTaskExecuteFunction(); final AsyncTaskCallbackFunction asyncTaskCallbackFunction = asyncTaskExecutionContext.getAsyncTaskCallbackFunction(); try { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncWorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncWorkerDelayTaskExecuteRunnable.java index fbc131b71dacd08a61e0e38957bb1ae5d4e415d3..d518c39879fdd05f122beab6dbb5eb2056879130 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncWorkerDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncWorkerDelayTaskExecuteRunnable.java @@ -55,7 +55,7 @@ public class AsyncWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteR task.handle(); // submit the task to async task queue asyncTaskExecutionContext = new AsyncTaskExecutionContext( - taskExecutionContext.getTaskInstanceId(), + taskExecutionContext, task.getAsyncTaskExecuteFunction(), new AsyncTaskCallbackFunctionImpl(this) );