提交 e8edbcaf 编写于 作者: X Xingcan Cui 提交者: Aljoscha Krettek

[FLINK-8257] [conf] Unify the value checks for setParallelism()

上级 9e3bac39
......@@ -120,7 +120,7 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
*/
public O setParallelism(int parallelism) {
Preconditions.checkArgument(parallelism > 0 || parallelism == ExecutionConfig.PARALLELISM_DEFAULT,
"The parallelism of an operator must be at least 1.");
"The parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
this.parallelism = parallelism;
......
......@@ -59,7 +59,7 @@ implements GraphAnalytic<K, VV, EV, T> {
*/
public GraphAnalyticBase<K, VV, EV, T> setParallelism(int parallelism) {
Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
"The parallelism must be greater than zero.");
"The parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
this.parallelism = parallelism;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.graph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.util.Preconditions;
......@@ -77,7 +78,9 @@ public abstract class IterationConfiguration {
* @param parallelism The parallelism.
*/
public void setParallelism(int parallelism) {
Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
Preconditions.checkArgument(
parallelism > 0 || parallelism == ExecutionConfig.PARALLELISM_DEFAULT,
"The parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
this.parallelism = parallelism;
}
......
......@@ -58,7 +58,7 @@ implements GraphAlgorithm<K, VV, EV, R> {
*/
public GraphAlgorithmWrappingBase<K, VV, EV, R> setParallelism(int parallelism) {
Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
"The parallelism must be greater than zero.");
"The parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
this.parallelism = parallelism;
......
......@@ -51,7 +51,7 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
@Override
public DataStreamSource<T> setParallelism(int parallelism) {
if (parallelism > 1 && !isParallel) {
if (parallelism != 1 && !isParallel) {
throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
} else {
return (DataStreamSource<T>) super.setParallelism(parallelism);
......
......@@ -130,16 +130,13 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
}
/**
* Sets the parallelism for this operator. The degree must be 1 or more.
* Sets the parallelism for this operator.
*
* @param parallelism
* The parallelism for this operator.
* @return The operator with set parallelism.
*/
public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
Preconditions.checkArgument(parallelism > 0,
"The parallelism of an operator must be at least 1.");
Preconditions.checkArgument(canBeParallel() || parallelism == 1,
"The parallelism of non parallel operator must be 1.");
......
......@@ -174,9 +174,6 @@ public abstract class StreamExecutionEnvironment {
* @param parallelism The parallelism
*/
public StreamExecutionEnvironment setParallelism(int parallelism) {
if (parallelism < 1) {
throw new IllegalArgumentException("parallelism must be at least one.");
}
config.setParallelism(parallelism);
return this;
}
......
......@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.transformations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -198,7 +199,9 @@ public abstract class StreamTransformation<T> {
* @param parallelism The new parallelism to set on this {@code StreamTransformation}.
*/
public void setParallelism(int parallelism) {
Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
Preconditions.checkArgument(
parallelism > 0 || parallelism == ExecutionConfig.PARALLELISM_DEFAULT,
"The parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
this.parallelism = parallelism;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册