From 3655dee5b5feee46eaadeaae221fa8f358b90340 Mon Sep 17 00:00:00 2001 From: Zohar Mizrahi Date: Sun, 9 Apr 2017 12:11:57 +0300 Subject: [PATCH] [FLINK-6177] Add support for "Distributed Cache" in streaming applications This closes #3741. --- .../StreamExecutionEnvironment.java | 48 +++++++++++++++ .../api/graph/StreamingJobGraphGenerator.java | 6 ++ .../scala/StreamExecutionEnvironment.scala | 48 +++++++++++++++ .../DistributedCacheTest.java | 60 +++++++++---------- 4 files changed, 130 insertions(+), 32 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index e8279454dfc..aad3a4b74c3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -33,6 +33,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.common.io.FileInputFormat; @@ -46,6 +47,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; @@ -136,6 +138,8 @@ public abstract class StreamExecutionEnvironment { /** The time characteristic used by the data streams. */ private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; + protected final List> cacheFile = new ArrayList<>(); + // -------------------------------------------------------------------------------------------- // Constructor and Properties @@ -148,6 +152,13 @@ public abstract class StreamExecutionEnvironment { return config; } + /** + * Get the list of cached files that were registered for distribution among the task managers. + */ + public List> getCachedFiles() { + return cacheFile; + } + /** * Sets the parallelism for operations executed through this environment. * Setting a parallelism of x here will cause all operators (such as map, @@ -1774,4 +1785,41 @@ public abstract class StreamExecutionEnvironment { protected static void resetContextEnvironment() { contextEnvironmentFactory = null; } + + /** + * Registers a file at the distributed cache under the given name. The file will be accessible + * from any user-defined function in the (distributed) runtime under a local path. Files + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. + * The runtime will copy the files temporarily to a local cache, if needed. + * + *

The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via + * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access + * {@link org.apache.flink.api.common.cache.DistributedCache} via + * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. + * + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path") + * @param name The name under which the file is registered. + */ + public void registerCachedFile(String filePath, String name) { + registerCachedFile(filePath, name, false); + } + + /** + * Registers a file at the distributed cache under the given name. The file will be accessible + * from any user-defined function in the (distributed) runtime under a local path. Files + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. + * The runtime will copy the files temporarily to a local cache, if needed. + * + *

The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via + * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access + * {@link org.apache.flink.api.common.cache.DistributedCache} via + * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. + * + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path") + * @param name The name under which the file is registered. + * @param executable flag indicating whether the file should be executable + */ + public void registerCachedFile(String filePath, String name, boolean executable) { + this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable))); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index aa0f08d20b9..b3a6cf82601 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; @@ -151,6 +152,11 @@ public class StreamingJobGraphGenerator { configureCheckpointing(); + // add registered cache file into job configuration + for (Tuple2 e : streamGraph.getEnvironment().getCachedFiles()) { + DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration()); + } + // set the ExecutionConfig last when it has been finalized try { jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 0b2587ecee2..742baf93154 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -49,6 +49,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ def getConfig = javaEnv.getConfig + /** + * Gets cache files. + */ + def getCachedFiles = javaEnv.getCachedFiles + /** * Sets the parallelism for operations executed through this environment. * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run @@ -668,6 +673,49 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { } f } + + + /** + * Registers a file at the distributed cache under the given name. The file will be accessible + * from any user-defined function in the (distributed) runtime under a local path. Files + * may be local files (as long as all relevant workers have access to it), or files in a + * distributed file system. The runtime will copy the files temporarily to a local cache, + * if needed. + *

+ * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs + * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and + * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via + * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. + * + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or + * "hdfs://host:port/and/path") + * @param name The name under which the file is registered. + */ + def registerCachedFile(filePath: String, name: String): Unit = { + javaEnv.registerCachedFile(filePath, name) + } + + + /** + * Registers a file at the distributed cache under the given name. The file will be accessible + * from any user-defined function in the (distributed) runtime under a local path. Files + * may be local files (as long as all relevant workers have access to it), or files in a + * distributed file system. The runtime will copy the files temporarily to a local cache, + * if needed. + *

+ * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs + * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and + * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via + * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. + * + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or + * "hdfs://host:port/and/path") + * @param name The name under which the file is registered. + * @param executable flag indicating whether the file should be executable + */ + def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit = { + javaEnv.registerCachedFile(filePath, name, executable) + } } object StreamExecutionEnvironment { diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java index 4fb2d952778..19bcf76d221 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java @@ -15,29 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.test.distributedCache; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +package org.apache.flink.test.distributedCache; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +import java.io.*; + +import java.util.*; -/** - * Tests the distributed cache by comparing a text file with a distributed copy. - */ -public class DistributedCacheTest extends JavaProgramTestBase { +public class DistributedCacheTest extends StreamingMultipleProgramsTestBase { public static final String data = "machen\n" + "zeit\n" @@ -45,33 +42,31 @@ public class DistributedCacheTest extends JavaProgramTestBase { + "keiner\n" + "meine\n"; - protected String textPath; - @Override - protected void preSubmit() throws Exception { - textPath = createTempFile("count.txt", data); + @Test + public void testStreamingDistributedCache() throws Exception { + String textPath = createTempFile("count.txt", data); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.registerCachedFile(textPath, "cache_test"); + env.readTextFile(textPath).flatMap(new WordChecker()); + env.execute(); } - @Override - protected void testProgram() throws Exception { + @Test + public void testBatchDistributedCache() throws Exception { + String textPath = createTempFile("count.txt", data); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.registerCachedFile(textPath, "cache_test"); - - List> result = env - .readTextFile(textPath) - .flatMap(new WordChecker()) - .collect(); - - compareResultAsTuples(result, data); + env.readTextFile(textPath).flatMap(new WordChecker()).count(); } public static class WordChecker extends RichFlatMapFunction> { private static final long serialVersionUID = 1L; - private final Set wordList = new HashSet<>(); + private final List wordList = new ArrayList<>(); @Override - public void open(Configuration conf) throws FileNotFoundException, IOException { + public void open(Configuration conf) throws IOException { File file = getRuntimeContext().getDistributedCache().getFile("cache_test"); BufferedReader reader = new BufferedReader(new FileReader(file)); String tempString; @@ -83,9 +78,10 @@ public class DistributedCacheTest extends JavaProgramTestBase { @Override public void flatMap(String word, Collector> out) throws Exception { - if (wordList.contains(word)) { - out.collect(new Tuple1<>(word)); - } + assertTrue("Unexpected word in stream! wordFromStream: " + word + ", shouldBeOneOf: " + + wordList.toString(), wordList.contains(word)); + + out.collect(new Tuple1<>(word)); } } } -- GitLab