提交 85b8fa40 编写于 作者: 薛            之            谦's avatar 薛 之 谦

Merge remote-tracking branch 'origin/ws-3.0.0' into ws-3.0.0

......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api.async;
import lombok.Data;
import lombok.NonNull;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
......@@ -26,25 +27,43 @@ import java.util.concurrent.TimeUnit;
@Data
public class AsyncTaskExecutionContext implements Delayed {
private final int taskInstanceId;
private final TaskExecutionContext taskExecutionContext;
private final @NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction;
private final AsyncTaskExecuteFunction asyncTaskExecuteFunction;
private final @NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction;
private final AsyncTaskCallbackFunction asyncTaskCallbackFunction;
public AsyncTaskExecutionContext(int taskInstanceId,
private long currentStartTime;
private int executeTimes;
private final long executeInterval;
public AsyncTaskExecutionContext(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction,
@NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction) {
this.taskInstanceId = taskInstanceId;
this.taskExecutionContext = taskExecutionContext;
this.asyncTaskExecuteFunction = asyncTaskExecuteFunction;
this.asyncTaskCallbackFunction = asyncTaskCallbackFunction;
this.currentStartTime = System.currentTimeMillis();
this.executeTimes = 0;
this.executeInterval = Math.max(asyncTaskExecuteFunction.getTaskExecuteInterval().toMillis(), 1000L);
}
public void refreshStartTime() {
currentStartTime = System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.toSeconds(asyncTaskExecuteFunction.getTaskExecuteInterval().toMillis());
// The first time doesn't have delay
if (executeTimes == 0) {
executeTimes++;
return 0;
}
return unit.convert(currentStartTime + executeInterval - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == null) {
......
......@@ -19,11 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import lombok.experimental.UtilityClass;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskCallbackFunction;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecutionStatus;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@UtilityClass
public class AsyncTaskDelayQueue {
......@@ -31,11 +35,12 @@ public class AsyncTaskDelayQueue {
private final DelayQueue<AsyncTaskExecutionContext> asyncTaskCheckDelayQueue = new DelayQueue<>();
public void addAsyncTask(@NonNull AsyncTaskExecutionContext asyncTaskExecutionContext) {
asyncTaskExecutionContext.refreshStartTime();
asyncTaskCheckDelayQueue.add(asyncTaskExecutionContext);
}
public @Nullable AsyncTaskExecutionContext pollAsyncTask() throws InterruptedException {
return asyncTaskCheckDelayQueue.poll(1, TimeUnit.MINUTES);
return asyncTaskCheckDelayQueue.take();
}
}
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskCallbackFunction;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecuteFunction;
......@@ -61,12 +62,13 @@ public class AsyncTaskLooper extends BaseDaemonThread {
if (asyncTaskExecutionContext == null) {
continue;
}
if (TaskExecutionContextCacheManager.getByTaskInstanceId(asyncTaskExecutionContext.getTaskInstanceId()) == null) {
final TaskExecutionContext taskExecutionContext = asyncTaskExecutionContext.getTaskExecutionContext();
if (TaskExecutionContextCacheManager.getByTaskInstanceId(taskExecutionContext.getTaskInstanceId()) == null) {
logger.warn("Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed");
continue;
}
asyncCheckThreadPool.submit(() -> {
Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
final AsyncTaskExecuteFunction asyncTaskExecuteFunction = asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
final AsyncTaskCallbackFunction asyncTaskCallbackFunction = asyncTaskExecutionContext.getAsyncTaskCallbackFunction();
try {
......
......@@ -55,7 +55,7 @@ public class AsyncWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteR
task.handle();
// submit the task to async task queue
asyncTaskExecutionContext = new AsyncTaskExecutionContext(
taskExecutionContext.getTaskInstanceId(),
taskExecutionContext,
task.getAsyncTaskExecuteFunction(),
new AsyncTaskCallbackFunctionImpl(this)
);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册