diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 8c0462d093d26986c917d6d5e5db1a68beef2c8a..eedcee15578554a32e4379108a07bf1573790eae 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -98,7 +98,7 @@ public class RestClusterClient extends ClusterClient { protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { log.info("Submitting job."); try { - // temporary hack for FLIP-6 since slot-sharing isn't implemented yet + // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); submitJob(jobGraph); } catch (JobSubmissionException e) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java index e276ac77e33a882e1f4ab270e90abe76e010982c..8720e7a9237cecdc2c9700057f845623aec660d0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java @@ -20,12 +20,10 @@ package org.apache.flink.streaming.api.environment; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -86,9 +84,6 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); - // TODO - temp fix to enforce restarts due to a bug in the allocation protocol - streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 5)); - JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true); @@ -99,16 +94,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { // add (and override) the settings with what the user defined configuration.addAll(this.conf); - // Currently we do not reuse slot anymore, - // so we need to sum up the parallelism of all vertices - int slotsCount = 0; - for (JobVertex jobVertex : jobGraph.getVertices()) { - slotsCount += jobVertex.getParallelism(); - } - MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) - .setNumSlotsPerTaskManager(slotsCount) + .setNumSlotsPerTaskManager(jobGraph.getMaximumParallelism()) .build(); if (LOG.isInfoEnabled()) { @@ -116,9 +104,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { } MiniCluster miniCluster = new MiniCluster(cfg); + try { miniCluster.start(); - miniCluster.waitUntilTaskManagerRegistrationsComplete(); return miniCluster.runJobBlocking(jobGraph); } finally { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java index c05359812df5159225dd2291514179fad471c70a..f302eda6856fba0f7644666891cf6eaf824389d9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.environment; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.util.TestLogger; @@ -63,23 +62,11 @@ public class LocalStreamEnvironmentITCase extends TestLogger { // ------------------------------------------------------------------------ private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) { - DataStream stream = env - .generateSequence(1, 100) - .setParallelism(parallelism) - .slotSharingGroup("group_1"); + DataStream stream = env.generateSequence(1, 100).setParallelism(parallelism); stream - .filter(new FilterFunction() { - @Override - public boolean filter(Long value) { - return false; - } - }) - .setParallelism(parallelism) + .filter(ignored -> false).setParallelism(parallelism) .startNewChain() - .slotSharingGroup("group_2") - - .print() - .setParallelism(parallelism); + .print().setParallelism(parallelism); } }