提交 d58ebdc7 编写于 作者: W willzhang4a58

remove tensorflow in file system

上级 427488b0
......@@ -255,7 +255,7 @@ void Graph<NodeType, EdgeType>::ToDotWithFilePath(
if (env->IsDirectory(dir_name).code() != tensorflow::error::OK) {
TF_CHECK_OK(env->RecursivelyCreateDir(dir_name));
}
PersistentOutStream out_stream(fs::GetLocalFS(), file_path);
PersistentOutStream out_stream(LocalFS(), file_path);
ToDotWithStream(out_stream);
}
......
......@@ -230,7 +230,7 @@ void Compiler::GenPlanFile(const std::string& plan_filepath) {
void Compiler::Plan2DotFile(const Plan& plan) {
const std::string file_path = LogDir() + "/dot/plan.dot";
PersistentOutStream out_stream(fs::GetLocalFS(), file_path);
PersistentOutStream out_stream(LocalFS(), file_path);
out_stream << "digraph {\n";
HashSet<int64_t> regst_desc_ids;
for (const TaskProto& task_proto : plan.task()) {
......
......@@ -27,18 +27,4 @@ void JobDesc::ToProto(JobDescProto* proto) const {
*(proto->mutable_placement()) = placement_;
}
fs::FileSystem* JobDesc::GetGlobalFS() const {
const GlobalFSConf& gfs_conf = job_conf_.global_fs_conf();
if (gfs_conf.has_localfs_conf()) {
CHECK_EQ(resource_.machine().size(), 1);
return fs::GetLocalFS();
} else if (gfs_conf.has_hdfs_conf()) {
// static fs::FileSystem* fs = new
// fs::HadoopFileSystem(gfs_conf.hdfs_conf()); return fs;
} else {
UNEXPECTED_RUN();
}
return nullptr;
}
} // namespace oneflow
......@@ -58,7 +58,6 @@ class JobDesc final {
size_t SizeOfOneDataId() const {
return job_conf_.max_data_id_length() * sizeof(char);
}
fs::FileSystem* GetGlobalFS() const;
private:
JobDesc() = default;
......
#include "oneflow/core/persistence/file_system.h"
#include <errno.h>
#include "oneflow/core/common/str_util.h"
#include "oneflow/core/job/job_desc.h"
#include "oneflow/core/persistence/posix/posix_file_system.h"
#include "oneflow/core/persistence/windows/windows_file_system.h"
......@@ -278,15 +279,37 @@ Status ErrnoToStatus(int err_number) {
return ret;
}
FileSystem* GetLocalFS() {
struct GlobalFSConstructor {
GlobalFSConstructor() {
const GlobalFSConf& gfs_conf =
JobDesc::Singleton()->job_conf().global_fs_conf();
if (gfs_conf.has_localfs_conf()) {
CHECK_EQ(JobDesc::Singleton()->resource().machine().size(), 1);
gfs = LocalFS();
} else if (gfs_conf.has_hdfs_conf()) {
// static fs::FileSystem* fs = new
// fs::HadoopFileSystem(gfs_conf.hdfs_conf()); return fs;
} else {
UNEXPECTED_RUN();
}
}
FileSystem* gfs;
};
} // namespace fs
fs::FileSystem* LocalFS() {
#ifdef PLATFORM_POSIX
static FileSystem* fs = new PosixFileSystem;
static fs::FileSystem* fs = new fs::PosixFileSystem;
#elif PLATFORM_WINDOWS
static FileSystem* fs = new WindowsFileSystem;
static fs::FileSystem* fs = new fs::WindowsFileSystem;
#endif
return fs;
}
} // namespace fs
fs::FileSystem* GlobalFS() {
static fs::GlobalFSConstructor gfs_constructor;
return gfs_constructor.gfs;
}
} // namespace oneflow
......@@ -230,10 +230,11 @@ void TryUpdateStatus(Status* current_status, const Status& new_status);
Status ErrnoToStatus(int err_number);
FileSystem* GetLocalFS();
} // namespace fs
fs::FileSystem* LocalFS();
fs::FileSystem* GlobalFS();
#define FS_CHECK_OK(val) CHECK_EQ(val, fs::Status::OK);
} // namespace oneflow
......
......@@ -2,13 +2,13 @@
namespace oneflow {
PersistentInStream::PersistentInStream(const std::string& file_path,
PersistentInStream::PersistentInStream(fs::FileSystem* fs,
const std::string& file_path,
uint64_t offset) {
tensorflow::Env* env_ = tensorflow::Env::Default();
TF_CHECK_OK(env_->FileExists(file_path));
TF_CHECK_OK(env_->NewRandomAccessFile(file_path, &file_));
FS_CHECK_OK(fs->FileExists(file_path));
FS_CHECK_OK(fs->NewRandomAccessFile(file_path, &file_));
offset_ = offset;
TF_CHECK_OK(env_->GetFileSize(file_path, &file_size_));
FS_CHECK_OK(fs->GetFileSize(file_path, &file_size_));
if (offset < file_size_) {
is_eof_ = false;
} else {
......@@ -22,14 +22,8 @@ PersistentInStream& PersistentInStream::Read(char* s, size_t n) {
is_eof_ = true;
offset_ += n;
return *this;
};
tensorflow::StringPiece result;
if (file_->Read(offset_, n, &result, s).code() == tensorflow::error::OK) {
CHECK(result.size() == n);
} else {
is_eof_ = true;
}
CHECK(result.data() == s);
FS_CHECK_OK(file_->Read(offset_, n, s));
offset_ += n;
return *this;
}
......
#ifndef ONEFLOW_CORE_PERSISTENCE_PERSISTENT_IN_STREAM_H_
#define ONEFLOW_CORE_PERSISTENCE_PERSISTENT_IN_STREAM_H_
#include "oneflow/core/common/util.h"
#include "tensorflow/core/platform/env.h"
#include "oneflow/core/persistence/file_system.h"
namespace oneflow {
......@@ -12,7 +11,8 @@ class PersistentInStream final {
PersistentInStream() = delete;
~PersistentInStream() = default;
PersistentInStream(const std::string& file_path, uint64_t offset);
PersistentInStream(fs::FileSystem*, const std::string& file_path,
uint64_t offset);
template<typename T>
PersistentInStream& operator>>(T& x) {
......@@ -30,8 +30,8 @@ class PersistentInStream final {
bool eof() const { return is_eof_; }
private:
std::unique_ptr<tensorflow::RandomAccessFile> file_;
tensorflow::uint64 file_size_;
std::unique_ptr<fs::RandomAccessFile> file_;
uint64_t file_size_;
uint64_t offset_;
bool is_eof_;
};
......
......@@ -26,8 +26,7 @@ const char* Snapshot::concat_file_name_ = "all";
const char* Snapshot::key_info_dir_name_ = "key_info";
Snapshot::Snapshot(const std::string& snapshot_root_path) {
env_ = tensorflow::Env::Default();
TF_CHECK_OK(env_->IsDirectory(snapshot_root_path));
FS_CHECK_OK(GlobalFS()->IsDirectory(snapshot_root_path));
root_path_ = snapshot_root_path;
CheckAndConcat();
}
......@@ -35,13 +34,13 @@ Snapshot::Snapshot(const std::string& snapshot_root_path) {
void Snapshot::CheckAndConcat() {
// the children of the root path must be dir, not file
std::vector<std::string> sub_dir_names;
TF_CHECK_OK(env_->GetChildren(root_path_, &sub_dir_names));
FS_CHECK_OK(GlobalFS()->GetChildren(root_path_, &sub_dir_names));
for (std::string sub_dir_name : sub_dir_names) {
std::string sub_dir = JoinPath(root_path_, sub_dir_name);
TF_CHECK_OK(env_->IsDirectory(sub_dir));
FS_CHECK_OK(GlobalFS()->IsDirectory(sub_dir));
// for the children of the sub_dir
std::vector<std::string> file_names;
TF_CHECK_OK(env_->GetChildren(sub_dir, &file_names));
FS_CHECK_OK(GlobalFS()->GetChildren(sub_dir, &file_names));
CHECK_NE(file_names.size(), 0);
std::string concat_file_path = JoinPath(sub_dir, concat_file_name_);
// for condition after concat
......@@ -49,7 +48,7 @@ void Snapshot::CheckAndConcat() {
// "all"
if (file_names.size() == 1) {
std::string file_path = JoinPath(sub_dir, file_names[0]);
TF_CHECK_OK(env_->FileExists(file_path));
FS_CHECK_OK(GlobalFS()->FileExists(file_path));
CHECK_EQ(file_names[0], concat_file_name_);
continue;
}
......@@ -61,9 +60,9 @@ void Snapshot::CheckAndConcat() {
//
// first: check key_info
std::string key_info_dir_path = JoinPath(sub_dir, key_info_dir_name_);
TF_CHECK_OK(env_->IsDirectory(key_info_dir_path));
FS_CHECK_OK(GlobalFS()->IsDirectory(key_info_dir_path));
std::vector<std::string> key_info_subs;
TF_CHECK_OK(env_->GetChildren(key_info_dir_path, &key_info_subs));
FS_CHECK_OK(GlobalFS()->GetChildren(key_info_dir_path, &key_info_subs));
int32_t part_num = -1;
for (std::string sub_file_name : key_info_subs) {
if (sub_file_name.length() > 6 && sub_file_name.substr(0, 5) == "total") {
......@@ -77,35 +76,34 @@ void Snapshot::CheckAndConcat() {
for (size_t i = 0; i < part_num; ++i) {
std::string done_file_path =
JoinPath(key_info_dir_path, "done_" + std::to_string(i));
TF_CHECK_OK(env_->FileExists(done_file_path));
FS_CHECK_OK(GlobalFS()->FileExists(done_file_path));
}
tensorflow::int64 undeletefiles, undeletedirs;
TF_CHECK_OK(env_->DeleteRecursively(key_info_dir_path, &undeletefiles,
&undeletedirs));
int64_t undeletefiles, undeletedirs;
FS_CHECK_OK(GlobalFS()->DeleteRecursively(key_info_dir_path, &undeletefiles,
&undeletedirs));
// concat
std::unique_ptr<tensorflow::WritableFile> concat_file;
TF_CHECK_OK(env_->NewWritableFile(concat_file_path, &concat_file));
std::unique_ptr<fs::WritableFile> concat_file;
FS_CHECK_OK(GlobalFS()->NewWritableFile(concat_file_path, &concat_file));
for (int32_t i = 0; i < part_num; ++i) {
std::string file_path = JoinPath(sub_dir, std::to_string(i));
TF_CHECK_OK(env_->FileExists(file_path));
const tensorflow::uint64 batch_size = 64 * 1024 * 1024;
FS_CHECK_OK(GlobalFS()->FileExists(file_path));
const uint64_t batch_size = 64 * 1024 * 1024;
char* scratch = new char[batch_size];
tensorflow::uint64 offset = 0;
std::unique_ptr<tensorflow::RandomAccessFile> file;
TF_CHECK_OK(env_->NewRandomAccessFile(file_path, &file));
tensorflow::uint64 file_size = 0;
TF_CHECK_OK(env_->GetFileSize(file_path, &file_size));
uint64_t offset = 0;
std::unique_ptr<fs::RandomAccessFile> file;
FS_CHECK_OK(GlobalFS()->NewRandomAccessFile(file_path, &file));
uint64_t file_size = 0;
FS_CHECK_OK(GlobalFS()->GetFileSize(file_path, &file_size));
while (offset < file_size) {
tensorflow::StringPiece data;
tensorflow::uint64 n = std::min(batch_size, (file_size - offset));
TF_CHECK_OK(file->Read(offset, n, &data, scratch));
TF_CHECK_OK(concat_file->Append(data));
uint64_t n = std::min(batch_size, (file_size - offset));
FS_CHECK_OK(file->Read(offset, n, scratch));
FS_CHECK_OK(concat_file->Append(scratch, n));
offset += n;
}
TF_CHECK_OK(env_->DeleteFile(file_path));
FS_CHECK_OK(GlobalFS()->DeleteFile(file_path));
free(scratch);
}
TF_CHECK_OK(concat_file->Close());
FS_CHECK_OK(concat_file->Close());
}
}
......@@ -113,7 +111,8 @@ std::unique_ptr<PersistentInStream> Snapshot::GetInStream(
const std::string& key, size_t begin_pos) const {
std::string file_path =
JoinPath(root_path_, MakeValidFileName(key), concat_file_name_);
PersistentInStream* ret = new PersistentInStream(file_path, begin_pos);
PersistentInStream* ret =
new PersistentInStream(GlobalFS(), file_path, begin_pos);
return std::unique_ptr<PersistentInStream>(ret);
}
......@@ -122,8 +121,8 @@ std::unique_ptr<PersistentInStream> Snapshot::GetInStream(
int64_t byte_size_of_each_dim) const {
std::string file_path =
JoinPath(root_path_, MakeValidFileName(key), concat_file_name_);
tensorflow::uint64 file_size = 0;
TF_CHECK_OK(env_->GetFileSize(file_path, &file_size));
uint64_t file_size = 0;
FS_CHECK_OK(GlobalFS()->GetFileSize(file_path, &file_size));
CHECK_GT(file_size, 0);
CHECK_EQ(file_size, dim_num * byte_size_of_each_dim);
BalancedSplitter splitter = BalancedSplitter(dim_num, part_num);
......@@ -134,25 +133,24 @@ std::unique_ptr<PersistentInStream> Snapshot::GetInStream(
std::unique_ptr<PersistentOutStream> Snapshot::GetOutStream(
const std::string& key, int32_t part_id, int32_t part_num) {
std::string dir_path = JoinPath(root_path_, MakeValidFileName(key));
if (env_->IsDirectory(dir_path).code() == tensorflow::error::NOT_FOUND) {
TF_CHECK_OK(env_->CreateDir(dir_path));
if (GlobalFS()->IsDirectory(dir_path) == fs::Status::NOT_FOUND) {
FS_CHECK_OK(GlobalFS()->CreateDir(dir_path));
}
TF_CHECK_OK(env_->IsDirectory(dir_path));
FS_CHECK_OK(GlobalFS()->IsDirectory(dir_path));
std::string key_info_dir_path = JoinPath(dir_path, key_info_dir_name_);
if (env_->IsDirectory(key_info_dir_path).code()
== tensorflow::error::NOT_FOUND) {
TF_CHECK_OK(env_->CreateDir(key_info_dir_path));
if (GlobalFS()->IsDirectory(key_info_dir_path) == fs::Status::NOT_FOUND) {
FS_CHECK_OK(GlobalFS()->CreateDir(key_info_dir_path));
}
TF_CHECK_OK(env_->IsDirectory(key_info_dir_path));
FS_CHECK_OK(GlobalFS()->IsDirectory(key_info_dir_path));
if (part_id == 0) {
std::unique_ptr<tensorflow::WritableFile> part_num_file;
std::unique_ptr<fs::WritableFile> part_num_file;
std::string part_num_file_path =
JoinPath(key_info_dir_path, "total_" + std::to_string(part_num));
TF_CHECK_OK(env_->NewWritableFile(part_num_file_path, &part_num_file));
FS_CHECK_OK(
GlobalFS()->NewWritableFile(part_num_file_path, &part_num_file));
}
std::string file_path = JoinPath(dir_path, std::to_string(part_id));
PersistentOutStream* ret =
new PersistentOutStream(JobDesc::Singleton()->GetGlobalFS(), file_path);
PersistentOutStream* ret = new PersistentOutStream(GlobalFS(), file_path);
return std::unique_ptr<PersistentOutStream>(ret);
}
......@@ -160,10 +158,8 @@ void Snapshot::OnePartDone4Key(const std::string& key, const int32_t part_id) {
std::string done_file_path =
JoinPath(root_path_, MakeValidFileName(key), key_info_dir_name_,
"done_" + std::to_string(part_id));
CHECK(env_->FileExists(done_file_path).code()
== tensorflow::error::NOT_FOUND);
PersistentOutStream out_stream(JobDesc::Singleton()->GetGlobalFS(),
done_file_path);
CHECK(GlobalFS()->FileExists(done_file_path) == fs::Status::NOT_FOUND);
PersistentOutStream out_stream(GlobalFS(), done_file_path);
}
} // namespace oneflow
......@@ -39,7 +39,6 @@ class Snapshot final {
// 2. every part file is writed done
static const char* key_info_dir_name_;
std::string root_path_;
tensorflow::Env* env_;
};
} // namespace oneflow
......
......@@ -7,13 +7,11 @@ namespace oneflow {
void SnapshotMgr::Init() {
LOG(INFO) << "SnapshotMgr Init";
model_save_snapshots_path_ = JobDesc::Singleton()->md_save_snapshots_path();
tensorflow::Env* env = tensorflow::Env::Default();
if (env->IsDirectory(model_save_snapshots_path_).code()
!= tensorflow::error::OK) {
TF_CHECK_OK(env->CreateDir(model_save_snapshots_path_));
if (GlobalFS()->IsDirectory(model_save_snapshots_path_) != fs::Status::OK) {
FS_CHECK_OK(GlobalFS()->CreateDir(model_save_snapshots_path_));
}
std::vector<std::string> result;
TF_CHECK_OK(env->GetChildren(model_save_snapshots_path_, &result));
FS_CHECK_OK(GlobalFS()->GetChildren(model_save_snapshots_path_, &result));
CHECK_EQ(result.size(), 0);
const std::string& load_path = JobDesc::Singleton()->md_load_snapshot_path();
if (load_path != "") {
......@@ -26,8 +24,7 @@ Snapshot* SnapshotMgr::GetWriteableSnapshot(int64_t snapshot_id) {
if (it == snapshot_id2writeable_snapshot_.end()) {
std::string snapshot_root_path = JoinPath(
model_save_snapshots_path_, "snapshot_" + std::to_string(snapshot_id));
tensorflow::Env* env = tensorflow::Env::Default();
TF_CHECK_OK(env->CreateDir(snapshot_root_path));
FS_CHECK_OK(GlobalFS()->CreateDir(snapshot_root_path));
std::unique_ptr<Snapshot> ret(new Snapshot(snapshot_root_path));
auto emplace_ret =
snapshot_id2writeable_snapshot_.emplace(snapshot_id, std::move(ret));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册