提交 8676280d 编写于 作者: M mingliang 提交者: Robert Metzger

change codestyles and clean up

上级 29c322d4
......@@ -205,7 +205,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
}
// add registered cache file into job configuration
for (Entry<String, String> e: program.getOriginalPactPlan().getCachedFile()) {
for (Entry<String, String> e: program.getOriginalPactPlan().getCachedFiles()) {
DistributedCache.addCachedFile(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration());
}
JobGraph graph = this.jobGraph;
......
......@@ -57,6 +57,8 @@ public class Plan implements Visitable<Operator> {
*/
protected int maxNumberMachines;
protected HashMap<String, String> cacheFile = new HashMap<String, String>();
// ------------------------------------------------------------------------
/**
......@@ -293,15 +295,19 @@ public class Plan implements Visitable<Operator> {
* @param filePath The files must be stored in a place that can be accessed from all workers (most commonly HDFS)
* @param name user defined name of that file
*/
public void registerCachedFile(String filePath, String name) {
this.cacheFile.put(name, filePath);
public void registerCachedFile(String filePath, String name) throws RuntimeException{
if (!this.cacheFile.containsKey(name)) {
this.cacheFile.put(name, filePath);
} else {
throw new RuntimeException("cache file " + name + "already exists!");
}
}
/**
* return the registered caches files
* @return Set of (name, filePath) pairs
*/
public Set<Entry<String,String>> getCachedFile() {
public Set<Entry<String,String>> getCachedFiles() {
return this.cacheFile.entrySet();
}
}
......@@ -256,5 +256,5 @@ public interface Environment {
*/
AccumulatorProtocol getAccumulatorProtocolProxy();
Map<String, FutureTask<Path>> getCopyTaskOfCacheFile();
Map<String, FutureTask<Path>> getCopyTask();
}
......@@ -14,7 +14,15 @@
package eu.stratosphere.nephele.execution;
import java.io.IOException;
import java.util.*;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.FutureTask;
......@@ -849,11 +857,11 @@ public class RuntimeEnvironment implements Environment, Runnable {
return accumulatorProtocolProxy;
}
public void setCopyTaskOfCacheFile(String name, FutureTask<Path> copyTask) {
public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
this.cacheCopyTasks.put(name, copyTask);
}
@Override
public Map<String, FutureTask<Path>> getCopyTaskOfCacheFile() {
public Map<String, FutureTask<Path>> getCopyTask() {
return this.cacheCopyTasks;
}
......
......@@ -22,9 +22,21 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.FutureTask;
import eu.stratosphere.api.common.cache.DistributedCache;
import eu.stratosphere.core.fs.Path;
......
......@@ -1035,7 +1035,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
public RuntimeUDFContext createRuntimeContext(String taskName) {
Environment env = getEnvironment();
return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTaskOfCacheFile());
return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask());
}
// --------------------------------------------------------------------------------------------
......
......@@ -53,7 +53,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
this.udfContext = ((RegularPactTask<?, ?>) parent).createRuntimeContext(taskName);
} else {
Environment env = parent.getEnvironment();
this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTaskOfCacheFile());
this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask());
}
setup(parent);
......
......@@ -275,7 +275,7 @@ public class MockEnvironment implements Environment {
}
@Override
public Map<String, FutureTask<Path>> getCopyTaskOfCacheFile() {
public Map<String, FutureTask<Path>> getCopyTask() {
return null;
}
......
......@@ -13,17 +13,20 @@ import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.util.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.HashSet;
import java.util.Set;
/**
* Test the distributed cache via using the cache file to do a selection on the input
*/
public class distributedCacheTest extends TestBase2 {
public class DistributedCacheTest extends TestBase2 {
public static final String cacheData = "machen\n" + "zeit\n" + "heerscharen\n" + "keiner\n" + "meine\n"
+ "fuehr\n" + "triumph\n" + "kommst\n" + "frei\n" + "schaffen\n" + "gesinde\n"
......@@ -107,24 +110,10 @@ public class distributedCacheTest extends TestBase2 {
}
public void uploadToHDFS(String localFile) throws Exception {
Configuration conf=new Configuration();
conf.set("fs.default.name", "hdfs://192.168.2.102:54320");
FileSystem hdfs=FileSystem.get(conf);
Path src =new Path(localFile);
Path dst =new Path("/");
hdfs.copyFromLocalFile(src, dst);
}
@Override
protected void preSubmit() throws Exception {
textPath = createTempFile("count.txt", WordCountData.COUNTS);
cachePath = createTempFile("cache.txt", cacheData);
// uploadToHDFS(cachePath);
resultPath = getTempDirPath("result");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册