From 8676280d551b8e15708b24b1d1caca352e77059a Mon Sep 17 00:00:00 2001 From: mingliang Date: Sun, 16 Mar 2014 20:38:32 +0100 Subject: [PATCH] change codestyles and clean up --- .../NepheleJobGraphGenerator.java | 2 +- .../java/eu/stratosphere/api/common/Plan.java | 12 ++++++-- .../nephele/execution/Environment.java | 2 +- .../nephele/execution/RuntimeEnvironment.java | 14 +++++++-- .../nephele/taskmanager/TaskManager.java | 16 ++++++++-- .../pact/runtime/task/RegularPactTask.java | 2 +- .../runtime/task/chaining/ChainedDriver.java | 2 +- .../runtime/test/util/MockEnvironment.java | 2 +- ...cheTest.java => DistributedCacheTest.java} | 29 ++++++------------- 9 files changed, 48 insertions(+), 33 deletions(-) rename stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/{distributedCacheTest.java => DistributedCacheTest.java} (89%) diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java index a2eead06fed..b0ae0454bec 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java @@ -205,7 +205,7 @@ public class NepheleJobGraphGenerator implements Visitor { } // add registered cache file into job configuration - for (Entry e: program.getOriginalPactPlan().getCachedFile()) { + for (Entry e: program.getOriginalPactPlan().getCachedFiles()) { DistributedCache.addCachedFile(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration()); } JobGraph graph = this.jobGraph; diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java index 317ec98797c..04163f238b9 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java @@ -57,6 +57,8 @@ public class Plan implements Visitable { */ protected int maxNumberMachines; + protected HashMap cacheFile = new HashMap(); + // ------------------------------------------------------------------------ /** @@ -293,15 +295,19 @@ public class Plan implements Visitable { * @param filePath The files must be stored in a place that can be accessed from all workers (most commonly HDFS) * @param name user defined name of that file */ - public void registerCachedFile(String filePath, String name) { - this.cacheFile.put(name, filePath); + public void registerCachedFile(String filePath, String name) throws RuntimeException{ + if (!this.cacheFile.containsKey(name)) { + this.cacheFile.put(name, filePath); + } else { + throw new RuntimeException("cache file " + name + "already exists!"); + } } /** * return the registered caches files * @return Set of (name, filePath) pairs */ - public Set> getCachedFile() { + public Set> getCachedFiles() { return this.cacheFile.entrySet(); } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java index 87d1c8bdd6f..03e9c3421da 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java @@ -256,5 +256,5 @@ public interface Environment { */ AccumulatorProtocol getAccumulatorProtocolProxy(); - Map> getCopyTaskOfCacheFile(); + Map> getCopyTask(); } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java index b02f65faa5e..86cbdc68cdf 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java @@ -14,7 +14,15 @@ package eu.stratosphere.nephele.execution; import java.io.IOException; -import java.util.*; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.FutureTask; @@ -849,11 +857,11 @@ public class RuntimeEnvironment implements Environment, Runnable { return accumulatorProtocolProxy; } - public void setCopyTaskOfCacheFile(String name, FutureTask copyTask) { + public void addCopyTaskForCacheFile(String name, FutureTask copyTask) { this.cacheCopyTasks.put(name, copyTask); } @Override - public Map> getCopyTaskOfCacheFile() { + public Map> getCopyTask() { return this.cacheCopyTasks; } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index 1aa2eda3be2..09c9c9e4591 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -22,9 +22,21 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.Map.Entry; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.FutureTask; import eu.stratosphere.api.common.cache.DistributedCache; import eu.stratosphere.core.fs.Path; diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java index 69abca94d06..ec2b6ff0685 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java @@ -1035,7 +1035,7 @@ public class RegularPactTask extends AbstractTask implem public RuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); - return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTaskOfCacheFile()); + return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask()); } // -------------------------------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedDriver.java index 455ec5293f5..5e9bc4409c9 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedDriver.java @@ -53,7 +53,7 @@ public abstract class ChainedDriver implements Collector { this.udfContext = ((RegularPactTask) parent).createRuntimeContext(taskName); } else { Environment env = parent.getEnvironment(); - this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTaskOfCacheFile()); + this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask()); } setup(parent); diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java index 192ed6869b1..8e25082ded4 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java @@ -275,7 +275,7 @@ public class MockEnvironment implements Environment { } @Override - public Map> getCopyTaskOfCacheFile() { + public Map> getCopyTask() { return null; } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/distributedCacheTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java similarity index 89% rename from stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/distributedCacheTest.java rename to stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java index 5bd13ad7de6..07ffa0d6437 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/distributedCacheTest.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java @@ -13,17 +13,20 @@ import eu.stratosphere.types.IntValue; import eu.stratosphere.types.Record; import eu.stratosphere.types.StringValue; import eu.stratosphere.util.Collector; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import java.io.*; -import java.util.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.util.HashSet; +import java.util.Set; + /** * Test the distributed cache via using the cache file to do a selection on the input */ -public class distributedCacheTest extends TestBase2 { +public class DistributedCacheTest extends TestBase2 { public static final String cacheData = "machen\n" + "zeit\n" + "heerscharen\n" + "keiner\n" + "meine\n" + "fuehr\n" + "triumph\n" + "kommst\n" + "frei\n" + "schaffen\n" + "gesinde\n" @@ -107,24 +110,10 @@ public class distributedCacheTest extends TestBase2 { } - - public void uploadToHDFS(String localFile) throws Exception { - - Configuration conf=new Configuration(); - conf.set("fs.default.name", "hdfs://192.168.2.102:54320"); - FileSystem hdfs=FileSystem.get(conf); - Path src =new Path(localFile); - Path dst =new Path("/"); - hdfs.copyFromLocalFile(src, dst); - - } - - @Override protected void preSubmit() throws Exception { textPath = createTempFile("count.txt", WordCountData.COUNTS); cachePath = createTempFile("cache.txt", cacheData); -// uploadToHDFS(cachePath); resultPath = getTempDirPath("result"); } -- GitLab