提交 6d431ed0 编写于 作者: W willzhang4a58

redirect stdout stderr


Former-commit-id: 8df7c99f
上级 12723847
......@@ -55,11 +55,17 @@ void CtrlCommNet::Barrier(const std::string& barrier_name,
}
TryLockResult CtrlCommNet::TryLock(const std::string& name) {
if (done_names_.find(name) != done_names_.end()) {
return TryLockResult::kDone;
}
grpc::ClientContext client_ctx;
TryLockRequest request;
request.set_name(name);
TryLockResponse response;
GetResponsibleStub(name)->TryLock(&client_ctx, request, &response);
if (response.result() == TryLockResult::kDone) {
CHECK(done_names_.insert(name).second);
}
return response.result();
}
......
......@@ -31,13 +31,14 @@ class CtrlCommNet final {
std::unique_ptr<CtrlServer> ctrl_server_;
std::vector<std::unique_ptr<CtrlService::Stub>> stubs_;
HashSet<std::string> done_names_;
};
#define FILE_LINE_STR __FILE__ ":" OF_PP_STRINGIZE(__LINE__)
#define OF_BARRIER() CtrlCommNet::Singleton()->Barrier(FILE_LINE_STR)
#define OF_ONCE_GUARD(name, ...) \
#define OF_CALL_ONCE(name, ...) \
do { \
TryLockResult lock_ret = CtrlCommNet::Singleton()->TryLock(name); \
if (lock_ret == TryLockResult::kLocked) { \
......
......@@ -30,7 +30,7 @@ CtrlServer::CtrlServer(const std::string& server_addr) {
server_builder.RegisterService(grpc_service_.get());
cq_ = server_builder.AddCompletionQueue();
grpc_server_ = server_builder.BuildAndStart();
LOG(INFO) << "Server listening on " << server_addr;
LOG(INFO) << "CtrlServer listening on " << server_addr;
added_worker_calls_.clear();
plan_ = nullptr;
pending_plan_calls_.clear();
......@@ -41,7 +41,10 @@ void CtrlServer::PublishPlan(const Plan* plan) {
std::unique_lock<std::mutex> lck(plan_mtx_);
plan_ = plan;
if (plan_) {
for (CtrlCallIf* call : pending_plan_calls_) { call->SendResponse(); }
for (auto call : pending_plan_calls_) {
*(call->mut_response()->mutable_plan()) = *plan;
call->SendResponse();
}
pending_plan_calls_.clear();
} else {
CHECK(pending_plan_calls_.empty());
......
......@@ -39,7 +39,7 @@ class CtrlServer final {
// FetchPlan
std::mutex plan_mtx_;
const Plan* plan_;
std::list<CtrlCallIf*> pending_plan_calls_;
std::list<CtrlCall<FetchPlanRequest, FetchPlanResponse>*> pending_plan_calls_;
};
} // namespace oneflow
......
#include "oneflow/core/common/util.h"
#include <cfenv>
#include "oneflow/core/common/str_util.h"
namespace oneflow {
......@@ -45,4 +46,14 @@ double oneflow_cast(const std::string& s) {
COMMAND(feenableexcept(FE_ALL_EXCEPT & ~FE_INEXACT & ~FE_UNDERFLOW));
#endif
void RedirectStdoutAndStderrToGlogDir() {
PCHECK(freopen(JoinPath(LogDir(), "stdout").c_str(), "a+", stdout));
PCHECK(freopen(JoinPath(LogDir(), "stderr").c_str(), "a+", stderr));
}
void CloseStdoutAndStderr() {
PCHECK(fclose(stdout) == 0);
PCHECK(fclose(stderr) == 0);
}
} // namespace oneflow
......@@ -140,6 +140,9 @@ inline uint32_t NewRandomSeed() {
#define FOR_RANGE(type, i, begin, end) for (type i = begin; i < end; ++i)
void RedirectStdoutAndStderrToGlogDir();
void CloseStdoutAndStderr();
} // namespace oneflow
#endif // ONEFLOW_CORE_COMMON_UTIL_H_
......@@ -268,6 +268,7 @@ void Compiler::Plan2DotFile(const Plan& plan) {
int main(int argc, char** argv) {
google::InitGoogleLogging(argv[0]);
google::ParseCommandLineFlags(&argc, &argv, true);
oneflow::RedirectStdoutAndStderrToGlogDir();
LOG(INFO) << "Compile Start";
oneflow::JobConf job_conf;
oneflow::ParseProtoFromTextFile(FLAGS_job_conf_filepath, &job_conf);
......@@ -276,6 +277,7 @@ int main(int argc, char** argv) {
plan = oneflow::Compiler::Singleton()->Compile(job_conf);
oneflow::PrintProtoToTextFile(plan, FLAGS_plan_filepath);
oneflow::Compiler::DeleteSingleton();
oneflow::CloseStdoutAndStderr();
LOG(INFO) << "Compile Stop";
return 0;
}
......@@ -125,12 +125,14 @@ void Runtime::SendCmdMsg(const std::vector<const TaskProto*>& tasks,
int main(int argc, char** argv) {
google::InitGoogleLogging(argv[0]);
google::ParseCommandLineFlags(&argc, &argv, true);
oneflow::RedirectStdoutAndStderrToGlogDir();
LOG(INFO) << "Runtime Start";
oneflow::Plan plan;
oneflow::ParseProtoFromTextFile(FLAGS_plan_filepath, &plan);
oneflow::Runtime::NewSingleton();
oneflow::Runtime::Singleton()->Run(plan, FLAGS_this_machine_name);
oneflow::Runtime::DeleteSingleton();
oneflow::CloseStdoutAndStderr();
LOG(INFO) << "Runtime Stop";
return 0;
}
......@@ -90,7 +90,7 @@ void Scheduler::NewAllSingleton(const std::string& job_conf_filepath,
void Scheduler::SystemCall(const std::string& cmd) {
LOG(INFO) << "SystemCall: [" << cmd << "]";
PCHECK(std::system(cmd.c_str()) == 0);
CHECK_EQ(std::system(cmd.c_str()), 0);
}
} // namespace oneflow
......@@ -102,11 +102,13 @@ int main(int argc, char** argv, char** env) {
google::InitGoogleLogging(argv[0]);
google::ParseCommandLineFlags(&argc, &argv, true);
oneflow::LocalFS()->CreateDirIfNotExist(oneflow::LogDir());
oneflow::RedirectStdoutAndStderrToGlogDir();
LOG(INFO) << "Scheduler Start";
oneflow::Scheduler::NewSingleton();
oneflow::Scheduler::Singleton()->Process(FLAGS_job_conf_filepath,
FLAGS_this_machine_name, env);
oneflow::Scheduler::DeleteSingleton();
oneflow::CloseStdoutAndStderr();
LOG(INFO) << "Scheduler Stop";
return 0;
}
......@@ -34,7 +34,7 @@ void RecordKernel::Forward(
std::function<Blob*(const std::string&)> BnInOp2Blob) const {
int64_t parallel_id = reinterpret_cast<int64_t>(kernel_ctx.other);
const std::string& root_path = op()->op_conf().record_conf().record_path();
OF_ONCE_GUARD(root_path, GlobalFS()->CreateDirIfNotExist(root_path));
OF_CALL_ONCE(root_path, GlobalFS()->CreateDirIfNotExist(root_path));
for (const std::string& ibn : op()->input_bns()) {
const std::string& lbn = op()->Lbn4BnInOp(ibn);
const Blob* blob = BnInOp2Blob(ibn);
......@@ -44,9 +44,9 @@ void RecordKernel::Forward(
const std::string& op_name = parsed_lbn.first;
const std::string& bn_in_op = parsed_lbn.second;
std::string op_dir = JoinPath(root_path, op_name);
OF_ONCE_GUARD(op_dir, GlobalFS()->CreateDir(op_dir));
OF_CALL_ONCE(op_dir, GlobalFS()->CreateDir(op_dir));
std::string bn_in_op_dir = JoinPath(op_dir, bn_in_op);
OF_ONCE_GUARD(bn_in_op_dir, GlobalFS()->CreateDir(bn_in_op_dir));
OF_CALL_ONCE(bn_in_op_dir, GlobalFS()->CreateDir(bn_in_op_dir));
std::string file_path =
JoinPath(bn_in_op_dir, "part_" + std::to_string(parallel_id));
PersistentOutStream out_stream(GlobalFS(), file_path);
......
......@@ -23,10 +23,10 @@ std::unique_ptr<PersistentOutStream> Snapshot::GetOutStream(
const std::string& bn_in_op = parsed_lbn.second;
// op_name_dir
std::string op_name_dir = JoinPath(root_path_, op_name);
OF_ONCE_GUARD(op_name_dir, GlobalFS()->CreateDir(op_name_dir));
OF_CALL_ONCE(op_name_dir, GlobalFS()->CreateDir(op_name_dir));
// bn_in_op_tmp_dir
std::string bn_in_op_tmp_dir = JoinPath(op_name_dir, bn_in_op + "_tmp");
OF_ONCE_GUARD(bn_in_op_tmp_dir, GlobalFS()->CreateDir(bn_in_op_tmp_dir));
OF_CALL_ONCE(bn_in_op_tmp_dir, GlobalFS()->CreateDir(bn_in_op_tmp_dir));
// part_file
std::string part_file =
JoinPath(bn_in_op_tmp_dir, "part_" + std::to_string(part_id));
......@@ -36,14 +36,14 @@ std::unique_ptr<PersistentOutStream> Snapshot::GetOutStream(
void Snapshot::OnePartDone(const std::string& lbn, int32_t part_id,
int32_t part_num) {
std::string done_dir = JoinPath(root_path_, lbn + "_done");
OF_ONCE_GUARD(done_dir, GlobalFS()->CreateDir(done_dir));
OF_CALL_ONCE(done_dir, GlobalFS()->CreateDir(done_dir));
std::string done_file_path = JoinPath(done_dir, std::to_string(part_id));
CHECK_EQ(GlobalFS()->FileExists(done_file_path), false);
{ PersistentOutStream out_stream(GlobalFS(), done_file_path); }
if (GlobalFS()->ListDir(done_dir).size() == part_num) {
std::string concat_file = JoinPath(root_path_, lbn);
OF_ONCE_GUARD(concat_file, GlobalFS()->RecursivelyDeleteDir(done_dir);
ConcatLbnFile(lbn, part_num, concat_file));
OF_CALL_ONCE(concat_file, GlobalFS()->RecursivelyDeleteDir(done_dir);
ConcatLbnFile(lbn, part_num, concat_file));
}
}
......@@ -75,7 +75,7 @@ void Snapshot::ConcatLbnFile(const std::string& lbn, int32_t part_num,
}
GlobalFS()->DeleteDir(part_dir);
std::string done_dir = JoinPath(root_path_, "snapshot_done_tmp");
OF_ONCE_GUARD(done_dir, GlobalFS()->CreateDir(done_dir));
OF_CALL_ONCE(done_dir, GlobalFS()->CreateDir(done_dir));
{
PersistentOutStream out_stream(
GlobalFS(), JoinPath(done_dir, op_name + "_" + bn_in_op));
......@@ -83,8 +83,8 @@ void Snapshot::ConcatLbnFile(const std::string& lbn, int32_t part_num,
if (GlobalFS()->ListDir(done_dir).size()
== SnapshotMgr::Singleton()->num_of_model_blobs()) {
std::string done_file = JoinPath(root_path_, "snapshot_done");
OF_ONCE_GUARD(done_file, GlobalFS()->RecursivelyDeleteDir(done_dir);
{ PersistentOutStream out_stream(GlobalFS(), done_file); });
OF_CALL_ONCE(done_file, GlobalFS()->RecursivelyDeleteDir(done_dir);
{ PersistentOutStream out_stream(GlobalFS(), done_file); });
}
}
......
......@@ -9,9 +9,10 @@ SnapshotMgr::SnapshotMgr(const Plan& plan) {
num_of_model_blobs_ = 0;
if (JobDesc::Singleton()->is_train()) {
model_save_snapshots_path_ = JobDesc::Singleton()->md_save_snapshots_path();
OF_ONCE_GUARD(model_save_snapshots_path_,
GlobalFS()->CreateDirIfNotExist(model_save_snapshots_path_);
CHECK(GlobalFS()->IsDirEmpty(model_save_snapshots_path_)););
OF_CALL_ONCE(model_save_snapshots_path_, {
GlobalFS()->CreateDirIfNotExist(model_save_snapshots_path_);
CHECK(GlobalFS()->IsDirEmpty(model_save_snapshots_path_));
});
HashSet<std::string> model_blob_set;
for (const OperatorProto& op_proto : plan.op()) {
if (op_proto.op_conf().has_model_save_conf()) {
......@@ -34,8 +35,8 @@ Snapshot* SnapshotMgr::GetWriteableSnapshot(int64_t snapshot_id) {
if (it == snapshot_id2writeable_snapshot_.end()) {
std::string snapshot_root_path = JoinPath(
model_save_snapshots_path_, "snapshot_" + std::to_string(snapshot_id));
OF_ONCE_GUARD(snapshot_root_path,
GlobalFS()->CreateDirIfNotExist(snapshot_root_path));
OF_CALL_ONCE(snapshot_root_path,
GlobalFS()->CreateDirIfNotExist(snapshot_root_path));
std::unique_ptr<Snapshot> ret(new Snapshot(snapshot_root_path));
auto emplace_ret =
snapshot_id2writeable_snapshot_.emplace(snapshot_id, std::move(ret));
......
......@@ -5,9 +5,16 @@ declare -a hosts=("192.168.1.11" "192.168.1.13")
SCHEDULER_CMD='GLOG_logtostderr=0 GLOG_log_dir=./log GLOG_v=0 GLOG_logbuflevel=-1 nohup ./scheduler -job_conf_filepath=./job.prototxt'
set +e
for host in "${hosts[@]}"
do
ssh $USER@$host "/usr/sbin/fuser -k 9000/tcp 9001/tcp 9002/tcp 9003/tcp 9004/tcp"
done
set -e
for host in "${hosts[@]}"
do
ssh $USER@$host 'rm -rf ~/oneflow_temp && mkdir ~/oneflow_temp'
scp ./compiler ./runtime ./scheduler ./*.prototxt $USER@$host:~/oneflow_temp
ssh $USER@$host "cd ~/oneflow_temp; $SCHEDULER_CMD -this_machine_name=$host 1>stdout 2>stderr </dev/null &"
ssh $USER@$host "cd ~/oneflow_temp; $SCHEDULER_CMD -this_machine_name=$host &"
done
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册