From e85b82f8990a089e023fea4f0e44b7c2ae913f40 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 29 Oct 2014 18:23:50 +0100 Subject: [PATCH] Removed blocking call in Execution.deploySlot --- .../apache/flink/runtime/executiongraph/Execution.java | 8 +++++++- .../runtime/jobmanager/web/JobmanagerInfoServlet.java | 8 ++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 139409b7d96..14e247fa731 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -33,6 +33,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import akka.dispatch.OnComplete; +import akka.pattern.Patterns; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -46,6 +47,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -276,7 +278,11 @@ public class Execution { @Override public TaskOperationResult call() throws Exception { Instance instance = slot.getInstance(); - return instance.submitTask(deployment); +// return instance.submitTask(deployment); + + //TODO realize as an actor + return (TaskOperationResult)Patterns.ask(instance.getTaskManager(), new TaskManagerMessages + .SubmitTask(deployment), AkkaUtils.FUTURE_TIMEOUT()); } }, AkkaUtils.globalExecutionContext()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java index 9797808e3b3..f52da0d6148 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java @@ -125,13 +125,13 @@ public class JobmanagerInfoServlet extends HttpServlet { } } else if("taskmanagers".equals(req.getParameter("get"))) { - int numberOfTaskManagrs = AkkaUtils.ask(jobmanager, + int numberOfTaskManagers = AkkaUtils.ask(jobmanager, RequestNumberRegisteredTaskManager$.MODULE$); - int numberOfRegisteredSltos = AkkaUtils.ask(jobmanager, + int numberOfRegisteredSlots = AkkaUtils.ask(jobmanager, RequestTotalNumberOfSlots$.MODULE$); - resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagrs +", " + - "\"slots\": "+numberOfRegisteredSltos+"}"); + resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " + + "\"slots\": "+numberOfRegisteredSlots+"}"); } else if("cancel".equals(req.getParameter("get"))) { String jobId = req.getParameter("job"); -- GitLab