提交 f31a55e0 编写于 作者: A Aljoscha Krettek

[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()

Before, there where some checks in
StreamExecutionEnvironment.set(Max)Parallelism() but a user would
circumvent these if using the ExecutionConfig directly. Now, all checks
are moved to the ExecutionConfig.
上级 e4fbae36
......@@ -84,8 +84,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
public static final int PARALLELISM_UNKNOWN = -2;
/**
* The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
* some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
* The default lower bound for max parallelism if nothing was configured by the user. We have
* this to allow users some degree of scale-up in case they forgot to configure maximum
* parallelism explicitly.
*/
public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
......@@ -292,13 +293,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
if (parallelism != PARALLELISM_UNKNOWN) {
if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
throw new IllegalArgumentException(
"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
}
this.parallelism = parallelism;
}
checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
checkArgument(
parallelism >= 1 || parallelism == PARALLELISM_DEFAULT,
"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT " +
"(use system default).");
checkArgument(
maxParallelism == -1 || parallelism <= maxParallelism,
"The specified parallelism must be smaller or equal to the maximum parallelism.");
checkArgument(
maxParallelism == -1 || parallelism != PARALLELISM_DEFAULT,
"Default parallelism cannot be specified when maximum parallelism is specified");
this.parallelism = parallelism;
return this;
}
......@@ -325,7 +331,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public void setMaxParallelism(int maxParallelism) {
checkArgument(
parallelism != PARALLELISM_DEFAULT,
"A maximum parallelism can only be specified with an explicitly specified " +
"parallelism.");
checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
checkArgument(
maxParallelism >= parallelism,
"The maximum parallelism must be larger than the parallelism.");
checkArgument(
maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
"maxParallelism is out of bounds 0 < maxParallelism <= " +
UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
this.maxParallelism = maxParallelism;
}
......
......@@ -168,9 +168,6 @@ public abstract class StreamExecutionEnvironment {
* @param parallelism The parallelism
*/
public StreamExecutionEnvironment setParallelism(int parallelism) {
if (parallelism < 1) {
throw new IllegalArgumentException("parallelism must be at least one.");
}
config.setParallelism(parallelism);
return this;
}
......@@ -184,11 +181,6 @@ public abstract class StreamExecutionEnvironment {
* @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1
*/
public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
Preconditions.checkArgument(maxParallelism > 0 &&
maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
"maxParallelism is out of bounds 0 < maxParallelism <= " +
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
config.setMaxParallelism(maxParallelism);
return this;
}
......
......@@ -36,7 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.SplittableIterator;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.net.URL;
import java.util.Arrays;
......@@ -52,6 +54,10 @@ import static org.mockito.Mockito.mock;
public class StreamExecutionEnvironmentTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
@Test
public void fromElementsWithBaseTypeTest1() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
......@@ -155,6 +161,53 @@ public class StreamExecutionEnvironmentTest {
contextEnv.getParallelism());
}
@Test
public void testMaxParallelismMustBeBiggerEqualParallelism() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
exception.expect(IllegalArgumentException.class);
env.setMaxParallelism(5);
}
@Test
public void testParallelismMustBeSmallerEqualMaxParallelism() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
env.setMaxParallelism(20);
exception.expect(IllegalArgumentException.class);
env.setParallelism(30);
}
@Test
public void testSetDefaultParallelismNotAllowedWhenMaxParallelismSpecified() {
final int defaultParallelism = 20;
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
env.setParallelism(10);
env.setMaxParallelism(15);
exception.expect(IllegalArgumentException.class);
env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
}
@Test
public void testSetMaxParallelismNotAllowedWithDefaultParallelism() {
final int defaultParallelism = 20;
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
env.setParallelism(10);
env.setMaxParallelism(15);
exception.expect(IllegalArgumentException.class);
env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
}
@Test
public void testParallelismBounds() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
......@@ -206,6 +259,7 @@ public class StreamExecutionEnvironmentTest {
Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
// configured value after generating
env.setParallelism(21);
env.setMaxParallelism(42);
env.getStreamGraph().getJobGraph();
Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
......
......@@ -247,6 +247,7 @@ public class StreamGraphGeneratorTest {
public void testSetupOfKeyGroupPartitioner() {
int maxParallelism = 42;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setParallelism(12);
env.getConfig().setMaxParallelism(maxParallelism);
DataStream<Integer> source = env.fromElements(1, 2, 3);
......@@ -278,6 +279,7 @@ public class StreamGraphGeneratorTest {
int keyedResult2MaxParallelism = 17;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setParallelism(12);
env.getConfig().setMaxParallelism(globalMaxParallelism);
DataStream<Integer> source = env.fromElements(1, 2, 3);
......@@ -384,6 +386,7 @@ public class StreamGraphGeneratorTest {
DataStream<Integer> input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128);
DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129);
env.setParallelism(12);
env.getConfig().setMaxParallelism(maxParallelism);
DataStream<Integer> keyedResult = input1.connect(input2).keyBy(
......
......@@ -298,8 +298,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getLeaderRPCPort());
env.setMaxParallelism(2 * PARALLELISM);
env.setParallelism(PARALLELISM);
env.setMaxParallelism(2 * PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册