提交 ca1cac8f 编写于 作者: W willzhang4a58

snapshot done

上级 7af9992a
......@@ -73,7 +73,7 @@ class Runtime final {
RuntimeCtx::Singleton()->set_this_machine_name(this_machine_name);
KernelMgr::Singleton()->InitFromPlan(plan);
RdmaCommNet::Init();
SnapshotMgr::Singleton()->Init();
SnapshotMgr::Singleton()->Init(plan);
ActorMsgBus::Singleton()->Init();
ThreadMgr::Singleton();
}
......
......@@ -17,9 +17,13 @@ void FileSystem::CreateDirIfNotExist(const std::string& dirname) {
}
bool FileSystem::IsDirEmpty(const std::string& dirname) {
return GetChildrenNumOfDir(dirname) == 0;
}
size_t FileSystem::GetChildrenNumOfDir(const std::string& dirname) {
std::vector<std::string> result;
FS_CHECK_OK(GetChildren(dirname, &result));
return result.size() == 0;
return result.size();
}
std::string FileSystem::TranslateName(const std::string& name) const {
......@@ -42,20 +46,8 @@ bool FileSystem::FilesExist(const std::vector<std::string>& files,
return result;
}
Status FileSystem::DeleteRecursively(const std::string& dirname,
int64_t* undeleted_files,
int64_t* undeleted_dirs) {
CHECK_NOTNULL(undeleted_files);
CHECK_NOTNULL(undeleted_dirs);
*undeleted_files = 0;
*undeleted_dirs = 0;
// Make sure that dirname exists;
Status exists_status = FileExists(dirname);
if (exists_status != Status::OK) {
(*undeleted_dirs)++;
return exists_status;
}
Status FileSystem::DeleteRecursively(const std::string& dirname) {
FS_CHECK_OK(FileExists(dirname));
std::deque<std::string> dir_q; // Queue for the BFS
std::vector<std::string> dir_list; // List of all dirs discovered
dir_q.push_back(dirname);
......@@ -72,10 +64,7 @@ Status FileSystem::DeleteRecursively(const std::string& dirname,
// GetChildren might fail if we don't have appropriate permissions.
Status s = GetChildren(dir, &children);
TryUpdateStatus(&ret, s);
if (s != Status::OK) {
(*undeleted_dirs)++;
continue;
}
FS_CHECK_OK(s);
for (const std::string& child : children) {
const std::string child_path = JoinPath(dir, child);
// If the child is a directory add it to the queue, otherwise delete it.
......@@ -86,7 +75,7 @@ Status FileSystem::DeleteRecursively(const std::string& dirname,
// unimplemented.
Status del_status = DeleteFile(child_path);
TryUpdateStatus(&ret, del_status);
if (del_status != Status::OK) { (*undeleted_files)++; }
CHECK_EQ(del_status, Status::OK);
}
}
}
......@@ -98,7 +87,7 @@ Status FileSystem::DeleteRecursively(const std::string& dirname,
// unimplemented.
Status s = DeleteDir(dir);
TryUpdateStatus(&ret, s);
if (s != Status::OK) { (*undeleted_dirs)++; }
FS_CHECK_OK(s);
}
return ret;
}
......
......@@ -170,6 +170,7 @@ class FileSystem {
void CreateDirIfNotExist(const std::string& dirname);
bool IsDirEmpty(const std::string& dirname);
size_t GetChildrenNumOfDir(const std::string& dirname);
// Creates the specified directory and all the necessary
// subdirectories.
......@@ -193,9 +194,7 @@ class FileSystem {
// * PERMISSION_DENIED - dirname or some descendant is not writable
// * UNIMPLEMENTED - Some underlying functions (like Delete) are not
// implemented
virtual Status DeleteRecursively(const std::string& dirname,
int64_t* undeleted_files,
int64_t* undeleted_dirs);
virtual Status DeleteRecursively(const std::string& dirname);
// Stores the size of `fname` in `*file_size`.
virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0;
......
#ifndef LIBHDFS_HDFS_H
#define LIBHDFS_HDFS_H
#ifndef ONEFLOW_CORE_PERSISTENCE_HADOOP_HDFS_H_
#define ONEFLOW_CORE_PERSISTENCE_HADOOP_HDFS_H_
#include <errno.h> /* for EINTERNAL, etc. */
#include <fcntl.h> /* for O_RDONLY, O_WRONLY */
......@@ -886,7 +886,7 @@ void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer);
#endif
#undef LIBHDFS_EXTERNAL
#endif /*LIBHDFS_HDFS_H*/
#endif /*ONEFLOW_CORE_PERSISTENCE_HADOOP_HDFS_H_*/
/**
* vim: ts=4: sw=4: et
......
#include "oneflow/core/persistence/snapshot.h"
#include "oneflow/core/common/balanced_splitter.h"
#include "oneflow/core/common/str_util.h"
#include "oneflow/core/job/job_desc.h"
#include "oneflow/core/operator/operator.h"
#include "oneflow/core/persistence/snapshot_manager.h"
namespace oneflow {
......@@ -56,15 +56,10 @@ void Snapshot::OnePartDone(const std::string& lbn, int32_t part_id,
std::string done_file_path = JoinPath(done_dir, std::to_string(part_id));
CHECK_EQ(GlobalFS()->FileExists(done_file_path), fs::Status::NOT_FOUND);
{ PersistentOutStream out_stream(GlobalFS(), done_file_path); }
std::vector<std::string> done_files;
GlobalFS()->GetChildren(done_dir, &done_files);
if (done_files.size() == part_num) {
if (GlobalFS()->GetChildrenNumOfDir(done_dir) == part_num) {
std::string concat_file = JoinPath(root_path_, lbn);
OF_ONCE_GUARD(concat_file, int64_t undeleted_files = 0;
int64_t undeleted_dirs = 0;
FS_CHECK_OK(GlobalFS()->DeleteRecursively(
done_dir, &undeleted_files, &undeleted_dirs));
CHECK_EQ(undeleted_files, 0); CHECK_EQ(undeleted_dirs, 0);
OF_ONCE_GUARD(concat_file,
FS_CHECK_OK(GlobalFS()->DeleteRecursively(done_dir));
ConcatLbnFile(lbn, part_num, concat_file));
}
}
......@@ -97,9 +92,19 @@ void Snapshot::ConcatLbnFile(const std::string& lbn, int32_t part_num,
}
}
FS_CHECK_OK(GlobalFS()->DeleteDir(part_dir));
std::string done_dir = JoinPath(root_path_, op_name, "done");
std::string done_dir = JoinPath(root_path_, "snapshot_done_tmp");
OF_ONCE_GUARD(done_dir, FS_CHECK_OK(GlobalFS()->CreateDir(done_dir)));
PersistentOutStream out_stream(GlobalFS(), JoinPath(done_dir, bn_in_op));
{
PersistentOutStream out_stream(
GlobalFS(), JoinPath(done_dir, op_name + "_" + bn_in_op));
}
if (GlobalFS()->GetChildrenNumOfDir(done_dir)
== SnapshotMgr::Singleton()->num_of_model_blobs()) {
std::string done_file = JoinPath(root_path_, "snapshot_done");
OF_ONCE_GUARD(done_file,
FS_CHECK_OK(GlobalFS()->DeleteRecursively(done_dir));
{ PersistentOutStream out_stream(GlobalFS(), done_file); });
}
}
} // namespace oneflow
......@@ -4,7 +4,7 @@
namespace oneflow {
void SnapshotMgr::Init() {
void SnapshotMgr::Init(const Plan& plan) {
LOG(INFO) << "SnapshotMgr Init";
model_save_snapshots_path_ = JobDesc::Singleton()->md_save_snapshots_path();
OF_ONCE_GUARD(model_save_snapshots_path_,
......@@ -14,6 +14,16 @@ void SnapshotMgr::Init() {
if (load_path != "") {
readable_snapshot_ptr_.reset(new Snapshot(load_path));
}
HashSet<std::string> model_blob_set;
for (const OperatorProto& op_proto : plan.op()) {
if (op_proto.op_conf().has_model_save_conf()) {
for (const std::string& lbn :
op_proto.op_conf().model_save_conf().lbns()) {
model_blob_set.insert(lbn);
}
}
}
num_of_model_blobs_ = model_blob_set.size();
}
Snapshot* SnapshotMgr::GetWriteableSnapshot(int64_t snapshot_id) {
......
......@@ -18,13 +18,16 @@ class SnapshotMgr {
const Snapshot* GetReadableSnapshot() { return readable_snapshot_ptr_.get(); }
void Init();
void Init(const Plan& plan);
size_t num_of_model_blobs() const { return num_of_model_blobs_; }
private:
SnapshotMgr() = default;
std::string model_save_snapshots_path_;
std::unique_ptr<const Snapshot> readable_snapshot_ptr_;
HashMap<int64_t, std::unique_ptr<Snapshot>> snapshot_id2writeable_snapshot_;
size_t num_of_model_blobs_;
};
} // namespace oneflow
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册