[FLINK-6046] Either store serialized value or permanent blob key in...

[FLINK-6046] Either store serialized value or permanent blob key in ExecutionGraph and ExecutionJobVertex
上级 315badcf
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
......
......@@ -96,13 +96,6 @@ public class JobManagerOptions {
key("jobmanager.archive.fs.dir")
.noDefaultValue();
/**
* The maximum size of the <tt>TaskDeploymentDescriptor</tt>'s serialized task and job
* information to still transmit them via RPC. Larger blobs may be offloaded to the BLOB server.
*/
public static final ConfigOption<Integer> TDD_OFFLOAD_MINSIZE = key("jobmanager.tdd.offload.minsize")
.defaultValue(1_024); // 1KiB by default
// ---------------------------------------------------------------------------------------------
private JobManagerOptions() {
......
......@@ -24,9 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -884,8 +886,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
*
* @return configuration
*/
public final Configuration getConfiguration() {
return blobServiceConfiguration;
public final int getMinOffloadingSize() {
return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
}
/**
......@@ -941,4 +943,37 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
return new ArrayList<>(activeConnections);
}
}
/**
* Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
* offloading size of the BlobServer.
*
* @param value to serialize
* @param jobId to which the value belongs.
* @param blobServer
* @param <T>
* @return
* @throws IOException
*/
public static <T> Either<SerializedValue<T>, PermanentBlobKey> tryOffload(
T value,
JobID jobId,
@Nullable BlobServer blobServer) throws IOException {
final SerializedValue<T> serializedValue = new SerializedValue<>(value);
if (blobServer == null || serializedValue.getByteArray().length < blobServer.getMinOffloadingSize()) {
return Either.Left(new SerializedValue<>(value));
} else {
try {
final PermanentBlobKey permanentBlobKey = blobServer.putPermanent(jobId, serializedValue.getByteArray());
return Either.Right(permanentBlobKey);
} catch (IOException e) {
LOG.warn("Failed to offload value " + value + " for job " + jobId + " to BLOB store.", e);
return Either.Left(serializedValue);
}
}
}
}
......@@ -27,7 +27,6 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
......@@ -58,17 +57,17 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
......@@ -78,6 +77,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
......@@ -174,17 +174,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Job specific information like the job id, job name, job configuration, etc. */
private final JobInformation jobInformation;
/** Serialized version of the job specific information. This is done to avoid multiple
* serializations of the same data when creating a TaskDeploymentDescriptor.
*/
private final SerializedValue<JobInformation> serializedJobInformation;
/**
* The key of the offloaded job information BLOB containing {@link #serializedJobInformation} or
* <tt>null</tt> if not offloaded.
*/
@Nullable
private final PermanentBlobKey jobInformationBlobKey;
private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
/** The executor which is used to execute futures. */
private final ScheduledExecutorService futureExecutor;
......@@ -245,10 +235,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** The total number of vertices currently in the execution graph */
private int numVerticesTotal;
/** Blob server reference for offloading large RPC messages. */
@Nullable
private final BlobServer blobServer;
// ------ Configuration of the Execution -------
/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
......@@ -290,6 +276,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// ------ Fields that are only relevant for archived execution graphs ------------
private String jsonPlan;
@Nullable
private BlobServer blobServer;
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
......@@ -307,10 +296,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
SerializedValue<ExecutionConfig> serializedConfig,
Time timeout,
RestartStrategy restartStrategy,
SlotProvider slotProvider) {
SlotProvider slotProvider) throws IOException {
this(futureExecutor, ioExecutor, jobId, jobName, jobConfig, serializedConfig, timeout,
restartStrategy, slotProvider, null);
this(
new JobInformation(
jobId,
jobName,
serializedConfig,
jobConfig,
Collections.emptyList(),
Collections.emptyList()),
futureExecutor,
ioExecutor,
timeout,
restartStrategy,
slotProvider);
}
/**
......@@ -318,33 +318,41 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
*/
@VisibleForTesting
ExecutionGraph(
JobInformation jobInformation,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
JobID jobId,
String jobName,
Configuration jobConfig,
SerializedValue<ExecutionConfig> serializedConfig,
Time timeout,
RestartStrategy restartStrategy,
SlotProvider slotProvider,
@Nullable BlobServer blobServer) {
SlotProvider slotProvider) throws IOException {
this(
new JobInformation(
jobId,
jobName,
serializedConfig,
jobConfig,
Collections.emptyList(),
Collections.emptyList()),
jobInformation,
futureExecutor,
ioExecutor,
timeout,
restartStrategy,
new RestartAllStrategy.Factory(),
slotProvider);
}
@VisibleForTesting
ExecutionGraph(
JobInformation jobInformation,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
Time timeout,
RestartStrategy restartStrategy,
FailoverStrategy.Factory failoverStrategy,
SlotProvider slotProvider) throws IOException {
this(
jobInformation,
futureExecutor,
ioExecutor,
timeout,
restartStrategy,
failoverStrategy,
slotProvider,
ExecutionGraph.class.getClassLoader(),
blobServer
);
null);
}
public ExecutionGraph(
......@@ -356,21 +364,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
FailoverStrategy.Factory failoverStrategyFactory,
SlotProvider slotProvider,
ClassLoader userClassLoader,
@Nullable BlobServer blobServer) {
@Nullable BlobServer blobServer) throws IOException {
checkNotNull(futureExecutor);
this.jobInformation = Preconditions.checkNotNull(jobInformation);
// serialize the job information to do the serialisation work only once
try {
this.serializedJobInformation = new SerializedValue<>(jobInformation);
}
catch (IOException e) {
// this cannot happen because 'JobInformation' is perfectly serializable
// rethrow unchecked, because this indicates a bug, not a recoverable situation
throw new FlinkRuntimeException("Bug: Cannot serialize JobInformation", e);
}
this.jobInformationOrBlobKey = BlobServer.tryOffload(jobInformation, jobInformation.getJobId(), blobServer);
this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
......@@ -405,50 +405,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
this.blobServer = blobServer;
this.jobInformationBlobKey = tryOffLoadJobInformation();
}
/**
* Tries to store {@link #serializedJobInformation} and in the graph's {@link
* ExecutionGraph#blobServer} (if not <tt>null</tt>) so that RPC messages do not need to include
* it.
*
* @return the BLOB key of the uploaded job information or <tt>null</tt> if the upload failed
*/
@Nullable
private PermanentBlobKey tryOffLoadJobInformation() {
if (blobServer == null) {
return null;
}
// If the serialized job information inside serializedJobInformation is larger than this,
// we try to offload it to the BLOB server.
final int rpcOffloadMinSize =
blobServer.getConfiguration().getInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE);
if (serializedJobInformation.getByteArray().length > rpcOffloadMinSize) {
LOG.info("Storing job {} information at the BLOB server", getJobID());
// TODO: do not overwrite existing job info and thus speed up recovery?
try {
return blobServer.putPermanent(getJobID(), serializedJobInformation.getByteArray());
} catch (IOException e) {
LOG.warn("Failed to offload job " + getJobID() + " information data to BLOB store", e);
}
}
return null;
}
/**
* Returns the key of the offloaded job information BLOB containing {@link
* #serializedJobInformation}.
*
* @return the BLOB key or <tt>null</tt> if not offloaded
*/
@Nullable
public PermanentBlobKey getJobInformationBlobKey() {
return jobInformationBlobKey;
}
// --------------------------------------------------------------------------------------------
......@@ -633,8 +589,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return slotProvider;
}
public SerializedValue<JobInformation> getSerializedJobInformation() {
return serializedJobInformation;
public Either<SerializedValue<JobInformation>, PermanentBlobKey> getJobInformationOrBlobKey() {
return jobInformationOrBlobKey;
}
@Override
......@@ -749,6 +705,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return this.stateTimestamps[status.ordinal()];
}
@Nullable
public final BlobServer getBlobServer() {
return blobServer;
}
......@@ -843,8 +800,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv =
new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp);
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
rpcCallTimeout,
globalModVersion,
createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
......
......@@ -103,24 +103,31 @@ public class ExecutionGraphBuilder {
final FailoverStrategy.Factory failoverStrategy =
FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
final JobInformation jobInformation = new JobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph = (prior != null) ? prior :
new ExecutionGraph(
new JobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths()),
futureExecutor,
ioExecutor,
timeout,
restartStrategy,
failoverStrategy,
slotProvider,
classLoader,
blobServer);
final ExecutionGraph executionGraph;
try {
executionGraph = (prior != null) ? prior :
new ExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
timeout,
restartStrategy,
failoverStrategy,
slotProvider,
classLoader,
blobServer);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
// set the basic properties
......
......@@ -46,8 +46,10 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.types.Either;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import javax.annotation.Nullable;
......@@ -131,6 +133,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
@Nullable
private PermanentBlobKey taskInformationBlobKey = null;
private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;
private InputSplitAssigner splitAssigner;
/**
......@@ -147,12 +151,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
Time timeout,
long initialGlobalModVersion,
long createTimestamp) throws JobException {
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
Time timeout,
long initialGlobalModVersion,
long createTimestamp) throws JobException {
if (graph == null || jobVertex == null) {
throw new NullPointerException();
......@@ -359,80 +363,29 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return inputs;
}
public SerializedValue<TaskInformation> getSerializedTaskInformation() throws IOException {
public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
// only one thread should offload the task information, so let's also let only one thread
// serialize the task information!
synchronized (stateMonitor) {
if (null == serializedTaskInformation) {
int parallelism = getParallelism();
int maxParallelism = getMaxParallelism();
if (LOG.isDebugEnabled()) {
LOG.debug("Creating task information for " + generateDebugString());
}
serializedTaskInformation = new SerializedValue<>(
new TaskInformation(
jobVertex.getID(),
jobVertex.getName(),
parallelism,
maxParallelism,
jobVertex.getInvokableClassName(),
jobVertex.getConfiguration()));
taskInformationBlobKey = tryOffLoadTaskInformation();
}
}
return serializedTaskInformation;
}
/**
* Returns the key of the offloaded task information BLOB containing {@link
* #serializedTaskInformation}.
* <p>
* This may be true after the first call to {@link #getSerializedTaskInformation()}.
*
* @return the BLOB key or <tt>null</tt> if not offloaded
*/
@Nullable
public PermanentBlobKey getTaskInformationBlobKey() {
return taskInformationBlobKey;
}
/**
* Tries to store {@link #serializedTaskInformation} and in the graph's {@link
* ExecutionGraph#blobServer} (if not <tt>null</tt>) so that RPC messages do not need to include
* it.
*
* @return the BLOB key of the uploaded task information or <tt>null</tt> if the upload failed
*/
@Nullable
private PermanentBlobKey tryOffLoadTaskInformation() {
BlobServer blobServer = graph.getBlobServer();
if (blobServer == null) {
return null;
}
// If the serialized task information inside #serializedTaskInformation is larger than this,
// we try to offload it to the BLOB server.
final int rpcOffloadMinSize =
blobServer.getConfiguration().getInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE);
if (serializedTaskInformation.getByteArray().length > rpcOffloadMinSize) {
LOG.info("Storing task {} information at the BLOB server", getJobVertexId());
// TODO: do not overwrite existing task info and thus speed up recovery?
try {
return blobServer.putPermanent(getJobId(), serializedTaskInformation.getByteArray());
} catch (IOException e) {
LOG.warn("Failed to offload task " + getJobVertexId() + " information data to BLOB store", e);
if (taskInformationOrBlobKey == null) {
final BlobServer blobServer = graph.getBlobServer();
final TaskInformation taskInformation = new TaskInformation(
jobVertex.getID(),
jobVertex.getName(),
parallelism,
maxParallelism,
jobVertex.getInvokableClassName(),
jobVertex.getConfiguration());
taskInformationOrBlobKey = BlobServer.tryOffload(
taskInformation,
getJobId(),
blobServer);
}
}
return null;
return taskInformationOrBlobKey;
}
@Override
......
......@@ -46,8 +46,10 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
......@@ -770,33 +772,32 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, partitionType, queueToRequest, partitions));
}
TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
{
PermanentBlobKey jobInfoBlobKey = getExecutionGraph().getJobInformationBlobKey();
if (jobInfoBlobKey != null) {
serializedJobInformation =
new TaskDeploymentDescriptor.Offloaded<>(jobInfoBlobKey);
} else {
serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(
getExecutionGraph().getSerializedJobInformation());
}
final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = getExecutionGraph().getJobInformationOrBlobKey();
final TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
if (jobInformationOrBlobKey.isLeft()) {
serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left());
} else {
serializedJobInformation = new TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right());
}
TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation;
{
PermanentBlobKey taskInfoBlobKey = jobVertex.getTaskInformationBlobKey();
if (taskInfoBlobKey != null) {
serializedTaskInformation = new TaskDeploymentDescriptor.Offloaded<>(taskInfoBlobKey);
} else {
try {
serializedTaskInformation = new TaskDeploymentDescriptor.NonOffloaded<>(
jobVertex.getSerializedTaskInformation());
} catch (IOException e) {
throw new ExecutionGraphException(
"Could not create a serialized JobVertexInformation for " +
jobVertex.getJobVertexId(), e);
}
}
final Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey;
try {
taskInformationOrBlobKey = jobVertex.getTaskInformationOrBlobKey();
} catch (IOException e) {
throw new ExecutionGraphException(
"Could not create a serialized JobVertexInformation for " +
jobVertex.getJobVertexId(), e);
}
final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation;
if (taskInformationOrBlobKey.isLeft()) {
serializedTaskInformation = new TaskDeploymentDescriptor.NonOffloaded<>(taskInformationOrBlobKey.left());
} else {
serializedTaskInformation = new TaskDeploymentDescriptor.Offloaded<>(taskInformationOrBlobKey.right());
}
return new TaskDeploymentDescriptor(
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
......@@ -36,6 +35,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
......@@ -54,7 +54,7 @@ import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.slf4j.LoggerFactory;
......@@ -68,7 +68,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import static junit.framework.TestCase.assertNull;
import static junit.framework.TestCase.assertTrue;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
......@@ -78,7 +78,7 @@ import static org.junit.Assert.fail;
/**
* Tests for {@link ExecutionGraph} deployment.
*/
public class ExecutionGraphDeploymentTest {
public class ExecutionGraphDeploymentTest extends TestLogger {
/**
* BLOB server instance to use for the job graph (may be <tt>null</tt>).
......@@ -98,7 +98,7 @@ public class ExecutionGraphDeploymentTest {
* @param eg the execution graph that was created
*/
protected void checkJobOffloaded(ExecutionGraph eg) throws Exception {
assertNull(eg.getJobInformationBlobKey());
assertTrue(eg.getJobInformationOrBlobKey().isLeft());
}
/**
......@@ -109,7 +109,7 @@ public class ExecutionGraphDeploymentTest {
* @param jobVertexId job vertex ID
*/
protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
assertNull(eg.getJobVertex(jobVertexId).getTaskInformationBlobKey());
assertTrue(eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft());
}
@Test
......@@ -141,17 +141,21 @@ public class ExecutionGraphDeploymentTest {
v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
final JobInformation expectedJobInformation = new DummyJobInformation(
jobId,
"some job");
ExecutionGraph eg = new ExecutionGraph(
expectedJobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
jobId,
"some job",
new Configuration(),
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartAllStrategy.Factory(),
new Scheduler(TestingUtils.defaultExecutionContext()),
ExecutionGraph.class.getClassLoader(),
blobServer);
checkJobOffloaded(eg);
List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
......@@ -387,12 +391,12 @@ public class ExecutionGraphDeploymentTest {
}
}
@Test
/**
* Tests that a blocking batch job fails if there are not enough resources left to schedule the
* succeeding tasks. This test case is related to [FLINK-4296] where finished producing tasks
* swallow the fail exception when scheduling a consumer task.
*/
@Test
public void testNoResourceAvailableFailure() throws Exception {
final JobID jobId = new JobID();
JobVertex v1 = new JobVertex("source");
......@@ -418,18 +422,22 @@ public class ExecutionGraphDeploymentTest {
TestingUtils.directExecutionContext()))));
}
final JobInformation jobInformation = new DummyJobInformation(
jobId,
"failing test job");
// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
jobInformation,
new DirectScheduledExecutorService(),
TestingUtils.defaultExecutor(),
jobId,
"failing test job",
new Configuration(),
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartAllStrategy.Factory(),
scheduler,
ExecutionGraph.class.getClassLoader(),
blobServer);
checkJobOffloaded(eg);
eg.setQueuedSchedulingAllowed(false);
......@@ -495,17 +503,20 @@ public class ExecutionGraphDeploymentTest {
TestingUtils.directExecutionContext()))));
}
final JobInformation jobInformation = new DummyJobInformation(
jobId,
"some job");
// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
jobInformation,
new DirectScheduledExecutorService(),
TestingUtils.defaultExecutor(),
jobId,
"some job",
new Configuration(),
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartAllStrategy.Factory(),
scheduler,
ExecutionGraph.class.getClassLoader(),
blobServer);
checkJobOffloaded(eg);
......
......@@ -18,8 +18,8 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.VoidBlobStore;
......@@ -41,7 +41,7 @@ public class ExecutionGraphDeploymentWithBlobCacheTest extends ExecutionGraphDep
public void setupBlobServer() throws IOException {
Configuration config = new Configuration();
// always offload the serialized job and task information
config.setInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE, 0);
config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
blobServer = new BlobServer(config, new VoidBlobStore());
blobServer.start();
......
......@@ -19,12 +19,14 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.types.Either;
import org.apache.flink.util.SerializedValue;
import org.junit.After;
import org.junit.Before;
......@@ -36,7 +38,6 @@ import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
......@@ -53,7 +54,7 @@ public class ExecutionGraphDeploymentWithBlobServerTest extends ExecutionGraphDe
public void setupBlobServer() throws IOException {
Configuration config = new Configuration();
// always offload the serialized job and task information
config.setInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE, 0);
config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
blobServer = Mockito.spy(new BlobServer(config, new VoidBlobStore()));
seenHashes.clear();
......@@ -81,19 +82,21 @@ public class ExecutionGraphDeploymentWithBlobServerTest extends ExecutionGraphDe
@Override
protected void checkJobOffloaded(ExecutionGraph eg) throws Exception {
PermanentBlobKey jobInformationBlobKey = eg.getJobInformationBlobKey();
assertNotNull(jobInformationBlobKey);
Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = eg.getJobInformationOrBlobKey();
assertTrue(jobInformationOrBlobKey.isRight());
// must not throw:
blobServer.getFile(eg.getJobID(), jobInformationBlobKey);
blobServer.getFile(eg.getJobID(), jobInformationOrBlobKey.right());
}
@Override
protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
PermanentBlobKey taskInformationBlobKey = eg.getJobVertex(jobVertexId).getTaskInformationBlobKey();
assertNotNull(taskInformationBlobKey);
Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey();
assertTrue(taskInformationOrBlobKey.isRight());
// must not throw:
blobServer.getFile(eg.getJobID(), taskInformationBlobKey);
blobServer.getFile(eg.getJobID(), taskInformationOrBlobKey.right());
}
}
......@@ -133,9 +133,7 @@ public class FailoverRegionTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
new InfiniteDelayRestartStrategy(10),
new FailoverPipelinedRegionWithDirectExecutor(),
slotProvider,
ExecutionGraph.class.getClassLoader(),
null);
slotProvider);
eg.attachJobGraph(ordered);
......@@ -257,9 +255,7 @@ public class FailoverRegionTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
new InfiniteDelayRestartStrategy(10),
new RestartPipelinedRegionStrategy.Factory(),
scheduler,
ExecutionGraph.class.getClassLoader(),
null);
scheduler);
try {
eg.attachJobGraph(ordered);
}
......@@ -331,9 +327,7 @@ public class FailoverRegionTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
new InfiniteDelayRestartStrategy(10),
new FailoverPipelinedRegionWithDirectExecutor(),
scheduler,
ExecutionGraph.class.getClassLoader(),
null);
scheduler);
try {
eg.attachJobGraph(ordered);
}
......@@ -441,9 +435,7 @@ public class FailoverRegionTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
restartStrategy,
new FailoverPipelinedRegionWithDirectExecutor(),
scheduler,
ExecutionGraph.class.getClassLoader(),
null);
scheduler);
try {
eg.attachJobGraph(ordered);
}
......
......@@ -168,9 +168,7 @@ public class GlobalModVersionTest {
Time.seconds(10),
new InfiniteDelayRestartStrategy(),
new CustomStrategy(failoverStrategy),
slotProvider,
getClass().getClassLoader(),
null);
slotProvider);
JobVertex jv = new JobVertex("test vertex");
jv.setInvokableClass(NoOpInvokable.class);
......
......@@ -291,9 +291,7 @@ public class IndividualRestartsConcurrencyTest {
Time.seconds(10),
restartStrategy,
failoverStrategy,
slotProvider,
getClass().getClassLoader(),
null);
slotProvider);
JobVertex jv = new JobVertex("test vertex");
jv.setInvokableClass(NoOpInvokable.class);
......
......@@ -303,11 +303,13 @@ public class PipelinedRegionFailoverConcurrencyTest {
SlotProvider slotProvider,
int parallelism) throws Exception {
final JobInformation jobInformation = new DummyJobInformation(
jid,
"test job");
// build a simple execution graph with on job vertex, parallelism 2
final ExecutionGraph graph = new ExecutionGraph(
new DummyJobInformation(
jid,
"test job"),
jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
Time.seconds(10),
......
......@@ -89,10 +89,12 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
final JobInformation jobInformation = new DummyJobInformation(
jobId,
jobName);
ExecutionGraph eg = new ExecutionGraph(
new DummyJobInformation(
jobId,
jobName),
jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
......@@ -171,10 +173,12 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
final JobInformation jobInformation = new DummyJobInformation(
jobId,
jobName);
ExecutionGraph eg = new ExecutionGraph(
new DummyJobInformation(
jobId,
jobName),
jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
......@@ -258,10 +262,12 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
final JobInformation jobInformation = new DummyJobInformation(
jobId,
jobName);
ExecutionGraph eg = new ExecutionGraph(
new DummyJobInformation(
jobId,
jobName),
jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
......@@ -336,10 +342,12 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
final JobInformation jobInformation = new DummyJobInformation(
jobId,
jobName);
ExecutionGraph eg = new ExecutionGraph(
new DummyJobInformation(
jobId,
jobName),
jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
......
......@@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -140,10 +141,12 @@ public class RescalePartitionerTest extends TestLogger {
assertEquals(4, mapVertex.getParallelism());
assertEquals(2, sinkVertex.getParallelism());
final JobInformation jobInformation = new DummyJobInformation(
jobId,
jobName);
ExecutionGraph eg = new ExecutionGraph(
new DummyJobInformation(
jobId,
jobName),
jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册