提交 f15ff9b0 编写于 作者: T Till Rohrmann

Removed legacy protocol classes.

上级 a9cc0a84
......@@ -21,10 +21,11 @@ package org.apache.flink.api.avro;
import java.io.File;
import java.net.InetSocketAddress;
import org.apache.flink.runtime.minicluster.NepheleMiniCluster;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
......@@ -38,19 +39,21 @@ public class AvroExternalJarProgramITCase {
@Test
public void testExternalProgram() {
NepheleMiniCluster testMiniCluster = null;
LocalFlinkMiniCluster testMiniCluster = null;
try {
testMiniCluster = new NepheleMiniCluster();
testMiniCluster.setTaskManagerNumSlots(4);
testMiniCluster.start();
testMiniCluster = new LocalFlinkMiniCluster(null);
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
testMiniCluster.start(config);
String jarFile = JAR_FILE;
String testData = getClass().getResource(TEST_DATA_FILE).toString();
PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
Client c = new Client(new InetSocketAddress("localhost", testMiniCluster.getJobManagerRpcPort()), new Configuration(), program.getUserCodeClassLoader());
Client c = new Client(new InetSocketAddress("localhost", testMiniCluster.getJobManagerRPCPort()),
new Configuration(), program.getUserCodeClassLoader());
c.run(program, 4, true);
}
catch (Throwable t) {
......
......@@ -19,11 +19,12 @@ package org.apache.flink.streaming.util;
import java.net.InetSocketAddress;
import org.apache.flink.runtime.minicluster.NepheleMiniCluster;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,7 +34,7 @@ public class ClusterUtil {
public static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
/**
* Executes the given JobGraph locally, on a NepheleMiniCluster
* Executes the given JobGraph locally, on a FlinkMiniCluster
*
* @param jobGraph
* jobGraph
......@@ -47,19 +48,19 @@ public class ClusterUtil {
Configuration configuration = jobGraph.getJobConfiguration();
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.setMemorySize(memorySize);
exec.setNumTaskManager(1);
exec.setTaskManagerNumSlots(degreeOfPrallelism);
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(null);
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_SLOTS, degreeOfParallelism);
if (LOG.isInfoEnabled()) {
LOG.info("Running on mini cluster");
}
try {
exec.start();
exec.start(configuration);
Client client = new Client(new InetSocketAddress("localhost",
exec.getJobManagerRpcPort()), configuration, ClusterUtil.class.getClassLoader());
Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRPCPort()),
configuration, ClusterUtil.class.getClassLoader());
client.run(jobGraph, true);
} catch (ProgramInvocationException e) {
if (e.getMessage().contains("GraphConversionException")) {
......
......@@ -57,7 +57,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import org.apache.flink.runtime.iterative.io.FakeOutputTask;
import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
......
......@@ -19,7 +19,7 @@
package org.apache.flink.api.common.operators;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
* A class for holding information about an operator, such as input/output TypeInformation.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.protocols;
/**
* Abstract base class for all protocols which use Nephele's IPC subsystem.
*/
public interface VersionedProtocol {
}
......@@ -24,8 +24,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.protocols.VersionedProtocol;
/**
* Utility class which provides various methods for dynamic class loading.
......@@ -37,49 +35,6 @@ public final class ClassUtils {
*/
private ClassUtils() {}
/**
* Searches for a protocol class by its name and attempts to load it.
*
* @param className
* the name of the protocol class
* @return an instance of the protocol class
* @throws ClassNotFoundException
* thrown if no class with such a name can be found
*/
public static Class<? extends VersionedProtocol> getProtocolByName(final String className)
throws ClassNotFoundException {
if (!className.contains("Protocol")) {
System.out.println(className);
throw new ClassNotFoundException("Only use this method for protocols!");
}
return (Class<? extends VersionedProtocol>) Class.forName(className, true, getClassLoader()).asSubclass(VersionedProtocol.class);
}
/**
* Searches for a record class by its name and attempts to load it.
*
* @param className
* the name of the record class
* @return an instance of the record class
* @throws ClassNotFoundException
* thrown if no class with such a name can be found
*/
@SuppressWarnings("unchecked")
public static Class<? extends IOReadableWritable> getRecordByName(final String className)
throws ClassNotFoundException {
//
// Class<?> clazz = Class.forName(className, true, getClassLoader());
// if (IOReadableWritable.class.isAssignableFrom(clazz)) {
// return clazz.asSubclass(IOReadableWritable.class);
// } else {
// return (Class<? extends IOReadableWritable>) clazz;
// }
//
return (Class<? extends IOReadableWritable>) Class.forName(className, true, getClassLoader());
}
/**
* Searches for a file system class by its name and attempts to load it.
*
......
......@@ -283,6 +283,15 @@ public class JobClient {
}
}
public int getRecommendedPollingInterval(){
try {
return AkkaUtils.<Integer>ask(jobManager, JobManagerMessages.RequestPollingInterval$.MODULE$);
}catch(IOException ioe){
throw new RuntimeException("Could not request recommended polling interval from job " +
"manager.", ioe);
}
}
/**
* Writes the given error message to the log and throws it in an {@link IOException}.
*
......
......@@ -33,7 +33,6 @@ import akka.actor.ActorRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.util.EnvironmentInformation;
......@@ -74,7 +73,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
// writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), JobVertexID.fromHexString(groupvertexId));
// }
// else if("taskmanagers".equals(req.getParameter("get"))) {
resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getTotalNumberOfRegisteredSlots()+"}");
// resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getTotalNumberOfRegisteredSlots()+"}");
// }
// else if("cancel".equals(req.getParameter("get"))) {
// String jobId = req.getParameter("job");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.protocols;
import java.io.IOException;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.jobgraph.JobID;
/**
* The accumulator protocol is implemented by the job manager. TaskManagers can
* use it to send the collected accumulators and JobClients can use it to get
* the final accumulator results after the job ended.
*/
public interface AccumulatorProtocol extends VersionedProtocol {
/**
* Report accumulators that were collected in a task. Called by Task
* Manager, after the user code was executed but before the task status
* update is reported.
*/
void reportAccumulatorResult(AccumulatorEvent accumulatorEvent)
throws IOException;
/**
* Get the final accumulator results. Called by JobClient after the job
* ended.
*/
AccumulatorEvent getAccumulatorResults(JobID jobID) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.protocols;
import java.io.IOException;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.jobgraph.JobID;
/**
* The channel lookup protocol can be used to resolve the ID of an output channel to all recipients which shall receive
* the data of this channel.
*
*/
public interface ChannelLookupProtocol extends VersionedProtocol {
/**
* Retrieves all recipients of a data for the given <code>sourceChannelID</code>.
*
* @param caller
* the {@link InstanceConnectionInfo} object of the task manager which calls this method
* @param jobID
* the ID of the job the channel ID belongs to
* @param sourceChannelID
* the ID of the channel to resolve
* @return the lookup response containing the connection info and a return code
* @throws IOException
* thrown if an error occurs during the IPC call
*/
ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID,
ChannelID sourceChannelID) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.protocols;
import java.io.IOException;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
/**
* The input split provider protocol is used to facilitate RPC calls related to the lazy split assignment.
*/
public interface InputSplitProviderProtocol extends VersionedProtocol {
InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertex, ExecutionAttemptID executionAttempt) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.protocols;
import java.io.IOException;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
/**
* The job manager protocol is implemented by the job manager and offers functionality
* to task managers which allows them to register themselves, send heart beat messages
* or to report the results of a task execution.
*/
public interface JobManagerProtocol extends ServiceDiscoveryProtocol {
/**
* Sends a heart beat to the job manager.
*
* @param taskManagerId The ID identifying the task manager.
* @throws IOException Thrown if an error occurs during this remote procedure call.
*/
boolean sendHeartbeat(InstanceID taskManagerId) throws IOException;
/**
* Registers a task manager at the JobManager.
*
* @param instanceConnectionInfo the information the job manager requires to connect to the instance's task manager
* @param hardwareDescription a hardware description with details on the instance's compute resources.
* @param numberOfSlots The number of task slots that the TaskManager provides.
*
* @return The ID under which the TaskManager is registered. Null, if the JobManager does not register the TaskManager.
*/
InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) throws IOException;
/**
* Reports an update of a task's execution state to the job manager. This method returns true, if the state was
* correctly registered. It it returns false, the calling task manager should cancel its execution of the task.
*
* @param taskExecutionState The new task execution state.
* @return True if everything is all right, false if the caller should cancel the task execution.
*
* @throws IOException Thrown, if an error occurs during this remote procedure call
*/
boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.protocols;
import java.io.IOException;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
/**
* The task submission protocol is implemented by the task manager and allows the job manager
* to submit and cancel tasks, as well as to query the task manager for cached libraries and submit
* these if necessary.
*/
public interface TaskOperationProtocol extends VersionedProtocol {
TaskOperationResult submitTask(TaskDeploymentDescriptor task) throws IOException;
TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException;
void killTaskManager() throws IOException;
}
......@@ -44,34 +44,34 @@ object AkkaUtils {
actorSystem
}
def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem): ActorRef = {
Await.result(system.actorSelection(parent.path / child).resolveOne(), AWAIT_DURATION)
}
def getConfigString(host: String, port: Int, configuration: Configuration): String = {
val transportHeartbeatInterval = configuration.getString(ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_INTERVAL,
val transportHeartbeatInterval = configuration.getString(ConfigConstants.
AKKA_TRANSPORT_HEARTBEAT_INTERVAL,
ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL)
val transportHeartbeatPause = configuration.getString(ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_PAUSE,
val transportHeartbeatPause = configuration.getString(ConfigConstants.
AKKA_TRANSPORT_HEARTBEAT_PAUSE,
ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE)
val transportThreshold = configuration.getDouble(ConfigConstants.AKKA_TRANSPORT_THRESHOLD,
ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD)
val watchHeartbeatInterval = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL,
ConfigConstants.DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL)
val watchHeartbeatInterval = configuration.getString(ConfigConstants
.AKKA_WATCH_HEARTBEAT_INTERVAL, ConfigConstants.DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL)
val watchHeartbeatPause = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
ConfigConstants.DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE)
val watchThreshold = configuration.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD,
ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD)
val akkaTCPTimeout = configuration.getString(ConfigConstants.AKKA_TCP_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_TCP_TIMEOUT)
val akkaFramesize = configuration.getString(ConfigConstants.AKKA_FRAMESIZE, ConfigConstants.DEFAULT_AKKA_FRAMESIZE)
val akkaFramesize = configuration.getString(ConfigConstants.AKKA_FRAMESIZE,
ConfigConstants.DEFAULT_AKKA_FRAMESIZE)
val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT,
ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT)
val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
val logLifecycleEvents = if(lifecycleEvents) "on" else "off"
val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
val configString = s"""akka.remote.transport-failure-detector.heartbeat-interval = $transportHeartbeatInterval
val configString = s"""akka.remote.transport-failure-detector.heartbeat-interval =
$transportHeartbeatInterval
|akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $transportHeartbeatPause
|akka.remote.transport-failure-detector.threshold = $transportThreshold
|akka.remote.watch-failure-detector.heartbeat-interval = $watchHeartbeatInterval
......@@ -113,12 +113,16 @@ object AkkaUtils {
""".stripMargin
}
def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem): ActorRef = {
Await.result(system.actorSelection(parent.path / child).resolveOne(), AWAIT_DURATION)
}
def getReference(path: String): ActorRef = {
Await.result(defaultActorSystem.actorSelection(path).resolveOne(), AWAIT_DURATION)
}
@throws(classOf[IOException])
def ask[T](actorSelection: ActorSelection, msg: Any): T ={
def ask[T](actorSelection: ActorSelection, msg: Any): T = {
val future = Patterns.ask(actorSelection, msg, FUTURE_TIMEOUT)
Await.result(future, AWAIT_DURATION).asInstanceOf[T]
}
......
......@@ -26,9 +26,11 @@ import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.event.job._
import org.apache.flink.runtime.executiongraph._
import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
import org.apache.flink.runtime.messages.ArchiveMessages.{ArchiveExecutionGraph, ArchiveJobEvent, ArchiveEvent}
import org.apache.flink.runtime.messages.ArchiveMessages.{ArchiveExecutionGraph, ArchiveJobEvent,
ArchiveEvent}
import org.apache.flink.runtime.messages.EventCollectorMessages._
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusFound, JobNotFound, JobStatusChanged, ExecutionStateChanged}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusFound, JobNotFound,
JobStatusChanged, ExecutionStateChanged}
import org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus
import org.apache.flink.runtime.messages.JobResult
import org.apache.flink.runtime.messages.JobResult.JobProgressResult
......@@ -36,7 +38,9 @@ import scala.collection.convert.{WrapAsScala}
import scala.concurrent.Future
import scala.concurrent.duration._
class EventCollector(val timerTaskInterval: Int) extends Actor with ActorLogMessages with ActorLogging with WrapAsScala {
class EventCollector(val timerTaskInterval: Int) extends Actor with ActorLogMessages with
ActorLogging with WrapAsScala {
import context.dispatcher
import AkkaUtils.FUTURE_TIMEOUT
......@@ -50,11 +54,11 @@ class EventCollector(val timerTaskInterval: Int) extends Actor with ActorLogMess
val jobInformation = collection.mutable.HashMap[JobID, (String, Boolean, Long)]()
override def preStart():Unit = {
override def preStart(): Unit = {
startArchiveExpiredEvent()
}
override def postStop(): Unit ={
override def postStop(): Unit = {
collectedEvents.clear()
recentJobs.clear()
recentExecutionGraphs.clear()
......@@ -62,52 +66,48 @@ class EventCollector(val timerTaskInterval: Int) extends Actor with ActorLogMess
jobInformation.clear()
}
def startArchiveExpiredEvent():Unit = {
val schedulerDuration = FiniteDuration(2*timerTaskInterval, SECONDS)
context.system.scheduler.schedule(schedulerDuration, schedulerDuration, self, ArchiveExpiredEvents)
def startArchiveExpiredEvent(): Unit = {
val schedulerDuration = FiniteDuration(2 * timerTaskInterval, SECONDS)
context.system.scheduler.schedule(schedulerDuration, schedulerDuration, self,
ArchiveExpiredEvents)
}
override def receiveWithLogMessages: Receive = {
case ArchiveExpiredEvents => {
val currentTime = System.currentTimeMillis()
collectedEvents.retain {
(jobID, events) =>
val (outdatedElements, currentElements) = events.partition {
event => event.getTimestamp + timerTaskInterval < currentTime
}
collectedEvents.retain { (jobID, events) =>
val (outdatedElements, currentElements) = events.partition { event => event.getTimestamp
+timerTaskInterval < currentTime
}
outdatedElements foreach (archiveEvent(jobID, _))
currentElements.nonEmpty
outdatedElements foreach (archiveEvent(jobID, _))
currentElements.nonEmpty
}
recentJobs.retain {
(jobID, recentJobEvent) =>
import JobStatus._
val status = recentJobEvent.getJobStatus
// only remove jobs which have stopped running
if ((status == FINISHED || status == CANCELED || status != FAILED) &&
recentJobEvent.getTimestamp + timerTaskInterval < currentTime) {
archiveJobEvent(jobID, recentJobEvent)
archiveExecutionGraph(jobID, recentExecutionGraphs.remove(jobID).get)
jobInformation.remove(jobID)
false
} else {
true
}
recentJobs.retain { (jobID, recentJobEvent) =>
import JobStatus._
val status = recentJobEvent.getJobStatus
// only remove jobs which have stopped running
if ((status == FINISHED || status == CANCELED || status != FAILED) &&
recentJobEvent.getTimestamp + timerTaskInterval < currentTime) {
archiveJobEvent(jobID, recentJobEvent)
archiveExecutionGraph(jobID, recentExecutionGraphs.remove(jobID).get)
jobInformation.remove(jobID)
false
} else {
true
}
}
}
case RequestJobProgress(jobID) => {
sender() ! JobProgressResult(JobResult.SUCCESS, null, collectedEvents.getOrElse(jobID, List()))
sender() ! JobProgressResult(JobResult.SUCCESS, null, collectedEvents.getOrElse(jobID,
List()))
}
case RequestRecentJobEvents => {
sender() ! RecentJobs(recentJobs.values.toList)
}
case RegisterJob(executionGraph, profilingAvailable, submissionTimestamp) => {
val jobID = executionGraph.getJobID
......@@ -115,8 +115,8 @@ class EventCollector(val timerTaskInterval: Int) extends Actor with ActorLogMess
executionGraph.registerJobStatusListener(self)
jobInformation += jobID ->(executionGraph.getJobName, profilingAvailable, submissionTimestamp)
}
case ExecutionStateChanged(jobID, vertexID, subtask, executionID, newExecutionState, optionalMessage) => {
case ExecutionStateChanged(jobID, vertexID, subtask, executionID, newExecutionState,
optionalMessage) => {
val timestamp = System.currentTimeMillis()
recentExecutionGraphs.get(jobID) match {
......@@ -125,19 +125,18 @@ class EventCollector(val timerTaskInterval: Int) extends Actor with ActorLogMess
val taskName = if (vertex != null) vertex.getJobVertex.getName else "(null)"
val totalNumberOfSubtasks = if (vertex != null) vertex.getParallelism else -1
val vertexEvent = new VertexEvent(timestamp, vertexID, taskName, totalNumberOfSubtasks, subtask, executionID,
newExecutionState, optionalMessage)
val vertexEvent = new VertexEvent(timestamp, vertexID, taskName, totalNumberOfSubtasks,
subtask, executionID, newExecutionState, optionalMessage)
val events = collectedEvents.getOrElse(jobID, List())
val executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp, vertexID, subtask,
executionID, newExecutionState)
val executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp, vertexID,
subtask, executionID, newExecutionState)
collectedEvents += jobID -> (executionStateChangeEvent :: vertexEvent :: events)
case None =>
log.warning(s"Could not find execution graph with jobID ${jobID}.")
}
}
case JobStatusChanged(executionGraph, newJobStatus, optionalMessage) => {
val jobID = executionGraph.getJobID()
......@@ -147,36 +146,35 @@ class EventCollector(val timerTaskInterval: Int) extends Actor with ActorLogMess
val currentTime = System.currentTimeMillis()
val (jobName, isProfilingEnabled, submissionTimestamp) = jobInformation(jobID)
recentJobs.put(jobID, new RecentJobEvent(jobID, jobName, newJobStatus, isProfilingEnabled, submissionTimestamp,
currentTime))
recentJobs.put(jobID, new RecentJobEvent(jobID, jobName, newJobStatus, isProfilingEnabled,
submissionTimestamp, currentTime))
val events = collectedEvents.getOrElse(jobID, List())
collectedEvents += jobID -> ((new JobEvent(currentTime, newJobStatus, optionalMessage)) :: events)
collectedEvents += jobID -> ((new JobEvent(currentTime, newJobStatus,
optionalMessage)) :: events)
}
case ProcessProfilingEvent(profilingEvent) => {
val events = collectedEvents.getOrElse(profilingEvent.getJobID, List())
collectedEvents += profilingEvent.getJobID -> (profilingEvent :: events)
}
case RegisterArchiveListener(actorListener) => {
context.watch(actorListener)
archiveListeners += actorListener
}
case Terminated(terminatedListener) => {
archiveListeners -= terminatedListener
}
case RequestJobStatus(jobID) => {
recentJobs.get(jobID) match {
case Some(recentJobEvent) => sender() ! JobStatusFound(jobID, recentJobEvent.getJobStatus)
case None =>
val responses = archiveListeners map { archivist => archivist ? RequestJobStatus(jobID) filter {
case _: JobStatusFound => true
case _ => false
}}
val noResponse = akka.pattern.after(AkkaUtils.FUTURE_DURATION, context.system.scheduler){
val responses = archiveListeners map { archivist => archivist ?
RequestJobStatus(jobID) filter {
case _: JobStatusFound => true
case _ => false
}
}
val noResponse = akka.pattern.after(AkkaUtils.FUTURE_DURATION, context.system.scheduler) {
Future.successful(JobNotFound(jobID))
}
......@@ -186,19 +184,19 @@ class EventCollector(val timerTaskInterval: Int) extends Actor with ActorLogMess
}
private def archiveEvent(jobID: JobID, event: AbstractEvent): Unit = {
for(listener <- archiveListeners){
for (listener <- archiveListeners) {
listener ! ArchiveEvent(jobID, event)
}
}
private def archiveJobEvent(jobID: JobID, event: RecentJobEvent): Unit = {
for(listener <- archiveListeners){
for (listener <- archiveListeners) {
listener ! ArchiveJobEvent(jobID, event)
}
}
private def archiveExecutionGraph(jobID: JobID, graph: ExecutionGraph): Unit = {
for(listener <- archiveListeners){
for (listener <- archiveListeners) {
listener ! ArchiveExecutionGraph(jobID, graph)
}
}
......
......@@ -27,7 +27,8 @@ import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Con
import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.runtime.accumulators.AccumulatorEvent
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusFound, JobStatusResponse, JobStatusChanged}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusFound,
JobStatusChanged}
import org.apache.flink.runtime.messages.JobResult
import org.apache.flink.runtime.messages.JobResult.{JobCancelResult, JobSubmissionResult}
import org.apache.flink.runtime.{JobException, ActorLogMessages}
......@@ -49,12 +50,15 @@ import org.slf4j.LoggerFactory
import scala.collection.convert.WrapAsScala
import scala.concurrent.Future
class JobManager(val archiveCount: Int, val profiling: Boolean, val recommendedPollingInterval: Int) extends Actor with
ActorLogMessages with ActorLogging with WrapAsScala {
class JobManager(val archiveCount: Int, val profiling: Boolean, val recommendedPollingInterval:
Int) extends Actor with ActorLogMessages with ActorLogging with WrapAsScala {
import context._
def profilerProps: Props = Props(classOf[JobManagerProfiler])
def archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
def eventCollectorProps: Props = Props(classOf[EventCollector], recommendedPollingInterval)
val profiler = profiling match {
......@@ -87,8 +91,8 @@ ActorLogMessages with ActorLogging with WrapAsScala {
override def receiveWithLogMessages: Receive = {
case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => {
val taskManager = sender()
val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo, hardwareInformation,
numberOfSlots)
val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
hardwareInformation, numberOfSlots)
context.watch(taskManager);
taskManager ! AcknowledgeRegistration(instanceID)
}
......@@ -111,7 +115,8 @@ ActorLogMessages with ActorLogging with WrapAsScala {
log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}}).")
val executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID(), new ExecutionGraph(jobGraph.getJobID,
val executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID(),
new ExecutionGraph(jobGraph.getJobID,
jobGraph.getName, jobGraph.getJobConfiguration))
val userCodeLoader = LibraryCacheManager.getClassLoader(jobGraph.getJobID)
......@@ -126,13 +131,15 @@ ActorLogMessages with ActorLogging with WrapAsScala {
executionGraph.addUserCodeJarFile(_)
}
log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph.getName}).")
log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph
.getName}).")
try {
for (vertex <- jobGraph.getVertices) {
val executableClass = vertex.getInvokableClassName
if (executableClass == null || executableClass.length == 0) {
throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.")
throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " +
s"invokable class.")
}
vertex.initializeOnMaster(userCodeLoader)
......@@ -146,11 +153,13 @@ ActorLogMessages with ActorLogging with WrapAsScala {
// topological sorting of the job vertices
val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources
log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph.getJobID} (${jobGraph.getName}).")
log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph
.getJobID} (${jobGraph.getName}).")
executionGraph.attachJobGraph(sortedTopology)
log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} (${jobGraph.getName}).")
log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " +
s"(${jobGraph.getName}).")
eventCollector ! RegisterJob(executionGraph, false, System.currentTimeMillis())
......@@ -163,17 +172,17 @@ ActorLogMessages with ActorLogging with WrapAsScala {
success = true
JobSubmissionResult(JobResult.SUCCESS, null)
}
}catch{
} catch {
case t: Throwable =>
log.error(t, "Job submission failed.")
JobSubmissionResult(JobResult.ERROR, StringUtils.stringifyException(t))
}finally{
if(!success){
} finally {
if (!success) {
this.currentJobs.remove(jobGraph.getJobID)
try{
try {
LibraryCacheManager.unregister(jobGraph.getJobID)
}catch{
} catch {
case e: IllegalStateException =>
case t: Throwable =>
log.error(t, "Error while de-registering job at library cache manager.")
......@@ -187,7 +196,7 @@ ActorLogMessages with ActorLogging with WrapAsScala {
currentJobs.get(jobID) match {
case Some(executionGraph) =>
Future{
Future {
executionGraph.cancel()
}
JobCancelResult(JobResult.SUCCESS, null)
......@@ -202,7 +211,8 @@ ActorLogMessages with ActorLogging with WrapAsScala {
currentJobs.get(taskExecutionState.getJobID) match {
case Some(executionGraph) => sender() ! executionGraph.updateState(taskExecutionState)
case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState.getJobID} to change state to" +
case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
.getJobID} to change state to" +
s" ${taskExecutionState.getExecutionState}.")
sender() ! false
}
......@@ -211,7 +221,7 @@ ActorLogMessages with ActorLogging with WrapAsScala {
case RequestNextInputSplit(jobID, vertexID) => {
val nextInputSplit = currentJobs.get(jobID) match {
case Some(executionGraph) => executionGraph.getJobVertex(vertexID) match {
case vertex: ExecutionJobVertex => vertex.getSplitAssigner match{
case vertex: ExecutionJobVertex => vertex.getSplitAssigner match {
case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(null)
case _ =>
log.error(s"No InputSplitAssigner for vertex ID ${vertexID}.")
......@@ -232,9 +242,10 @@ ActorLogMessages with ActorLogging with WrapAsScala {
case JobStatusChanged(executionGraph, newJobStatus, optionalMessage) => {
val jobID = executionGraph.getJobID
log.info(s"Status of job ${jobID} (${executionGraph.getJobName}) changed to ${newJobStatus}${optionalMessage}.")
log.info(s"Status of job ${jobID} (${executionGraph.getJobName}) changed to " +
s"${newJobStatus}${optionalMessage}.")
if(Set(JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED) contains newJobStatus){
if (Set(JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED) contains newJobStatus) {
// send final job status to job termination listeners
jobTerminationListener.get(jobID) foreach {
listeners =>
......@@ -244,20 +255,20 @@ ActorLogMessages with ActorLogging with WrapAsScala {
}
currentJobs.remove(jobID)
try{
try {
LibraryCacheManager.unregister(jobID)
}catch{
} catch {
case t: Throwable =>
log.error(t, s"Could not properly unregister job ${jobID} form the library cache.")
}
}
}
case LookupConnectionInformation(connectionInformation, jobID, sourceChannelID) =>{
case LookupConnectionInformation(connectionInformation, jobID, sourceChannelID) => {
currentJobs.get(jobID) match {
case Some(executionGraph) =>
sender() ! ConnectionInformation(executionGraph.lookupConnectionInfoAndDeployReceivers(connectionInformation,
sourceChannelID))
sender() ! ConnectionInformation(executionGraph.lookupConnectionInfoAndDeployReceivers
(connectionInformation, sourceChannelID))
case None =>
log.error(s"Cannot find execution graph for job ID ${jobID}.")
sender() ! ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound())
......@@ -265,7 +276,8 @@ ActorLogMessages with ActorLogging with WrapAsScala {
}
case ReportAccumulatorResult(accumulatorEvent) => {
accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID, accumulatorEvent.getAccumulators
accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID,
accumulatorEvent.getAccumulators
(LibraryCacheManager.getClassLoader(accumulatorEvent.getJobID)))
}
......@@ -283,10 +295,10 @@ ActorLogMessages with ActorLogging with WrapAsScala {
}
case RequestJobStatusWhenTerminated(jobID) => {
if(currentJobs.contains(jobID)){
if (currentJobs.contains(jobID)) {
val listeners = jobTerminationListener.getOrElse(jobID, Set())
jobTerminationListener += jobID -> (listeners + sender())
}else{
} else {
eventCollector.tell(RequestJobStatus(jobID), sender())
}
}
......@@ -302,7 +314,7 @@ ActorLogMessages with ActorLogging with WrapAsScala {
eventCollector.tell(RequestRecentJobEvents, sender())
}
case msg:RequestJobProgress => {
case msg: RequestJobProgress => {
eventCollector forward msg
}
......@@ -318,7 +330,7 @@ ActorLogMessages with ActorLogging with WrapAsScala {
}
}
object JobManager{
object JobManager {
val LOG = LoggerFactory.getLogger(classOf[JobManager])
val FAILURE_RETURN_CODE = 1
val JOB_MANAGER_NAME = "jobmanager"
......@@ -326,26 +338,26 @@ object JobManager{
val ARCHIVE_NAME = "archive"
val PROFILER_NAME = "profiler"
def main(args: Array[String]):Unit = {
def main(args: Array[String]): Unit = {
val (hostname, port, configuration) = initialize(args)
val (jobManagerSystem,_) = startActorSystemAndActor(hostname, port, configuration)
val (jobManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration)
jobManagerSystem.awaitTermination()
}
def initialize(args: Array[String]):(String, Int, Configuration) = {
val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager"){
def initialize(args: Array[String]): (String, Int, Configuration) = {
val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") {
head("flink jobmanager")
opt[String]("configDir") action { (x, c) =>
c.copy(configDir = x)
} text("Specify configuration directory.")
} text ("Specify configuration directory.")
}
parser.parse(args, JobManagerCLIConfiguration()) map {
config =>
GlobalConfiguration.loadConfiguration(config.configDir)
val configuration = GlobalConfiguration.getConfiguration()
if(config.configDir != null && new File(config.configDir).isDirectory){
if (config.configDir != null && new File(config.configDir).isDirectory) {
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
}
......@@ -360,7 +372,8 @@ object JobManager{
}
}
def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration): (ActorSystem, ActorRef) = {
def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration):
(ActorSystem, ActorRef) = {
implicit val actorSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
(actorSystem, (startActor _).tupled(parseConfiguration(configuration)))
}
......@@ -369,19 +382,23 @@ object JobManager{
val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
val profilingEnabled = configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)
val recommendedPollingInterval = configuration.getInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY,
val recommendedPollingInterval = configuration.getInteger(ConfigConstants
.JOBCLIENT_POLLING_INTERVAL_KEY,
ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL)
(archiveCount, profilingEnabled, recommendedPollingInterval)
}
def startActorWithConfiguration(configuration: Configuration)(implicit actorSystem: ActorSystem) = {
def startActorWithConfiguration(configuration: Configuration)(implicit actorSystem:
ActorSystem) = {
(startActor _).tupled(parseConfiguration(configuration))
}
def startActor(archiveCount: Int, profilingEnabled: Boolean, recommendedPollingInterval: Int)(implicit actorSystem:
def startActor(archiveCount: Int, profilingEnabled: Boolean, recommendedPollingInterval: Int)
(implicit actorSystem:
ActorSystem): ActorRef = {
actorSystem.actorOf(Props(classOf[JobManager], archiveCount, profilingEnabled, recommendedPollingInterval),
actorSystem.actorOf(Props(classOf[JobManager], archiveCount, profilingEnabled,
recommendedPollingInterval),
JOB_MANAGER_NAME)
}
......
......@@ -30,8 +30,8 @@ import org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus
import scala.collection.convert.DecorateAsJava
import scala.collection.mutable.ListBuffer
class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with ActorLogging with
DecorateAsJava {
class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with
ActorLogging with DecorateAsJava {
/**
* The map which stores all collected events until they are either
* fetched by the client or discarded.
......@@ -44,11 +44,12 @@ DecorateAsJava {
val oldJobs = collection.mutable.HashMap[JobID, RecentJobEvent]()
/**
* Map of execution graphs belonging to recently started jobs with the time stamp of the last received job event.
* Map of execution graphs belonging to recently started jobs with the time stamp of the last
* received job event.
*/
val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]()
val lru = collection.mutable.Queue[JobID]()
override def receiveWithLogMessages: Receive = {
......@@ -79,11 +80,11 @@ DecorateAsJava {
}
def cleanup(jobID: JobID): Unit = {
if(!lru.contains(jobID)){
if (!lru.contains(jobID)) {
lru.enqueue(jobID)
}
while(lru.size > max_entries){
while (lru.size > max_entries) {
val removedJobID = lru.dequeue()
collectedEvents.remove(removedJobID)
oldJobs.remove(removedJobID)
......
......@@ -26,13 +26,18 @@ import org.apache.flink.runtime.profiling.types.ProfilingEvent
import scala.collection.convert.{WrapAsScala, DecorateAsJava}
object EventCollectorMessages extends DecorateAsJava with WrapAsScala{
object EventCollectorMessages extends DecorateAsJava with WrapAsScala {
case class ProcessProfilingEvent(profilingEvent: ProfilingEvent)
case class RegisterArchiveListener(listener: ActorRef)
case class RequestJobProgress(jobID: JobID)
case class RegisterJob(executionGraph: ExecutionGraph, profilingAvailable: Boolean, submissionTimestamp: Long)
case class RecentJobs(jobs: List[RecentJobEvent]){
case class RegisterJob(executionGraph: ExecutionGraph, profilingAvailable: Boolean,
submissionTimestamp: Long)
case class RecentJobs(jobs: List[RecentJobEvent]) {
def this(_jobs: java.util.List[RecentJobEvent]) = {
this(_jobs.toList)
}
......@@ -42,12 +47,14 @@ object EventCollectorMessages extends DecorateAsJava with WrapAsScala{
}
}
case class JobEvents(jobs: List[AbstractEvent]){
case class JobEvents(jobs: List[AbstractEvent]) {
def asJavaList: java.util.List[AbstractEvent] = {
jobs.asJava
}
}
case object ArchiveExpiredEvents
case object RequestRecentJobEvents
}
......@@ -23,13 +23,20 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGra
import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID, JobID}
object ExecutionGraphMessages {
case class ExecutionStateChanged(jobID: JobID, vertexID: JobVertexID, subtask: Int, executionID: ExecutionAttemptID,
case class ExecutionStateChanged(jobID: JobID, vertexID: JobVertexID, subtask: Int,
executionID: ExecutionAttemptID,
newExecutionState: ExecutionState, optionalMessage: String)
case class JobStatusChanged(executionGraph: ExecutionGraph, newJobStatus: JobStatus, optionalMessage: String)
sealed trait JobStatusResponse{
case class JobStatusChanged(executionGraph: ExecutionGraph, newJobStatus: JobStatus,
optionalMessage: String)
sealed trait JobStatusResponse {
def jobID: JobID
};
case class JobStatusFound(jobID: JobID, status: JobStatus) extends JobStatusResponse
case class JobNotFound(jobID: JobID) extends JobStatusResponse
}
......@@ -27,11 +27,15 @@ object JobResult extends Enumeration with DecorateAsJava {
type JobResult = Value
val SUCCESS, ERROR = Value
case class JobProgressResult(returnCode: JobResult, description: String, events: List[AbstractEvent]){
case class JobProgressResult(returnCode: JobResult, description: String,
events: List[AbstractEvent]) {
def asJavaList: java.util.List[AbstractEvent] = {
events.asJava
}
}
case class JobCancelResult(returnCode: JobResult, description: String)
case class JobSubmissionResult(returnCode: JobResult, description: String)
}
......@@ -27,20 +27,36 @@ import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer
import org.apache.flink.runtime.taskmanager.TaskExecutionState
object JobManagerMessages {
case class SubmitJob(jobGraph: JobGraph)
case class CancelJob(jobID: JobID)
case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
case class RequestNextInputSplit(jobID: JobID, vertexID: JobVertexID)
case class LookupConnectionInformation(caller: InstanceConnectionInfo, jobID: JobID, sourceChannelID: ChannelID)
case class LookupConnectionInformation(caller: InstanceConnectionInfo, jobID: JobID,
sourceChannelID: ChannelID)
case class ConnectionInformation(response: ConnectionInfoLookupResponse)
case class ReportAccumulatorResult(accumulatorEvent: AccumulatorEvent)
case class RequestAccumulatorResult(jobID: JobID)
case class RegisterJobStatusListener(jobID: JobID)
case class RequestJobStatusWhenTerminated(jobID: JobID)
case class RequestJobStatus(jobID: JobID)
case object RequestInstances
case object RequestNumberRegisteredTaskManager
case object RequestAvailableSlots
case object RequestPollingInterval
}
......@@ -21,7 +21,11 @@ package org.apache.flink.runtime.messages
import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription}
object RegistrationMessages {
case class RegisterTaskManager(connectionInfo: InstanceConnectionInfo, hardwareDescription: HardwareDescription,
numberOfSlots: Int)
case class RegisterTaskManager(connectionInfo: InstanceConnectionInfo,
hardwareDescription: HardwareDescription,
numberOfSlots: Int)
case class AcknowledgeRegistration(instanceID: InstanceID)
}
......@@ -18,5 +18,5 @@
package org.apache.flink.runtime.taskmanager
case class NetworkConnectionConfiguration(numBuffers: Int, bufferSize: Int, numInThreads: Int, numOutThreads: Int,
lowWaterMark: Int, highWaterMark: Int)
case class NetworkConnectionConfiguration(numBuffers: Int, bufferSize: Int, numInThreads: Int,
numOutThreads: Int, lowWaterMark: Int, highWaterMark: Int)
\ No newline at end of file
......@@ -35,21 +35,23 @@ import org.apache.flink.runtime.profiling.impl.{EnvironmentThreadSet, InstancePr
import scala.concurrent.duration.FiniteDuration
class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) extends Actor with ActorLogMessages with
ActorLogging {
class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) extends Actor with
ActorLogMessages with ActorLogging {
import context.dispatcher
val tmx = ManagementFactory.getThreadMXBean
val instanceProfiler = new InstanceProfiler(instancePath)
val listeners = scala.collection.mutable.Set[ActorRef]()
val environments = scala.collection.mutable.HashMap[ExecutionAttemptID, RuntimeEnvironment]()
val monitoredThreads = scala.collection.mutable.HashMap[RuntimeEnvironment, EnvironmentThreadSet]()
val monitoredThreads = scala.collection.mutable.HashMap[RuntimeEnvironment,
EnvironmentThreadSet]()
var monitoringScheduler: Option[Cancellable] = None
if(tmx.isThreadContentionMonitoringSupported){
if (tmx.isThreadContentionMonitoringSupported) {
tmx.setThreadContentionMonitoringEnabled(true)
}else{
} else {
throw new ProfilingException("The thread contention monitoring is not supported.")
}
......@@ -66,14 +68,14 @@ ActorLogging {
case RegisterProfilingListener => {
listeners += sender()
if(monitoringScheduler.isEmpty){
if (monitoringScheduler.isEmpty) {
startMonitoring
}
}
case UnregisterProfilingListener => {
listeners -= sender()
if(listeners.isEmpty){
if (listeners.isEmpty) {
stopMonitoring
}
}
......@@ -83,15 +85,15 @@ ActorLogging {
val profilingDataContainer = new ProfilingDataContainer()
for((env, set) <- monitoredThreads){
for ((env, set) <- monitoredThreads) {
val threadProfilingData = set.captureCPUUtilization(env.getJobID, tmx, timestamp)
if(threadProfilingData != null){
if (threadProfilingData != null) {
profilingDataContainer.addProfilingData(threadProfilingData)
}
if(monitoredThreads.nonEmpty){
val instanceProfilingData = try{
if (monitoredThreads.nonEmpty) {
val instanceProfilingData = try {
Some(instanceProfiler.generateProfilingData(timestamp))
} catch {
case e: ProfilingException => {
......@@ -100,10 +102,12 @@ ActorLogging {
}
}
instanceProfilingData foreach { profilingDataContainer.addProfilingData(_) }
instanceProfilingData foreach {
profilingDataContainer.addProfilingData(_)
}
if(!profilingDataContainer.isEmpty){
for(listener <- listeners){
if (!profilingDataContainer.isEmpty) {
for (listener <- listeners) {
listener ! ReportProfilingData(profilingDataContainer)
}
}
......@@ -119,7 +123,8 @@ ActorLogging {
environments.get(executionID) match {
case Some(environment) =>
newExecutionState match {
case RUNNING => registerMainThreadForCPUProfiling(environment, vertexID, subtaskIndex, executionID)
case RUNNING => registerMainThreadForCPUProfiling(environment, vertexID,
subtaskIndex, executionID)
case FINISHED | CANCELING | CANCELED | FAILED =>
unregisterMainThreadFromCPUProfiling(environment)
case _ =>
......@@ -132,26 +137,32 @@ ActorLogging {
def startMonitoring(): Unit = {
val interval = new FiniteDuration(reportInterval, TimeUnit.MILLISECONDS)
val delay = new FiniteDuration((reportInterval* Math.random()).toLong, TimeUnit.MILLISECONDS)
monitoringScheduler = Some(context.system.scheduler.schedule(delay, interval, self, ProfileTasks))
val delay = new FiniteDuration((reportInterval * Math.random()).toLong, TimeUnit.MILLISECONDS)
monitoringScheduler = Some(context.system.scheduler.schedule(delay, interval, self,
ProfileTasks))
}
def stopMonitoring(): Unit = {
monitoringScheduler.foreach { _.cancel() }
monitoringScheduler.foreach {
_.cancel()
}
monitoringScheduler = None
}
def registerMainThreadForCPUProfiling(environment: RuntimeEnvironment, vertexID: JobVertexID, subtask: Int,
def registerMainThreadForCPUProfiling(environment: RuntimeEnvironment, vertexID: JobVertexID,
subtask: Int,
executionID: ExecutionAttemptID): Unit = {
monitoredThreads += environment -> new EnvironmentThreadSet(tmx, environment.getExecutingThread, vertexID,
monitoredThreads += environment -> new EnvironmentThreadSet(tmx,
environment.getExecutingThread, vertexID,
subtask, executionID)
}
def unregisterMainThreadFromCPUProfiling(environment: RuntimeEnvironment): Unit = {
monitoredThreads.remove(environment) match {
case Some(set) =>
if(set.getMainThread != environment.getExecutingThread){
log.error(s"The thread ${environment.getExecutingThread.getName} is not the main thread of this environment.")
if (set.getMainThread != environment.getExecutingThread) {
log.error(s"The thread ${environment.getExecutingThread.getName} is not the main thread" +
s" of this environment.")
}
case None =>
}
......
......@@ -39,7 +39,6 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.TestingUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.AllocatedSlot;
......@@ -51,6 +50,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
......
......@@ -29,13 +29,13 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.TestingUtils;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
......
......@@ -31,7 +31,6 @@ import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.TestingUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
......@@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
......
......@@ -24,7 +24,6 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import akka.actor.ActorRef;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
......@@ -33,7 +32,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.junit.Test;
......
......@@ -30,7 +30,7 @@ import java.net.InetAddress;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.TestingUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.AfterClass;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobgraph;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.ServerSocket;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.jobmanager.JobManager;
public class JobManagerTestUtils {
public static final JobManager startJobManager(int numSlots) throws Exception {
return startJobManager(1, numSlots);
}
public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager) throws Exception {
return startJobManager(numTaskManagers, numSlotsPerTaskManager, null);
}
public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager, Configuration additionalParams) throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
if (additionalParams != null) {
cfg.addAll(additionalParams);
}
GlobalConfiguration.includeConfiguration(cfg);
JobManager jm = new JobManager(ExecutionMode.LOCAL);
// we need to wait until the taskmanager is registered
// max time is 5 seconds
long deadline = System.currentTimeMillis() + 5000;
while (jm.getNumberOfSlotsAvailableToScheduler() < numTaskManagers * numSlotsPerTaskManager &&
System.currentTimeMillis() < deadline)
{
Thread.sleep(10);
}
assertEquals(numTaskManagers * numSlotsPerTaskManager, jm.getNumberOfSlotsAvailableToScheduler());
return jm;
}
public static int getAvailablePort() throws IOException {
for (int i = 0; i < 50; i++) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
if (port != 0) {
return port;
}
} finally {
serverSocket.close();
}
}
throw new IOException("could not find free port");
}
public static void waitForTaskThreadsToBeTerminated() throws InterruptedException {
Thread[] threads = new Thread[Thread.activeCount()];
Thread.enumerate(threads);
for (Thread t : threads) {
if (t == null) {
continue;
}
ThreadGroup tg = t.getThreadGroup();
if (tg != null && tg.getName() != null && tg.getName().equals("Task Threads")) {
t.join();
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.tasks;
import org.apache.flink.runtime.io.network.api.RecordReader;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.types.IntegerRecord;
public final class AgnosticBinaryReceiver extends AbstractInvokable {
private RecordReader<IntegerRecord> reader1;
private RecordReader<IntegerRecord> reader2;
@Override
public void registerInputOutput() {
reader1 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
reader2 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
}
@Override
public void invoke() throws Exception {
while (reader1.next() != null);
while (reader2.next() != null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.tasks;
import org.apache.flink.runtime.io.network.api.RecordReader;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.types.IntegerRecord;
public final class AgnosticReceiver extends AbstractInvokable {
private RecordReader<IntegerRecord> reader;
@Override
public void registerInputOutput() {
reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
}
@Override
public void invoke() throws Exception {
while (reader.next() != null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.tasks;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
/**
* An invokable that does nothing.
*/
public class BlockingNoOpInvokable extends AbstractInvokable {
@Override
public void registerInputOutput() {}
@Override
public void invoke() throws Exception {
Object o = new Object();
synchronized (o) {
o.wait();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.tasks;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
/**
* An invokable that does nothing.
*/
public class NoOpInvokable extends AbstractInvokable {
@Override
public void registerInputOutput() {}
@Override
public void invoke() {}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.tasks;
import org.apache.flink.runtime.io.network.api.RecordReader;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.types.IntegerRecord;
public final class Receiver extends AbstractInvokable {
private RecordReader<IntegerRecord> reader;
@Override
public void registerInputOutput() {
reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
}
@Override
public void invoke() throws Exception {
IntegerRecord i1 = reader.next();
IntegerRecord i2 = reader.next();
IntegerRecord i3 = reader.next();
if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
throw new Exception("Wrong Data Received");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.tasks;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.types.IntegerRecord;
public final class Sender extends AbstractInvokable {
private RecordWriter<IntegerRecord> writer;
@Override
public void registerInputOutput() {
writer = new RecordWriter<IntegerRecord>(this);
}
@Override
public void invoke() throws Exception {
try {
writer.initializeSerializers();
writer.emit(new IntegerRecord(42));
writer.emit(new IntegerRecord(1337));
writer.flush();
}
finally {
writer.clearBuffers();
}
}
}
......@@ -55,7 +55,6 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
......
......@@ -37,7 +37,6 @@ import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.TestingUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
......@@ -54,13 +53,12 @@ import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.TestingTaskManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.testingActors.TestingActors;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.taskmanager.TestingTaskManagerMessages.NotifyWhenTaskRemoved;
import org.junit.AfterClass;
import org.junit.BeforeClass;
......@@ -179,7 +177,7 @@ public class TaskManagerTest {
expectMsgEquals(new TaskOperationResult(eid1, true));
Future<Object> response = Patterns.ask(tm, new NotifyWhenTaskRemoved(eid1),
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
AkkaUtils.FUTURE_TIMEOUT());
Await.ready(response, d);
......@@ -198,7 +196,7 @@ public class TaskManagerTest {
tm.tell(new TaskManagerMessages.CancelTask(eid2), getRef());
expectMsgEquals(new TaskOperationResult(eid2, true));
response = Patterns.ask(tm, new NotifyWhenTaskRemoved(eid2),
response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
AkkaUtils.FUTURE_TIMEOUT());
Await.ready(response, d);
......@@ -341,13 +339,13 @@ public class TaskManagerTest {
// wait until the tasks are done. rare thread races may cause the tasks to be done before
// we get to the check, so we need to guard the check
if (t1 != null) {
Future<Object> response = Patterns.ask(tm, new NotifyWhenTaskRemoved(eid1),
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
AkkaUtils.FUTURE_TIMEOUT());
Await.ready(response, d);
}
if (t2 != null) {
Future<Object> response = Patterns.ask(tm, new NotifyWhenTaskRemoved(eid2),
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
AkkaUtils.FUTURE_TIMEOUT());
Await.ready(response, d);
assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
......@@ -430,7 +428,7 @@ public class TaskManagerTest {
expectMsgEquals(new TaskOperationResult(eid2, true));
if (t2 != null) {
Future<Object> response = Patterns.ask(tm, new NotifyWhenTaskRemoved(eid2),
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
AkkaUtils.FUTURE_TIMEOUT());
Await.ready(response, d);
}
......@@ -440,7 +438,7 @@ public class TaskManagerTest {
tm.tell(new TaskManagerMessages.CancelTask(eid1), getRef());
expectMsgEquals(new TaskOperationResult(eid1, true));
}
Future<Object> response = Patterns.ask(tm, new NotifyWhenTaskRemoved(eid1),
Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
AkkaUtils.FUTURE_TIMEOUT());
Await.ready(response, d);
}
......@@ -541,7 +539,7 @@ public class TaskManagerTest {
String akkaURL = jm.path().toSerializationFormat();
cfg.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, akkaURL);
ActorRef taskManager = TestingActors.startTestingTaskManagerWithConfiguration("localhost", cfg, system);
ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system);
Future<Object> response = Patterns.ask(taskManager, TaskManagerMessages.NotifyWhenRegisteredAtMaster$.MODULE$,
AkkaUtils.FUTURE_TIMEOUT());
......
......@@ -40,7 +40,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
import org.mockito.Matchers;
......
......@@ -35,7 +35,7 @@
<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
<logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
<logger name="org.apache.flink.runtime.testingActors" level="OFF"/>
<logger name="org.apache.flink.runtime.testingUtils" level="OFF"/>
<logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" level="OFF"/>
<logger name="org.apache.flink.runtime.jobmanager.EventCollector" level="OFF"/>
<logger name="org.apache.flink.runtime.instance.InstanceManager" level="OFF"/>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager
import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern,
AbstractJobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusFound
import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatusWhenTerminated,
SubmitJob}
import org.apache.flink.runtime.messages.JobResult
import org.apache.flink.runtime.messages.JobResult.JobSubmissionResult
import org.apache.flink.runtime.testingUtils.TestingUtils
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scala.concurrent.duration._
class CoLocationConstraintITCase(_system: ActorSystem) extends TestKit(_system) with
ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
/**
* This job runs in N slots with N senders and N receivers. Unless slot sharing is used,
* it cannot complete.
*/
"The JobManager actor" must {
"support colocation constraints and slot sharing" in {
val num_tasks = 31
val sender = new AbstractJobVertex("Sender")
val receiver = new AbstractJobVertex("Receiver")
sender.setInvokableClass(classOf[Sender])
receiver.setInvokableClass((classOf[Receiver]))
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
sender.setSlotSharingGroup(sharingGroup)
receiver.setSlotSharingGroup(sharingGroup)
receiver.setStrictlyCoLocatedWith(sender)
val jobGraph = new JobGraph("Pointwise job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
val jm = cluster.getJobManager
try {
LibraryCacheManager.register(jobGraph.getJobID, Array[String]())
within(1 second) {
jm ! SubmitJob(jobGraph)
expectMsg(JobSubmissionResult(JobResult.SUCCESS, null))
jm ! RequestJobStatusWhenTerminated(jobGraph.getJobID)
expectMsg(JobStatusFound(jobGraph.getJobID, JobStatus.FINISHED))
}
} finally {
cluster.stop()
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager
import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern,
AbstractJobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{AgnosticBinaryReceiver, Receiver}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusFound
import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatusWhenTerminated,
SubmitJob}
import org.apache.flink.runtime.messages.JobResult
import org.apache.flink.runtime.messages.JobResult.JobSubmissionResult
import org.apache.flink.runtime.taskmanager.TaskManagerTest.Sender
import org.apache.flink.runtime.testingUtils.TestingUtils
import org.scalatest.{Matchers, WordSpecLike, BeforeAndAfterAll}
import scala.concurrent.duration._
class SlotSharingITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with
WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
}
"The JobManager actor" must {
"support slot sharing for forward job" in {
val num_tasks = 31
val sender = new AbstractJobVertex("Sender")
val receiver = new AbstractJobVertex("Receiver")
sender.setInvokableClass(classOf[Sender])
receiver.setInvokableClass(classOf[Receiver])
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
sender.setSlotSharingGroup(sharingGroup)
receiver.setSlotSharingGroup(sharingGroup)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
val jm = cluster.getJobManager
try {
LibraryCacheManager.register(jobGraph.getJobID, Array[String]())
within(1 second) {
jm ! SubmitJob(jobGraph)
expectMsg(new JobSubmissionResult(JobResult.SUCCESS, null))
jm ! RequestJobStatusWhenTerminated(jobGraph.getJobID)
expectMsg(JobStatusFound(jobGraph.getJobID, JobStatus.FINISHED))
}
} finally {
cluster.stop()
}
}
/**
* This job runs in N slots with 2 * N senders and N receivers. Unless slot sharing is used,
* it cannot complete.
*/
"support jobs with two inputs and slot sharing" in {
val num_tasks = 11
val sender1 = new AbstractJobVertex("Sender1")
val sender2 = new AbstractJobVertex("Sender2")
val receiver = new AbstractJobVertex("Receiver")
sender1.setInvokableClass(classOf[Sender])
sender2.setInvokableClass(classOf[Sender])
receiver.setInvokableClass(classOf[AgnosticBinaryReceiver])
sender1.setParallelism(num_tasks)
sender2.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
val sharingGroup = new SlotSharingGroup(sender1.getID, sender2.getID, receiver.getID)
sender1.setSlotSharingGroup(sharingGroup)
sender2.setSlotSharingGroup(sharingGroup)
receiver.setSlotSharingGroup(sharingGroup)
receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
val jm = cluster.getJobManager
try {
LibraryCacheManager.register(jobGraph.getJobID, Array[String]())
within(1 second) {
jm ! SubmitJob(jobGraph)
expectMsg(JobSubmissionResult(JobResult.SUCCESS, null))
jm ! RequestJobStatusWhenTerminated(jobGraph.getJobID)
expectMsg(JobStatusFound(jobGraph.getJobID, JobStatus.FINISHED))
}
} finally {
cluster.stop()
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
import org.apache.flink.runtime.io.network.api.{RecordReader, RecordWriter}
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
import org.apache.flink.runtime.types.IntegerRecord
object Tasks {
class BlockingNoOpInvokable extends AbstractInvokable {
override def registerInputOutput(): Unit = {}
override def invoke(): Unit = {
val o = new Object()
o.synchronized{
o.wait()
}
}
}
class NoOpInvokable extends AbstractInvokable{
override def registerInputOutput(): Unit = {}
override def invoke(): Unit = {}
}
class Sender extends AbstractInvokable{
var writer: RecordWriter[IntegerRecord] = _
override def registerInputOutput(): Unit = {
writer = new RecordWriter[IntegerRecord](this)
}
override def invoke(): Unit = {
try{
writer.initializeSerializers()
writer.emit(new IntegerRecord(42))
writer.emit(new IntegerRecord(1337))
writer.flush()
}finally{
writer.clearBuffers()
}
}
}
class Receiver extends AbstractInvokable {
var reader: RecordReader[IntegerRecord] = _
override def registerInputOutput(): Unit = {
reader = new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
}
override def invoke(): Unit = {
val i1 = reader.next()
val i2 = reader.next()
val i3 = reader.next()
if(i1.getValue != 42 || i2.getValue != 1337 || i3 != null){
throw new Exception("Wrong data received.")
}
}
}
class AgnosticReceiver extends AbstractInvokable {
var reader: RecordReader[IntegerRecord] = _
override def registerInputOutput(): Unit = {
reader = new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
}
override def invoke(): Unit = {
while(reader.next() != null){}
}
}
class AgnosticBinaryReceiver extends AbstractInvokable {
var reader1: RecordReader[IntegerRecord] = _
var reader2: RecordReader[IntegerRecord] = _
override def registerInputOutput(): Unit = {
reader1 = new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
reader2 = new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
}
override def invoke(): Unit = {
while(reader1.next() != null){}
while(reader2.next() != null){}
}
}
class ExceptionSender extends AbstractInvokable{
var writer: RecordWriter[IntegerRecord] = _
override def registerInputOutput(): Unit = {
writer = new RecordWriter[IntegerRecord](this)
}
override def invoke(): Unit = {
writer.initializeSerializers()
throw new Exception("Test exception")
}
}
class SometimesExceptionSender extends AbstractInvokable {
var writer: RecordWriter[IntegerRecord] = _
override def registerInputOutput(): Unit = {
writer = new RecordWriter[IntegerRecord](this)
}
override def invoke(): Unit = {
writer.initializeSerializers()
if(Math.random() < 0.05){
throw new Exception("Test exception")
}else{
val o = new Object()
o.synchronized(o.wait())
}
}
}
class ExceptionReceiver extends AbstractInvokable {
override def registerInputOutput(): Unit = {
new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
}
override def invoke(): Unit = {
throw new Exception("Test exception")
}
}
class InstantiationErrorSender extends AbstractInvokable{
throw new RuntimeException("Test exception in constructor")
override def registerInputOutput(): Unit = {
}
override def invoke(): Unit = {
}
}
class SometimesInstantiationErrorSender extends AbstractInvokable{
if(Math.random < 0.05){
throw new RuntimeException("Test exception in constructor")
}
override def registerInputOutput(): Unit = {
new RecordWriter[IntegerRecord](this)
}
override def invoke(): Unit = {
val o = new Object()
o.synchronized(o.wait())
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.taskmanager
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
object TestingTaskManagerMessages {
case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.testingActors
import akka.actor.{Props, ActorSystem}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.taskmanager.{TestingTaskManager, TaskManager}
object TestingActors {
def startTestingTaskManagerWithConfiguration(hostname: String, config: Configuration)(implicit system: ActorSystem) ={
val (connectionInfo, jobManagerURL, numberOfSlots, memorySize, pageSize, tmpDirPaths, networkConnectionConfig,
memoryUsageLogging, profilingInterval) = TaskManager.parseConfiguration(hostname, config);
system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, numberOfSlots,
memorySize, pageSize, tmpDirPaths, networkConnectionConfig, memoryUsageLogging,
profilingInterval) with TestingTaskManager))
}
}
......@@ -16,15 +16,16 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
package org.apache.flink.runtime.testingUtils
import akka.actor.{Props, ActorSystem}
import akka.actor.{ActorSystem, Props}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager
class TestingCluster extends FlinkMiniCluster{
class TestingCluster extends FlinkMiniCluster {
override def getConfiguration(userConfig: Configuration): Configuration = {
val cfg = new Configuration()
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
......@@ -36,16 +37,21 @@ class TestingCluster extends FlinkMiniCluster{
}
override def startJobManager(system: ActorSystem, config: Configuration) = {
val (archiveCount, profiling, recommendedPollingInterval) = JobManager.parseConfiguration(config)
system.actorOf(Props(new JobManager(archiveCount, profiling, recommendedPollingInterval) with TestingJobManager),
val (archiveCount, profiling, recommendedPollingInterval) =
JobManager.parseConfiguration(config)
system.actorOf(Props(new JobManager(archiveCount, profiling, recommendedPollingInterval) with
TestingJobManager),
JobManager.JOB_MANAGER_NAME)
}
override def startTaskManager(system: ActorSystem, config: Configuration, index: Int) = {
val (connectionInfo, jobManagerURL, numberOfSlots, memorySize, pageSize, tmpDirPaths, networkConnectionConfig,
memoryUsageLogging, profilingInterval) = TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, config)
system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, numberOfSlots, memorySize, pageSize,
tmpDirPaths, networkConnectionConfig, memoryUsageLogging, profilingInterval)),
TaskManager.TASK_MANAGER_NAME + index)
val (connectionInfo, jobManagerURL, numberOfSlots, memorySize, pageSize, tmpDirPaths,
networkConnectionConfig, memoryUsageLogging, profilingInterval) =
TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, config)
system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, numberOfSlots,
memorySize, pageSize, tmpDirPaths, networkConnectionConfig, memoryUsageLogging,
profilingInterval)), TaskManager.TASK_MANAGER_NAME + index)
}
}
......@@ -16,12 +16,14 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
package org.apache.flink.runtime.testingUtils
import akka.pattern.{ask, pipe}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobmanager.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}
import org.apache.flink.runtime.jobmanager.EventCollector
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}
import scala.concurrent.Future
import scala.concurrent.duration._
......@@ -29,7 +31,7 @@ trait TestingEventCollector extends ActorLogMessages {
self: EventCollector =>
import context.dispatcher
import AkkaUtils.FUTURE_TIMEOUT
import org.apache.flink.runtime.akka.AkkaUtils.FUTURE_TIMEOUT
abstract override def receiveWithLogMessages: Receive = {
receiveTestingMessages orElse super.receiveWithLogMessages
......
......@@ -16,18 +16,22 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
package org.apache.flink.runtime.testingUtils
import akka.actor.Props
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.jobmanager.TestingJobManagerMessages.{ExecutionGraphFound, RequestExecutionGraph}
import org.apache.flink.runtime.jobmanager.{EventCollector, JobManager, MemoryArchivist}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound,
RequestExecutionGraph}
trait TestingJobManager extends ActorLogMessages {
self: JobManager =>
override def archiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
override def eventCollectorProps = Props(new EventCollector(recommendedPollingInterval) with TestingEventCollector)
override def eventCollectorProps = Props(new EventCollector(recommendedPollingInterval) with
TestingEventCollector)
abstract override def receiveWithLogMessages: Receive = {
receiveTestingMessages orElse super.receiveWithLogMessages
......@@ -36,7 +40,7 @@ trait TestingJobManager extends ActorLogMessages {
def receiveTestingMessages: Receive = {
case RequestExecutionGraph(jobID) =>
currentJobs.get(jobID) match {
case Some(executionGraph) => sender() ! ExecutionGraphFound(jobID,executionGraph)
case Some(executionGraph) => sender() ! ExecutionGraphFound(jobID, executionGraph)
case None => eventCollector.tell(RequestExecutionGraph(jobID), sender())
}
}
......
......@@ -16,17 +16,22 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
package org.apache.flink.runtime.testingUtils
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.jobgraph.JobID
object TestingJobManagerMessages {
case class RequestExecutionGraph(jobID: JobID)
sealed trait ResponseExecutionGraph{
sealed trait ResponseExecutionGraph {
def jobID: JobID
}
case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends ResponseExecutionGraph
case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
ResponseExecutionGraph
case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
}
......@@ -16,10 +16,11 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
package org.apache.flink.runtime.testingUtils
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.jobmanager.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}
import org.apache.flink.runtime.jobmanager.MemoryArchivist
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}
trait TestingMemoryArchivist extends ActorLogMessages {
self: MemoryArchivist =>
......
......@@ -16,14 +16,14 @@
* limitations under the License.
*/
package org.apache.flink.runtime.taskmanager
package org.apache.flink.runtime.testingUtils
import akka.actor.ActorRef
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.{ActorLogMessages}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.jobmanager.TestingTaskManagerMessages.{ResponseRunningTasks, RequestRunningTasks}
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{NotifyWhenTaskRemoved, ResponseRunningTasks, RequestRunningTasks}
import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
import org.apache.flink.runtime.taskmanager.TestingTaskManagerMessages.NotifyWhenTaskRemoved
trait TestingTaskManager extends ActorLogMessages {
self: TaskManager =>
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
package org.apache.flink.runtime.testingUtils
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.taskmanager.Task
......@@ -24,6 +24,7 @@ import org.apache.flink.runtime.taskmanager.Task
import scala.collection.convert.DecorateAsJava
object TestingTaskManagerMessages extends DecorateAsJava{
case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
case object RequestRunningTasks
case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
......
......@@ -16,11 +16,15 @@
* limitations under the License.
*/
package org.apache.flink.runtime
package org.apache.flink.runtime.testingUtils
import akka.actor.{Props, ActorSystem}
import com.typesafe.config.ConfigFactory
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.core.io.IOReadableWritable
import org.apache.flink.runtime.akka.serialization.IOReadableWritableSerializer
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
object TestingUtils {
val testConfig = ConfigFactory.parseString(getDefaultTestingActorSystemConfigString)
......@@ -44,4 +48,24 @@ object TestingUtils {
|}
""".stripMargin
}
def startTestingTaskManagerWithConfiguration(hostname: String, config: Configuration)
(implicit system: ActorSystem) = {
val (connectionInfo, jobManagerURL, numberOfSlots, memorySize, pageSize, tmpDirPaths,
networkConnectionConfig, memoryUsageLogging, profilingInterval) =
TaskManager.parseConfiguration(hostname, config);
system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, numberOfSlots,
memorySize, pageSize, tmpDirPaths, networkConnectionConfig, memoryUsageLogging,
profilingInterval) with TestingTaskManager))
}
def startTestingCluster(numSlots: Int): FlinkMiniCluster = {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
val cluster = new TestingCluster
cluster.start(config)
cluster
}
}
......@@ -21,21 +21,23 @@ package org.apache.flink.test.cancelling;
import java.util.Iterator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.messages.JobResult;
import org.apache.flink.runtime.messages.JobResult.JobProgressResult;
import org.apache.flink.runtime.messages.JobResult.JobSubmissionResult;
import org.apache.flink.runtime.messages.JobResult.JobCancelResult;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.Plan;
import org.apache.flink.runtime.minicluster.NepheleMiniCluster;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobCancelResult;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobProgressResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.JobEvent;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -64,7 +66,7 @@ public abstract class CancellingTestBase {
// --------------------------------------------------------------------------------------------
protected NepheleMiniCluster executor;
protected LocalFlinkMiniCluster executor;
protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
......@@ -79,10 +81,11 @@ public abstract class CancellingTestBase {
@Before
public void startCluster() throws Exception {
verifyJvmOptions();
this.executor = new NepheleMiniCluster();
this.executor.setDefaultOverwriteFiles(true);
this.executor.setTaskManagerNumSlots(taskManagerNumSlots);
this.executor.start();
this.executor = new LocalFlinkMiniCluster(null);
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
this.executor.start(config);
}
@After
......@@ -110,10 +113,11 @@ public abstract class CancellingTestBase {
long cancelTime = -1L;
final JobClient client = this.executor.getJobClient(jobGraph);
final JobSubmissionResult submissionResult = client.submitJob();
if (submissionResult.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
throw new IllegalStateException(submissionResult.getDescription());
if (submissionResult.returnCode() != JobResult.SUCCESS()) {
throw new IllegalStateException(submissionResult.description());
}
final int interval = client.getRecommendedPollingInterval();
final long sleep = interval * 1000L;
......@@ -142,8 +146,8 @@ public abstract class CancellingTestBase {
throw new IllegalStateException("Return value of cancelJob is null!");
}
if (jcr.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
throw new IllegalStateException(jcr.getDescription());
if (jcr.returnCode() != JobResult.SUCCESS()) {
throw new IllegalStateException(jcr.description());
}
// Save when the cancel request has been issued
......@@ -164,14 +168,14 @@ public abstract class CancellingTestBase {
throw new IllegalStateException("Returned job progress is unexpectedly null!");
}
if (jobProgressResult.getReturnCode() == AbstractJobResult.ReturnCode.ERROR) {
if (jobProgressResult.returnCode() == JobResult.ERROR()) {
throw new IllegalStateException("Could not retrieve job progress: "
+ jobProgressResult.getDescription());
+ jobProgressResult.description());
}
boolean exitLoop = false;
final Iterator<AbstractEvent> it = jobProgressResult.getEvents();
final Iterator<AbstractEvent> it = jobProgressResult.asJavaList().iterator();
while (it.hasNext()) {
final AbstractEvent event = it.next();
......
......@@ -29,7 +29,7 @@ public class WordCountITCase extends JavaProgramTestBase {
public WordCountITCase(){
setDegreeOfParallelism(4);
setNumTaskManager(2);
setNumTaskManagers(2);
setTaskManagerNumSlots(2);
}
......
......@@ -22,7 +22,9 @@ import java.io.File;
import java.io.FileWriter;
import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.runtime.minicluster.NepheleMiniCluster;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.testdata.KMeansData;
import org.junit.Assert;
import org.junit.Test;
......@@ -34,7 +36,7 @@ public class PackagedProgramEndToEndITCase {
@Test
public void testEverything() {
NepheleMiniCluster cluster = new NepheleMiniCluster();
LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(null);
File points = null;
File clusters = null;
......@@ -59,11 +61,12 @@ public class PackagedProgramEndToEndITCase {
String jarPath = "target/maven-test-jar.jar";
// run KMeans
cluster.setNumTaskManager(2);
cluster.setTaskManagerNumSlots(2);
cluster.start();
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
cluster.start(config);
RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRpcPort());
RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort());
ex.executeJar(jarPath,
"org.apache.flink.test.util.testjar.KMeansForTest",
......
......@@ -18,9 +18,9 @@
package org.apache.flink.test.util;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Assert;
import org.apache.flink.runtime.minicluster.NepheleMiniCluster;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -119,7 +119,7 @@ public abstract class FailingTestBase extends RecordAPITestBase {
// reference to the timeout thread
private final Thread timeoutThread;
// cluster to submit the job to.
private final NepheleMiniCluster executor;
private final LocalFlinkMiniCluster executor;
// job graph of the failing job (submitted first)
private final JobGraph failingJob;
// job graph of the working job (submitted after return from failing job)
......@@ -128,7 +128,8 @@ public abstract class FailingTestBase extends RecordAPITestBase {
private volatile Exception error;
public SubmissionThread(Thread timeoutThread, NepheleMiniCluster executor, JobGraph failingJob, JobGraph job) {
public SubmissionThread(Thread timeoutThread, LocalFlinkMiniCluster executor, JobGraph failingJob,
JobGraph job) {
this.timeoutThread = timeoutThread;
this.executor = executor;
this.failingJob = failingJob;
......
......@@ -60,9 +60,9 @@ under the License.
<module>flink-compiler</module>
<module>flink-examples</module>
<module>flink-clients</module>
<!--<module>flink-tests</module>-->
<!--<module>flink-test-utils</module>-->
<!--<module>flink-addons</module>-->
<module>flink-tests</module>
<module>flink-test-utils</module>
<module>flink-addons</module>
<module>flink-quickstart</module>
<module>flink-dist</module>
</modules>
......
......@@ -43,7 +43,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册