提交 80d6c586 编写于 作者: D Daniel Warneke

Improved compatibility with Windows

上级 acca0ef8
......@@ -241,7 +241,7 @@ public final class GlobalConfiguration {
// load each xml file
for (File f : files) {
get().loadResource("file://" + f.getAbsolutePath());
get().loadResource(f);
}
// Store the path to the configuration directory itself
......@@ -253,10 +253,10 @@ public final class GlobalConfiguration {
/**
* Loads an XML document of key-values pairs.
*
* @param uri
* the URI pointing to the XML document
* @param file
* the XML document file
*/
private void loadResource(final String uri) {
private void loadResource(final File file) {
final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
// Ignore comments in the XML file
......@@ -278,7 +278,7 @@ public final class GlobalConfiguration {
Document doc = null;
Element root = null;
doc = builder.parse(uri);
doc = builder.parse(file);
if (doc == null) {
LOG.warn("Cannot load configuration: doc is null");
......
......@@ -49,7 +49,7 @@ public class LocalFileStatus implements FileStatus {
*/
public LocalFileStatus(final File f, final FileSystem fs) {
this.file = f;
this.path = new Path(fs.getUri().getScheme() + ":" + f.getAbsolutePath());
this.path = new Path(fs.getUri().getScheme() + ":" + f.toURI().getPath());
}
/**
......
......@@ -162,10 +162,10 @@ public final class JobFileInputVertex extends AbstractJobInputVertex {
// Check if the path is valid
try {
final FileSystem fs = path.getFileSystem();
final FileStatus f = fs.getFileStatus(path);
final FileSystem fs = this.path.getFileSystem();
final FileStatus f = fs.getFileStatus(this.path);
if (f == null) {
throw new IOException(path.toString() + " led to a null object");
throw new IOException(this.path.toString() + " led to a null object");
}
} catch (IOException e) {
throw new IllegalConfigurationException("Cannot access file or directory: "
......
......@@ -15,6 +15,7 @@
package eu.stratosphere.nephele.checkpointing;
import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
......@@ -40,8 +41,6 @@ public final class CheckpointUtils {
public static final String DISTRIBUTED_CHECKPOINT_PATH_KEY = "checkpoint.distributed.path";
public static final String DEFAULT_LOCAL_CHECKPOINT_PATH = "file:///tmp";
public static final String COMPLETED_CHECKPOINT_SUFFIX = "_final";
private static Path LOCAL_CHECKPOINT_PATH = null;
......@@ -60,8 +59,13 @@ public final class CheckpointUtils {
public static Path getLocalCheckpointPath() {
if (LOCAL_CHECKPOINT_PATH == null) {
LOCAL_CHECKPOINT_PATH = new Path(GlobalConfiguration.getString(LOCAL_CHECKPOINT_PATH_KEY,
DEFAULT_LOCAL_CHECKPOINT_PATH));
String localCheckpointPath = GlobalConfiguration.getString(LOCAL_CHECKPOINT_PATH_KEY, null);
if (localCheckpointPath == null) {
LOCAL_CHECKPOINT_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI());
} else {
LOCAL_CHECKPOINT_PATH = new Path(localCheckpointPath);
}
}
return LOCAL_CHECKPOINT_PATH;
......
......@@ -104,8 +104,6 @@ public class HardwareDescriptionFactory {
*/
private static float RUNTIME_MEMORY_THRESHOLD = 0.7f;
private static boolean isNativeCodeLoaded = false;
/**
* Private constructor, so class cannot be instantiated.
*/
......@@ -140,14 +138,14 @@ public class HardwareDescriptionFactory {
* Constructs a new hardware description object.
*
* @param numberOfCPUCores
* the number of CPU cores available to the JVM on the compute
* node
* the number of CPU cores available to the JVM on the compute
* node
* @param sizeOfPhysicalMemory
* the size of physical memory in bytes available on the compute
* node
* the size of physical memory in bytes available on the compute
* node
* @param sizeOfFreeMemory
* the size of free memory in bytes available to the JVM on the
* compute node
* the size of free memory in bytes available to the JVM on the
* compute node
* @return the hardware description object
*/
public static HardwareDescription construct(int numberOfCPUCores,
......@@ -160,8 +158,8 @@ public class HardwareDescriptionFactory {
/**
* Returns the size of free memory in bytes available to the JVM.
*
* @return the size of the free memory in bytes available to the JVM or
* <code>-1</code> if the size cannot be determined
* @return the size of the free memory in bytes available to the JVM or <code>-1</code> if the size cannot be
* determined
*/
private static long getSizeOfFreeMemory() {
......@@ -453,25 +451,49 @@ public class HardwareDescriptionFactory {
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
private static synchronized long getSizeOfPhysicalMemoryForWindows() {
private static long getSizeOfPhysicalMemoryForWindows() {
BufferedReader bi = null;
long sizeOfPhyiscalMemory = 0L;
try {
Process proc = Runtime.getRuntime().exec("wmic memorychip get capacity");
if (!isNativeCodeLoaded) {
// TODO: Get rid of absolute path here
System.load("C:\\Users\\warneke\\workspace\\stratosphere\\nephele\\nephele-server\\src\\main\\native\\HardwareDescriptionFactory.dll");
bi = new BufferedReader(
new InputStreamReader(proc.getInputStream()));
String line = bi.readLine();
if (line == null) {
return -1L;
}
isNativeCodeLoaded = true;
if (!line.startsWith("Capacity")) {
return -1L;
}
while ((line = bi.readLine()) != null) {
if (line.isEmpty()) {
continue;
}
line = line.replaceAll(" ", "");
sizeOfPhyiscalMemory += Long.parseLong(line);
}
} catch (Exception e) {
LOG.error(e);
return -1L;
} finally {
if (bi != null) {
try {
bi.close();
} catch (IOException ioe) {
}
}
}
return getPhysicalMemoryFromGlobalMemoryStatus();
return sizeOfPhyiscalMemory;
}
/**
* This is a wrapper to the native WIN32 call GlobalMemoryStatus. The
* wrapper extracts the size of the physical memory on a Windows platform in
* bytes.
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
private static native long getPhysicalMemoryFromGlobalMemoryStatus();
}
......@@ -281,7 +281,8 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
// Get the directory for storing temporary files
final String[] tmpDirPaths = GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(":");
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(File.pathSeparator);
checkTempDirs(tmpDirPaths);
// Initialize the byte buffered channel manager
......@@ -1006,31 +1007,32 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
return tmp.requestData(data);
}
/**
* Checks, whether the given strings describe existing directories that are writable. If that is not
* the case, an exception is raised.
*
* @param tempDirs An array of strings which are checked to be paths to writable directories.
* @throws Exception Thrown, if any of the mentioned checks fails.
* @param tempDirs
* An array of strings which are checked to be paths to writable directories.
* @throws Exception
* Thrown, if any of the mentioned checks fails.
*/
private static final void checkTempDirs(String[] tempDirs) throws Exception
{
private static final void checkTempDirs(String[] tempDirs) throws Exception {
for (int i = 0; i < tempDirs.length; i++) {
final String dir = tempDirs[i];
if (dir == null) {
throw new Exception("Temporary file directory #" + (i+1) + " is null.");
throw new Exception("Temporary file directory #" + (i + 1) + " is null.");
}
final File f = new File(dir);
if (!f.exists()) {
throw new Exception("Temporary file directory #" + (i+1) + " does not exist.");
throw new Exception("Temporary file directory #" + (i + 1) + " does not exist.");
}
if (!f.isDirectory()) {
throw new Exception("Temporary file directory #" + (i+1) + " is not a directory.");
throw new Exception("Temporary file directory #" + (i + 1) + " is not a directory.");
}
if (!f.canWrite()) {
throw new Exception("Temporary file directory #" + (i+1) + " is not writable.");
throw new Exception("Temporary file directory #" + (i + 1) + " is not writable.");
}
}
}
......
......@@ -45,10 +45,10 @@ public final class EnvelopeConsumptionLog {
private static final int LOG_WINDOW_SIZE = 256 * 1024;
private static final int SIZE_OF_INTEGER = 4;
private static final AtomicEnumerator<String> TEMP_PATHS = AtomicEnumerator.get(
GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(":"));
System.getProperty("java.io.tmpdir")).split(File.pathSeparator));
private final File logFile;
......@@ -111,8 +111,7 @@ public final class EnvelopeConsumptionLog {
}
}
private static String constructFileName(final ExecutionVertexID vertexID)
{
private static String constructFileName(final ExecutionVertexID vertexID) {
return TEMP_PATHS.getNext() + File.separator + ENVELOPE_CONSUMPTION_LOG_PREFIX + vertexID;
}
......
......@@ -232,7 +232,7 @@ public class ExecutionGraphTest {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile.getAbsolutePath()));
i1.setFilePath(new Path(inputFile.toURI()));
// task vertex
final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
......@@ -241,7 +241,7 @@ public class ExecutionGraphTest {
// output vertex
final JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + ServerTestUtils.getRandomFilename()));
o1.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
o1.setVertexToShareInstancesWith(i1);
i1.setVertexToShareInstancesWith(t1);
......@@ -459,7 +459,8 @@ public class ExecutionGraphTest {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile.getAbsolutePath()));
System.out.println("URI " + inputFile.toURI());
i1.setFilePath(new Path(inputFile.toURI()));
// task vertex
final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
......@@ -468,7 +469,7 @@ public class ExecutionGraphTest {
// output vertex
final JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + ServerTestUtils.getRandomFilename()));
o1.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
// connect vertices
i1.connectTo(t1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION);
......@@ -570,11 +571,11 @@ public class ExecutionGraphTest {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile1.getAbsolutePath()));
i1.setFilePath(new Path(inputFile1.toURI()));
i1.setNumberOfSubtasks(2);
final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
i2.setFileInputClass(FileLineReader.class);
i2.setFilePath(new Path("file://" + inputFile2.getAbsolutePath()));
i2.setFilePath(new Path(inputFile2.toURI()));
i2.setNumberOfSubtasks(2);
// task vertex
......@@ -591,7 +592,7 @@ public class ExecutionGraphTest {
// output vertex
final JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + ServerTestUtils.getRandomFilename()));
o1.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
o1.setNumberOfSubtasks(2);
i1.setVertexToShareInstancesWith(t1);
t1.setVertexToShareInstancesWith(t3);
......@@ -809,12 +810,12 @@ public class ExecutionGraphTest {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile1.getAbsolutePath()));
i1.setFilePath(new Path(inputFile1.toURI()));
i1.setNumberOfSubtasks(4);
i1.setNumberOfSubtasksPerInstance(2);
final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
i2.setFileInputClass(FileLineReader.class);
i2.setFilePath(new Path("file://" + inputFile2.getAbsolutePath()));
i2.setFilePath(new Path(inputFile2.toURI()));
i2.setNumberOfSubtasks(4);
i2.setNumberOfSubtasksPerInstance(2);
// task vertex
......@@ -837,12 +838,12 @@ public class ExecutionGraphTest {
// output vertex
final JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + ServerTestUtils.getRandomFilename()));
o1.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
o1.setNumberOfSubtasks(4);
o1.setNumberOfSubtasksPerInstance(2);
final JobFileOutputVertex o2 = new JobFileOutputVertex("Output 2", jg);
o2.setFileOutputClass(FileLineWriter.class);
o2.setFilePath(new Path("file://" + ServerTestUtils.getRandomFilename()));
o2.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
o2.setNumberOfSubtasks(4);
o2.setNumberOfSubtasksPerInstance(2);
o1.setVertexToShareInstancesWith(o2);
......@@ -937,7 +938,7 @@ public class ExecutionGraphTest {
// input vertex
final JobFileInputVertex input = new JobFileInputVertex(inputTaskName, jg);
input.setFileInputClass(SelfCrossInputTask.class);
input.setFilePath(new Path("file://" + inputFile1.getAbsolutePath()));
input.setFilePath(new Path(inputFile1.toURI()));
input.setNumberOfSubtasks(degreeOfParallelism);
// cross vertex
......@@ -948,13 +949,16 @@ public class ExecutionGraphTest {
// output vertex
final JobFileOutputVertex output = new JobFileOutputVertex(outputTaskName, jg);
output.setFileOutputClass(FileLineWriter.class);
output.setFilePath(new Path("file://" + ServerTestUtils.getRandomFilename()));
output.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
output.setNumberOfSubtasks(degreeOfParallelism);
// connect vertices
input.connectTo(cross, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION, 0, 0, DistributionPattern.POINTWISE);
input.connectTo(cross, ChannelType.NETWORK, CompressionLevel.NO_COMPRESSION, 1, 1, DistributionPattern.BIPARTITE);
cross.connectTo(output, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION, 0, 0, DistributionPattern.POINTWISE);
input.connectTo(cross, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION, 0, 0,
DistributionPattern.POINTWISE);
input.connectTo(cross, ChannelType.NETWORK, CompressionLevel.NO_COMPRESSION, 1, 1,
DistributionPattern.BIPARTITE);
cross.connectTo(output, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION, 0, 0,
DistributionPattern.POINTWISE);
LibraryCacheManager.register(jobID, new String[0]);
......@@ -1059,7 +1063,7 @@ public class ExecutionGraphTest {
// input vertex
final JobFileInputVertex input1 = new JobFileInputVertex("Input 1", jg);
input1.setFileInputClass(FileLineReader.class);
input1.setFilePath(new Path("file://" + inputFile1.getAbsolutePath()));
input1.setFilePath(new Path(inputFile1.toURI()));
input1.setNumberOfSubtasks(degreeOfParallelism);
// forward vertex 1
......@@ -1080,13 +1084,16 @@ public class ExecutionGraphTest {
// output vertex
final JobFileOutputVertex output1 = new JobFileOutputVertex("Output 1", jg);
output1.setFileOutputClass(FileLineWriter.class);
output1.setFilePath(new Path("file://" + ServerTestUtils.getRandomFilename()));
output1.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
output1.setNumberOfSubtasks(degreeOfParallelism);
// connect vertices
input1.connectTo(forward1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION, DistributionPattern.POINTWISE);
forward1.connectTo(forward2, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION, DistributionPattern.POINTWISE);
forward2.connectTo(forward3, ChannelType.NETWORK, CompressionLevel.NO_COMPRESSION, DistributionPattern.POINTWISE);
input1.connectTo(forward1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION,
DistributionPattern.POINTWISE);
forward1.connectTo(forward2, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION,
DistributionPattern.POINTWISE);
forward2.connectTo(forward3, ChannelType.NETWORK, CompressionLevel.NO_COMPRESSION,
DistributionPattern.POINTWISE);
forward3.connectTo(output1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION);
// setup instance sharing
......
......@@ -46,40 +46,40 @@ public class LocalInstanceManagerTest {
*/
@BeforeClass
public static void startDiscoveryService() {
final String configDir = ServerTestUtils.getConfigDir();
if(configDir == null) {
if (configDir == null) {
fail("Cannot locate configuration directory");
}
GlobalConfiguration.loadConfiguration(configDir);
final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
InetAddress bindAddress = null;
if(address != null) {
if (address != null) {
try {
bindAddress = InetAddress.getByName(address);
} catch(UnknownHostException e) {
} catch (UnknownHostException e) {
fail(e.getMessage());
}
}
try {
DiscoveryService.startDiscoveryService(bindAddress, 5555);
} catch(DiscoveryException e) {
} catch (DiscoveryException e) {
fail(e.getMessage());
}
}
/**
* Stops the discovery service after the tests.
*/
@AfterClass
public static void stopDiscoveryService() {
DiscoveryService.stopDiscoveryService();
}
/**
* Checks if the local instance manager reads the default correctly from the configuration file.
*/
......@@ -87,10 +87,10 @@ public class LocalInstanceManagerTest {
public void testInstanceTypeFromConfiguration() {
final String configDir = ServerTestUtils.getConfigDir();
if(configDir == null) {
if (configDir == null) {
fail("Cannot locate configuration directory");
}
final TestInstanceListener testInstanceListener = new TestInstanceListener();
LocalInstanceManager lm = null;
......@@ -107,8 +107,9 @@ public class LocalInstanceManagerTest {
assertEquals(160, defaultInstanceType.getDiskCapacity());
assertEquals(0, defaultInstanceType.getPricePerHour());
} catch(Exception e) {
Assert.fail("Instanciating LocalInstanceManager failed: "+e.getMessage());
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Instantiation of LocalInstanceManager failed: " + e.getMessage());
} finally {
if (lm != null) {
......
......@@ -243,7 +243,7 @@ public class JobManagerITCase {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + testDirectory));
i1.setFilePath(new Path(new File(testDirectory).toURI()));
// task vertex 1
final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
......@@ -256,7 +256,7 @@ public class JobManagerITCase {
// output vertex
JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + outputFile.getAbsolutePath().toString()));
o1.setFilePath(new Path(outputFile.toURI()));
t1.setVertexToShareInstancesWith(i1);
t2.setVertexToShareInstancesWith(i1);
......@@ -272,7 +272,8 @@ public class JobManagerITCase {
}
// add jar
jg.addJar(new Path("file://" + ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar"));
jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar")
.toURI()));
// Create job client and launch job
jobClient = new JobClient(jg, configuration);
......@@ -351,7 +352,7 @@ public class JobManagerITCase {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile.getAbsolutePath().toString()));
i1.setFilePath(new Path(inputFile.toURI()));
// task vertex 1
final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
......@@ -360,7 +361,7 @@ public class JobManagerITCase {
// output vertex
JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + outputFile.getAbsolutePath().toString()));
o1.setFilePath(new Path(outputFile.toURI()));
t1.setVertexToShareInstancesWith(i1);
o1.setVertexToShareInstancesWith(i1);
......@@ -370,7 +371,8 @@ public class JobManagerITCase {
t1.connectTo(o1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION);
// add jar
jg.addJar(new Path("file://" + ServerTestUtils.getTempDir() + File.separator + exceptionClassName + ".jar"));
jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + exceptionClassName + ".jar")
.toURI()));
// Create job client and launch job
jobClient = new JobClient(jg, configuration);
......@@ -438,7 +440,7 @@ public class JobManagerITCase {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile.getAbsolutePath().toString()));
i1.setFilePath(new Path(inputFile.toURI()));
// task vertex 1
final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
......@@ -447,7 +449,7 @@ public class JobManagerITCase {
// output vertex
JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + outputFile.getAbsolutePath().toString()));
o1.setFilePath(new Path(outputFile.toURI()));
t1.setVertexToShareInstancesWith(i1);
o1.setVertexToShareInstancesWith(i1);
......@@ -529,7 +531,7 @@ public class JobManagerITCase {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input with broadcast writer", jg);
i1.setFileInputClass(BroadcastSourceTask.class);
i1.setFilePath(new Path("file://" + inputFile.getAbsolutePath().toString()));
i1.setFilePath(new Path(inputFile.toURI()));
// output vertex
JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
......@@ -537,7 +539,7 @@ public class JobManagerITCase {
o1.setNumberOfSubtasksPerInstance(receivers);
o1.setVertexToShareInstancesWith(i1);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + outputFile.getAbsolutePath().toString()));
o1.setFilePath(new Path(outputFile.toURI()));
// connect vertices
try {
......@@ -547,7 +549,8 @@ public class JobManagerITCase {
}
// add jar
jg.addJar(new Path("file://" + ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar"));
jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar")
.toURI()));
// Create job client and launch job
jobClient = new JobClient(jg, configuration);
......@@ -623,7 +626,7 @@ public class JobManagerITCase {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile.getAbsolutePath().toString()));
i1.setFilePath(new Path(inputFile.toURI()));
// task vertex 1
final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
......@@ -636,7 +639,7 @@ public class JobManagerITCase {
// output vertex
JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + outputFile.getAbsolutePath().toString()));
o1.setFilePath(new Path(outputFile.toURI()));
t1.setVertexToShareInstancesWith(i1);
t2.setVertexToShareInstancesWith(i1);
......@@ -652,7 +655,8 @@ public class JobManagerITCase {
}
// add jar
jg.addJar(new Path("file://" + ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar"));
jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar")
.toURI()));
// Create job client and launch job
jobClient = new JobClient(jg, configuration);
......@@ -719,7 +723,7 @@ public class JobManagerITCase {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex("Input with two Outputs", jg);
i1.setFileInputClass(DoubleSourceTask.class);
i1.setFilePath(new Path("file://" + inputFile.getAbsolutePath().toString()));
i1.setFilePath(new Path(inputFile.toURI()));
// task vertex 1
final JobTaskVertex t1 = new JobTaskVertex("Task with two Inputs", jg);
......@@ -728,7 +732,7 @@ public class JobManagerITCase {
// output vertex
JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + outputFile.getAbsolutePath().toString()));
o1.setFilePath(new Path(outputFile.toURI()));
t1.setVertexToShareInstancesWith(i1);
o1.setVertexToShareInstancesWith(i1);
......@@ -739,7 +743,7 @@ public class JobManagerITCase {
t1.connectTo(o1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION);
// add jar
jg.addJar(new Path("file://" + jarFile.getAbsolutePath()));
jg.addJar(new Path(jarFile.toURI()));
// Create job client and launch job
jobClient = new JobClient(jg, configuration);
......@@ -799,12 +803,12 @@ public class JobManagerITCase {
// input vertex
final JobFileInputVertex i1 = new JobFileInputVertex(jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile.getAbsolutePath().toString()));
i1.setFilePath(new Path(inputFile.toURI()));
// output vertex
JobFileOutputVertex o1 = new JobFileOutputVertex(jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + outputFile.getAbsolutePath().toString()));
o1.setFilePath(new Path(outputFile.toURI()));
o1.setVertexToShareInstancesWith(i1);
......@@ -812,7 +816,7 @@ public class JobManagerITCase {
i1.connectTo(o1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION);
// add jar
jg.addJar(new Path("file://" + jarFile.getAbsolutePath()));
jg.addJar(new Path(jarFile.toURI()));
// Create job client and launch job
jobClient = new JobClient(jg, configuration);
......@@ -890,12 +894,12 @@ public class JobManagerITCase {
// input vertex 1
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile1.getAbsolutePath().toString()));
i1.setFilePath(new Path(inputFile1.toURI()));
// input vertex 2
final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
i2.setFileInputClass(FileLineReader.class);
i2.setFilePath(new Path("file://" + inputFile2.getAbsolutePath().toString()));
i2.setFilePath(new Path(inputFile2.toURI()));
// union task
final JobTaskVertex u1 = new JobTaskVertex("Union", jg);
......@@ -904,7 +908,7 @@ public class JobManagerITCase {
// output vertex
JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + outputFile.getAbsolutePath().toString()));
o1.setFilePath(new Path(outputFile.toURI()));
i1.setVertexToShareInstancesWith(o1);
i2.setVertexToShareInstancesWith(o1);
......@@ -916,7 +920,7 @@ public class JobManagerITCase {
u1.connectTo(o1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION);
// add jar
jg.addJar(new Path("file://" + jarFile.getAbsolutePath()));
jg.addJar(new Path(jarFile.toURI()));
// Create job client and launch job
jobClient = new JobClient(jg, configuration);
......@@ -1033,14 +1037,14 @@ public class JobManagerITCase {
// input vertex 1
final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
i1.setFileInputClass(FileLineReader.class);
i1.setFilePath(new Path("file://" + inputFile1.getAbsolutePath().toString()));
i1.setFilePath(new Path(inputFile1.toURI()));
i1.setNumberOfSubtasks(numberOfSubtasks);
i1.setNumberOfSubtasksPerInstance(numberOfSubtasks);
// input vertex 2
final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
i2.setFileInputClass(FileLineReader.class);
i2.setFilePath(new Path("file://" + inputFile2.getAbsolutePath().toString()));
i2.setFilePath(new Path(inputFile2.toURI()));
i2.setNumberOfSubtasks(numberOfSubtasks);
i2.setNumberOfSubtasksPerInstance(numberOfSubtasks);
......@@ -1053,7 +1057,7 @@ public class JobManagerITCase {
// output vertex
JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
o1.setFileOutputClass(FileLineWriter.class);
o1.setFilePath(new Path("file://" + outputFile.getAbsolutePath().toString()));
o1.setFilePath(new Path(outputFile.toURI()));
o1.setNumberOfSubtasks(numberOfSubtasks);
o1.setNumberOfSubtasksPerInstance(numberOfSubtasks);
......@@ -1067,7 +1071,7 @@ public class JobManagerITCase {
f1.connectTo(o1, ChannelType.NETWORK, CompressionLevel.NO_COMPRESSION, DistributionPattern.BIPARTITE);
// add jar
jg.addJar(new Path("file://" + jarFile.getAbsolutePath()));
jg.addJar(new Path(jarFile.toURI()));
// Create job client and launch job
jobClient = new JobClient(jg, configuration);
......
......@@ -25,8 +25,6 @@ import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import eu.stratosphere.nephele.configuration.ConfigConstants;
import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.jobmanager.JobManagerITCase;
......@@ -131,14 +129,13 @@ public final class ServerTestUtils {
}
/**
* Reads the path to the directory for temporary files from the configuration and returns it.
* Returns the path to the directory for temporary files.
*
* @return the path to the directory for temporary files
*/
public static String getTempDir() {
return GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(":")[0];
return System.getProperty("java.io.tmpdir");
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册