提交 c65f7a3e 编写于 作者: W Wenjun Ruan

Add async logger to worker logger file

上级 b11f517d
...@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api.async; ...@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api.async;
import lombok.Data; import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -26,16 +27,16 @@ import java.util.concurrent.TimeUnit; ...@@ -26,16 +27,16 @@ import java.util.concurrent.TimeUnit;
@Data @Data
public class AsyncTaskExecutionContext implements Delayed { public class AsyncTaskExecutionContext implements Delayed {
private final int taskInstanceId; private final TaskExecutionContext taskExecutionContext;
private final @NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction; private final AsyncTaskExecuteFunction asyncTaskExecuteFunction;
private final @NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction; private final AsyncTaskCallbackFunction asyncTaskCallbackFunction;
public AsyncTaskExecutionContext(int taskInstanceId, public AsyncTaskExecutionContext(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction, @NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction,
@NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction) { @NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction) {
this.taskInstanceId = taskInstanceId; this.taskExecutionContext = taskExecutionContext;
this.asyncTaskExecuteFunction = asyncTaskExecuteFunction; this.asyncTaskExecuteFunction = asyncTaskExecuteFunction;
this.asyncTaskCallbackFunction = asyncTaskCallbackFunction; this.asyncTaskCallbackFunction = asyncTaskCallbackFunction;
} }
......
...@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner; ...@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskCallbackFunction; import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskCallbackFunction;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecuteFunction; import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecuteFunction;
...@@ -61,12 +62,13 @@ public class AsyncTaskLooper extends BaseDaemonThread { ...@@ -61,12 +62,13 @@ public class AsyncTaskLooper extends BaseDaemonThread {
if (asyncTaskExecutionContext == null) { if (asyncTaskExecutionContext == null) {
continue; continue;
} }
final TaskExecutionContext taskExecutionContext = asyncTaskExecutionContext.getTaskExecutionContext();
if (TaskExecutionContextCacheManager.getByTaskInstanceId(asyncTaskExecutionContext.getTaskInstanceId()) == null) { if (TaskExecutionContextCacheManager.getByTaskInstanceId(taskExecutionContext.getTaskInstanceId()) == null) {
logger.warn("Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed"); logger.warn("Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed");
continue; continue;
} }
asyncCheckThreadPool.submit(() -> { asyncCheckThreadPool.submit(() -> {
Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
final AsyncTaskExecuteFunction asyncTaskExecuteFunction = asyncTaskExecutionContext.getAsyncTaskExecuteFunction(); final AsyncTaskExecuteFunction asyncTaskExecuteFunction = asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
final AsyncTaskCallbackFunction asyncTaskCallbackFunction = asyncTaskExecutionContext.getAsyncTaskCallbackFunction(); final AsyncTaskCallbackFunction asyncTaskCallbackFunction = asyncTaskExecutionContext.getAsyncTaskCallbackFunction();
try { try {
......
...@@ -55,7 +55,7 @@ public class AsyncWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteR ...@@ -55,7 +55,7 @@ public class AsyncWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteR
task.handle(); task.handle();
// submit the task to async task queue // submit the task to async task queue
asyncTaskExecutionContext = new AsyncTaskExecutionContext( asyncTaskExecutionContext = new AsyncTaskExecutionContext(
taskExecutionContext.getTaskInstanceId(), taskExecutionContext,
task.getAsyncTaskExecuteFunction(), task.getAsyncTaskExecuteFunction(),
new AsyncTaskCallbackFunctionImpl(this) new AsyncTaskCallbackFunctionImpl(this)
); );
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册