diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java index b3f05335e80318d3a0112cf6756e42573df5e20e..4a2ffa29be5cb96a88c47fe0e16b13f4d367749e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java @@ -33,20 +33,29 @@ public class AsyncTaskExecutionContext implements Delayed { private final AsyncTaskCallbackFunction asyncTaskCallbackFunction; + private long currentStartTime; + private final long executeInterval; + public AsyncTaskExecutionContext(@NonNull TaskExecutionContext taskExecutionContext, @NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction, @NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction) { this.taskExecutionContext = taskExecutionContext; this.asyncTaskExecuteFunction = asyncTaskExecuteFunction; this.asyncTaskCallbackFunction = asyncTaskCallbackFunction; + this.currentStartTime = System.currentTimeMillis(); + this.executeInterval = Math.max(asyncTaskExecuteFunction.getTaskExecuteInterval().toMillis(), 1000L); + } + + public void refreshStartTime() { + currentStartTime = System.currentTimeMillis(); } @Override public long getDelay(TimeUnit unit) { - long intervalSeconds = Math.max(asyncTaskExecuteFunction.getTaskExecuteInterval().getSeconds(), 1); - return unit.convert(intervalSeconds, TimeUnit.SECONDS); + return unit.convert(currentStartTime + executeInterval - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } + @Override public int compareTo(Delayed o) { if (o == null) { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncTaskDelayQueue.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncTaskDelayQueue.java index 562057d31dfb5a9fd88227751155cdfd26734e6b..10b759c242ec73498295a6b1e6de76960f90c062 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncTaskDelayQueue.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/AsyncTaskDelayQueue.java @@ -19,11 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner; import lombok.NonNull; import lombok.experimental.UtilityClass; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskCallbackFunction; +import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecuteFunction; import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecutionStatus; import javax.annotation.Nullable; +import java.time.Duration; import java.util.concurrent.DelayQueue; -import java.util.concurrent.TimeUnit; @UtilityClass public class AsyncTaskDelayQueue { @@ -31,11 +35,12 @@ public class AsyncTaskDelayQueue { private final DelayQueue asyncTaskCheckDelayQueue = new DelayQueue<>(); public void addAsyncTask(@NonNull AsyncTaskExecutionContext asyncTaskExecutionContext) { + asyncTaskExecutionContext.refreshStartTime(); asyncTaskCheckDelayQueue.add(asyncTaskExecutionContext); } public @Nullable AsyncTaskExecutionContext pollAsyncTask() throws InterruptedException { - return asyncTaskCheckDelayQueue.poll(1, TimeUnit.MINUTES); + return asyncTaskCheckDelayQueue.take(); } }