提交 b8d0a0aa 编写于 作者: T Till Rohrmann

Reworked local cluster start. TaskManager watches JobManager and tries...

Reworked local cluster start. TaskManager watches JobManager and tries reregistration in case of disconnect. Introduced akka.ask.timeout config parameter to configure akka timeouts.
上级 26c77948
...@@ -33,6 +33,7 @@ import java.util.Date; ...@@ -33,6 +33,7 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
...@@ -63,6 +64,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; ...@@ -63,6 +64,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$; import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;
import scala.concurrent.duration.FiniteDuration;
/** /**
* Implementation of a simple command line fronted for executing programs. * Implementation of a simple command line fronted for executing programs.
...@@ -511,7 +513,7 @@ public class CliFrontend { ...@@ -511,7 +513,7 @@ public class CliFrontend {
} }
Iterable<ExecutionGraph> jobs = AkkaUtils.<RunningJobs>ask(jobManager, Iterable<ExecutionGraph> jobs = AkkaUtils.<RunningJobs>ask(jobManager,
RequestRunningJobs$.MODULE$).asJavaIterable(); RequestRunningJobs$.MODULE$, getAkkaTimeout()).asJavaIterable();
ArrayList<ExecutionGraph> runningJobs = null; ArrayList<ExecutionGraph> runningJobs = null;
ArrayList<ExecutionGraph> scheduledJobs = null; ArrayList<ExecutionGraph> scheduledJobs = null;
...@@ -632,7 +634,7 @@ public class CliFrontend { ...@@ -632,7 +634,7 @@ public class CliFrontend {
return 1; return 1;
} }
AkkaUtils.ask(jobManager, new CancelJob(jobId)); AkkaUtils.ask(jobManager, new CancelJob(jobId), getAkkaTimeout());
return 0; return 0;
} }
catch (Throwable t) { catch (Throwable t) {
...@@ -756,7 +758,8 @@ public class CliFrontend { ...@@ -756,7 +758,8 @@ public class CliFrontend {
} }
return JobManager.getJobManager(jobManagerAddress, return JobManager.getJobManager(jobManagerAddress,
ActorSystem.create("CliFrontendActorSystem", AkkaUtils.getDefaultActorSystemConfig())); ActorSystem.create("CliFrontendActorSystem", AkkaUtils
.getDefaultActorSystemConfig()),getAkkaTimeout());
} }
...@@ -815,6 +818,13 @@ public class CliFrontend { ...@@ -815,6 +818,13 @@ public class CliFrontend {
} }
return GlobalConfiguration.getConfiguration(); return GlobalConfiguration.getConfiguration();
} }
protected FiniteDuration getAkkaTimeout(){
Configuration config = getGlobalConfiguration();
return new FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
}
public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) { public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>(); List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>();
......
...@@ -24,6 +24,7 @@ import java.io.IOException; ...@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
...@@ -53,6 +54,7 @@ import com.google.common.base.Preconditions; ...@@ -53,6 +54,7 @@ import com.google.common.base.Preconditions;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import scala.concurrent.duration.FiniteDuration;
/** /**
* Encapsulates the functionality necessary to submit a program to a remote cluster. * Encapsulates the functionality necessary to submit a program to a remote cluster.
...@@ -301,12 +303,15 @@ public class Client { ...@@ -301,12 +303,15 @@ public class Client {
String hostname = configuration.getString(ConfigConstants String hostname = configuration.getString(ConfigConstants
.JOB_MANAGER_IPC_ADDRESS_KEY, null); .JOB_MANAGER_IPC_ADDRESS_KEY, null);
FiniteDuration timeout = new FiniteDuration(configuration.getInteger(ConfigConstants
.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
if(hostname == null){ if(hostname == null){
throw new ProgramInvocationException("Could not find hostname of job manager."); throw new ProgramInvocationException("Could not find hostname of job manager.");
} }
try { try {
JobClient.uploadJarFiles(jobGraph, hostname, client); JobClient.uploadJarFiles(jobGraph, hostname, client, timeout);
}catch(IOException e){ }catch(IOException e){
throw new ProgramInvocationException("Could not upload blobs.", e); throw new ProgramInvocationException("Could not upload blobs.", e);
} }
...@@ -317,7 +322,7 @@ public class Client { ...@@ -317,7 +322,7 @@ public class Client {
return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client); return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client);
} }
else { else {
SubmissionResponse response =JobClient.submitJobDetached(jobGraph, client); SubmissionResponse response =JobClient.submitJobDetached(jobGraph, client, timeout);
if(response instanceof SubmissionFailure){ if(response instanceof SubmissionFailure){
SubmissionFailure failure = (SubmissionFailure) response; SubmissionFailure failure = (SubmissionFailure) response;
......
...@@ -23,6 +23,7 @@ import java.io.IOException; ...@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
...@@ -38,6 +39,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; ...@@ -38,6 +39,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$; import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
import scala.concurrent.duration.FiniteDuration;
public class JobsInfoServlet extends HttpServlet { public class JobsInfoServlet extends HttpServlet {
...@@ -51,11 +53,15 @@ public class JobsInfoServlet extends HttpServlet { ...@@ -51,11 +53,15 @@ public class JobsInfoServlet extends HttpServlet {
private final Configuration config; private final Configuration config;
private final ActorSystem system; private final ActorSystem system;
private final FiniteDuration timeout;
public JobsInfoServlet(Configuration flinkConfig) { public JobsInfoServlet(Configuration flinkConfig) {
this.config = flinkConfig; this.config = flinkConfig;
system = ActorSystem.create("JobsInfoServletActorSystem", system = ActorSystem.create("JobsInfoServletActorSystem",
AkkaUtils.getDefaultActorSystemConfig()); AkkaUtils.getDefaultActorSystemConfig());
this.timeout = new FiniteDuration(flinkConfig.getInteger(ConfigConstants
.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
} }
@Override @Override
...@@ -67,10 +73,11 @@ public class JobsInfoServlet extends HttpServlet { ...@@ -67,10 +73,11 @@ public class JobsInfoServlet extends HttpServlet {
int jmPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, int jmPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
ActorRef jm = JobManager.getJobManager(new InetSocketAddress(jmHost, jmPort), system); ActorRef jm = JobManager.getJobManager(new InetSocketAddress(jmHost, jmPort), system,
timeout);
Iterator<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jm, Iterator<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jm,
RequestRunningJobs$.MODULE$).asJavaIterable().iterator(); RequestRunningJobs$.MODULE$, timeout).asJavaIterable().iterator();
resp.setStatus(HttpServletResponse.SC_OK); resp.setStatus(HttpServletResponse.SC_OK);
......
...@@ -95,7 +95,7 @@ public class WebInterfaceServer { ...@@ -95,7 +95,7 @@ public class WebInterfaceServer {
throw new FileNotFoundException("Cannot start web interface server because the web " + throw new FileNotFoundException("Cannot start web interface server because the web " +
"root dir " + WEB_ROOT_DIR + " is not included in the jar."); "root dir " + WEB_ROOT_DIR + " is not included in the jar.");
} }
String tmpDirPath = config.getString(ConfigConstants.WEB_TMP_DIR_KEY, String tmpDirPath = config.getString(ConfigConstants.WEB_TMP_DIR_KEY,
ConfigConstants.DEFAULT_WEB_TMP_DIR); ConfigConstants.DEFAULT_WEB_TMP_DIR);
...@@ -155,7 +155,8 @@ public class WebInterfaceServer { ...@@ -155,7 +155,8 @@ public class WebInterfaceServer {
ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS); ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContext.setContextPath("/"); servletContext.setContextPath("/");
servletContext.addServlet(new ServletHolder(new PactJobJSONServlet(uploadDir)), "/pactPlan"); servletContext.addServlet(new ServletHolder(new PactJobJSONServlet(uploadDir)), "/pactPlan");
servletContext.addServlet(new ServletHolder(new JobsInfoServlet(nepheleConfig)), "/jobsInfo"); servletContext.addServlet(new ServletHolder(new JobsInfoServlet(nepheleConfig)),
"/jobsInfo");
servletContext.addServlet(new ServletHolder(new PlanDisplayServlet(jobManagerWebPort)), "/showPlan"); servletContext.addServlet(new ServletHolder(new PlanDisplayServlet(jobManagerWebPort)), "/showPlan");
servletContext.addServlet(new ServletHolder(new JobsServlet(uploadDir, tmpDir, "launch.html")), "/jobs"); servletContext.addServlet(new ServletHolder(new JobsServlet(uploadDir, tmpDir, "launch.html")), "/jobs");
servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(nepheleConfig, uploadDir, planDumpDir)), servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(nepheleConfig, uploadDir, planDumpDir)),
......
...@@ -43,8 +43,10 @@ import org.powermock.core.classloader.annotations.PrepareForTest; ...@@ -43,8 +43,10 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox; import org.powermock.reflect.Whitebox;
import scala.Tuple2; import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
...@@ -105,7 +107,7 @@ public class ClientTest { ...@@ -105,7 +107,7 @@ public class ClientTest {
@Test @Test
public void shouldSubmitToJobClient() throws ProgramInvocationException, IOException { public void shouldSubmitToJobClient() throws ProgramInvocationException, IOException {
when(mockJobClient.submitJobDetached(any(JobGraph.class), when(mockJobClient.submitJobDetached(any(JobGraph.class),
any(ActorRef.class))).thenReturn(mockSubmissionSuccess); any(ActorRef.class), any(FiniteDuration.class))).thenReturn(mockSubmissionSuccess);
Client out = new Client(configMock, getClass().getClassLoader()); Client out = new Client(configMock, getClass().getClassLoader());
out.run(program.getPlanWithJars(), -1, false); out.run(program.getPlanWithJars(), -1, false);
...@@ -118,7 +120,7 @@ public class ClientTest { ...@@ -118,7 +120,7 @@ public class ClientTest {
@Test(expected = ProgramInvocationException.class) @Test(expected = ProgramInvocationException.class)
public void shouldThrowException() throws Exception { public void shouldThrowException() throws Exception {
when(mockJobClient.submitJobDetached(any(JobGraph.class), when(mockJobClient.submitJobDetached(any(JobGraph.class),
any(ActorRef.class))).thenReturn(mockSubmissionFailure); any(ActorRef.class), any(FiniteDuration.class))).thenReturn(mockSubmissionFailure);
Client out = new Client(configMock, getClass().getClassLoader()); Client out = new Client(configMock, getClass().getClassLoader());
out.run(program.getPlanWithJars(), -1, false); out.run(program.getPlanWithJars(), -1, false);
......
...@@ -346,6 +346,11 @@ public final class ConfigConstants { ...@@ -346,6 +346,11 @@ public final class ConfigConstants {
* Log level for akka * Log level for akka
*/ */
public static final String AKKA_LOG_LEVEL = "akka.loglevel"; public static final String AKKA_LOG_LEVEL = "akka.loglevel";
/**
* Timeout for all blocking calls
*/
public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
// ----------------------------- Miscellaneous ---------------------------- // ----------------------------- Miscellaneous ----------------------------
...@@ -594,6 +599,8 @@ public final class ConfigConstants { ...@@ -594,6 +599,8 @@ public final class ConfigConstants {
public static String DEFAULT_AKKA_FRAMESIZE = "10485760b"; public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
public static String DEFAULT_AKKA_LOG_LEVEL = "OFF"; public static String DEFAULT_AKKA_LOG_LEVEL = "OFF";
public static int DEFAULT_AKKA_ASK_TIMEOUT = 100;
// ----------------------------- LocalExecution ---------------------------- // ----------------------------- LocalExecution ----------------------------
......
...@@ -80,7 +80,7 @@ case $STARTSTOP in ...@@ -80,7 +80,7 @@ case $STARTSTOP in
rotateLogFile $out rotateLogFile $out
echo Starting job manager echo Starting job manager
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$FLINK_JM_CLASSPATH" org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null & $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$FLINK_JM_CLASSPATH" org.apache.flink.runtime.jobmanager.JobManager --executionMode $EXECUTIONMODE --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null &
echo $! > $pid echo $! > $pid
;; ;;
......
...@@ -57,6 +57,6 @@ if not defined FOUND ( ...@@ -57,6 +57,6 @@ if not defined FOUND (
echo Starting Flink job manager. Webinterface by default on http://localhost:8081/. echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
echo Don't close this batch window. Stop job manager by pressing Ctrl+C. echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster --configDir %FLINK_CONF_DIR% > "%out%" 2>&1 java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% org.apache.flink.runtime.jobmanager.JobManager --executionMode local --configDir %FLINK_CONF_DIR% > "%out%" 2>&1
endlocal endlocal
...@@ -27,10 +27,13 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; ...@@ -27,10 +27,13 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING; import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED; import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import akka.dispatch.OnComplete; import akka.dispatch.OnComplete;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
...@@ -51,6 +54,7 @@ import org.slf4j.Logger; ...@@ -51,6 +54,7 @@ import org.slf4j.Logger;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/** /**
* A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery, * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
...@@ -78,6 +82,9 @@ public class Execution { ...@@ -78,6 +82,9 @@ public class Execution {
private static final Logger LOG = ExecutionGraph.LOG; private static final Logger LOG = ExecutionGraph.LOG;
private static final int NUM_CANCEL_CALL_TRIES = 3; private static final int NUM_CANCEL_CALL_TRIES = 3;
public static FiniteDuration timeout = new FiniteDuration(ConfigConstants
.DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
...@@ -273,7 +280,7 @@ public class Execution { ...@@ -273,7 +280,7 @@ public class Execution {
Instance instance = slot.getInstance(); Instance instance = slot.getInstance();
Future<Object> deployAction = Patterns.ask(instance.getTaskManager(), Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
new TaskManagerMessages.SubmitTask(deployment),AkkaUtils.FUTURE_TIMEOUT()); new TaskManagerMessages.SubmitTask(deployment), new Timeout(timeout));
deployAction.onComplete(new OnComplete<Object>(){ deployAction.onComplete(new OnComplete<Object>(){
...@@ -583,7 +590,7 @@ public class Execution { ...@@ -583,7 +590,7 @@ public class Execution {
Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new
TaskManagerMessages.CancelTask(attemptId), NUM_CANCEL_CALL_TRIES, TaskManagerMessages.CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
AkkaUtils.globalExecutionContext()); AkkaUtils.globalExecutionContext(), timeout);
cancelResult.onComplete(new OnComplete<Object>(){ cancelResult.onComplete(new OnComplete<Object>(){
......
...@@ -103,7 +103,6 @@ public class ExecutionGraph { ...@@ -103,7 +103,6 @@ public class ExecutionGraph {
private final long[] stateTimestamps; private final long[] stateTimestamps;
private final Object progressLock = new Object(); private final Object progressLock = new Object();
private int nextVertexToFinish; private int nextVertexToFinish;
......
...@@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.JobID; ...@@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import scala.concurrent.duration.FiniteDuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -78,14 +79,19 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker ...@@ -78,14 +79,19 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
private final DiscardBufferPool discardBufferPool; private final DiscardBufferPool discardBufferPool;
private final FiniteDuration timeout;
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
public ChannelManager(ActorRef channelLookup, InstanceConnectionInfo connectionInfo, int numNetworkBuffers, public ChannelManager(ActorRef channelLookup, InstanceConnectionInfo connectionInfo, int numNetworkBuffers,
int networkBufferSize, NetworkConnectionManager networkConnectionManager) throws IOException { int networkBufferSize, NetworkConnectionManager networkConnectionManager,
FiniteDuration timeout) throws IOException {
this.channelLookup= channelLookup; this.channelLookup= channelLookup;
this.connectionInfo = connectionInfo; this.connectionInfo = connectionInfo;
this.timeout = timeout;
try { try {
this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize); this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
} catch (Throwable e) { } catch (Throwable e) {
...@@ -378,7 +384,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker ...@@ -378,7 +384,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
try{ try{
lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup, lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID, new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
sourceChannelID)).response(); sourceChannelID), timeout).response();
}catch(IOException ioe) { }catch(IOException ioe) {
throw ioe; throw ioe;
} }
......
...@@ -64,6 +64,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation; ...@@ -64,6 +64,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import scala.concurrent.duration.FiniteDuration;
public class JobmanagerInfoServlet extends HttpServlet { public class JobmanagerInfoServlet extends HttpServlet {
...@@ -75,11 +76,13 @@ public class JobmanagerInfoServlet extends HttpServlet { ...@@ -75,11 +76,13 @@ public class JobmanagerInfoServlet extends HttpServlet {
/** Underlying JobManager */ /** Underlying JobManager */
private final ActorRef jobmanager; private final ActorRef jobmanager;
private final ActorRef archive; private final ActorRef archive;
private final FiniteDuration timeout;
public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive) { public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
this.jobmanager = jobmanager; this.jobmanager = jobmanager;
this.archive = archive; this.archive = archive;
this.timeout = timeout;
} }
...@@ -92,14 +95,15 @@ public class JobmanagerInfoServlet extends HttpServlet { ...@@ -92,14 +95,15 @@ public class JobmanagerInfoServlet extends HttpServlet {
try { try {
if("archive".equals(req.getParameter("get"))) { if("archive".equals(req.getParameter("get"))) {
List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils
.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$).asJavaCollection()); .<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout)
.asJavaCollection());
writeJsonForArchive(resp.getWriter(), archivedJobs); writeJsonForArchive(resp.getWriter(), archivedJobs);
} }
else if("job".equals(req.getParameter("get"))) { else if("job".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job"); String jobId = req.getParameter("job");
JobResponse response = AkkaUtils.ask(archive, JobResponse response = AkkaUtils.ask(archive,
new RequestJob(JobID.fromHexString(jobId))); new RequestJob(JobID.fromHexString(jobId)), timeout);
if(response instanceof JobFound){ if(response instanceof JobFound){
ExecutionGraph archivedJob = ((JobFound)response).executionGraph(); ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
...@@ -113,7 +117,7 @@ public class JobmanagerInfoServlet extends HttpServlet { ...@@ -113,7 +117,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
String groupvertexId = req.getParameter("groupvertex"); String groupvertexId = req.getParameter("groupvertex");
JobResponse response = AkkaUtils.ask(archive, JobResponse response = AkkaUtils.ask(archive,
new RequestJob(JobID.fromHexString(jobId))); new RequestJob(JobID.fromHexString(jobId)), timeout);
if(response instanceof JobFound && groupvertexId != null){ if(response instanceof JobFound && groupvertexId != null){
ExecutionGraph archivedJob = ((JobFound)response).executionGraph(); ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
...@@ -126,9 +130,9 @@ public class JobmanagerInfoServlet extends HttpServlet { ...@@ -126,9 +130,9 @@ public class JobmanagerInfoServlet extends HttpServlet {
} }
else if("taskmanagers".equals(req.getParameter("get"))) { else if("taskmanagers".equals(req.getParameter("get"))) {
int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager, int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager,
RequestNumberRegisteredTaskManager$.MODULE$); RequestNumberRegisteredTaskManager$.MODULE$, timeout);
int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager, int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager,
RequestTotalNumberOfSlots$.MODULE$); RequestTotalNumberOfSlots$.MODULE$, timeout);
resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " + resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
"\"slots\": "+numberOfRegisteredSlots+"}"); "\"slots\": "+numberOfRegisteredSlots+"}");
...@@ -136,7 +140,7 @@ public class JobmanagerInfoServlet extends HttpServlet { ...@@ -136,7 +140,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
else if("cancel".equals(req.getParameter("get"))) { else if("cancel".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job"); String jobId = req.getParameter("job");
AkkaUtils.<CancellationResponse>ask(jobmanager, AkkaUtils.<CancellationResponse>ask(jobmanager,
new CancelJob(JobID.fromHexString(jobId))); new CancelJob(JobID.fromHexString(jobId)), timeout);
} }
else if("updates".equals(req.getParameter("get"))) { else if("updates".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job"); String jobId = req.getParameter("job");
...@@ -146,7 +150,7 @@ public class JobmanagerInfoServlet extends HttpServlet { ...@@ -146,7 +150,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
} }
else{ else{
Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask
(jobmanager, RequestRunningJobs$.MODULE$).asJavaIterable(); (jobmanager, RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
writeJsonForJobs(resp.getWriter(), runningJobs); writeJsonForJobs(resp.getWriter(), runningJobs);
} }
...@@ -324,7 +328,7 @@ public class JobmanagerInfoServlet extends HttpServlet { ...@@ -324,7 +328,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
// write accumulators // write accumulators
AccumulatorResultsResponse response = AkkaUtils.ask(jobmanager, AccumulatorResultsResponse response = AkkaUtils.ask(jobmanager,
new RequestAccumulatorResults(graph.getJobID())); new RequestAccumulatorResults(graph.getJobID()), timeout);
if(response instanceof AccumulatorResultsFound){ if(response instanceof AccumulatorResultsFound){
Map<String, Object> accMap = ((AccumulatorResultsFound)response).asJavaMap(); Map<String, Object> accMap = ((AccumulatorResultsFound)response).asJavaMap();
...@@ -417,7 +421,7 @@ public class JobmanagerInfoServlet extends HttpServlet { ...@@ -417,7 +421,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
try { try {
Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager, Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager,
RequestRunningJobs$.MODULE$).asJavaIterable(); RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
//Serialize job to json //Serialize job to json
wrt.write("{"); wrt.write("{");
...@@ -439,7 +443,7 @@ public class JobmanagerInfoServlet extends HttpServlet { ...@@ -439,7 +443,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
wrt.write("],"); wrt.write("],");
JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId)); JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId), timeout);
if(response instanceof JobFound){ if(response instanceof JobFound){
ExecutionGraph graph = ((JobFound)response).executionGraph(); ExecutionGraph graph = ((JobFound)response).executionGraph();
......
...@@ -46,6 +46,7 @@ import org.codehaus.jettison.json.JSONObject; ...@@ -46,6 +46,7 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
/** /**
* A Servlet that displays the Configuration in the web interface. * A Servlet that displays the Configuration in the web interface.
...@@ -59,13 +60,15 @@ public class SetupInfoServlet extends HttpServlet { ...@@ -59,13 +60,15 @@ public class SetupInfoServlet extends HttpServlet {
private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class); private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class);
private Configuration globalC; final private Configuration globalC;
private ActorRef jobmanager; final private ActorRef jobmanager;
final private FiniteDuration timeout;
public SetupInfoServlet(ActorRef jm) { public SetupInfoServlet(ActorRef jm, FiniteDuration timeout) {
globalC = GlobalConfiguration.getConfiguration(); globalC = GlobalConfiguration.getConfiguration();
this.jobmanager = jm; this.jobmanager = jm;
this.timeout = timeout;
} }
@Override @Override
...@@ -104,7 +107,7 @@ public class SetupInfoServlet extends HttpServlet { ...@@ -104,7 +107,7 @@ public class SetupInfoServlet extends HttpServlet {
private void writeTaskmanagers(HttpServletResponse resp) throws IOException { private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
List<Instance> instances = new ArrayList<Instance>(AkkaUtils.<RegisteredTaskManagers>ask List<Instance> instances = new ArrayList<Instance>(AkkaUtils.<RegisteredTaskManagers>ask
(jobmanager, RequestRegisteredTaskManagers$.MODULE$).asJavaCollection()); (jobmanager, RequestRegisteredTaskManagers$.MODULE$, timeout).asJavaCollection());
Collections.sort(instances, INSTANCE_SORTER); Collections.sort(instances, INSTANCE_SORTER);
......
...@@ -40,6 +40,7 @@ import org.eclipse.jetty.server.Server; ...@@ -40,6 +40,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList; import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import scala.concurrent.duration.FiniteDuration;
/** /**
...@@ -63,6 +64,11 @@ public class WebInfoServer { ...@@ -63,6 +64,11 @@ public class WebInfoServer {
*/ */
private final Server server; private final Server server;
/**
* Timeout for akka requests
*/
private final FiniteDuration timeout;
/** /**
* Port for info server * Port for info server
*/ */
...@@ -78,10 +84,12 @@ public class WebInfoServer { ...@@ -78,10 +84,12 @@ public class WebInfoServer {
* Thrown, if the server setup failed for an I/O related reason. * Thrown, if the server setup failed for an I/O related reason.
*/ */
public WebInfoServer(Configuration config, ActorRef jobmanager, public WebInfoServer(Configuration config, ActorRef jobmanager,
ActorRef archive) throws IOException { ActorRef archive, FiniteDuration timeout) throws IOException {
this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
this.timeout = timeout;
// if no explicit configuration is given, use the global configuration // if no explicit configuration is given, use the global configuration
if (config == null) { if (config == null) {
config = GlobalConfiguration.getConfiguration(); config = GlobalConfiguration.getConfiguration();
...@@ -122,9 +130,10 @@ public class WebInfoServer { ...@@ -122,9 +130,10 @@ public class WebInfoServer {
ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS); ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContext.setContextPath("/"); servletContext.setContextPath("/");
servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager, servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager,
archive)), "/jobsInfo"); archive, timeout)), "/jobsInfo");
servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo"); servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager)), "/setupInfo"); servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager, timeout)),
"/setupInfo");
servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu"); servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");
......
...@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; ...@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskManagerMessages;
import scala.concurrent.duration.FiniteDuration;
public class TaskInputSplitProvider implements InputSplitProvider { public class TaskInputSplitProvider implements InputSplitProvider {
...@@ -34,18 +35,22 @@ public class TaskInputSplitProvider implements InputSplitProvider { ...@@ -34,18 +35,22 @@ public class TaskInputSplitProvider implements InputSplitProvider {
private final JobID jobId; private final JobID jobId;
private final JobVertexID vertexId; private final JobVertexID vertexId;
private final FiniteDuration timeout;
public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId) { public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId,
FiniteDuration timeout) {
this.jobManager = jobManager; this.jobManager = jobManager;
this.jobId = jobId; this.jobId = jobId;
this.vertexId = vertexId; this.vertexId = vertexId;
this.timeout = timeout;
} }
@Override @Override
public InputSplit getNextInputSplit() { public InputSplit getNextInputSplit() {
try { try {
TaskManagerMessages.NextInputSplit nextInputSplit = AkkaUtils.ask(jobManager, TaskManagerMessages.NextInputSplit nextInputSplit = AkkaUtils.ask(jobManager,
new JobManagerMessages.RequestNextInputSplit(jobId, vertexId)); new JobManagerMessages.RequestNextInputSplit(jobId, vertexId), timeout);
return nextInputSplit.inputSplit(); return nextInputSplit.inputSplit();
} }
......
...@@ -30,9 +30,7 @@ import scala.concurrent.{ExecutionContext, Future, Await} ...@@ -30,9 +30,7 @@ import scala.concurrent.{ExecutionContext, Future, Await}
import scala.concurrent.duration._ import scala.concurrent.duration._
object AkkaUtils { object AkkaUtils {
implicit val FUTURE_TIMEOUT: Timeout = 100 minute val DEFAULT_TIMEOUT: FiniteDuration = 1 minute
implicit val AWAIT_DURATION: FiniteDuration = 1 minute
implicit val FUTURE_DURATION: FiniteDuration = 1 minute
val INF_TIMEOUT = 21474835 seconds val INF_TIMEOUT = 21474835 seconds
...@@ -122,34 +120,27 @@ object AkkaUtils { ...@@ -122,34 +120,27 @@ object AkkaUtils {
ConfigFactory.parseString(getDefaultActorSystemConfigString) ConfigFactory.parseString(getDefaultActorSystemConfigString)
} }
def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem): ActorRef = { def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, timeout:
Await.result(system.actorSelection(parent.path / child).resolveOne(), AWAIT_DURATION) FiniteDuration): ActorRef = {
Await.result(system.actorSelection(parent.path / child).resolveOne()(timeout), timeout)
} }
def getReference(path: String)(implicit system: ActorSystem): ActorRef = { def getReference(path: String)(implicit system: ActorSystem, timeout: FiniteDuration): ActorRef
Await.result(system.actorSelection(path).resolveOne(), AWAIT_DURATION) = {
Await.result(system.actorSelection(path).resolveOne()(timeout), timeout)
} }
@throws(classOf[IOException]) @throws(classOf[IOException])
def ask[T](actorSelection: ActorSelection, msg: Any): T = { def ask[T](actorSelection: ActorSelection, msg: Any)(implicit timeout: FiniteDuration): T
ask(actorSelection, msg, FUTURE_TIMEOUT, FUTURE_DURATION) = {
}
@throws(classOf[IOException])
def ask[T](actor: ActorRef, msg: Any): T = {
ask(actor, msg, FUTURE_TIMEOUT, FUTURE_DURATION)
}
@throws(classOf[IOException])
def ask[T](actorSelection: ActorSelection, msg: Any, timeout: Timeout, duration: Duration): T = {
val future = Patterns.ask(actorSelection, msg, timeout) val future = Patterns.ask(actorSelection, msg, timeout)
Await.result(future, duration).asInstanceOf[T] Await.result(future, timeout).asInstanceOf[T]
} }
@throws(classOf[IOException]) @throws(classOf[IOException])
def ask[T](actor: ActorRef, msg: Any, timeout: Timeout, duration: Duration): T = { def ask[T](actor: ActorRef, msg: Any)(implicit timeout: FiniteDuration): T = {
val future = Patterns.ask(actor, msg, timeout) val future = Patterns.ask(actor, msg, timeout)
Await.result(future, duration).asInstanceOf[T] Await.result(future, timeout).asInstanceOf[T]
} }
def askInf[T](actor: ActorRef, msg: Any): T = { def askInf[T](actor: ActorRef, msg: Any): T = {
...@@ -174,8 +165,8 @@ object AkkaUtils { ...@@ -174,8 +165,8 @@ object AkkaUtils {
} }
def retry(target: ActorRef, message: Any, tries: Int)(implicit executionContext: def retry(target: ActorRef, message: Any, tries: Int)(implicit executionContext:
ExecutionContext): Future[Any] = { ExecutionContext, timeout: FiniteDuration): Future[Any] = {
(target ? message) recoverWith{ (target ? message)(timeout) recoverWith{
case t: Throwable => case t: Throwable =>
if(tries > 0){ if(tries > 0){
retry(target, message, tries-1) retry(target, message, tries-1)
......
...@@ -35,13 +35,14 @@ import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, S ...@@ -35,13 +35,14 @@ import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, S
import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration.Duration import scala.concurrent.duration.{FiniteDuration, Duration}
class JobClient(jobManagerURL: String) extends Actor with ActorLogMessages with ActorLogging{ class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends Actor with ActorLogMessages
with ActorLogging{
import context._ import context._
val jobManager = AkkaUtils.getReference(jobManagerURL) val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout)
override def receiveWithLogMessages: Receive = { override def receiveWithLogMessages: Receive = {
case SubmitJobDetached(jobGraph) => case SubmitJobDetached(jobGraph) =>
...@@ -120,15 +121,16 @@ object JobClient{ ...@@ -120,15 +121,16 @@ object JobClient{
} }
def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef): SubmissionResponse = { def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef)(implicit timeout: FiniteDuration):
import AkkaUtils.FUTURE_TIMEOUT SubmissionResponse = {
val response = jobClient ? SubmitJobDetached(jobGraph) val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout)
Await.result(response.mapTo[SubmissionResponse],AkkaUtils.FUTURE_DURATION) Await.result(response.mapTo[SubmissionResponse],timeout)
} }
@throws(classOf[IOException]) @throws(classOf[IOException])
def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef): Unit = { def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef)(implicit timeout:
FiniteDuration): Unit = {
val port = AkkaUtils.ask[Int](jobClient, RequestBlobManagerPort) val port = AkkaUtils.ask[Int](jobClient, RequestBlobManagerPort)
val serverAddress = new InetSocketAddress(hostname, port) val serverAddress = new InetSocketAddress(hostname, port)
......
...@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager ...@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
import java.io.File import java.io.File
import java.net.{InetSocketAddress} import java.net.{InetSocketAddress}
import java.util.concurrent.TimeUnit
import akka.actor._ import akka.actor._
import akka.pattern.Patterns import akka.pattern.Patterns
...@@ -28,10 +29,11 @@ import com.google.common.base.Preconditions ...@@ -28,10 +29,11 @@ import com.google.common.base.Preconditions
import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph} import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph}
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.{JobException, ActorLogMessages} import org.apache.flink.runtime.{JobException, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
...@@ -52,7 +54,10 @@ import scala.concurrent.duration._ ...@@ -52,7 +54,10 @@ import scala.concurrent.duration._
class JobManager(val configuration: Configuration) extends class JobManager(val configuration: Configuration) extends
Actor with ActorLogMessages with ActorLogging with WrapAsScala { Actor with ActorLogMessages with ActorLogging with WrapAsScala {
import context._ import context._
import AkkaUtils.FUTURE_TIMEOUT implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
Execution.timeout = timeout;
log.info("Starting job manager.") log.info("Starting job manager.")
...@@ -329,7 +334,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { ...@@ -329,7 +334,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
case RequestJobStatus(jobID) => { case RequestJobStatus(jobID) => {
currentJobs.get(jobID) match { currentJobs.get(jobID) match {
case Some((executionGraph,_)) => sender() ! CurrentJobStatus(jobID, executionGraph.getState) case Some((executionGraph,_)) => sender() ! CurrentJobStatus(jobID, executionGraph.getState)
case None => archive ? RequestJobStatus(jobID) pipeTo sender() case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo sender()
} }
} }
...@@ -344,7 +349,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { ...@@ -344,7 +349,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
case RequestJob(jobID) => { case RequestJob(jobID) => {
currentJobs.get(jobID) match { currentJobs.get(jobID) match {
case Some((eg, _)) => sender() ! JobFound(jobID, eg) case Some((eg, _)) => sender() ! JobFound(jobID, eg)
case None => archive ? RequestJob(jobID) pipeTo sender() case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender()
} }
} }
...@@ -384,6 +389,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { ...@@ -384,6 +389,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
} }
object JobManager { object JobManager {
import ExecutionMode._
val LOG = LoggerFactory.getLogger(classOf[JobManager]) val LOG = LoggerFactory.getLogger(classOf[JobManager])
val FAILURE_RETURN_CODE = 1 val FAILURE_RETURN_CODE = 1
val JOB_MANAGER_NAME = "jobmanager" val JOB_MANAGER_NAME = "jobmanager"
...@@ -392,19 +398,34 @@ object JobManager { ...@@ -392,19 +398,34 @@ object JobManager {
val PROFILER_NAME = "profiler" val PROFILER_NAME = "profiler"
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val (hostname, port, configuration) = parseArgs(args) val (hostname, port, configuration, executionMode) = parseArgs(args)
val jobManagerSystem = AkkaUtils.createActorSystem(hostname, port, configuration) val jobManagerSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem) startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem)
if(executionMode.equals(LOCAL)){
TaskManager.startActorWithConfiguration(hostname, configuration, true)(jobManagerSystem)
}
jobManagerSystem.awaitTermination() jobManagerSystem.awaitTermination()
println("Shutting down.")
} }
def parseArgs(args: Array[String]): (String, Int, Configuration) = { def parseArgs(args: Array[String]): (String, Int, Configuration, ExecutionMode) = {
val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") { val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") {
head("flink jobmanager") head("flink jobmanager")
opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text ("Specify " + opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text ("Specify " +
"configuration directory.") "configuration directory.")
opt[String]("executionMode") optional() action { (x, c) =>
if(x.equals("local")){
c.copy(executionMode = LOCAL)
}else{
c.copy(executionMode = CLUSTER)
}
} text {
"Specify execution mode of job manager"
}
} }
parser.parse(args, JobManagerCLIConfiguration()) map { parser.parse(args, JobManagerCLIConfiguration()) map {
...@@ -419,7 +440,7 @@ object JobManager { ...@@ -419,7 +440,7 @@ object JobManager {
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
(hostname, port, configuration) (hostname, port, configuration, config.executionMode)
} getOrElse { } getOrElse {
LOG.error("CLI Parsing failed. Usage: " + parser.usage) LOG.error("CLI Parsing failed. Usage: " + parser.usage)
sys.exit(FAILURE_RETURN_CODE) sys.exit(FAILURE_RETURN_CODE)
...@@ -456,19 +477,23 @@ object JobManager { ...@@ -456,19 +477,23 @@ object JobManager {
s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}" s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}"
} }
def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = { def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration):
ActorRef = {
AkkaUtils.getChild(jobManager, PROFILER_NAME) AkkaUtils.getChild(jobManager, PROFILER_NAME)
} }
def getEventCollector(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = { def getEventCollector(jobManager: ActorRef)(implicit system: ActorSystem, timeout:
FiniteDuration): ActorRef = {
AkkaUtils.getChild(jobManager, EVENT_COLLECTOR_NAME) AkkaUtils.getChild(jobManager, EVENT_COLLECTOR_NAME)
} }
def getArchivist(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = { def getArchivist(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration):
ActorRef = {
AkkaUtils.getChild(jobManager, ARCHIVE_NAME) AkkaUtils.getChild(jobManager, ARCHIVE_NAME)
} }
def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem): ActorRef = { def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem, timeout:
FiniteDuration): ActorRef = {
AkkaUtils.getReference(getAkkaURL(address.getHostName + ":" + address.getPort)) AkkaUtils.getReference(getAkkaURL(address.getHostName + ":" + address.getPort))
} }
} }
...@@ -18,5 +18,11 @@ ...@@ -18,5 +18,11 @@
package org.apache.flink.runtime.jobmanager package org.apache.flink.runtime.jobmanager
case class JobManagerCLIConfiguration(configDir: String = null) { object ExecutionMode extends Enumeration{
type ExecutionMode = Value
val LOCAL = Value
val CLUSTER = Value
} }
case class JobManagerCLIConfiguration(configDir: String = null, executionMode: ExecutionMode
.ExecutionMode = ExecutionMode.CLUSTER) {}
...@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobmanager.web.WebInfoServer ...@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobmanager.web.WebInfoServer
trait WithWebServer extends Actor { trait WithWebServer extends Actor {
that: JobManager => that: JobManager =>
val webServer = new WebInfoServer(configuration,self, archive) val webServer = new WebInfoServer(configuration,self, archive, timeout)
webServer.start() webServer.start()
abstract override def postStop(): Unit = { abstract override def postStop(): Unit = {
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.apache.flink.runtime.minicluster package org.apache.flink.runtime.minicluster
import java.util.concurrent.TimeUnit
import akka.pattern.ask import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem} import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.api.common.io.FileOutputFormat import org.apache.flink.api.common.io.FileOutputFormat
...@@ -27,11 +29,15 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere ...@@ -27,11 +29,15 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere
import org.apache.flink.runtime.util.EnvironmentInformation import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Future, Await} import scala.concurrent.{Future, Await}
abstract class FlinkMiniCluster(userConfiguration: Configuration) { abstract class FlinkMiniCluster(userConfiguration: Configuration) {
import FlinkMiniCluster._ import FlinkMiniCluster._
implicit val timeout = FiniteDuration(userConfiguration.getInteger(ConfigConstants
.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
val configuration = generateConfiguration(userConfiguration) val configuration = generateConfiguration(userConfiguration)
val jobManagerActorSystem = startJobManagerActorSystem() val jobManagerActorSystem = startJobManagerActorSystem()
...@@ -92,19 +98,18 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) { ...@@ -92,19 +98,18 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
} }
def awaitTermination(): Unit = { def awaitTermination(): Unit = {
taskManagerActorSystems foreach { _.awaitTermination(AkkaUtils.AWAIT_DURATION)} taskManagerActorSystems foreach { _.awaitTermination()}
jobManagerActorSystem.awaitTermination(AkkaUtils.AWAIT_DURATION) jobManagerActorSystem.awaitTermination()
} }
def waitForTaskManagersToBeRegistered(): Unit = { def waitForTaskManagersToBeRegistered(): Unit = {
implicit val timeout = AkkaUtils.FUTURE_TIMEOUT
implicit val executionContext = AkkaUtils.globalExecutionContext implicit val executionContext = AkkaUtils.globalExecutionContext
val futures = taskManagerActors map { val futures = taskManagerActors map {
_ ? NotifyWhenRegisteredAtJobManager taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout)
} }
Await.ready(Future.sequence(futures), AkkaUtils.AWAIT_DURATION) Await.ready(Future.sequence(futures), timeout)
} }
} }
......
...@@ -118,40 +118,4 @@ FlinkMiniCluster(userConfiguration){ ...@@ -118,40 +118,4 @@ FlinkMiniCluster(userConfiguration){
object LocalFlinkMiniCluster{ object LocalFlinkMiniCluster{
val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster]) val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
val FAILURE_RETURN_CODE = 1
def main(args: Array[String]): Unit = {
val configuration = parseArgs(args)
val cluster = new LocalFlinkMiniCluster(configuration)
cluster.awaitTermination()
}
def parseArgs(args: Array[String]): Configuration = {
val parser = new OptionParser[LocalFlinkMiniClusterConfiguration]("LocalFlinkMiniCluster") {
head("LocalFlinkMiniCluster")
opt[String]("configDir") action { (value, config) => config.copy(configDir = value) } text
{"Specify configuration directory."}
}
parser.parse(args, LocalFlinkMiniClusterConfiguration()) map {
config =>{
GlobalConfiguration.loadConfiguration(config.configDir)
val configuration = GlobalConfiguration.getConfiguration
if(config.configDir != null && new File(config.configDir).isDirectory){
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
}
configuration
}
} getOrElse{
LOG.error("CLI parsing failed. Usage: " + parser.usage)
sys.exit(FAILURE_RETURN_CODE)
}
}
case class LocalFlinkMiniClusterConfiguration(val configDir: String = "")
} }
...@@ -69,8 +69,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka ...@@ -69,8 +69,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala with WrapAsScala { extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala with WrapAsScala {
import context._ import context._
import AkkaUtils.FUTURE_TIMEOUT import taskManagerConfig.{timeout => tmTimeout, _}
import taskManagerConfig._ implicit val timeout = tmTimeout
log.info(s"Starting task manager at ${self.path}.") log.info(s"Starting task manager at ${self.path}.")
...@@ -172,7 +173,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka ...@@ -172,7 +173,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
} else { } else {
log.error("TaskManager could not register at JobManager."); log.error("TaskManager could not register at JobManager.");
throw new RuntimeException("TaskManager could not register at JobManager"); self ! PoisonPill
} }
} }
...@@ -182,6 +183,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka ...@@ -182,6 +183,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
currentJobManager = sender() currentJobManager = sender()
instanceID = id instanceID = id
context.watch(currentJobManager)
log.info(s"TaskManager successfully registered at JobManager ${ log.info(s"TaskManager successfully registered at JobManager ${
currentJobManager.path currentJobManager.path
.toString .toString
...@@ -247,7 +250,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka ...@@ -247,7 +250,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
case None => case None =>
} }
val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID) val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID, timeout)
val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager, val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
ioManager, splitProvider,currentJobManager) ioManager, splitProvider,currentJobManager)
...@@ -356,14 +359,19 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka ...@@ -356,14 +359,19 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
log.error(s"Cannot find task with ID ${executionID} to unregister.") log.error(s"Cannot find task with ID ${executionID} to unregister.")
} }
} }
case Terminated(jobManager) => {
log.info(s"Job manager ${jobManager.path} is no longer reachable. Try to reregister.")
tryJobManagerRegistration()
}
} }
def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID, def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID,
executionState: ExecutionState, executionState: ExecutionState,
optionalError: Throwable): Unit = { optionalError: Throwable): Unit = {
log.info(s"Update execution state to ${executionState}.") log.info(s"Update execution state to ${executionState}.")
val futureResponse = currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState
(jobID, executionID, executionState, optionalError)) (jobID, executionID, executionState, optionalError)))(timeout)
val receiver = this.self val receiver = this.self
...@@ -402,7 +410,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka ...@@ -402,7 +410,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
} }
channelManager = Some(new ChannelManager(currentJobManager, connectionInfo, numBuffers, channelManager = Some(new ChannelManager(currentJobManager, connectionInfo, numBuffers,
bufferSize, connectionManager)) bufferSize, connectionManager, timeout))
} catch { } catch {
case ioe: IOException => case ioe: IOException =>
log.error(ioe, "Failed to instantiate ChannelManager.") log.error(ioe, "Failed to instantiate ChannelManager.")
...@@ -412,7 +420,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka ...@@ -412,7 +420,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
def setupLibraryCacheManager(blobPort: Int): Unit = { def setupLibraryCacheManager(blobPort: Int): Unit = {
if(blobPort > 0){ if(blobPort > 0){
val address = new InetSocketAddress(currentJobManager.path.address.host.get, blobPort) val address = new InetSocketAddress(currentJobManager.path.address.host.getOrElse
("localhost"), blobPort)
libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(address), cleanupInterval) libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(address), cleanupInterval)
}else{ }else{
libraryCacheManager = new FallbackLibraryCacheManager libraryCacheManager = new FallbackLibraryCacheManager
...@@ -598,8 +607,11 @@ object TaskManager { ...@@ -598,8 +607,11 @@ object TaskManager {
.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, .LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize, val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize,
tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval) tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout)
(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration)
} }
......
...@@ -18,7 +18,10 @@ ...@@ -18,7 +18,10 @@
package org.apache.flink.runtime.taskmanager package org.apache.flink.runtime.taskmanager
import scala.concurrent.duration.FiniteDuration
case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int, case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int,
tmpDirPaths: Array[String], cleanupInterval: Long, tmpDirPaths: Array[String], cleanupInterval: Long,
memoryLogggingIntervalMs: Option[Long], memoryLogggingIntervalMs: Option[Long],
profilingInterval: Option[Long]) profilingInterval: Option[Long],
timeout: FiniteDuration)
...@@ -32,6 +32,7 @@ import akka.actor.UntypedActor; ...@@ -32,6 +32,7 @@ import akka.actor.UntypedActor;
import akka.japi.Creator; import akka.japi.Creator;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;
...@@ -73,6 +74,8 @@ public class TaskManagerTest { ...@@ -73,6 +74,8 @@ public class TaskManagerTest {
private static ActorSystem system; private static ActorSystem system;
private static Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
@BeforeClass @BeforeClass
public static void setup(){ public static void setup(){
system = ActorSystem.create("TestActorSystem", TestingUtils.testConfig()); system = ActorSystem.create("TestActorSystem", TestingUtils.testConfig());
...@@ -178,7 +181,7 @@ public class TaskManagerTest { ...@@ -178,7 +181,7 @@ public class TaskManagerTest {
expectMsgEquals(new TaskOperationResult(eid1, true)); expectMsgEquals(new TaskOperationResult(eid1, true));
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
AkkaUtils.FUTURE_TIMEOUT()); timeout);
Await.ready(response, d); Await.ready(response, d);
assertEquals(ExecutionState.CANCELED, t1.getExecutionState()); assertEquals(ExecutionState.CANCELED, t1.getExecutionState());
...@@ -197,7 +200,7 @@ public class TaskManagerTest { ...@@ -197,7 +200,7 @@ public class TaskManagerTest {
expectMsgEquals(new TaskOperationResult(eid2, true)); expectMsgEquals(new TaskOperationResult(eid2, true));
response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2), response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
AkkaUtils.FUTURE_TIMEOUT()); timeout);
Await.ready(response, d); Await.ready(response, d);
assertEquals(ExecutionState.CANCELED, t2.getExecutionState()); assertEquals(ExecutionState.CANCELED, t2.getExecutionState());
...@@ -336,13 +339,13 @@ public class TaskManagerTest { ...@@ -336,13 +339,13 @@ public class TaskManagerTest {
// we get to the check, so we need to guard the check // we get to the check, so we need to guard the check
if (t1 != null) { if (t1 != null) {
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
AkkaUtils.FUTURE_TIMEOUT()); timeout);
Await.ready(response, d); Await.ready(response, d);
} }
if (t2 != null) { if (t2 != null) {
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2), Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
AkkaUtils.FUTURE_TIMEOUT()); timeout);
Await.ready(response, d); Await.ready(response, d);
assertEquals(ExecutionState.FINISHED, t2.getExecutionState()); assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
} }
...@@ -424,7 +427,7 @@ public class TaskManagerTest { ...@@ -424,7 +427,7 @@ public class TaskManagerTest {
if (t2 != null) { if (t2 != null) {
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2), Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
AkkaUtils.FUTURE_TIMEOUT()); timeout);
Await.ready(response, d); Await.ready(response, d);
} }
...@@ -434,7 +437,7 @@ public class TaskManagerTest { ...@@ -434,7 +437,7 @@ public class TaskManagerTest {
expectMsgEquals(new TaskOperationResult(eid1, true)); expectMsgEquals(new TaskOperationResult(eid1, true));
} }
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
AkkaUtils.FUTURE_TIMEOUT()); timeout);
Await.ready(response, d); Await.ready(response, d);
} }
...@@ -538,7 +541,7 @@ public class TaskManagerTest { ...@@ -538,7 +541,7 @@ public class TaskManagerTest {
ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system); ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system);
Future<Object> response = Patterns.ask(taskManager, NotifyWhenRegisteredAtJobManager$.MODULE$, Future<Object> response = Patterns.ask(taskManager, NotifyWhenRegisteredAtJobManager$.MODULE$,
AkkaUtils.FUTURE_TIMEOUT()); timeout);
try { try {
FiniteDuration d = new FiniteDuration(2, TimeUnit.SECONDS); FiniteDuration d = new FiniteDuration(2, TimeUnit.SECONDS);
......
...@@ -37,6 +37,8 @@ import scala.concurrent.duration._ ...@@ -37,6 +37,8 @@ import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner]) @RunWith(classOf[JUnitRunner])
class JobManagerITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with class JobManagerITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with
WordSpecLike with Matchers with BeforeAndAfterAll { WordSpecLike with Matchers with BeforeAndAfterAll {
implicit val timeout = 1 minute
def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
override def afterAll: Unit = { override def afterAll: Unit = {
......
...@@ -23,9 +23,8 @@ import java.util.concurrent.TimeUnit; ...@@ -23,9 +23,8 @@ import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.dispatch.ExecutionContexts;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import com.amazonaws.http.ExecutionContext; import akka.util.Timeout;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
...@@ -116,7 +115,7 @@ public abstract class CancellingTestBase { ...@@ -116,7 +115,7 @@ public abstract class CancellingTestBase {
boolean jobSuccessfullyCancelled = false; boolean jobSuccessfullyCancelled = false;
Future<Object> result = Patterns.ask(client, new JobClientMessages.SubmitJobAndWait Future<Object> result = Patterns.ask(client, new JobClientMessages.SubmitJobAndWait
(jobGraph, false), AkkaUtils.FUTURE_TIMEOUT()); (jobGraph, false), new Timeout(AkkaUtils.DEFAULT_TIMEOUT()));
actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling, actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling,
TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()), TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()),
...@@ -125,7 +124,7 @@ public abstract class CancellingTestBase { ...@@ -125,7 +124,7 @@ public abstract class CancellingTestBase {
throw new IllegalStateException("Job restarted"); throw new IllegalStateException("Job restarted");
try { try {
Await.result(result, AkkaUtils.AWAIT_DURATION()); Await.result(result, AkkaUtils.DEFAULT_TIMEOUT());
} catch (JobExecutionException exception) { } catch (JobExecutionException exception) {
if (!exception.isJobCanceledByUser()) { if (!exception.isJobCanceledByUser()) {
throw new IllegalStateException("Job Failed."); throw new IllegalStateException("Job Failed.");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册