[FLINK-8171] [flip6] Remove work arounds from Flip6LocalStreamEnvironment

It is no longer needed to wait for the registration of task managers and
to not use slot sharing when submitting jobs to the Flip-6 MiniCluster.
Therefore, we can remove these work arounds from the
Flip6LocalStreamEnvironment.

Adapt comment in RestClusterClient

This closes #5101.
上级 b5db8d90
......@@ -98,7 +98,7 @@ public class RestClusterClient extends ClusterClient {
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
log.info("Submitting job.");
try {
// temporary hack for FLIP-6 since slot-sharing isn't implemented yet
// we have to enable queued scheduling because slot will be allocated lazily
jobGraph.setAllowQueuedScheduling(true);
submitJob(jobGraph);
} catch (JobSubmissionException e) {
......
......@@ -20,12 +20,10 @@ package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
......@@ -86,9 +84,6 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
// TODO - temp fix to enforce restarts due to a bug in the allocation protocol
streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 5));
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
......@@ -99,16 +94,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
// add (and override) the settings with what the user defined
configuration.addAll(this.conf);
// Currently we do not reuse slot anymore,
// so we need to sum up the parallelism of all vertices
int slotsCount = 0;
for (JobVertex jobVertex : jobGraph.getVertices()) {
slotsCount += jobVertex.getParallelism();
}
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(slotsCount)
.setNumSlotsPerTaskManager(jobGraph.getMaximumParallelism())
.build();
if (LOG.isInfoEnabled()) {
......@@ -116,9 +104,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
}
MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
miniCluster.waitUntilTaskManagerRegistrationsComplete();
return miniCluster.runJobBlocking(jobGraph);
}
finally {
......
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.environment;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.TestLogger;
......@@ -63,23 +62,11 @@ public class LocalStreamEnvironmentITCase extends TestLogger {
// ------------------------------------------------------------------------
private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
DataStream<Long> stream = env
.generateSequence(1, 100)
.setParallelism(parallelism)
.slotSharingGroup("group_1");
DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);
stream
.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) {
return false;
}
})
.setParallelism(parallelism)
.filter(ignored -> false).setParallelism(parallelism)
.startNewChain()
.slotSharingGroup("group_2")
.print()
.setParallelism(parallelism);
.print().setParallelism(parallelism);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册