提交 f726e552 编写于 作者: T Till Rohrmann

Made ExecutionGraph, Execution, ExecutionJobVertex, ExecutionVertex,...

Made ExecutionGraph, Execution, ExecutionJobVertex, ExecutionVertex, AllocatedSlot, Instance, CoLocationConstraint, SharedSlot and SlotSharingGroupAssignment serializable. Integrated Kryo to be used to serialize Akka messages.
上级 8d414d7e
......@@ -578,21 +578,21 @@ public final class ConfigConstants {
// ------------------------------ Akka Values ------------------------------
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "5000 ms";
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "100 s";
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms";
public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "100 s";
public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 min";
public static double DEFAULT_AKKA_WATCH_THRESHOLD = 300.0;
public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
public static String DEFAULT_AKKA_TCP_TIMEOUT = "15 s";
public static String DEFAULT_AKKA_TCP_TIMEOUT = "100 s";
public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 10;
public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;
public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;
......
......@@ -146,25 +146,34 @@ public class AccumulatorEvent implements Serializable {
}
private void writeObject(java.io.ObjectOutputStream out) throws IOException{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
out.writeObject(jobID);
oos.writeInt(accumulators.size());
byte[] buffer = null;
for(Map.Entry<String, Accumulator<?, ?>> entry: this.accumulators.entrySet()){
oos.writeUTF(entry.getKey());
oos.writeUTF(entry.getValue().getClass().getName());
if(accumulators != null) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
entry.getValue().write(oos);
}
oos.writeInt(accumulators.size());
for (Map.Entry<String, Accumulator<?, ?>> entry : this.accumulators.entrySet()) {
oos.writeUTF(entry.getKey());
oos.writeUTF(entry.getValue().getClass().getName());
oos.flush();
oos.close();
baos.close();
entry.getValue().write(oos);
}
oos.flush();
oos.close();
baos.close();
byte[] buffer = baos.toByteArray();
buffer = baos.toByteArray();
}else if(serializedData != null){
buffer = serializedData;
}else{
throw new RuntimeException("The AccumulatorEvent's accumulator is null and there is " +
"no serialized data attached to it.");
}
out.writeInt(buffer.length);
out.write(buffer);
......
......@@ -27,6 +27,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
......@@ -74,7 +75,9 @@ import scala.concurrent.duration.FiniteDuration;
* occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
* actions if it is not. Many actions are also idempotent (like canceling).
*/
public class Execution {
public class Execution implements Serializable {
static final long serialVersionUID = 42L;
private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
......
......@@ -60,6 +60,7 @@ import static akka.dispatch.Futures.future;
public class ExecutionGraph implements Serializable {
static final long serialVersionUID = 42L;
private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
......@@ -76,7 +77,7 @@ public class ExecutionGraph implements Serializable {
private final String jobName;
/** The job configuration that was originally attached to the JobGraph. */
private final Configuration jobConfiguration;
private transient final Configuration jobConfiguration;
/** The classloader of the user code. */
private final ClassLoader userClassLoader;
......@@ -88,22 +89,24 @@ public class ExecutionGraph implements Serializable {
private final List<ExecutionJobVertex> verticesInCreationOrder;
/** All intermediate results that are part of this graph */
private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
private transient final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>
intermediateResults;
/** The currently executed tasks, for callbacks */
private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
private transient final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
private final Map<ChannelID, ExecutionEdge> edges = new HashMap<ChannelID, ExecutionEdge>();
private transient final Map<ChannelID, ExecutionEdge> edges = new HashMap<ChannelID,
ExecutionEdge>();
private final List<BlobKey> requiredJarFiles;
private transient final List<BlobKey> requiredJarFiles;
private final List<ActorRef> jobStatusListenerActors;
private transient final List<ActorRef> jobStatusListenerActors;
private final List<ActorRef> executionListenerActors;
private transient final List<ActorRef> executionListenerActors;
private final long[] stateTimestamps;
private final Object progressLock = new Object();
private transient final Object progressLock = new Object();
private int nextVertexToFinish;
......@@ -116,7 +119,7 @@ public class ExecutionGraph implements Serializable {
private volatile Throwable failureCause;
private Scheduler scheduler;
private transient Scheduler scheduler;
private boolean allowQueuedScheduling = true;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
......@@ -39,12 +40,13 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.slf4j.Logger;
public class ExecutionJobVertex {
public class ExecutionJobVertex implements Serializable {
static final long serialVersionUID = 42L;
/** Use the same log for all ExecutionGraph classes */
private static final Logger LOG = ExecutionGraph.LOG;
private final Object stateMonitor = new Object();
private transient final Object stateMonitor = new Object();
private final ExecutionGraph graph;
......@@ -52,9 +54,9 @@ public class ExecutionJobVertex {
private final ExecutionVertex[] taskVertices;
private final IntermediateResult[] producedDataSets;
private transient final IntermediateResult[] producedDataSets;
private final List<IntermediateResult> inputs;
private transient final List<IntermediateResult> inputs;
private final int parallelism;
......@@ -68,7 +70,7 @@ public class ExecutionJobVertex {
private final InputSplit[] inputSplits;
private InputSplitAssigner splitAssigner;
private transient InputSplitAssigner splitAssigner;
public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex, int defaultParallelism) throws JobException {
......
......@@ -22,6 +22,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
......@@ -48,7 +49,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
* The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
* which time it spawns an {@link Execution}.
*/
public class ExecutionVertex {
public class ExecutionVertex implements Serializable {
static final long serialVersionUID = 42L;
@SuppressWarnings("unused")
private static final Logger LOG = ExecutionGraph.LOG;
......@@ -59,9 +61,9 @@ public class ExecutionVertex {
private final ExecutionJobVertex jobVertex;
private final IntermediateResultPartition[] resultPartitions;
private transient final IntermediateResultPartition[] resultPartitions;
private final ExecutionEdge[][] inputEdges;
private transient final ExecutionEdge[][] inputEdges;
private final int subTaskIndex;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.instance;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
......@@ -28,7 +29,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.Locality;
/**
* An allocated slot is the unit in which resources are allocated on instances.
*/
public class AllocatedSlot {
public class AllocatedSlot implements Serializable {
static final long serialVersionUID = 42L;
private static final AtomicIntegerFieldUpdater<AllocatedSlot> STATUS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.instance;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
......@@ -32,13 +33,15 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
/**
* An taskManager represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on.
*/
public class Instance {
public class Instance implements Serializable {
static final long serialVersionUID = 42L;
/** The lock on which to synchronize allocations and failure state changes */
private final Object instanceLock = new Object();
private transient final Object instanceLock = new Object();
/** The actor ref to the task manager represented by this taskManager. */
private final ActorRef taskManager;
private transient final ActorRef taskManager;
/** The instance connection information for the data transfer. */
private final InstanceConnectionInfo connectionInfo;
......@@ -53,14 +56,14 @@ public class Instance {
private final int numberOfSlots;
/** A list of available slot positions */
private final Queue<Integer> availableSlots;
private transient final Queue<Integer> availableSlots;
/** Allocated slots on this taskManager */
private final Set<AllocatedSlot> allocatedSlots = new HashSet<AllocatedSlot>();
/** A listener to be notified upon new slot availability */
private SlotAvailabilityListener slotAvailabilityListener;
private transient SlotAvailabilityListener slotAvailabilityListener;
/**
* Time when last heat beat has been received from the task manager running on this taskManager.
......
......@@ -23,7 +23,11 @@ import org.apache.flink.runtime.instance.Instance;
import com.google.common.base.Preconditions;
public class CoLocationConstraint {
import java.io.Serializable;
public class CoLocationConstraint implements Serializable {
static final long serialVersionUID = 42L;
private final CoLocationGroup group;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmanager.scheduler;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
......@@ -30,7 +31,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
* methods may only be called from within the synchronization scope of
* it associated SlotSharingGroupAssignment.
*/
class SharedSlot {
class SharedSlot implements Serializable {
static final long serialVersionUID = 42L;
private final AllocatedSlot allocatedSlot;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmanager.scheduler;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
......@@ -38,11 +39,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
public class SlotSharingGroupAssignment {
public class SlotSharingGroupAssignment implements Serializable {
static final long serialVersionUID = 42L;
private static final Logger LOG = Scheduler.LOG;
private final Object lock = new Object();
private transient final Object lock = new Object();
/** All slots currently allocated to this sharing group */
private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
......
......@@ -584,9 +584,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
// Report accumulators to JobManager
env.getAccumulator().tell(new JobManagerMessages.ReportAccumulatorResult(new
AccumulatorEvent(env.getJobID(), AccumulatorHelper.copy(accumulators))),
ActorRef.noSender());
JobManagerMessages.ReportAccumulatorResult accResult = new JobManagerMessages.ReportAccumulatorResult(new
AccumulatorEvent(env.getJobID(), AccumulatorHelper.copy(accumulators)));
env.getAccumulator().tell(accResult, ActorRef.noSender());
// We also clear the accumulators, since stub instances might be reused
// (e.g. in iterations) and we don't want to count twice. This may not be
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskmanager;
import akka.actor.ActorRef;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
......@@ -36,13 +37,16 @@ public class TaskInputSplitProvider implements InputSplitProvider {
private final JobVertexID vertexId;
private final ExecutionAttemptID executionID;
private final FiniteDuration timeout;
public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId,
FiniteDuration timeout) {
ExecutionAttemptID executionID, FiniteDuration timeout) {
this.jobManager = jobManager;
this.jobId = jobId;
this.vertexId = vertexId;
this.executionID = executionID;
this.timeout = timeout;
}
......@@ -50,7 +54,8 @@ public class TaskInputSplitProvider implements InputSplitProvider {
public InputSplit getNextInputSplit() {
try {
TaskManagerMessages.NextInputSplit nextInputSplit = AkkaUtils.ask(jobManager,
new JobManagerMessages.RequestNextInputSplit(jobId, vertexId), timeout);
new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID),
timeout);
return nextInputSplit.inputSplit();
}
......
......@@ -30,11 +30,11 @@ trait ActorLogMessages {
override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x)
override def apply(x: Any):Unit = {
log.debug(s"Received message $x from ${self.sender}.")
log.debug("Received message {} from {}.", x, self.sender)
val start = System.nanoTime()
_receiveWithLogMessages(x)
val duration = (System.nanoTime() - start) / 1000000
log.debug(s"Handled message $x in $duration ms from ${self.sender}.")
log.debug(s"Handled message {} in {} ms from {}.", x, duration, self.sender)
}
}
......
......@@ -23,7 +23,6 @@ import java.util.concurrent.Callable
import akka.actor.{ActorSelection, ActorRef, ActorSystem}
import akka.pattern.{Patterns, ask => akkaAsk}
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import scala.concurrent.{ExecutionContext, Future, Await}
......@@ -81,19 +80,17 @@ object AkkaUtils {
val configString =
s"""
|akka {
| loglevel = "$logLevel"
| stdout-loglevel = "$logLevel"
| loglevel = $logLevel
| stdout-loglevel = $logLevel
|
| log-dead-letters = $logLifecycleEvents
| log-dead-letters-during-shutdown = $logLifecycleEvents
|
| extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$$"]
|
| remote {
| transport-failure-detector{
| acceptable-heartbeat-pause = $transportHeartbeatPause
| threshold = $transportThreshold
| heartbeat-interval = $transportHeartbeatInterval
| threshold = $transportThreshold
| }
|
| watch-failure-detector{
......@@ -107,41 +104,19 @@ object AkkaUtils {
| hostname = $host
| port = $port
| connection-timeout = $akkaTCPTimeout
| maximum-frame-size = $akkaFramesize
| maximum-frame-size = ${akkaFramesize}
| }
| }
|
| log-remote-lifecycle-events = $logLifecycleEvents
|
| }
|
| actor{
| default-dispatcher{
| throughput = $akkaThroughput
| }
|
| kryo{
| type = "nograph"
| idstrategy = "default"
| serializer-pool-size = 16
| buffer-size = 4096
| max-buffer-size = -1
| use-manifests = false
| compression = off
| implicit-registration-logging = true
| kryo-trace = true
| kryo-custom-serializer-init = "org.apache.flink.runtime.akka.KryoInitializer"
| }
|
| serialize-messages = on
|
| serializers{
| kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
| }
|
| serialization-bindings {
| throughput = ${akkaThroughput}
| }
| }
|
|}
""".stripMargin
......@@ -149,7 +124,7 @@ object AkkaUtils {
}
def getDefaultActorSystemConfigString: String = {
s"""
"""
|akka {
| daemonic = on
|
......@@ -158,7 +133,8 @@ object AkkaUtils {
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
| stdout-loglevel = "WARNING"
| jvm-exit-on-fatal-error = off
| log-config-on-start = off
| log-config-on-start = on
| serialize-messages = on
|
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
......@@ -167,18 +143,195 @@ object AkkaUtils {
| remote{
| netty{
| tcp{
| port = 0
| transport-class = "akka.remote.transport.netty.NettyTransport"
| tcp-nodelay = on
|
| port = 0
| maximum-frame-size = 1MB
| execution-pool-size = 4
| }
| }
| }
|}
""".stripMargin
}
// scalastyle:off line.size.limit
def getKryoSerializerString: String = {
"""
|akka {
|
| extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
|
| actor{
| kryo{
| type = "graph"
| idstrategy = "incremental"
| serializer-pool-size = 16
| buffer-size = 4096
| max-buffer-size = -1
| use-manifests = false
| compression = off
| implicit-registration-logging = true
| kryo-trace = false
| kryo-custom-serializer-init = "org.apache.flink.runtime.akka.KryoInitializer"
| }
|
|
| serializers{
| kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
| java = "akka.serialization.JavaSerializer"
| }
|
| serialization-bindings {
| "java.io.Serializable" = java
|
| "java.lang.Throwable" = java
| "akka.event.Logging$Error" = java
| "java.lang.Integer" = kryo
| "java.lang.Long" = kryo
| "java.lang.Float" = kryo
| "java.lang.Double" = kryo
| "java.lang.Boolean" = kryo
| "java.lang.Short" = kryo
|
| "scala.Tuple2" = kryo
| "scala.Tuple3" = kryo
| "scala.Tuple4" = kryo
| "scala.Tuple5" = kryo
| "scala.Tuple6" = kryo
| "scala.Tuple7" = kryo
| "scala.Tuple8" = kryo
| "scala.Tuple9" = kryo
| "scala.Tuple10" = kryo
| "scala.Tuple11" = kryo
| "scala.Tuple12" = kryo
| "scala.collection.BitSet" = kryo
| "scala.collection.SortedSet" = kryo
| "scala.util.Left" = kryo
| "scala.util.Right" = kryo
| "scala.collection.SortedMap" = kryo
| "scala.Int" = kryo
| "scala.Long" = kryo
| "scala.Float" = kryo
| "scala.Double" = kryo
| "scala.Boolean" = kryo
| "scala.Short" = kryo
| "java.lang.String" = kryo
| "scala.Option" = kryo
| "scala.collection.immutable.Map" = kryo
| "scala.collection.Traversable" = kryo
| "scala.runtime.BoxedUnit" = kryo
|
| "akka.actor.SystemGuardian$RegisterTerminationHook$" = kryo
| "akka.actor.Address" = kryo
| "akka.actor.Terminated" = kryo
| "akka.actor.LocalActorRef" = kryo
| "akka.actor.RepointableActorRef" = kryo
| "akka.actor.Identify" = kryo
| "akka.actor.ActorIdentity" = kryo
| "akka.actor.PoisonPill$" = kryo
| "akka.actor.SystemGuardian$TerminationHook$" = kryo
| "akka.actor.SystemGuardian$TerminationHookDone$" = kryo
| "akka.actor.AddressTerminated" = kryo
| "akka.actor.Status$Failure" = kryo
| "akka.remote.RemoteWatcher$ReapUnreachableTick$" = kryo
| "akka.remote.RemoteWatcher$HeartbeatTick$" = kryo
| "akka.remote.ReliableDeliverySupervisor$GotUid" = kryo
| "akka.remote.EndpointWriter$AckIdleCheckTimer$" = kryo
| "akka.remote.EndpointWriter$StoppedReading" = kryo
| "akka.remote.ReliableDeliverySupervisor$Ungate$" = kryo
| "akka.remote.EndpointWriter$StopReading" = kryo
| "akka.remote.EndpointWriter$OutboundAck" = kryo
| "akka.remote.Ack" = kryo
| "akka.remote.SeqNo" = kryo
| "akka.remote.EndpointWriter$FlushAndStop$" = kryo
| "akka.remote.ReliableDeliverySupervisor$AttemptSysMsgRedelivery$" = kryo
| "akka.remote.RemoteWatcher$WatchRemote" = kryo
| "akka.remote.RemoteWatcher$UnwatchRemote" = kryo
| "akka.remote.RemoteWatcher$RewatchRemote" = kryo
| "akka.remote.RemoteWatcher$Rewatch" = kryo
| "akka.remote.RemoteWatcher$Heartbeat$" = kryo
| "akka.remote.RemoteWatcher$HeartbeatRsp" = kryo
| "akka.remote.EndpointWriter$FlushAndStopTimeout$" = kryo
| "akka.remote.RemoteWatcher$ExpectedFirstHeartbeat" = kryo
| "akka.remote.transport.Transport$InvalidAssociationException" = kryo
| "akka.dispatch.sysmsg.Terminate" = kryo
| "akka.dispatch.sysmsg.Unwatch" = kryo
| "akka.dispatch.sysmsg.Watch" = kryo
| "akka.dispatch.sysmsg.DeathWatchNotification" = kryo
|
| "org.apache.flink.runtime.messages.ArchiveMessages$ArchiveExecutionGraph" = kryo
| "org.apache.flink.runtime.messages.ArchiveMessages$ArchivedJobs" = kryo
|
| "org.apache.flink.runtime.messages.ExecutionGraphMessages$ExecutionStateChanged" = kryo
| "org.apache.flink.runtime.messages.ExecutionGraphMessages$JobStatusChanged" = kryo
|
| "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobAndWait" = kryo
| "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobDetached" = kryo
|
| "org.apache.flink.runtime.messages.JobManagerMessages$SubmitJob" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$SubmissionSuccess" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$SubmissionFailure" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$CancellationSuccess" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$CancellationFailure" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$CancelJob" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$UpdateTaskExecutionState" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestNextInputSplit" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$LookupConnectionInformation" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$ConnectionInformation" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$ReportAccumulatorResult" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestAccumulatorResults" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$AccumulatorResultsFound" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$AccumulatorResultsNotFound" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestJobStatus" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$CurrentJobStatus" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestNumberRegisteredTaskManager$" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestTotalNumberOfSlots$" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestBlobManagerPort$" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestFinalJobStatus" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$JobResultSuccess" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$JobResultCanceled" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailed" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobs$" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RunningJobs" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestJob" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$JobFound" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$JobNotFound" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RequestRegisteredTaskManagers$" = kryo
| "org.apache.flink.runtime.messages.JobManagerMessages$RegisteredTaskManagers" = kryo
|
| "org.apache.flink.runtime.messages.JobManagerProfilerMessages$ReportProfilingData" = kryo
|
| "org.apache.flink.runtime.messages.TaskManagerMessages$NotifyWhenRegisteredAtJobManager$" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$RegisterAtJobManager$" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$CancelTask" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$SubmitTask" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$NextInputSplit" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$UnregisterTask" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$TaskOperationResult" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$Heartbeat" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$RegisteredAtJobManager$" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$RegisterAtJobManager$" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$SendHeartbeat$" = kryo
| "org.apache.flink.runtime.messages.TaskManagerMessages$LogMemoryUsage$" = kryo
|
| "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$MonitorTask" = kryo
| "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$UnmonitorTask" = kryo
| "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$RegisterProfilingListener$" = kryo
| "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$UnregisterProfilingListener$" = kryo
| "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$ProfileTasks$" = kryo
|
| "org.apache.flink.runtime.messages.RegistrationMessages$RegisterTaskManager" = kryo
| "org.apache.flink.runtime.messages.RegistrationMessages$AcknowledgeRegistration" = kryo
| }
| }
|}
""".stripMargin
}
// scalastyle:on line.size.limit
def getDefaultActorSystemConfig = {
ConfigFactory.parseString(getDefaultActorSystemConfigString)
}
......
......@@ -18,9 +18,204 @@
package org.apache.flink.runtime.akka
import com.esotericsoftware.kryo.Kryo
import java.net.Inet4Address
import com.esotericsoftware.kryo.serializers.JavaSerializer
import com.esotericsoftware.kryo.{Serializer, Kryo}
import org.apache.flink.api.common.accumulators.Accumulator
import org.apache.flink.core.fs.FileInputSplit
import org.apache.flink.core.io.{LocatableInputSplit, GenericInputSplit}
import org.apache.flink.runtime.accumulators.AccumulatorEvent
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
import org.apache.flink.runtime.instance.{Instance, InstanceID, HardwareDescription,
InstanceConnectionInfo}
import org.apache.flink.runtime.io.network.{RemoteReceiver, ConnectionInfoLookupResponse}
import org.apache.flink.runtime.io.network.channels.ChannelID
import org.apache.flink.runtime.jobgraph.{JobVertexID, JobStatus, JobID, JobGraph}
import org.apache.flink.runtime.messages.ArchiveMessages.{ArchivedJobs, RequestArchivedJobs,
ArchiveExecutionGraph}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{ExecutionStateChanged,
JobStatusChanged}
import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, SubmitJobAndWait}
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
RegisterTaskManager}
import org.apache.flink.runtime.messages.TaskManagerMessages._
import org.apache.flink.runtime.messages.TaskManagerProfilerMessages._
import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer
import org.apache.flink.runtime.taskmanager.{Task, TaskExecutionState}
class KryoInitializer {
def cystomize(kryo: Kryo): Unit = {
def customize(kryo: Kryo): Unit = {
register(kryo)
}
def register(kryo: Kryo): Unit = {
def register(className: String): Unit = {
kryo.register(Class.forName(className))
}
def registerClass(clazz: Class[_], serializer: Serializer[_] = null): Unit = {
if(serializer != null){
kryo.register(clazz, serializer)
}else {
kryo.register(clazz)
}
}
register("scala.Some")
register("scala.None$")
register("scala.collection.immutable.Set$EmptySet$")
register("scala.runtime.BoxedUnit")
register("akka.actor.SystemGuardian$RegisterTerminationHook$")
register("akka.actor.Address")
register("akka.actor.Terminated")
register("akka.actor.LocalActorRef")
register("akka.actor.RepointableActorRef")
register("akka.actor.Identify")
register("akka.actor.ActorIdentity")
register("akka.actor.PoisonPill$")
register("akka.actor.AddressTerminated")
register("akka.actor.Status$Failure")
register("akka.remote.RemoteWatcher$ReapUnreachableTick$")
register("akka.remote.RemoteWatcher$HeartbeatTick$")
register("akka.remote.ReliableDeliverySupervisor$GotUid")
register("akka.remote.EndpointWriter$AckIdleCheckTimer$")
register("akka.remote.EndpointWriter$StoppedReading")
register("akka.remote.ReliableDeliverySupervisor$Ungate$")
register("akka.remote.EndpointWriter$StopReading")
register("akka.remote.EndpointWriter$OutboundAck")
register("akka.remote.Ack")
register("akka.remote.SeqNo")
register("akka.remote.RemoteWatcher$HeartbeatRsp")
register("akka.actor.SystemGuardian$TerminationHook$")
register("akka.actor.SystemGuardian$TerminationHookDone$")
register("akka.remote.EndpointWriter$FlushAndStop$")
register("akka.remote.RemoteWatcher$WatchRemote")
register("akka.remote.RemoteWatcher$UnwatchRemote")
register("akka.remote.RemoteWatcher$Rewatch")
register("akka.remote.RemoteWatcher$RewatchRemote")
register("akka.remote.ReliableDeliverySupervisor$AttemptSysMsgRedelivery$")
register("akka.remote.RemoteActorRef")
register("akka.remote.RemoteWatcher$Heartbeat$")
register("akka.remote.EndpointWriter$FlushAndStopTimeout$")
register("akka.remote.RemoteWatcher$ExpectedFirstHeartbeat")
register("akka.remote.transport.Transport$InvalidAssociationException")
register("akka.remote.transport.AkkaProtocolException")
register("akka.dispatch.sysmsg.Terminate")
register("akka.dispatch.sysmsg.Unwatch")
register("akka.dispatch.sysmsg.Watch")
register("akka.dispatch.sysmsg.DeathWatchNotification")
// register("java.util.Collections$UnmodifiableRandomAccessList")
//Register Flink messages
kryo.setDefaultSerializer(classOf[JavaSerializer])
//misc types
registerClass(classOf[JobID])
registerClass(classOf[JobVertexID])
registerClass(classOf[ExecutionAttemptID])
registerClass(classOf[InstanceID])
registerClass(classOf[ExecutionState])
registerClass(classOf[JobStatus])
registerClass(classOf[TaskExecutionState])
registerClass(classOf[InstanceConnectionInfo])
registerClass(classOf[HardwareDescription])
registerClass(classOf[Inet4Address])
registerClass(classOf[ChannelID])
registerClass(classOf[ConnectionInfoLookupResponse])
registerClass(classOf[RemoteReceiver])
registerClass(classOf[AccumulatorEvent], new JavaSerializer)
registerClass(classOf[Instance], new JavaSerializer())
registerClass(classOf[JobGraph], new JavaSerializer())
registerClass(classOf[TaskDeploymentDescriptor], new JavaSerializer())
registerClass(classOf[ExecutionGraph], new JavaSerializer())
registerClass(classOf[ProfilingDataContainer], new JavaSerializer)
registerClass(classOf[Task], new JavaSerializer)
registerClass(classOf[GenericInputSplit], new JavaSerializer)
registerClass(classOf[LocatableInputSplit], new JavaSerializer)
registerClass(classOf[FileInputSplit], new JavaSerializer)
registerClass(classOf[StackTraceElement])
registerClass(classOf[Array[StackTraceElement]])
//Archive messages
registerClass(classOf[ArchiveExecutionGraph])
registerClass(RequestArchivedJobs.getClass)
registerClass(classOf[ArchivedJobs])
//ExecutionGraph messages
registerClass(classOf[ExecutionStateChanged])
registerClass(classOf[JobStatusChanged])
//JobClient messages
registerClass(classOf[SubmitJobAndWait])
registerClass(classOf[SubmitJobDetached])
// JobManager messages
registerClass(classOf[SubmitJob])
registerClass(classOf[SubmissionSuccess])
registerClass(classOf[SubmissionFailure])
registerClass(classOf[CancelJob])
registerClass(classOf[UpdateTaskExecutionState])
registerClass(classOf[RequestNextInputSplit])
registerClass(classOf[LookupConnectionInformation])
registerClass(classOf[ConnectionInformation])
registerClass(classOf[ReportAccumulatorResult])
registerClass(classOf[RequestAccumulatorResults])
registerClass(classOf[AccumulatorResultsFound])
registerClass(classOf[AccumulatorResultsNotFound])
registerClass(classOf[RequestJobStatus])
registerClass(classOf[CurrentJobStatus])
registerClass(RequestNumberRegisteredTaskManager.getClass)
registerClass(RequestTotalNumberOfSlots.getClass)
registerClass(RequestBlobManagerPort.getClass)
registerClass(classOf[RequestFinalJobStatus])
registerClass(classOf[JobResultSuccess])
registerClass(classOf[JobResultCanceled])
registerClass(classOf[JobResultFailed])
registerClass(classOf[CancellationSuccess])
registerClass(classOf[CancellationFailure])
registerClass(RequestRunningJobs.getClass)
registerClass(classOf[RunningJobs])
registerClass(classOf[RequestJob])
registerClass(classOf[JobFound])
registerClass(classOf[JobNotFound])
registerClass(RequestRegisteredTaskManagers.getClass)
registerClass(classOf[RegisteredTaskManagers])
//JobManagerProfiler messages
registerClass(classOf[ReportProfilingData])
//Registration messages
registerClass(classOf[RegisterTaskManager])
registerClass(classOf[AcknowledgeRegistration])
//TaskManager messages
registerClass(classOf[CancelTask])
registerClass(classOf[SubmitTask])
registerClass(classOf[NextInputSplit])
registerClass(classOf[UnregisterTask])
registerClass(classOf[TaskOperationResult])
registerClass(NotifyWhenRegisteredAtJobManager.getClass)
registerClass(RegisterAtJobManager.getClass)
registerClass(RegisteredAtJobManager.getClass)
registerClass(SendHeartbeat.getClass)
registerClass(classOf[Heartbeat])
registerClass(LogMemoryUsage.getClass)
//TaskManagerProfiler messages
registerClass(classOf[MonitorTask])
registerClass(classOf[UnmonitorTask])
registerClass(RegisterProfilingListener.getClass)
registerClass(UnregisterProfilingListener.getClass)
registerClass(ProfileTasks.getClass)
}
}
......@@ -108,7 +108,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
hardwareInformation, numberOfSlots)
// to be notified when the taskManager is no longer reachable
// context.watch(taskManager);
context.watch(taskManager);
taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
}
......@@ -246,7 +246,8 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
}else {
currentJobs.get(taskExecutionState.getJobID) match {
case Some((executionGraph, _)) =>
sender() ! executionGraph.updateState(taskExecutionState)
val originalSender = sender()
originalSender ! executionGraph.updateState(taskExecutionState)
case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
.getJobID} to change state to ${taskExecutionState.getExecutionState}.")
sender() ! false
......@@ -254,24 +255,41 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
}
}
case RequestNextInputSplit(jobID, vertexID) => {
case RequestNextInputSplit(jobID, vertexID, executionAttempt) => {
val nextInputSplit = currentJobs.get(jobID) match {
case Some((executionGraph,_)) => executionGraph.getJobVertex(vertexID) match {
case vertex: ExecutionJobVertex => vertex.getSplitAssigner match {
case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(null)
case _ =>
log.error(s"No InputSplitAssigner for vertex ID ${vertexID}.")
case Some((executionGraph,_)) =>
val execution = executionGraph.getRegisteredExecutions().get(executionAttempt)
if(execution == null){
log.error("Can not find Execution for attempt " + executionAttempt)
null
}else{
val slot = execution.getAssignedResource
val host = if(slot != null){
slot.getInstance().getInstanceConnectionInfo.getHostname
}else{
null
}
executionGraph.getJobVertex(vertexID) match {
case vertex: ExecutionJobVertex => vertex.getSplitAssigner match {
case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(host)
case _ =>
log.error(s"No InputSplitAssigner for vertex ID ${vertexID}.")
null
}
case _ =>
log.error(s"Cannot find execution vertex for vertex ID ${vertexID}.")
null
}
case _ =>
log.error(s"Cannot find execution vertex for vertex ID ${vertexID}.")
null
}
case None =>
log.error(s"Cannot find execution graph for job ID ${jobID}.")
null
}
log.debug("Send next input split {}.", nextInputSplit)
sender() ! NextInputSplit(nextInputSplit)
}
......@@ -323,8 +341,10 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
case LookupConnectionInformation(connectionInformation, jobID, sourceChannelID) => {
currentJobs.get(jobID) match {
case Some((executionGraph, _)) =>
sender() ! ConnectionInformation(executionGraph.lookupConnectionInfoAndDeployReceivers
(connectionInformation, sourceChannelID))
val originalSender = sender()
originalSender ! ConnectionInformation(
executionGraph.lookupConnectionInfoAndDeployReceivers
(connectionInformation, sourceChannelID))
case None =>
log.error(s"Cannot find execution graph for job ID ${jobID}.")
sender() ! ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound())
......@@ -381,7 +401,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
case Terminated(taskManager) => {
log.info(s"Task manager ${taskManager.path} terminated.")
instanceManager.unregisterTaskManager(taskManager)
// context.unwatch(taskManager)
context.unwatch(taskManager)
}
}
......@@ -442,6 +462,7 @@ object JobManager {
parser.parse(args, JobManagerCLIConfiguration()) map {
config =>
GlobalConfiguration.loadConfiguration(config.configDir)
val configuration = GlobalConfiguration.getConfiguration()
if (config.configDir != null && new File(config.configDir).isDirectory) {
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
......
......@@ -19,7 +19,7 @@
package org.apache.flink.runtime.messages
import org.apache.flink.runtime.accumulators.AccumulatorEvent
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
import org.apache.flink.runtime.instance.{Instance, InstanceConnectionInfo}
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
import org.apache.flink.runtime.io.network.channels.ChannelID
......@@ -71,7 +71,8 @@ object JobManagerMessages {
* @param jobID
* @param vertexID
*/
case class RequestNextInputSplit(jobID: JobID, vertexID: JobVertexID)
case class RequestNextInputSplit(jobID: JobID, vertexID: JobVertexID, executionAttempt:
ExecutionAttemptID)
/**
* Looks up the connection information of a task being the source of a channel specified by
......
......@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.{ConfigFactory, Config}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
......@@ -61,21 +62,35 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
def startTaskManager(index: Int)(implicit system: ActorSystem):
ActorRef
def getJobManagerAkkaConfigString(): String = {
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_JOB_MANAGER_IPC_PORT)
AkkaUtils.getConfigString(HOSTNAME, port, configuration)
}
def startJobManagerActorSystem(): ActorSystem = {
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
val configString = getJobManagerAkkaConfigString()
AkkaUtils.createActorSystem(HOSTNAME, port, configuration)
val config = ConfigFactory.parseString(getJobManagerAkkaConfigString())
AkkaUtils.createActorSystem(config)
}
def startTaskManagerActorSystem(index: Int): ActorSystem = {
def getTaskManagerAkkaConfigString(index: Int): String = {
val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
AkkaUtils.createActorSystem(HOSTNAME, if(port != 0) port + index else port,
AkkaUtils.getConfigString(HOSTNAME, if(port != 0) port + index else port,
configuration)
}
def startTaskManagerActorSystem(index: Int): ActorSystem = {
val config = ConfigFactory.parseString(getTaskManagerAkkaConfigString(index))
AkkaUtils.createActorSystem(config)
}
def getJobManager: ActorRef = {
jobManagerActor
}
......
......@@ -185,7 +185,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
currentJobManager = sender()
instanceID = id
// context.watch(currentJobManager)
context.watch(currentJobManager)
log.info(s"TaskManager successfully registered at JobManager ${
currentJobManager.path
......@@ -261,7 +261,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
case None =>
}
val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID, timeout)
val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID,
executionID, timeout)
val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
ioManager, splitProvider, currentJobManager, bcVarManager)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.testingUtils
import com.esotericsoftware.kryo.Kryo
import org.apache.flink.runtime.akka.KryoInitializer
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
class KryoTestingInitializer {
def customize(kryo: Kryo): Unit = {
val initializer = new KryoInitializer
initializer.customize(kryo)
kryo.register(classOf[RequestExecutionGraph])
kryo.register(classOf[ExecutionGraphFound])
kryo.register(classOf[ExecutionGraphNotFound])
kryo.register(classOf[WaitForAllVerticesToBeRunning])
kryo.register(classOf[AllVerticesRunning])
kryo.register(classOf[NotifyWhenJobRemoved])
kryo.register(classOf[NotifyWhenTaskRemoved])
kryo.register(RequestRunningTasks.getClass)
kryo.register(classOf[ResponseRunningTasks])
kryo.register(RequestBroadcastVariablesWithReferences.getClass)
kryo.register(classOf[ResponseBroadcastVariablesWithReferences])
kryo.register(classOf[CheckIfJobRemoved])
}
}
......@@ -37,6 +37,14 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(
cfg
}
override def getJobManagerAkkaConfigString(): String = {
super.getJobManagerAkkaConfigString() + TestingUtils.getTestingSerializationBindings
}
override def getTaskManagerAkkaConfigString(index: Int): String = {
super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings
}
override def startJobManager(implicit system: ActorSystem) = {
system.actorOf(Props(new JobManager(configuration) with TestingJobManager),
JobManager.JOB_MANAGER_NAME)
......
......@@ -84,12 +84,12 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
}
import context.dispatcher
val f = Future.sequence(responses)
val t = Await.result(f, timeout)
sender() ! true
// Future.fold(responses)(true)(_ & _) pipeTo sender()
// val f = Future.sequence(responses)
//
// val t = Await.result(f, timeout)
//
// sender() ! true
Future.fold(responses)(true)(_ & _) pipeTo sender()
}
}
......
......@@ -58,6 +58,37 @@ object TestingUtils {
""".stripMargin
}
// scalastyle:off line.size.limit
val getTestingSerializationBindings =
"""
|akka {
| actor {
| kryo{
| kryo-custom-serializer-init = "org.apache.flink.runtime.testingUtils.KryoTestingInitializer"
| }
|
| serialization-bindings {
| "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$RequestExecutionGraph" = kryo
| "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$ExecutionGraphFound" = kryo
| "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$ExecutionGraphNotFound" = kryo
| "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$WaitForAllVerticesToBeRunning" = kryo
| "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$AllVerticesRunning" = kryo
| "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$NotifyWhenJobRemoved" = kryo
|
| "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$NotifyWhenTaskRemoved" = kryo
| "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$RequestRunningTasks$" = kryo
| "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$ResponseRunningTasks" = kryo
| "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$RequestBroadcastVariablesWithReferences$" = kryo
| "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$ResponseBroadcastVariablesWithReferences" = kryo
| "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$CheckIfJobRemoved" = kryo
| }
| }
|}
""".stripMargin
// scalastyle:on line.size.limit
def startTestingTaskManagerWithConfiguration(hostname: String, config: Configuration)
(implicit system: ActorSystem) = {
val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =
......
......@@ -22,7 +22,7 @@ import akka.actor.{Props, ActorSystem, ActorRef}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingTaskManager
import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager}
class ForkableFlinkMiniCluster(userConfiguration: Configuration) extends
LocalFlinkMiniCluster(userConfiguration) {
......@@ -52,6 +52,10 @@ LocalFlinkMiniCluster(userConfiguration) {
super.generateConfiguration(config)
}
override def getTaskManagerAkkaConfigString(index: Int): String = {
super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings
}
override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
val config = configuration.clone()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册