From 7ddfb16a047c6de2e8a7f7be6ac43bf66e248e39 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Tue, 21 Dec 2010 01:04:59 +0100 Subject: [PATCH] - moved filesystem classes into designated package --- .../util/filesystem/ExternalDFSProvider.java | 46 ++++++ .../util/filesystem/FilesystemProvider.java | 33 ++++ .../test/util/filesystem/HDFSProvider.java | 149 ++++++++++++++++++ .../test/util/filesystem/LocalFSProvider.java | 124 +++++++++++++++ .../test/util/filesystem/MiniDFSProvider.java | 117 ++++++++++++++ 5 files changed, 469 insertions(+) create mode 100644 pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/ExternalDFSProvider.java create mode 100644 pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/FilesystemProvider.java create mode 100644 pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/HDFSProvider.java create mode 100644 pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/LocalFSProvider.java create mode 100644 pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/MiniDFSProvider.java diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/ExternalDFSProvider.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/ExternalDFSProvider.java new file mode 100644 index 00000000000..e3daa74c9bc --- /dev/null +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/ExternalDFSProvider.java @@ -0,0 +1,46 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.test.util.filesystem; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class ExternalDFSProvider extends HDFSProvider { + public ExternalDFSProvider(String configDir) { + super(configDir); + } + + public void start() throws Exception { + Configuration config = new Configuration(false); + config.addResource(new Path(configDir + "/hadoop-default.xml")); + config.addResource(new Path(configDir + "/hadoop-site.xml")); + + hdfs = FileSystem.get(config); + } + + public void stop() { + try { + hdfs.close(); + } catch (IOException e) { + e.printStackTrace(); + } + hdfs = null; + } + +} diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/FilesystemProvider.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/FilesystemProvider.java new file mode 100644 index 00000000000..f8ae781c282 --- /dev/null +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/FilesystemProvider.java @@ -0,0 +1,33 @@ +package eu.stratosphere.pact.test.util.filesystem; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public interface FilesystemProvider { + + public void start() throws Exception; + + public void stop(); + + public boolean createFile(String fileName, String fileContent) throws IOException; + + public boolean copyFile(String localFile, String hdfsFile) throws IOException; + + public boolean createDir(String dirName) throws IOException; + + public boolean delete(String path, boolean recursive) throws IOException; + + public OutputStream getOutputStream(String file) throws IOException; + + public InputStream getInputStream(String file) throws IOException; + + public String getTempDirPath(); + + public String[] listFiles(String dir) throws IOException; + + public boolean isDir(String file) throws IOException; + + public String getURIPrefix(); + +} diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/HDFSProvider.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/HDFSProvider.java new file mode 100644 index 00000000000..ffe796091e0 --- /dev/null +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/HDFSProvider.java @@ -0,0 +1,149 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.test.util.filesystem; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public abstract class HDFSProvider implements FilesystemProvider { + + protected FileSystem hdfs; + + protected final String configDir; + + abstract public void start() throws Exception; + + abstract public void stop(); + + protected HDFSProvider(String config) { + this.configDir = config; + } + + public FileSystem getFileSystem() { + return hdfs; + } + + public String getConfigDir() { + return configDir; + } + + public OutputStream getOutputStream(String file) throws IOException { + return hdfs.create(new org.apache.hadoop.fs.Path(file)); + } + + public InputStream getInputStream(String file) throws IOException { + return hdfs.open(new org.apache.hadoop.fs.Path(file)); + } + + public String getTempDirPath() { + return "/user/"+hdfs.getHomeDirectory().getName(); + } + + public boolean createDir(String dirName) throws IOException { + if(!getFileSystem().mkdirs(new Path(dirName))) { + return false; + } + + while (!getFileSystem().exists(new Path(dirName))) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public boolean delete(String path, boolean recursive) throws IOException { + if(!getFileSystem().delete(new Path(path), recursive)) { + return false; + } + + while (getFileSystem().exists(new Path(path))) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public boolean createFile(String fileName, String fileContent) throws IOException { + if(getFileSystem().exists(new Path(fileName))) { + return false; + } + + OutputStream os = this.getOutputStream(fileName); + Writer wr = new OutputStreamWriter(os); + wr.write(fileContent); + wr.close(); + while (!getFileSystem().exists(new Path(fileName))) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public boolean copyFile(String localFile, String hdfsFile) throws IOException { + if(getFileSystem().exists(new Path(hdfsFile))) { + return false; + } + + FileReader fr = new FileReader(new File(localFile)); + OutputStream os = this.getOutputStream(hdfsFile); + Writer wr = new OutputStreamWriter(os); + + while (fr.ready()) { + wr.write(fr.read()); + } + wr.close(); + fr.close(); + + return true; + } + + public String[] listFiles(String dir) throws IOException { + FileStatus[] fss = hdfs.listStatus(new Path(dir)); + ArrayList fileList = new ArrayList(fss.length); + for(FileStatus fs : fss) { + fileList.add(fs.getPath().toString()); + } + return fileList.toArray(new String[1]); + } + + public boolean isDir(String file) throws IOException { + return hdfs.getFileStatus(new Path(file)).isDir(); + } + + public String getURIPrefix() { + return hdfs.getUri().toString(); + } + +} diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/LocalFSProvider.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/LocalFSProvider.java new file mode 100644 index 00000000000..1d4fa17062c --- /dev/null +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/LocalFSProvider.java @@ -0,0 +1,124 @@ +package eu.stratosphere.pact.test.util.filesystem; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class LocalFSProvider implements FilesystemProvider { + + public boolean createDir(String dirName) throws IOException { + File f = new File(dirName); + if(f.exists()) { + return false; + } + + return f.mkdir(); + } + + public boolean createFile(String fileName, String fileContent) throws IOException { + File f = new File(fileName); + if(f.exists()) { + return false; + } + + FileWriter fw = new FileWriter(f); + fw.write(fileContent); + fw.close(); + + return true; + } + + public boolean copyFile(String source, String target) throws IOException { + File t = new File(target); + if(t.exists()) { + return false; + } + + File s = new File(source); + + FileWriter tw = new FileWriter(t); + FileReader sr = new FileReader(s); + + char[] buffer = new char[1024]; + + int copiedBytes = sr.read(buffer); + while(copiedBytes > -1) { + tw.write(buffer, 0, copiedBytes); + copiedBytes = sr.read(buffer); + } + + sr.close(); + tw.close(); + + return true; + } + + public boolean delete(String path, boolean recursive) throws IOException { + File f = new File(path); + + if(f.isDirectory() && recursive) { + for(String c : f.list()) { + this.delete(c,true); + } + f.delete(); + + return true; + } else if(f.isDirectory() && !recursive) { + return false; + } else { + f.delete(); + + return true; + } + } + + public InputStream getInputStream(String file) throws IOException { + return new FileInputStream(file); + } + + public OutputStream getOutputStream(String file) throws IOException { + return new FileOutputStream(file); + } + + public String getTempDirPath() { + return System.getProperty("java.io.tmpdir"); + } + + + + @Override + public void start() throws Exception { + } + + @Override + public void stop() { + } + + @Override + public boolean isDir(String file) throws IOException { + return (new File(file)).isDirectory(); + } + + @Override + public String[] listFiles(String dir) throws IOException { + File f = new File(dir); + + if(!f.isDirectory()){ + return null; + } else { + return f.list(); + } + } + + @Override + public String getURIPrefix() { + return "file://"; + } + + +} diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/MiniDFSProvider.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/MiniDFSProvider.java new file mode 100644 index 00000000000..05e7f8c2834 --- /dev/null +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/filesystem/MiniDFSProvider.java @@ -0,0 +1,117 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.test.util.filesystem; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; + +import eu.stratosphere.pact.test.util.FileWriter; + +/** + * @author Erik Nijkamp + */ +public class MiniDFSProvider extends HDFSProvider { + private MiniDFSCluster cluster; + + private final static int NAME_NODE_PORT = 9000; + + public MiniDFSProvider() { + super(getTempDir() + "/config"); + } + + public void start() throws Exception { + deleteDir(getTempDir()); + + // create dirs + new FileWriter().dir(getTempDir()); + new FileWriter().dir(getLogDir()); + new FileWriter().dir(getDfsDir()); + new FileWriter().dir(getConfigDir()); + + // set system properies needed by the MiniDFSCluster + System.setProperty("hadoop.log.dir", getLogDir()); + System.setProperty("test.build.data", getDfsDir()); + + // init hdfs cluster + cluster = new MiniDFSCluster(NAME_NODE_PORT, new Configuration(), 1, true, true, null, null); + hdfs = cluster.getFileSystem(); + + // write hdfs config files needed by nephele in temp folder + new FileWriter().dir(getConfigDir()).file("hadoop-site.xml").write( + "", + "", "", " ", + " fs.default.name", + " " + "hdfs://localhost:" + cluster.getNameNodePort() + "", " ", + " ", " dfs.name.dir", " " + getDfsDir() + "/name", + " ", " ", " dfs.data.dir", + " " + getDfsDir() + "/data", " ", "").close().file( + "hadoop-default.xml").write("", + "", "", "") + .close(); + } + + public void stop() { + + try { + hdfs.close(); + } catch (IOException e1) { + e1.printStackTrace(); + } + + cluster.shutdown(); + + deleteDir(getTempDir()); + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + + } + } + + private static String getTempDir() { + return System.getProperty("user.dir") + "/tmp"; + } + + private static String getDfsDir() { + return getTempDir() + "/dfs"; + } + + private static String getLogDir() { + return getTempDir() + "/logs"; + } + + private boolean deleteDir(String dir) { + return deleteDir(new File(dir)); + } + + private boolean deleteDir(File dir) { + if (dir.isDirectory()) { + String[] childrens = dir.list(); + for (String child : childrens) { + boolean success = deleteDir(new File(dir, child)); + if (!success) { + return false; + } + } + } + + return dir.delete(); + } +} -- GitLab