提交 026311ae 编写于 作者: A Aljoscha Krettek 提交者: Stephan Ewen

[APIs] Add ExecutionConfig

上级 fb96f12a
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common;
import java.io.Serializable;
/**
* A configuration config for configuring behaviour of the system, such as whether to use
* the closure cleaner, object-reuse mode...
*/
public class ExecutionConfig implements Serializable {
// Key for storing it in the Job Configuration
public static final String CONFIG_KEY = "runtime.config";
private boolean useClosureCleaner = true;
private int degreeOfParallelism = -1;
private int numberOfExecutionRetries = -1;
// For future use...
// private boolean forceGenericSerializer = false;
// private boolean objectReuse = false;
/**
* Enables the ClosureCleaner. This analyzes user code functions and sets fields to null
* that are not used. This will in most cases make closures or anonymous inner classes
* serializable that where not serializable due to some Scala or Java implementation artifact.
* User code must be serializable because it needs to be sent to worker nodes.
*/
public ExecutionConfig enableClosureCleaner() {
useClosureCleaner = true;
return this;
}
/**
* Disables the ClosureCleaner. @see #enableClosureCleaner()
*/
public ExecutionConfig disableClosureCleaner() {
useClosureCleaner = false;
return this;
}
/**
* Returns whether the ClosureCleaner is enabled. @see #enableClosureCleaner()
*/
public boolean isClosureCleanerEnabled() {
return useClosureCleaner;
}
/**
* Gets the degree of parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific degree of parallelism.
* Other operations may need to run with a different
* degree of parallelism - for example calling
* a reduce operation over the entire
* set will insert eventually an operation that runs non-parallel (degree of parallelism of one).
*
* @return The degree of parallelism used by operations, unless they override that value. This method
* returns {@code -1}, if the environments default parallelism should be used.
*/
public int getDegreeOfParallelism() {
return degreeOfParallelism;
}
/**
* Sets the degree of parallelism (DOP) for operations executed through this environment.
* Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
* x parallel instances.
* <p>
* This method overrides the default parallelism for this environment.
* The local execution environment uses by default a value equal to the number of hardware
* contexts (CPU cores / threads). When executing the program via the command line client
* from a JAR file, the default degree of parallelism is the one configured for that setup.
*
* @param degreeOfParallelism The degree of parallelism
*/
public ExecutionConfig setDegreeOfParallelism(int degreeOfParallelism) {
if (degreeOfParallelism < 1) {
throw new IllegalArgumentException("Degree of parallelism must be at least one.");
}
this.degreeOfParallelism = degreeOfParallelism;
return this;
}
/**
* Gets the number of times the system will try to re-execute failed tasks. A value
* of {@code -1} indicates that the system default value (as defined in the configuration)
* should be used.
*
* @return The number of times the system will try to re-execute failed tasks.
*/
public int getNumberOfExecutionRetries() {
return numberOfExecutionRetries;
}
/**
* Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of {@code -1} indicates that the system
* default value (as defined in the configuration) should be used.
*
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*/
public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) {
if (numberOfExecutionRetries < -1) {
throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
}
this.numberOfExecutionRetries = numberOfExecutionRetries;
return this;
}
// These are for future use...
// public ExecutionConfig forceGenericSerializer() {
// forceGenericSerializer = true;
// return this;
// }
//
// public ExecutionConfig disableForceGenericSerializer() {
// forceGenericSerializer = false;
// return this;
// }
//
// public boolean isForceGenericSerializerEnabled() {
// return forceGenericSerializer;
// }
//
// public ExecutionConfig enableObjectReuse() {
// objectReuse = true;
// return this;
// }
//
// public ExecutionConfig disableObjectReuse() {
// objectReuse = false;
// return this;
// }
//
// public boolean isObjectReuseEnabled() {
// return objectReuse;
// }
}
...@@ -29,6 +29,7 @@ import java.util.List; ...@@ -29,6 +29,7 @@ import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.Plan;
...@@ -95,9 +96,7 @@ public abstract class ExecutionEnvironment { ...@@ -95,9 +96,7 @@ public abstract class ExecutionEnvironment {
private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>(); private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
private int degreeOfParallelism = -1; private ExecutionConfig config = new ExecutionConfig();
private int numberOfExecutionRetries = -1;
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
...@@ -110,7 +109,22 @@ public abstract class ExecutionEnvironment { ...@@ -110,7 +109,22 @@ public abstract class ExecutionEnvironment {
protected ExecutionEnvironment() { protected ExecutionEnvironment() {
this.executionId = UUID.randomUUID(); this.executionId = UUID.randomUUID();
} }
/**
* Sets the config object.
*/
public void setConfig(ExecutionConfig config) {
Validate.notNull(config);
this.config = config;
}
/**
* Gets the config object.
*/
public ExecutionConfig getConfig() {
return config;
}
/** /**
* Gets the degree of parallelism with which operation are executed by default. Operations can * Gets the degree of parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific degree of parallelism via * individually override this value to use a specific degree of parallelism via
...@@ -123,7 +137,7 @@ public abstract class ExecutionEnvironment { ...@@ -123,7 +137,7 @@ public abstract class ExecutionEnvironment {
* returns {@code -1}, if the environments default parallelism should be used. * returns {@code -1}, if the environments default parallelism should be used.
*/ */
public int getDegreeOfParallelism() { public int getDegreeOfParallelism() {
return degreeOfParallelism; return config.getDegreeOfParallelism();
} }
/** /**
...@@ -139,11 +153,7 @@ public abstract class ExecutionEnvironment { ...@@ -139,11 +153,7 @@ public abstract class ExecutionEnvironment {
* @param degreeOfParallelism The degree of parallelism * @param degreeOfParallelism The degree of parallelism
*/ */
public void setDegreeOfParallelism(int degreeOfParallelism) { public void setDegreeOfParallelism(int degreeOfParallelism) {
if (degreeOfParallelism < 1) { config.setDegreeOfParallelism(degreeOfParallelism);
throw new IllegalArgumentException("Degree of parallelism must be at least one.");
}
this.degreeOfParallelism = degreeOfParallelism;
} }
/** /**
...@@ -154,10 +164,7 @@ public abstract class ExecutionEnvironment { ...@@ -154,10 +164,7 @@ public abstract class ExecutionEnvironment {
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks. * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*/ */
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) { public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
if (numberOfExecutionRetries < -1) { config.setNumberOfExecutionRetries(numberOfExecutionRetries);
throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
}
this.numberOfExecutionRetries = numberOfExecutionRetries;
} }
/** /**
...@@ -168,7 +175,7 @@ public abstract class ExecutionEnvironment { ...@@ -168,7 +175,7 @@ public abstract class ExecutionEnvironment {
* @return The number of times the system will try to re-execute failed tasks. * @return The number of times the system will try to re-execute failed tasks.
*/ */
public int getNumberOfExecutionRetries() { public int getNumberOfExecutionRetries() {
return numberOfExecutionRetries; return config.getNumberOfExecutionRetries();
} }
/** /**
...@@ -742,8 +749,7 @@ public abstract class ExecutionEnvironment { ...@@ -742,8 +749,7 @@ public abstract class ExecutionEnvironment {
if (getDegreeOfParallelism() > 0) { if (getDegreeOfParallelism() > 0) {
plan.setDefaultParallelism(getDegreeOfParallelism()); plan.setDefaultParallelism(getDegreeOfParallelism());
} }
plan.setNumberOfExecutionRetries(this.numberOfExecutionRetries);
try { try {
registerCachedFilesWithPlan(plan); registerCachedFilesWithPlan(plan);
} catch (Exception e) { } catch (Exception e) {
......
...@@ -92,8 +92,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { ...@@ -92,8 +92,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* Returns the execution environment associated with the current DataSet. * Returns the execution environment associated with the current DataSet.
* @return associated execution environment * @return associated execution environment
*/ */
def getExecutionEnvironment: ExecutionEnvironment = new ExecutionEnvironment(set def getExecutionEnvironment: ExecutionEnvironment =
.getExecutionEnvironment) new ExecutionEnvironment(set.getExecutionEnvironment)
/** /**
* Returns the underlying Java DataSet. * Returns the underlying Java DataSet.
...@@ -110,11 +110,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { ...@@ -110,11 +110,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* *
* @param f the closure to clean * @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not * @throws InvalidProgramException if <tt>checkSerializable</tt> is set but <tt>f</tt>
* serializable * is not serializable
*/ */
private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable) if (set.getExecutionEnvironment.getConfig.isClosureCleanerEnabled) {
ClosureCleaner.clean(f, checkSerializable)
}
f f
} }
......
...@@ -20,7 +20,7 @@ package org.apache.flink.api.scala ...@@ -20,7 +20,7 @@ package org.apache.flink.api.scala
import java.util.UUID import java.util.UUID
import org.apache.commons.lang3.Validate import org.apache.commons.lang3.Validate
import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult}
import org.apache.flink.api.java.io._ import org.apache.flink.api.java.io._
import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.common.typeinfo.BasicTypeInfo
...@@ -28,7 +28,8 @@ import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase} ...@@ -28,7 +28,8 @@ import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase}
import org.apache.flink.api.scala.operators.ScalaCsvInputFormat import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
import org.apache.flink.core.fs.Path import org.apache.flink.core.fs.Path
import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, CollectionEnvironment} import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv,
CollectionEnvironment}
import org.apache.flink.api.common.io.{InputFormat, FileInputFormat} import org.apache.flink.api.common.io.{InputFormat, FileInputFormat}
import org.apache.flink.api.java.operators.DataSource import org.apache.flink.api.java.operators.DataSource
...@@ -59,6 +60,19 @@ import scala.reflect.ClassTag ...@@ -59,6 +60,19 @@ import scala.reflect.ClassTag
* be created. * be created.
*/ */
class ExecutionEnvironment(javaEnv: JavaEnv) { class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Sets the config object.
*/
def setConfig(config: ExecutionConfig): Unit = {
javaEnv.setConfig(config)
}
/**
* Gets the config object.
*/
def getConfig: ExecutionConfig = {
javaEnv.getConfig
}
/** /**
* Sets the degree of parallelism (DOP) for operations executed through this environment. * Sets the degree of parallelism (DOP) for operations executed through this environment.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册