From b0fbce71030c7bd171f5ebd464b1a0d94961c270 Mon Sep 17 00:00:00 2001 From: mingliang Date: Wed, 19 Mar 2014 21:18:12 +0100 Subject: [PATCH] add test for delete process of tmp file and tested in cluster --- .../java/eu/stratosphere/api/common/Plan.java | 9 +- .../api/common/cache/DistributedCache.java | 25 ++-- .../translation/WrappingFunction.java | 6 + .../pact/runtime/cache/FileCache.java | 54 +++++---- .../cache/FileCacheDeleteValidationTest.java | 110 ++++++++++++++++++ .../DistributedCacheTest.java | 13 +++ 6 files changed, 183 insertions(+), 34 deletions(-) create mode 100644 stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java index 04163f238b9..988d0914171 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java @@ -13,20 +13,23 @@ package eu.stratosphere.api.common; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + import java.util.ArrayList; import java.util.Calendar; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map.Entry; +import java.util.Set; import eu.stratosphere.api.common.operators.GenericDataSink; import eu.stratosphere.api.common.operators.Operator; import eu.stratosphere.util.Visitable; import eu.stratosphere.util.Visitor; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkArgument; - /** * This class encapsulates a single stratosphere job (an instantiated data flow), together with some parameters. * Parameters include the name and a default degree of parallelism. The job is referenced by the data sinks, diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java index db10d3bc741..a5b45b05dbb 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java @@ -1,18 +1,27 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + package eu.stratosphere.api.common.cache; -import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.configuration.GlobalConfiguration; import eu.stratosphere.core.fs.Path; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; - import java.io.File; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** @@ -29,8 +38,6 @@ public class DistributedCache { public final static String TMP_PREFIX = "tmp_"; - public final static int DEFAULT_BUFFER_SIZE = 8192; - private Map> cacheCopyTasks = new HashMap>(); public static void addCachedFile(String name, String filePath, Configuration conf) { @@ -60,10 +67,8 @@ public class DistributedCache { //The FutureTask.get() method will block until the file is ready. try { tmp = cacheCopyTasks.get(name).get(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); + } catch (Exception e) { + throw new RuntimeException("Error while getting file from distributed cache", e); } return new File(tmp.toString()); } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/WrappingFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/WrappingFunction.java index 6ecb93d8681..c224a2d1267 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/WrappingFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/WrappingFunction.java @@ -24,6 +24,7 @@ import eu.stratosphere.api.common.accumulators.Histogram; import eu.stratosphere.api.common.accumulators.IntCounter; import eu.stratosphere.api.common.accumulators.LongCounter; import eu.stratosphere.api.common.aggregators.Aggregator; +import eu.stratosphere.api.common.cache.DistributedCache; import eu.stratosphere.api.common.functions.AbstractFunction; import eu.stratosphere.api.common.functions.IterationRuntimeContext; import eu.stratosphere.api.common.functions.RuntimeContext; @@ -138,6 +139,11 @@ public abstract class WrappingFunction extends Abstr return list; } + + @Override + public DistributedCache getDistributedCache() { + return context.getDistributedCache(); + } } private static class WrappingIterationRuntimeContext extends WrappingRuntimeContext implements IterationRuntimeContext { diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java index 2dd32ac0cdc..80ed872f5c0 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java @@ -1,3 +1,16 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + package eu.stratosphere.pact.runtime.cache; import eu.stratosphere.api.common.cache.DistributedCache; @@ -10,6 +23,8 @@ import eu.stratosphere.core.fs.Path; import eu.stratosphere.core.fs.local.LocalFileSystem; import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory; +import eu.stratosphere.nephele.util.IOUtils; + import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -28,7 +43,7 @@ public class FileCache { private LocalFileSystem lfs = new LocalFileSystem(); - private Map, Boolean> active = new HashMap, Boolean>(); + private Map, Integer> count = new HashMap, Integer>(); private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE); @@ -37,8 +52,13 @@ public class FileCache { */ public FutureTask createTmpFile(String name, String filePath, JobID jobID) { - synchronized (active) { - active.put(new ImmutablePair(jobID,name), true); + synchronized (count) { + Pair key = new ImmutablePair(jobID,name); + if (count.containsKey(key)) { + count.put(key, count.get(key) + 1); + } else { + count.put(key, 1); + } } CopyProcess cp = new CopyProcess(name, filePath, jobID); FutureTask copyTask = new FutureTask(cp); @@ -50,10 +70,7 @@ public class FileCache { * Leave a 5 seconds delay to clear the local file. */ public void deleteTmpFile(String name, JobID jobID) { - synchronized (active) { - active.put(new ImmutablePair(jobID, name), false); - } - DeleteProcess dp = new DeleteProcess(name, jobID); + DeleteProcess dp = new DeleteProcess(name, jobID, count.get(new ImmutablePair(jobID,name))); executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS); } @@ -68,7 +85,7 @@ public class FileCache { try { this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - e.printStackTrace(); + throw new RuntimeException("Error shutting down the file cache", e); } } } @@ -94,17 +111,10 @@ public class FileCache { Path distributedPath = new Path(filePath); FileSystem fs = distributedPath.getFileSystem(); FSDataInputStream fsInput = fs.open(distributedPath); - byte [] buffer = new byte[DistributedCache.DEFAULT_BUFFER_SIZE]; - int num = fsInput.read(buffer); - while (num != -1) { - lfsOutput.write(buffer, 0, num); - num = fsInput.read(buffer); - } - fsInput.close(); - lfsOutput.close(); + IOUtils.copyBytes(fsInput, lfsOutput); } } catch (IOException e1) { - e1.printStackTrace(); + throw new RuntimeException("Error copying a file from hdfs to the local fs", e1); } return tmp; } @@ -115,15 +125,17 @@ public class FileCache { private class DeleteProcess implements Runnable { private String name; private JobID jobID; + private int oldCount; - public DeleteProcess(String name, JobID jobID) { + public DeleteProcess(String name, JobID jobID, int c) { this.name = name; this.jobID = jobID; + this.oldCount = c; } public void run() { - synchronized (active) { - if (active.get(new ImmutablePair(jobID, name))) { + synchronized (count) { + if (count.get(new ImmutablePair(jobID, name)) != oldCount) { return; } } @@ -133,7 +145,7 @@ public class FileCache { lfs.delete(tmp, true); } } catch (IOException e1) { - e1.printStackTrace(); + throw new RuntimeException("Error deleting the file", e1); } } } diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java new file mode 100644 index 00000000000..a6320627471 --- /dev/null +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java @@ -0,0 +1,110 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.cache; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +import eu.stratosphere.core.fs.Path; +import eu.stratosphere.core.fs.local.LocalFileSystem; +import eu.stratosphere.nephele.jobgraph.JobID; +import junit.framework.Assert; + +import org.junit.Before; +import org.junit.Test; +import org.junit.After; + +import java.io.File; +import java.io.IOException; + +/** + * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds. + */ +public class FileCacheDeleteValidationTest { + FileCache fileCache = new FileCache(); + LocalFileSystem lfs = new LocalFileSystem(); + + + String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n" + + "hohen Werke Sind herrlich wie am ersten Tag.\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.\n"; + + @Before + public void createTmpCacheFile() { + File f = new File(System.getProperty("java.io.tmpdir"), "cacheFile"); + try { + Files.write(testFileContent, f, Charsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Error initializing the test", e); + } + } + + @Test + public void testFileReuseForNextTask() { + JobID jobID = new JobID(); + String filePath = "file://" + new Path(System.getProperty("java.io.tmpdir"), "cacheFile").toString(); + fileCache.createTmpFile("test_file", filePath, jobID); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted error", e); + } + fileCache.deleteTmpFile("test_file", jobID); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted error", e); + } + //new task comes after 1 second + try { + Assert.assertTrue("Local cache file should not be deleted when another task comes in 5 seconds!", lfs.exists(fileCache.getTempDir(jobID, "test_file"))); + } catch (IOException e) { + throw new RuntimeException("Interrupted error", e); + } + fileCache.createTmpFile("test_file", filePath, jobID); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted error", e); + } + fileCache.deleteTmpFile("test_file", jobID); + try { + Thread.sleep(7000); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted error", e); + } + //no task comes in 7 seconds + try { + Assert.assertTrue("Local cache file should be deleted when no task comes in 5 seconds!", !lfs.exists(fileCache.getTempDir(jobID, "test_file"))); + } catch (IOException e) { + throw new RuntimeException("Interrupted error", e); + } + } + + @After + public void shutdown() { + fileCache.shutdown(); + } +} diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java index 07ffa0d6437..886581346b7 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java @@ -1,3 +1,16 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + package eu.stratosphere.test.distributedCache; import eu.stratosphere.api.common.Plan; -- GitLab