提交 2b188193 编写于 作者: Y Yi Zhu 提交者: Will Zhang

Update FileSystem interfaces (#349)

* change interfaces of FileSystem

* updt interfaces and posix_file_system

* updt posix_file_system

* updt filesystem interfaces

* remove Status

* updt filesystem interfaces

* resolve conflict

* un-unploaded conflicts

* fix bug in record_kernel, replace LOG with PLOG

* replace createdir with createdirifnotexist in snapshot

* remove createdirifnotexist

* updt .gitignore

* remove predict output

* roll back

* fix bug

* fix for review
上级 82582f71
......@@ -4,5 +4,6 @@
/examples/*/runtime
/examples/*/*log*
/examples/*/*snapshot*
/examples/*/predict_result
*plan*
core.*
......@@ -175,8 +175,7 @@ class KernelUtil<DeviceType::kCPU, T> final {
ctx->cpu_stream()->SendWork([=]() {
int64_t byte_size_of_each_dim = num_in_each_dim * sizeof(T);
std::string file_path = JoinPath(model_dir, bn_in_op);
uint64_t file_size = 0;
FS_CHECK_OK(GlobalFS()->GetFileSize(file_path, &file_size));
uint64_t file_size = GlobalFS()->GetFileSize(file_path);
CHECK_EQ(file_size, dim_num * byte_size_of_each_dim);
BalancedSplitter splitter = BalancedSplitter(dim_num, part_num);
int64_t begin_pos = splitter.At(part_id).begin() * byte_size_of_each_dim;
......
......@@ -9,45 +9,21 @@ namespace oneflow {
namespace fs {
OF_DEFINE_ENUM_TO_OSTREAM_FUNC(Status);
void FileSystem::CreateDirIfNotExist(const std::string& dirname) {
if (IsDirectory(dirname) == Status::OK) { return; }
FS_CHECK_OK(CreateDir(dirname));
if (IsDirectory(dirname)) { return; }
CreateDir(dirname);
}
bool FileSystem::IsDirEmpty(const std::string& dirname) {
return GetChildrenNumOfDir(dirname) == 0;
}
size_t FileSystem::GetChildrenNumOfDir(const std::string& dirname) {
std::vector<std::string> result;
FS_CHECK_OK(GetChildren(dirname, &result));
return result.size();
return ListDir(dirname).empty();
}
std::string FileSystem::TranslateName(const std::string& name) const {
return CleanPath(name);
}
bool FileSystem::FilesExist(const std::vector<std::string>& files,
std::vector<Status>* status) {
bool result = true;
for (const auto& file : files) {
Status s = FileExists(file);
result &= (s == Status::OK);
if (status != nullptr) {
status->push_back(s);
} else if (!result) {
// Return early since there is no need to check other files.
return false;
}
}
return result;
}
Status FileSystem::DeleteRecursively(const std::string& dirname) {
FS_CHECK_OK(FileExists(dirname));
void FileSystem::RecursivelyDeleteDir(const std::string& dirname) {
CHECK(FileExists(dirname));
std::deque<std::string> dir_q; // Queue for the BFS
std::vector<std::string> dir_list; // List of all dirs discovered
dir_q.push_back(dirname);
......@@ -55,27 +31,21 @@ Status FileSystem::DeleteRecursively(const std::string& dirname) {
// Do a BFS on the directory to discover all the sub-directories. Remove all
// children that are files along the way. Then cleanup and remove the
// directories in reverse order.;
Status ret = Status::OK;
while (!dir_q.empty()) {
std::string dir = dir_q.front();
dir_q.pop_front();
dir_list.push_back(dir);
std::vector<std::string> children;
// GetChildren might fail if we don't have appropriate permissions.
Status s = GetChildren(dir, &children);
TryUpdateStatus(&ret, s);
FS_CHECK_OK(s);
std::vector<std::string> children = ListDir(dir);
for (const std::string& child : children) {
const std::string child_path = JoinPath(dir, child);
// If the child is a directory add it to the queue, otherwise delete it.
if (IsDirectory(child_path) == Status::OK) {
if (IsDirectory(child_path)) {
dir_q.push_back(child_path);
} else {
// Delete file might fail because of permissions issues or might be
// unimplemented.
Status del_status = DeleteFile(child_path);
TryUpdateStatus(&ret, del_status);
CHECK_EQ(del_status, Status::OK);
DeleteFile(child_path);
}
}
}
......@@ -85,20 +55,16 @@ Status FileSystem::DeleteRecursively(const std::string& dirname) {
for (const std::string& dir : dir_list) {
// Delete dir might fail because of permissions issues or might be
// unimplemented.
Status s = DeleteDir(dir);
TryUpdateStatus(&ret, s);
FS_CHECK_OK(s);
DeleteDir(dir);
}
return ret;
}
Status FileSystem::RecursivelyCreateDir(const std::string& dirname) {
void FileSystem::RecursivelyCreateDir(const std::string& dirname) {
std::string remaining_dir = dirname;
std::vector<std::string> sub_dirs;
while (!remaining_dir.empty()) {
Status status = FileExists(remaining_dir);
if (status == Status::OK) { break; }
if (status != Status::NOT_FOUND) { return status; }
bool status = FileExists(remaining_dir);
if (status) { break; }
// Basename returns "" for / ending dirs.
if (remaining_dir[remaining_dir.length() - 1] != '/') {
sub_dirs.push_back(Basename(remaining_dir));
......@@ -113,160 +79,8 @@ Status FileSystem::RecursivelyCreateDir(const std::string& dirname) {
std::string built_path = remaining_dir;
for (const std::string& sub_dir : sub_dirs) {
built_path = JoinPath(built_path, sub_dir);
Status status = CreateDir(built_path);
if (status != Status::OK && status != Status::ALREADY_EXISTS) {
return status;
}
}
return Status::OK;
}
void TryUpdateStatus(Status* current_status, const Status& new_status) {
if (*current_status == Status::OK) { *current_status = new_status; }
}
Status ErrnoToStatus(int err_number) {
Status ret;
switch (err_number) {
case 0: ret = Status::OK; break;
case EINVAL: // Invalid argument
case ENAMETOOLONG: // Filename too long
case E2BIG: // Argument list too long
case EDESTADDRREQ: // Destination address required
case EDOM: // Mathematics argument out of domain of function
case EFAULT: // Bad address
case EILSEQ: // Illegal byte sequence
case ENOPROTOOPT: // Protocol not available
case ENOSTR: // Not a STREAM
case ENOTSOCK: // Not a socket
case ENOTTY: // Inappropriate I/O control operation
case EPROTOTYPE: // Protocol wrong type for socket
case ESPIPE: // Invalid seek
ret = Status::INVALID_ARGUMENT;
break;
case ETIMEDOUT: // Connection timed out
case ETIME: // Timer expired
ret = Status::DEADLINE_EXCEEDED;
break;
case ENODEV: // No such device
case ENOENT: // No such file or directory
case ENXIO: // No such device or address
case ESRCH: // No such process
ret = Status::NOT_FOUND;
break;
case EEXIST: // File exists
case EADDRNOTAVAIL: // Address not available
case EALREADY: // Connection already in progress
ret = Status::ALREADY_EXISTS;
break;
case EPERM: // Operation not permitted
case EACCES: // Permission denied
case EROFS: // Read only file system
ret = Status::PERMISSION_DENIED;
break;
case ENOTEMPTY: // Directory not empty
case EISDIR: // Is a directory
case ENOTDIR: // Not a directory
case EADDRINUSE: // Address already in use
case EBADF: // Invalid file descriptor
case EBUSY: // Device or resource busy
case ECHILD: // No child processes
case EISCONN: // Socket is connected
#if !defined(_WIN32)
case ENOTBLK: // Block device required
#endif
case ENOTCONN: // The socket is not connected
case EPIPE: // Broken pipe
#if !defined(_WIN32)
case ESHUTDOWN: // Cannot send after transport endpoint shutdown
#endif
case ETXTBSY: // Text file busy
ret = Status::FAILED_PRECONDITION;
break;
case ENOSPC: // No space left on device
#if !defined(_WIN32)
case EDQUOT: // Disk quota exceeded
#endif
case EMFILE: // Too many open files
case EMLINK: // Too many links
case ENFILE: // Too many open files in system
case ENOBUFS: // No buffer space available
case ENODATA: // No message is available on the STREAM read queue
case ENOMEM: // Not enough space
case ENOSR: // No STREAM resources
#if !defined(_WIN32)
case EUSERS: // Too many users
#endif
ret = Status::RESOURCE_EXHAUSTED;
break;
case EFBIG: // File too large
case EOVERFLOW: // Value too large to be stored in data type
case ERANGE: // Result too large
ret = Status::OUT_OF_RANGE;
break;
case ENOSYS: // Function not implemented
case ENOTSUP: // Operation not supported
case EAFNOSUPPORT: // Address family not supported
#if !defined(_WIN32)
case EPFNOSUPPORT: // Protocol family not supported
#endif
case EPROTONOSUPPORT: // Protocol not supported
#if !defined(_WIN32)
case ESOCKTNOSUPPORT: // Socket type not supported
#endif
case EXDEV: // Improper link
ret = Status::UNIMPLEMENTED;
break;
case EAGAIN: // Resource temporarily unavailable
case ECONNREFUSED: // Connection refused
case ECONNABORTED: // Connection aborted
case ECONNRESET: // Connection reset
case EINTR: // Interrupted function call
#if !defined(_WIN32)
case EHOSTDOWN: // Host is down
#endif
case EHOSTUNREACH: // Host is unreachable
case ENETDOWN: // Network is down
case ENETRESET: // Connection aborted by network
case ENETUNREACH: // Network unreachable
case ENOLCK: // No locks available
case ENOLINK: // Link has been severed
#if !defined(_WIN32)
case ENONET: // Machine is not on the network
#endif
ret = Status::UNAVAILABLE;
break;
case EDEADLK: // Resource deadlock avoided
#if !defined(_WIN32)
case ESTALE: // Stale file handle
#endif
ret = Status::ABORTED;
break;
case ECANCELED: // Operation cancelled
ret = Status::CANCELLED;
break;
// NOTE: If you get any of the following (especially in a
// reproducible way) and can propose a better mapping,
// please email the owners about updating this mapping.
case EBADMSG: // Bad message
case EIDRM: // Identifier removed
case EINPROGRESS: // Operation in progress
case EIO: // I/O Status
case ELOOP: // Too many levels of symbolic links
case ENOEXEC: // Exec format Status
case ENOMSG: // No message of the desired type
case EPROTO: // Protocol Status
#if !defined(_WIN32)
case EREMOTE: // Object is remote
#endif
ret = Status::UNKNOWN;
break;
default: {
ret = Status::UNKNOWN;
break;
}
CreateDir(built_path);
}
return ret;
}
struct GlobalFSConstructor {
......
......@@ -12,28 +12,6 @@ namespace oneflow {
namespace fs {
enum class Status {
OK = 0,
CANCELLED,
UNKNOWN,
INVALID_ARGUMENT,
DEADLINE_EXCEEDED,
NOT_FOUND,
ALREADY_EXISTS,
PERMISSION_DENIED,
UNAUTHENTICATED,
RESOURCE_EXHAUSTED,
FAILED_PRECONDITION,
ABORTED,
OUT_OF_RANGE,
UNIMPLEMENTED,
INTERNAL,
UNAVAILABLE,
DATA_LOSS,
};
OF_DECLARE_ENUM_TO_OSTREAM_FUNC(Status);
// A file abstraction for randomly reading the contents of a file.
class RandomAccessFile {
public:
......@@ -41,19 +19,11 @@ class RandomAccessFile {
RandomAccessFile() = default;
virtual ~RandomAccessFile() = default;
// Reads up to `n` bytes from the file starting at `offset`.
//
// Sets `*result` to the data that was read (including if fewer
// than `n` bytes were successfully read).
//
// On OK returned status: `n` bytes have been stored in `*result`.
// On non-OK returned status: `[0..n]` bytes have been stored in `*result`.
//
// Returns `OUT_OF_RANGE` if fewer than n bytes were stored in `*result`
// because of EOF.
// Reads `n` bytes from the file starting at `offset`.
// Sets `*result` to the data that was read.
//
// Safe for concurrent use by multiple threads.
virtual Status Read(uint64_t offset, size_t n, char* result) const = 0;
virtual void Read(uint64_t offset, size_t n, char* result) const = 0;
private:
};
......@@ -69,16 +39,12 @@ class WritableFile {
virtual ~WritableFile() = default;
// Append 'data' to the file.
virtual Status Append(const char* data, size_t n) = 0;
virtual void Append(const char* data, size_t n) = 0;
// Close the file.
//
// Flush() and de-allocate resources associated with this file
//
// Typical return codes (not guaranteed to be exhaustive):
// * OK
// * Other codes, as returned from Flush()
virtual Status Close() = 0;
virtual void Close() = 0;
// Flushes the file and optionally syncs contents to filesystem.
//
......@@ -90,7 +56,7 @@ class WritableFile {
// eventually flush the contents. If the OS or machine crashes
// after a successful flush, the contents may or may not be
// persisted, depending on the implementation.
virtual Status Flush() = 0;
virtual void Flush() = 0;
private:
};
......@@ -103,15 +69,13 @@ class FileSystem {
// specified name.
//
// On success, stores a pointer to the new file in
// *result and returns OK. On failure stores NULL in *result and
// returns non-OK. If the file does not exist, returns a non-OK
// status.
// *result. On failure stores NULL in *result.
//
// The returned file may be concurrently accessed by multiple threads.
//
// The ownership of the returned RandomAccessFile is passed to the caller
// and the object should be deleted when is not used.
virtual Status NewRandomAccessFile(
virtual void NewRandomAccessFile(
const std::string& fname, std::unique_ptr<RandomAccessFile>* result) = 0;
// Creates an object that writes to a new file with the specified
......@@ -119,89 +83,66 @@ class FileSystem {
//
// Deletes any existing file with the same name and creates a
// new file. On success, stores a pointer to the new file in
// *result and returns OK. On failure stores NULL in *result and
// returns non-OK.
// *result. On failure stores NULL in *result.
//
// The returned file will only be accessed by one thread at a time.
//
// The ownership of the returned WritableFile is passed to the caller
// and the object should be deleted when is not used.
virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) = 0;
virtual void NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) = 0;
// Creates an object that either appends to an existing file, or
// writes to a new file (if the file does not exist to begin with).
//
// On success, stores a pointer to the new file in *result and
// returns OK. On failure stores NULL in *result and returns
// non-OK.
// On success, stores a pointer to the new file in *result.
// On failure stores NULL in *result.
//
// The returned file will only be accessed by one thread at a time.
//
// The ownership of the returned WritableFile is passed to the caller
// and the object should be deleted when is not used.
virtual Status NewAppendableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) = 0;
// Returns OK if the named path exists and NOT_FOUND otherwise.
virtual Status FileExists(const std::string& fname) = 0;
virtual void NewAppendableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) = 0;
// Returns true if all the listed files exist, false otherwise.
// if status is not null, populate the vector with a detailed status
// for each file.
virtual bool FilesExist(const std::vector<std::string>& files,
std::vector<Status>* status);
// Returns true if the named path exists and false otherwise.
virtual bool FileExists(const std::string& fname) = 0;
// Returns the immediate children in the given directory.
// Store the immediate children in the `dir` in `result`.
//
// The returned paths are relative to 'dir'.
virtual Status GetChildren(const std::string& dir,
std::vector<std::string>* result) = 0;
virtual std::vector<std::string> ListDir(const std::string& dir) = 0;
// Deletes the named file.
virtual Status DeleteFile(const std::string& fname) = 0;
virtual void DeleteFile(const std::string& fname) = 0;
// Creates the specified directory.
// Typical return codes:
// * OK - successfully created the directory.
// * ALREADY_EXISTS - directory with name dirname already exists.
// * PERMISSION_DENIED - dirname is not writable.
virtual Status CreateDir(const std::string& dirname) = 0;
virtual void CreateDir(const std::string& dirname) = 0;
void CreateDirIfNotExist(const std::string& dirname);
bool IsDirEmpty(const std::string& dirname);
size_t GetChildrenNumOfDir(const std::string& dirname);
// Creates the specified directory and all the necessary
// subdirectories.
// Typical return codes:
// * OK - successfully created the directory and sub directories, even if
// they were already created.
// * PERMISSION_DENIED - dirname or some subdirectory is not writable.
virtual Status RecursivelyCreateDir(const std::string& dirname);
virtual void RecursivelyCreateDir(const std::string& dirname);
// Deletes the specified directory.
virtual Status DeleteDir(const std::string& dirname) = 0;
virtual void DeleteDir(const std::string& dirname) = 0;
// Deletes the specified directory and all subdirectories and files
// underneath it. undeleted_files and undeleted_dirs stores the number of
// files and directories that weren't deleted (unspecified if the return
// status is not OK).
// files and directories that weren't deleted.
//
// REQUIRES: undeleted_files, undeleted_dirs to be not null.
// Typical return codes:
// * OK - dirname exists and we were able to delete everything underneath.
// * NOT_FOUND - dirname doesn't exist
// * PERMISSION_DENIED - dirname or some descendant is not writable
// * UNIMPLEMENTED - Some underlying functions (like Delete) are not
// implemented
virtual Status DeleteRecursively(const std::string& dirname);
virtual void RecursivelyDeleteDir(const std::string& dirname);
// Stores the size of `fname` in `*file_size`.
virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0;
virtual uint64_t GetFileSize(const std::string& fname) = 0;
// Overwrites the target if it exists.
virtual Status RenameFile(const std::string& src,
const std::string& target) = 0;
virtual void RenameFile(const std::string& old_name,
const std::string& new_name) = 0;
// Translate an URI to a filename for the FileSystem implementation.
//
......@@ -211,39 +152,14 @@ class FileSystem {
virtual std::string TranslateName(const std::string& name) const;
// Returns whether the given path is a directory or not.
//
// Typical return codes (not guaranteed exhaustive):
// * OK - The path exists and is a directory.
// * FAILED_PRECONDITION - The path exists and is not a directory.
// * NOT_FOUND - The path entry does not exist.
// * PERMISSION_DENIED - Insufficient permissions.
// * UNIMPLEMENTED - The file factory doesn't support directories.
virtual Status IsDirectory(const std::string& fname) = 0;
virtual bool IsDirectory(const std::string& fname) = 0;
protected:
FileSystem() = default;
};
// If `current_status` is OK, stores `new_status` into `current_status`.
// If `current_status` is NOT OK, preserves the current status,
void TryUpdateStatus(Status* current_status, const Status& new_status);
Status ErrnoToStatus(int err_number);
#define FS_RETURN_IF_ERR(val) \
{ \
const Status _ret_if_err = val; \
if (_ret_if_err != Status::OK) { \
PLOG(WARNING); \
return _ret_if_err; \
} \
}
} // namespace fs
// file system check status is ok
#define FS_CHECK_OK(val) CHECK_EQ(val, fs::Status::OK);
fs::FileSystem* LocalFS();
fs::FileSystem* GlobalFS();
......
......@@ -26,7 +26,7 @@ class LibHDFS {
}
// The status, if any, from failure to load.
Status status() { return status_; }
bool status() { return status_; }
std::function<hdfsFS(hdfsBuilder*)> hdfsBuilderConnect;
std::function<hdfsBuilder*()> hdfsNewBuilder;
......@@ -51,7 +51,7 @@ class LibHDFS {
private:
void LoadAndBind();
Status status_;
bool status_;
void* handle_ = nullptr;
};
......@@ -63,35 +63,34 @@ class HadoopFileSystem final : public FileSystem {
HadoopFileSystem(const HdfsConf&);
Status NewRandomAccessFile(
const std::string& fname,
std::unique_ptr<RandomAccessFile>* result) override;
void NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
void NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
void NewAppendableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
Status FileExists(const std::string& fname) override;
bool FileExists(const std::string& fname) override;
Status GetChildren(const std::string& dir,
std::vector<std::string>* result) override;
std::vector<std::string> ListDir(const std::string& dir) override;
Status DeleteFile(const std::string& fname) override;
void DeleteFile(const std::string& fname) override;
Status CreateDir(const std::string& dirname) override;
void CreateDir(const std::string& dirname) override;
Status DeleteDir(const std::string& dirname) override;
void DeleteDir(const std::string& dirname) override;
Status GetFileSize(const std::string& fname, uint64_t* file_size) override;
uint64_t GetFileSize(const std::string& fname) override;
Status RenameFile(const std::string& src, const std::string& target) override;
void RenameFile(const std::string& old_name,
const std::string& new_name) override;
Status IsDirectory(const std::string& fname) override;
bool IsDirectory(const std::string& fname) override;
private:
Status Connect(hdfsFS* fs);
bool Connect(hdfsFS* fs);
std::string namenode_;
LibHDFS* hdfs_;
};
......
......@@ -37,8 +37,8 @@ int32_t PersistentInStream::Read(char* s, size_t n) {
PersistentInStream::PersistentInStream(fs::FileSystem* fs,
const std::string& file_path,
uint64_t offset) {
FS_CHECK_OK(fs->NewRandomAccessFile(file_path, &file_));
FS_CHECK_OK(fs->GetFileSize(file_path, &file_size_));
fs->NewRandomAccessFile(file_path, &file_);
file_size_ = fs->GetFileSize(file_path);
cur_file_pos_ = offset;
buffer_ = new char[buffer_size_ + 1];
cur_buf_begin_ = buffer_;
......
......@@ -4,13 +4,13 @@ namespace oneflow {
PersistentOutStream::PersistentOutStream(fs::FileSystem* fs,
const std::string& file_path) {
FS_CHECK_OK(fs->NewWritableFile(file_path, &file_));
fs->NewWritableFile(file_path, &file_);
}
PersistentOutStream::~PersistentOutStream() { FS_CHECK_OK(file_->Close()); }
PersistentOutStream::~PersistentOutStream() { file_->Close(); }
PersistentOutStream& PersistentOutStream::Write(const char* s, size_t n) {
FS_CHECK_OK(file_->Append(s, n));
file_->Append(s, n);
return *this;
}
......
......@@ -27,7 +27,7 @@ class PosixRandomAccessFile : public RandomAccessFile {
: fname_(fname), fd_(fd) {}
~PosixRandomAccessFile() override { close(fd_); }
Status Read(uint64_t offset, size_t n, char* result) const override {
void Read(uint64_t offset, size_t n, char* result) const override {
char* dst = result;
while (n > 0) {
ssize_t r = pread(fd_, dst, n, static_cast<off_t>(offset));
......@@ -36,14 +36,15 @@ class PosixRandomAccessFile : public RandomAccessFile {
n -= r;
offset += r;
} else if (r == 0) {
return Status::OUT_OF_RANGE;
PLOG(FATAL) << "Read EOF";
return;
} else if (errno == EINTR || errno == EAGAIN) {
// Retry
} else {
return ErrnoToStatus(errno);
PLOG(FATAL) << "Fail to read file " << fname_;
return;
}
}
return Status::OK;
}
};
......@@ -60,134 +61,126 @@ class PosixWritableFile : public WritableFile {
if (file_ != nullptr) { fclose(file_); }
}
Status Append(const char* data, size_t n) override {
size_t r = fwrite(data, sizeof(char), n, file_);
if (r != n) { return ErrnoToStatus(errno); }
return Status::OK;
void Append(const char* data, size_t n) override {
if (fwrite(data, sizeof(char), n, file_) != n) {
PLOG(FATAL) << "Fail to append to file " << fname_;
}
}
Status Close() override {
Status s = Status::OK;
if (fclose(file_) != 0) { s = ErrnoToStatus(errno); }
void Close() override {
Flush();
if (fclose(file_) != 0) { PLOG(FATAL) << "Fail to close file " << fname_; }
file_ = nullptr;
return s;
}
Status Flush() override {
if (fflush(file_) != 0) { return ErrnoToStatus(errno); }
return Status::OK;
void Flush() override {
if (fflush(file_) != 0) { PLOG(FATAL) << "Fail to flush file " << fname_; }
}
};
Status PosixFileSystem::NewRandomAccessFile(
void PosixFileSystem::NewRandomAccessFile(
const std::string& fname, std::unique_ptr<RandomAccessFile>* result) {
std::string translated_fname = TranslateName(fname);
Status s = Status::OK;
int fd = open(translated_fname.c_str(), O_RDONLY);
if (fd < 0) {
s = ErrnoToStatus(errno);
PLOG(FATAL) << "Fail to open file " << fname;
} else {
result->reset(new PosixRandomAccessFile(fname, fd));
}
return s;
CHECK_NOTNULL(result->get());
}
Status PosixFileSystem::NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) {
void PosixFileSystem::NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) {
std::string translated_fname = TranslateName(fname);
Status s = Status::OK;
FILE* f = fopen(translated_fname.c_str(), "w");
if (f == nullptr) {
s = ErrnoToStatus(errno);
PLOG(FATAL) << "Fail to open file " << fname;
} else {
result->reset(new PosixWritableFile(translated_fname, f));
}
return s;
CHECK_NOTNULL(result->get());
}
Status PosixFileSystem::NewAppendableFile(
const std::string& fname, std::unique_ptr<WritableFile>* result) {
void PosixFileSystem::NewAppendableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) {
std::string translated_name = TranslateName(fname);
Status s = Status::OK;
FILE* f = fopen(translated_name.c_str(), "a");
if (f == nullptr) {
s = ErrnoToStatus(errno);
PLOG(FATAL) << "Fail to open file " << fname;
} else {
result->reset(new PosixWritableFile(translated_name, f));
}
return s;
CHECK_NOTNULL(result->get());
}
Status PosixFileSystem::FileExists(const std::string& fname) {
if (access(TranslateName(fname).c_str(), F_OK) == 0) { return Status::OK; }
return Status::NOT_FOUND;
bool PosixFileSystem::FileExists(const std::string& fname) {
if (access(TranslateName(fname).c_str(), F_OK) == 0) { return true; }
return false;
}
Status PosixFileSystem::GetChildren(const std::string& dir,
std::vector<std::string>* result) {
std::vector<std::string> PosixFileSystem::ListDir(const std::string& dir) {
std::string translated_dir = TranslateName(dir);
result->clear();
std::vector<std::string> result;
DIR* d = opendir(translated_dir.c_str());
if (d == nullptr) { return ErrnoToStatus(errno); }
if (d == nullptr) {
PLOG(FATAL) << "Fail to open dir " << dir;
return result;
}
struct dirent* entry;
while ((entry = readdir(d)) != nullptr) {
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) {
continue;
}
result->push_back(entry->d_name);
result.push_back(entry->d_name);
}
closedir(d);
return Status::OK;
return result;
}
Status PosixFileSystem::DeleteFile(const std::string& fname) {
Status s = Status::OK;
if (unlink(TranslateName(fname).c_str()) != 0) { s = ErrnoToStatus(errno); }
return s;
void PosixFileSystem::DeleteFile(const std::string& fname) {
if (unlink(TranslateName(fname).c_str()) != 0) {
PLOG(FATAL) << "Fail to delete file " << fname;
}
}
Status PosixFileSystem::CreateDir(const std::string& dirname) {
Status s = Status::OK;
void PosixFileSystem::CreateDir(const std::string& dirname) {
if (mkdir(TranslateName(dirname).c_str(), 0755) != 0) {
s = ErrnoToStatus(errno);
PLOG(FATAL) << "Fail to create dir " << dirname;
}
return s;
}
Status PosixFileSystem::DeleteDir(const std::string& dirname) {
Status s = Status::OK;
if (rmdir(TranslateName(dirname).c_str()) != 0) { s = ErrnoToStatus(errno); }
return s;
void PosixFileSystem::DeleteDir(const std::string& dirname) {
if (rmdir(TranslateName(dirname).c_str()) != 0) {
PLOG(FATAL) << "Fail to delete dir " << dirname;
}
}
Status PosixFileSystem::GetFileSize(const std::string& fname,
uint64_t* file_size) {
Status s = Status::OK;
uint64_t PosixFileSystem::GetFileSize(const std::string& fname) {
struct stat sbuf;
if (stat(TranslateName(fname).c_str(), &sbuf) != 0) {
*file_size = 0;
s = ErrnoToStatus(errno);
PLOG(FATAL) << "Fail to load statistics of " << fname;
return 0;
} else {
*file_size = sbuf.st_size;
return sbuf.st_size;
}
return s;
}
Status PosixFileSystem::RenameFile(const std::string& src,
const std::string& target) {
Status s = Status::OK;
if (rename(TranslateName(src).c_str(), TranslateName(target).c_str()) != 0) {
s = ErrnoToStatus(errno);
void PosixFileSystem::RenameFile(const std::string& old_name,
const std::string& new_name) {
if (rename(TranslateName(old_name).c_str(), TranslateName(new_name).c_str())
!= 0) {
PLOG(FATAL) << "Fail to rename file from " << old_name << " to "
<< new_name;
}
return s;
}
Status PosixFileSystem::IsDirectory(const std::string& fname) {
bool PosixFileSystem::IsDirectory(const std::string& fname) {
struct stat sbuf;
if (stat(TranslateName(fname).c_str(), &sbuf) == 0 && S_ISDIR(sbuf.st_mode)) {
return Status::OK;
return true;
} else {
return ErrnoToStatus(errno);
return false;
}
}
......
......@@ -15,32 +15,31 @@ class PosixFileSystem final : public FileSystem {
PosixFileSystem() = default;
~PosixFileSystem() = default;
Status NewRandomAccessFile(
const std::string& fname,
std::unique_ptr<RandomAccessFile>* result) override;
void NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
void NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
void NewAppendableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
Status FileExists(const std::string& fname) override;
bool FileExists(const std::string& fname) override;
Status GetChildren(const std::string& dir,
std::vector<std::string>* result) override;
std::vector<std::string> ListDir(const std::string& dir) override;
Status DeleteFile(const std::string& fname) override;
void DeleteFile(const std::string& fname) override;
Status CreateDir(const std::string& dirname) override;
void CreateDir(const std::string& dirname) override;
Status DeleteDir(const std::string& dirname) override;
void DeleteDir(const std::string& dirname) override;
Status GetFileSize(const std::string& fname, uint64_t* file_size) override;
uint64_t GetFileSize(const std::string& fname) override;
Status RenameFile(const std::string& src, const std::string& target) override;
void RenameFile(const std::string& old_name,
const std::string& new_name) override;
Status IsDirectory(const std::string& fname) override;
bool IsDirectory(const std::string& fname) override;
private:
};
......
......@@ -7,7 +7,7 @@
namespace oneflow {
Snapshot::Snapshot(const std::string& snapshot_root_path) {
FS_CHECK_OK(GlobalFS()->IsDirectory(snapshot_root_path));
CHECK(GlobalFS()->IsDirectory(snapshot_root_path));
root_path_ = snapshot_root_path;
}
......@@ -23,11 +23,10 @@ std::unique_ptr<PersistentOutStream> Snapshot::GetOutStream(
const std::string& bn_in_op = parsed_lbn.second;
// op_name_dir
std::string op_name_dir = JoinPath(root_path_, op_name);
OF_ONCE_GUARD(op_name_dir, FS_CHECK_OK(GlobalFS()->CreateDir(op_name_dir)));
OF_ONCE_GUARD(op_name_dir, GlobalFS()->CreateDir(op_name_dir));
// bn_in_op_tmp_dir
std::string bn_in_op_tmp_dir = JoinPath(op_name_dir, bn_in_op + "_tmp");
OF_ONCE_GUARD(bn_in_op_tmp_dir,
FS_CHECK_OK(GlobalFS()->CreateDir(bn_in_op_tmp_dir)));
OF_ONCE_GUARD(bn_in_op_tmp_dir, GlobalFS()->CreateDir(bn_in_op_tmp_dir));
// part_file
std::string part_file =
JoinPath(bn_in_op_tmp_dir, "part_" + std::to_string(part_id));
......@@ -37,14 +36,13 @@ std::unique_ptr<PersistentOutStream> Snapshot::GetOutStream(
void Snapshot::OnePartDone(const std::string& lbn, int32_t part_id,
int32_t part_num) {
std::string done_dir = JoinPath(root_path_, lbn + "_done");
OF_ONCE_GUARD(done_dir, FS_CHECK_OK(GlobalFS()->CreateDir(done_dir)));
OF_ONCE_GUARD(done_dir, GlobalFS()->CreateDir(done_dir));
std::string done_file_path = JoinPath(done_dir, std::to_string(part_id));
CHECK_EQ(GlobalFS()->FileExists(done_file_path), fs::Status::NOT_FOUND);
CHECK_EQ(GlobalFS()->FileExists(done_file_path), false);
{ PersistentOutStream out_stream(GlobalFS(), done_file_path); }
if (GlobalFS()->GetChildrenNumOfDir(done_dir) == part_num) {
if (GlobalFS()->ListDir(done_dir).size() == part_num) {
std::string concat_file = JoinPath(root_path_, lbn);
OF_ONCE_GUARD(concat_file,
FS_CHECK_OK(GlobalFS()->DeleteRecursively(done_dir));
OF_ONCE_GUARD(concat_file, GlobalFS()->RecursivelyDeleteDir(done_dir);
ConcatLbnFile(lbn, part_num, concat_file));
}
}
......@@ -63,31 +61,29 @@ void Snapshot::ConcatLbnFile(const std::string& lbn, int32_t part_num,
std::unique_ptr<fs::RandomAccessFile> part_file;
std::string part_file_path =
JoinPath(part_dir, "part_" + std::to_string(i));
FS_CHECK_OK(GlobalFS()->NewRandomAccessFile(part_file_path, &part_file));
uint64_t part_file_size = 0;
FS_CHECK_OK(GlobalFS()->GetFileSize(part_file_path, &part_file_size));
GlobalFS()->NewRandomAccessFile(part_file_path, &part_file);
uint64_t part_file_size = GlobalFS()->GetFileSize(part_file_path);
uint64_t offset = 0;
while (offset < part_file_size) {
uint64_t n = std::min(buffer_size, part_file_size - offset);
FS_CHECK_OK(part_file->Read(offset, n, buffer));
part_file->Read(offset, n, buffer);
out_stream.Write(buffer, n);
offset += n;
}
FS_CHECK_OK(GlobalFS()->DeleteFile(part_file_path));
GlobalFS()->DeleteFile(part_file_path);
}
}
FS_CHECK_OK(GlobalFS()->DeleteDir(part_dir));
GlobalFS()->DeleteDir(part_dir);
std::string done_dir = JoinPath(root_path_, "snapshot_done_tmp");
OF_ONCE_GUARD(done_dir, FS_CHECK_OK(GlobalFS()->CreateDir(done_dir)));
OF_ONCE_GUARD(done_dir, GlobalFS()->CreateDir(done_dir));
{
PersistentOutStream out_stream(
GlobalFS(), JoinPath(done_dir, op_name + "_" + bn_in_op));
}
if (GlobalFS()->GetChildrenNumOfDir(done_dir)
if (GlobalFS()->ListDir(done_dir).size()
== SnapshotMgr::Singleton()->num_of_model_blobs()) {
std::string done_file = JoinPath(root_path_, "snapshot_done");
OF_ONCE_GUARD(done_file,
FS_CHECK_OK(GlobalFS()->DeleteRecursively(done_dir));
OF_ONCE_GUARD(done_file, GlobalFS()->RecursivelyDeleteDir(done_dir);
{ PersistentOutStream out_stream(GlobalFS(), done_file); });
}
}
......
......@@ -19,32 +19,31 @@ class WindowsFileSystem final : public FileSystem {
WindowsFileSystem() = default;
~WindowsFileSystem() = default;
Status NewRandomAccessFile(
void NewRandomAccessFile(
const std::string& fname,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(const std::string& fname,
void NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(const std::string& fname,
void NewAppendableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result) override;
Status FileExists(const std::string& fname) override;
bool FileExists(const std::string& fname) override;
Status GetChildren(const std::string& dir,
std::vector<std::string>* result) override;
std::vector<std::string> ListDir(const std::string& dir) override;
Status DeleteFile(const std::string& fname) override;
void DeleteFile(const std::string& fname) override;
Status CreateDir(const std::string& dirname) override;
void CreateDir(const std::string& dirname) override;
Status DeleteDir(const std::string& dirname) override;
void DeleteDir(const std::string& dirname) override;
Status GetFileSize(const std::string& fname, uint64_t* file_size) override;
uint64_t GetFileSize(const std::string& fname) override;
Status RenameFile(const std::string& src, const std::string& target) override;
void RenameFile(const std::string& old_name, const std::string& new_name) override;
Status IsDirectory(const std::string& fname) override;
bool IsDirectory(const std::string& fname) override;
static std::wstring Utf8ToWideChar(const std::string& utf8str) {
int size_required = MultiByteToWideChar(CP_UTF8, 0, utf8str.c_str(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册