提交 7ddfb16a 编写于 作者: F Fabian Hueske

- moved filesystem classes into designated package

上级 717a3775
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.util.filesystem;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ExternalDFSProvider extends HDFSProvider {
public ExternalDFSProvider(String configDir) {
super(configDir);
}
public void start() throws Exception {
Configuration config = new Configuration(false);
config.addResource(new Path(configDir + "/hadoop-default.xml"));
config.addResource(new Path(configDir + "/hadoop-site.xml"));
hdfs = FileSystem.get(config);
}
public void stop() {
try {
hdfs.close();
} catch (IOException e) {
e.printStackTrace();
}
hdfs = null;
}
}
package eu.stratosphere.pact.test.util.filesystem;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public interface FilesystemProvider {
public void start() throws Exception;
public void stop();
public boolean createFile(String fileName, String fileContent) throws IOException;
public boolean copyFile(String localFile, String hdfsFile) throws IOException;
public boolean createDir(String dirName) throws IOException;
public boolean delete(String path, boolean recursive) throws IOException;
public OutputStream getOutputStream(String file) throws IOException;
public InputStream getInputStream(String file) throws IOException;
public String getTempDirPath();
public String[] listFiles(String dir) throws IOException;
public boolean isDir(String file) throws IOException;
public String getURIPrefix();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.util.filesystem;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public abstract class HDFSProvider implements FilesystemProvider {
protected FileSystem hdfs;
protected final String configDir;
abstract public void start() throws Exception;
abstract public void stop();
protected HDFSProvider(String config) {
this.configDir = config;
}
public FileSystem getFileSystem() {
return hdfs;
}
public String getConfigDir() {
return configDir;
}
public OutputStream getOutputStream(String file) throws IOException {
return hdfs.create(new org.apache.hadoop.fs.Path(file));
}
public InputStream getInputStream(String file) throws IOException {
return hdfs.open(new org.apache.hadoop.fs.Path(file));
}
public String getTempDirPath() {
return "/user/"+hdfs.getHomeDirectory().getName();
}
public boolean createDir(String dirName) throws IOException {
if(!getFileSystem().mkdirs(new Path(dirName))) {
return false;
}
while (!getFileSystem().exists(new Path(dirName))) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return true;
}
public boolean delete(String path, boolean recursive) throws IOException {
if(!getFileSystem().delete(new Path(path), recursive)) {
return false;
}
while (getFileSystem().exists(new Path(path))) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return true;
}
public boolean createFile(String fileName, String fileContent) throws IOException {
if(getFileSystem().exists(new Path(fileName))) {
return false;
}
OutputStream os = this.getOutputStream(fileName);
Writer wr = new OutputStreamWriter(os);
wr.write(fileContent);
wr.close();
while (!getFileSystem().exists(new Path(fileName))) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return true;
}
public boolean copyFile(String localFile, String hdfsFile) throws IOException {
if(getFileSystem().exists(new Path(hdfsFile))) {
return false;
}
FileReader fr = new FileReader(new File(localFile));
OutputStream os = this.getOutputStream(hdfsFile);
Writer wr = new OutputStreamWriter(os);
while (fr.ready()) {
wr.write(fr.read());
}
wr.close();
fr.close();
return true;
}
public String[] listFiles(String dir) throws IOException {
FileStatus[] fss = hdfs.listStatus(new Path(dir));
ArrayList<String> fileList = new ArrayList<String>(fss.length);
for(FileStatus fs : fss) {
fileList.add(fs.getPath().toString());
}
return fileList.toArray(new String[1]);
}
public boolean isDir(String file) throws IOException {
return hdfs.getFileStatus(new Path(file)).isDir();
}
public String getURIPrefix() {
return hdfs.getUri().toString();
}
}
package eu.stratosphere.pact.test.util.filesystem;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class LocalFSProvider implements FilesystemProvider {
public boolean createDir(String dirName) throws IOException {
File f = new File(dirName);
if(f.exists()) {
return false;
}
return f.mkdir();
}
public boolean createFile(String fileName, String fileContent) throws IOException {
File f = new File(fileName);
if(f.exists()) {
return false;
}
FileWriter fw = new FileWriter(f);
fw.write(fileContent);
fw.close();
return true;
}
public boolean copyFile(String source, String target) throws IOException {
File t = new File(target);
if(t.exists()) {
return false;
}
File s = new File(source);
FileWriter tw = new FileWriter(t);
FileReader sr = new FileReader(s);
char[] buffer = new char[1024];
int copiedBytes = sr.read(buffer);
while(copiedBytes > -1) {
tw.write(buffer, 0, copiedBytes);
copiedBytes = sr.read(buffer);
}
sr.close();
tw.close();
return true;
}
public boolean delete(String path, boolean recursive) throws IOException {
File f = new File(path);
if(f.isDirectory() && recursive) {
for(String c : f.list()) {
this.delete(c,true);
}
f.delete();
return true;
} else if(f.isDirectory() && !recursive) {
return false;
} else {
f.delete();
return true;
}
}
public InputStream getInputStream(String file) throws IOException {
return new FileInputStream(file);
}
public OutputStream getOutputStream(String file) throws IOException {
return new FileOutputStream(file);
}
public String getTempDirPath() {
return System.getProperty("java.io.tmpdir");
}
@Override
public void start() throws Exception {
}
@Override
public void stop() {
}
@Override
public boolean isDir(String file) throws IOException {
return (new File(file)).isDirectory();
}
@Override
public String[] listFiles(String dir) throws IOException {
File f = new File(dir);
if(!f.isDirectory()){
return null;
} else {
return f.list();
}
}
@Override
public String getURIPrefix() {
return "file://";
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.util.filesystem;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import eu.stratosphere.pact.test.util.FileWriter;
/**
* @author Erik Nijkamp
*/
public class MiniDFSProvider extends HDFSProvider {
private MiniDFSCluster cluster;
private final static int NAME_NODE_PORT = 9000;
public MiniDFSProvider() {
super(getTempDir() + "/config");
}
public void start() throws Exception {
deleteDir(getTempDir());
// create dirs
new FileWriter().dir(getTempDir());
new FileWriter().dir(getLogDir());
new FileWriter().dir(getDfsDir());
new FileWriter().dir(getConfigDir());
// set system properies needed by the MiniDFSCluster
System.setProperty("hadoop.log.dir", getLogDir());
System.setProperty("test.build.data", getDfsDir());
// init hdfs cluster
cluster = new MiniDFSCluster(NAME_NODE_PORT, new Configuration(), 1, true, true, null, null);
hdfs = cluster.getFileSystem();
// write hdfs config files needed by nephele in temp folder
new FileWriter().dir(getConfigDir()).file("hadoop-site.xml").write(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<?xml-stylesheet href=\"configuration.xsl\" type=\"text/xsl\"?>", "<configuration>", " <property>",
" <name>fs.default.name</name>",
" <value>" + "hdfs://localhost:" + cluster.getNameNodePort() + "</value>", " </property>",
" <property>", " <name>dfs.name.dir</name>", " <value>" + getDfsDir() + "/name</value>",
" </property>", " <property>", " <name>dfs.data.dir</name>",
" <value>" + getDfsDir() + "/data</value>", " </property>", "</configuration>").close().file(
"hadoop-default.xml").write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<?xml-stylesheet href=\"configuration.xsl\" type=\"text/xsl\"?>", "<configuration>", "</configuration>")
.close();
}
public void stop() {
try {
hdfs.close();
} catch (IOException e1) {
e1.printStackTrace();
}
cluster.shutdown();
deleteDir(getTempDir());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
}
private static String getTempDir() {
return System.getProperty("user.dir") + "/tmp";
}
private static String getDfsDir() {
return getTempDir() + "/dfs";
}
private static String getLogDir() {
return getTempDir() + "/logs";
}
private boolean deleteDir(String dir) {
return deleteDir(new File(dir));
}
private boolean deleteDir(File dir) {
if (dir.isDirectory()) {
String[] childrens = dir.list();
for (String child : childrens) {
boolean success = deleteDir(new File(dir, child));
if (!success) {
return false;
}
}
}
return dir.delete();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册