未验证 提交 c01dba9e 编写于 作者: W Wenjun Ruan 提交者: GitHub

Merge pull request #48 from ruanwenjun/dev_wenjun_fixAsyncDelay

Add async logger to worker logger file
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api.async;
import lombok.Data;
import lombok.NonNull;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
......@@ -26,16 +27,16 @@ import java.util.concurrent.TimeUnit;
@Data
public class AsyncTaskExecutionContext implements Delayed {
private final int taskInstanceId;
private final TaskExecutionContext taskExecutionContext;
private final @NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction;
private final AsyncTaskExecuteFunction asyncTaskExecuteFunction;
private final @NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction;
private final AsyncTaskCallbackFunction asyncTaskCallbackFunction;
public AsyncTaskExecutionContext(int taskInstanceId,
public AsyncTaskExecutionContext(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction,
@NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction) {
this.taskInstanceId = taskInstanceId;
this.taskExecutionContext = taskExecutionContext;
this.asyncTaskExecuteFunction = asyncTaskExecuteFunction;
this.asyncTaskCallbackFunction = asyncTaskCallbackFunction;
}
......
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskCallbackFunction;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecuteFunction;
......@@ -61,12 +62,13 @@ public class AsyncTaskLooper extends BaseDaemonThread {
if (asyncTaskExecutionContext == null) {
continue;
}
if (TaskExecutionContextCacheManager.getByTaskInstanceId(asyncTaskExecutionContext.getTaskInstanceId()) == null) {
final TaskExecutionContext taskExecutionContext = asyncTaskExecutionContext.getTaskExecutionContext();
if (TaskExecutionContextCacheManager.getByTaskInstanceId(taskExecutionContext.getTaskInstanceId()) == null) {
logger.warn("Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed");
continue;
}
asyncCheckThreadPool.submit(() -> {
Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
final AsyncTaskExecuteFunction asyncTaskExecuteFunction = asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
final AsyncTaskCallbackFunction asyncTaskCallbackFunction = asyncTaskExecutionContext.getAsyncTaskCallbackFunction();
try {
......
......@@ -55,7 +55,7 @@ public class AsyncWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteR
task.handle();
// submit the task to async task queue
asyncTaskExecutionContext = new AsyncTaskExecutionContext(
taskExecutionContext.getTaskInstanceId(),
taskExecutionContext,
task.getAsyncTaskExecuteFunction(),
new AsyncTaskCallbackFunctionImpl(this)
);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册