提交 e85b82f8 编写于 作者: T Till Rohrmann

Removed blocking call in Execution.deploySlot

上级 09241209
......@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
......@@ -46,6 +47,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
......@@ -276,7 +278,11 @@ public class Execution {
@Override
public TaskOperationResult call() throws Exception {
Instance instance = slot.getInstance();
return instance.submitTask(deployment);
// return instance.submitTask(deployment);
//TODO realize as an actor
return (TaskOperationResult)Patterns.ask(instance.getTaskManager(), new TaskManagerMessages
.SubmitTask(deployment), AkkaUtils.FUTURE_TIMEOUT());
}
}, AkkaUtils.globalExecutionContext());
......
......@@ -125,13 +125,13 @@ public class JobmanagerInfoServlet extends HttpServlet {
}
}
else if("taskmanagers".equals(req.getParameter("get"))) {
int numberOfTaskManagrs = AkkaUtils.<Integer>ask(jobmanager,
int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager,
RequestNumberRegisteredTaskManager$.MODULE$);
int numberOfRegisteredSltos = AkkaUtils.<Integer>ask(jobmanager,
int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager,
RequestTotalNumberOfSlots$.MODULE$);
resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagrs +", " +
"\"slots\": "+numberOfRegisteredSltos+"}");
resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
"\"slots\": "+numberOfRegisteredSlots+"}");
}
else if("cancel".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册