diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..3d110bdb2cb6712997868c19f316198df8ab6dda --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import java.io.Serializable; + +/** + * A configuration config for configuring behaviour of the system, such as whether to use + * the closure cleaner, object-reuse mode... + */ +public class ExecutionConfig implements Serializable { + + // Key for storing it in the Job Configuration + public static final String CONFIG_KEY = "runtime.config"; + + private boolean useClosureCleaner = true; + private int degreeOfParallelism = -1; + private int numberOfExecutionRetries = -1; + + // For future use... +// private boolean forceGenericSerializer = false; +// private boolean objectReuse = false; + + /** + * Enables the ClosureCleaner. This analyzes user code functions and sets fields to null + * that are not used. This will in most cases make closures or anonymous inner classes + * serializable that where not serializable due to some Scala or Java implementation artifact. + * User code must be serializable because it needs to be sent to worker nodes. + */ + public ExecutionConfig enableClosureCleaner() { + useClosureCleaner = true; + return this; + } + + /** + * Disables the ClosureCleaner. @see #enableClosureCleaner() + */ + public ExecutionConfig disableClosureCleaner() { + useClosureCleaner = false; + return this; + } + + /** + * Returns whether the ClosureCleaner is enabled. @see #enableClosureCleaner() + */ + public boolean isClosureCleanerEnabled() { + return useClosureCleaner; + } + + /** + * Gets the degree of parallelism with which operation are executed by default. Operations can + * individually override this value to use a specific degree of parallelism. + * Other operations may need to run with a different + * degree of parallelism - for example calling + * a reduce operation over the entire + * set will insert eventually an operation that runs non-parallel (degree of parallelism of one). + * + * @return The degree of parallelism used by operations, unless they override that value. This method + * returns {@code -1}, if the environments default parallelism should be used. + */ + + public int getDegreeOfParallelism() { + return degreeOfParallelism; + } + + /** + * Sets the degree of parallelism (DOP) for operations executed through this environment. + * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with + * x parallel instances. + *

+ * This method overrides the default parallelism for this environment. + * The local execution environment uses by default a value equal to the number of hardware + * contexts (CPU cores / threads). When executing the program via the command line client + * from a JAR file, the default degree of parallelism is the one configured for that setup. + * + * @param degreeOfParallelism The degree of parallelism + */ + + public ExecutionConfig setDegreeOfParallelism(int degreeOfParallelism) { + if (degreeOfParallelism < 1) { + throw new IllegalArgumentException("Degree of parallelism must be at least one."); + } + this.degreeOfParallelism = degreeOfParallelism; + return this; + } + + /** + * Gets the number of times the system will try to re-execute failed tasks. A value + * of {@code -1} indicates that the system default value (as defined in the configuration) + * should be used. + * + * @return The number of times the system will try to re-execute failed tasks. + */ + public int getNumberOfExecutionRetries() { + return numberOfExecutionRetries; + } + + /** + * Sets the number of times that failed tasks are re-executed. A value of zero + * effectively disables fault tolerance. A value of {@code -1} indicates that the system + * default value (as defined in the configuration) should be used. + * + * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks. + */ + public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) { + if (numberOfExecutionRetries < -1) { + throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)"); + } + this.numberOfExecutionRetries = numberOfExecutionRetries; + return this; + } + + // These are for future use... +// public ExecutionConfig forceGenericSerializer() { +// forceGenericSerializer = true; +// return this; +// } +// +// public ExecutionConfig disableForceGenericSerializer() { +// forceGenericSerializer = false; +// return this; +// } +// +// public boolean isForceGenericSerializerEnabled() { +// return forceGenericSerializer; +// } +// +// public ExecutionConfig enableObjectReuse() { +// objectReuse = true; +// return this; +// } +// +// public ExecutionConfig disableObjectReuse() { +// objectReuse = false; +// return this; +// } +// +// public boolean isObjectReuseEnabled() { +// return objectReuse; +// } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 38046793df72a337ed88ff710f319122d01dd2a8..c19e9aa7c9a1d8cfa76d22faaf667c848d3ec006 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.UUID; import org.apache.commons.lang3.Validate; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; @@ -95,9 +96,7 @@ public abstract class ExecutionEnvironment { private final List> cacheFile = new ArrayList>(); - private int degreeOfParallelism = -1; - - private int numberOfExecutionRetries = -1; + private ExecutionConfig config = new ExecutionConfig(); // -------------------------------------------------------------------------------------------- @@ -110,7 +109,22 @@ public abstract class ExecutionEnvironment { protected ExecutionEnvironment() { this.executionId = UUID.randomUUID(); } - + + /** + * Sets the config object. + */ + public void setConfig(ExecutionConfig config) { + Validate.notNull(config); + this.config = config; + } + + /** + * Gets the config object. + */ + public ExecutionConfig getConfig() { + return config; + } + /** * Gets the degree of parallelism with which operation are executed by default. Operations can * individually override this value to use a specific degree of parallelism via @@ -123,7 +137,7 @@ public abstract class ExecutionEnvironment { * returns {@code -1}, if the environments default parallelism should be used. */ public int getDegreeOfParallelism() { - return degreeOfParallelism; + return config.getDegreeOfParallelism(); } /** @@ -139,11 +153,7 @@ public abstract class ExecutionEnvironment { * @param degreeOfParallelism The degree of parallelism */ public void setDegreeOfParallelism(int degreeOfParallelism) { - if (degreeOfParallelism < 1) { - throw new IllegalArgumentException("Degree of parallelism must be at least one."); - } - - this.degreeOfParallelism = degreeOfParallelism; + config.setDegreeOfParallelism(degreeOfParallelism); } /** @@ -154,10 +164,7 @@ public abstract class ExecutionEnvironment { * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks. */ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) { - if (numberOfExecutionRetries < -1) { - throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)"); - } - this.numberOfExecutionRetries = numberOfExecutionRetries; + config.setNumberOfExecutionRetries(numberOfExecutionRetries); } /** @@ -168,7 +175,7 @@ public abstract class ExecutionEnvironment { * @return The number of times the system will try to re-execute failed tasks. */ public int getNumberOfExecutionRetries() { - return numberOfExecutionRetries; + return config.getNumberOfExecutionRetries(); } /** @@ -742,8 +749,7 @@ public abstract class ExecutionEnvironment { if (getDegreeOfParallelism() > 0) { plan.setDefaultParallelism(getDegreeOfParallelism()); } - plan.setNumberOfExecutionRetries(this.numberOfExecutionRetries); - + try { registerCachedFilesWithPlan(plan); } catch (Exception e) { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 13d1f08aa0a6b9d4916f080f8d3b8b76ef08cad4..69bcfde9a0d774bffe38abd69454c63380b62732 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -92,8 +92,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * Returns the execution environment associated with the current DataSet. * @return associated execution environment */ - def getExecutionEnvironment: ExecutionEnvironment = new ExecutionEnvironment(set - .getExecutionEnvironment) + def getExecutionEnvironment: ExecutionEnvironment = + new ExecutionEnvironment(set.getExecutionEnvironment) /** * Returns the underlying Java DataSet. @@ -110,11 +110,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * * @param f the closure to clean * @param checkSerializable whether or not to immediately check f for serializability - * @throws SparkException if checkSerializable is set but f is not - * serializable + * @throws InvalidProgramException if checkSerializable is set but f + * is not serializable */ private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) + if (set.getExecutionEnvironment.getConfig.isClosureCleanerEnabled) { + ClosureCleaner.clean(f, checkSerializable) + } f } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index e756e78fc5b8172f0f07ac128b061bfe3683f535..43f86091ef0a2b6eec7154a7ab24a7e5c0a046cd 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.scala import java.util.UUID import org.apache.commons.lang3.Validate -import org.apache.flink.api.common.JobExecutionResult +import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult} import org.apache.flink.api.java.io._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.BasicTypeInfo @@ -28,7 +28,8 @@ import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase} import org.apache.flink.api.scala.operators.ScalaCsvInputFormat import org.apache.flink.core.fs.Path -import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, CollectionEnvironment} +import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, +CollectionEnvironment} import org.apache.flink.api.common.io.{InputFormat, FileInputFormat} import org.apache.flink.api.java.operators.DataSource @@ -59,6 +60,19 @@ import scala.reflect.ClassTag * be created. */ class ExecutionEnvironment(javaEnv: JavaEnv) { + /** + * Sets the config object. + */ + def setConfig(config: ExecutionConfig): Unit = { + javaEnv.setConfig(config) + } + + /** + * Gets the config object. + */ + def getConfig: ExecutionConfig = { + javaEnv.getConfig + } /** * Sets the degree of parallelism (DOP) for operations executed through this environment.