提交 f2d75848 编写于 作者: S Shiyuan Shang-Guan 提交者: Jinhui Yuan

use hostname as log_dir_path and get this_machine_id through ip_addr (#1287)

* use hostname as log_dir_path and get this_machine_id through ip_addr

* update by comment

* fix ParseThisMachineId

* fixbug

* rm TODO


Former-commit-id: a18f2912
上级 bf6a73f8
......@@ -7,8 +7,6 @@
#include <sys/sysinfo.h>
#endif
DEFINE_int64(this_machine_id, -1, "");
namespace oneflow {
#define DEFINE_ONEFLOW_STR2INT_CAST(dst_type, cast_func) \
......@@ -89,7 +87,9 @@ size_t GetAvailableCpuMemSize() {
}
std::string LogDir() {
std::string v = FLAGS_log_dir + "/" + std::to_string(FLAGS_this_machine_id);
char hostname[255];
CHECK_EQ(gethostname(hostname, sizeof(hostname)), 0);
std::string v = FLAGS_log_dir + "/" + std::string(hostname);
return v;
}
......
......@@ -26,7 +26,6 @@
#include "oneflow/core/common/meta_util.hpp"
DECLARE_string(log_dir);
DECLARE_int64(this_machine_id);
namespace std {
template<typename T0, typename T1>
......
......@@ -3,6 +3,15 @@
#include "oneflow/core/common/protobuf.h"
#include "oneflow/core/persistence/hadoop/hadoop_file_system.h"
#ifdef PLATFORM_POSIX
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <ifaddrs.h>
#endif // PLATFORM_POSIX
namespace oneflow {
int64_t JobDesc::piece_num_of_experiment_phase() const {
......@@ -108,6 +117,7 @@ JobDesc::JobDesc(const std::string& job_conf_filepath) {
ParseProtoFromTextFile(job_conf.other(), job_conf_.mutable_other());
}
SanityCheck();
ParseThisMachineId();
Init();
}
......@@ -148,8 +158,40 @@ void JobDesc::Init() {
void JobDesc::SanityCheck() {
int64_t machine_num = job_conf_.resource().machine_size();
FOR_RANGE(int64_t, i, 0, machine_num) { CHECK_EQ(job_conf_.resource().machine(i).id(), i); }
CHECK_GE(FLAGS_this_machine_id, 0);
CHECK_LT(FLAGS_this_machine_id, machine_num);
}
void JobDesc::ParseThisMachineId() {
this_machine_id_ = -1;
#ifdef PLATFORM_POSIX
auto resource_conf = job_conf_.resource();
int64_t machine_num = resource_conf.machine_size();
struct ifaddrs* ifaddr = NULL;
struct ifaddrs* ifa = NULL;
char addr[INET_ADDRSTRLEN];
memset(addr, '\0', sizeof(addr));
HashMap<std::string, int64_t> ip_addr2machine_id;
FOR_RANGE(int64_t, i, 0, machine_num) {
CHECK(ip_addr2machine_id.emplace(resource_conf.machine(i).addr(), i).second);
}
CHECK_EQ(getifaddrs(&ifaddr), 0);
for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == NULL) { continue; }
if (ifa->ifa_addr->sa_family == AF_INET) {
PCHECK(inet_ntop(AF_INET, &(reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr)->sin_addr),
addr, INET_ADDRSTRLEN));
auto ip_addr2machine_id_it = ip_addr2machine_id.find(std::string(addr));
if (ip_addr2machine_id_it != ip_addr2machine_id.end()) {
this_machine_id_ = ip_addr2machine_id_it->second;
break;
}
}
}
freeifaddrs(ifaddr);
CHECK_GE(this_machine_id_, 0);
CHECK_LT(this_machine_id_, machine_num);
#else
UNIMPLEMENTED()
#endif
}
void JobDesc::SplitDecodeOps() {
......
......@@ -59,6 +59,7 @@ class JobDesc final {
}
int64_t reduce_group_size() const { return job_conf_.other().reduce_group_size(); }
int64_t cudnn_buf_limit_mbyte() const { return job_conf_.other().cudnn_buf_limit_mbyte(); }
int32_t this_machine_id() const { return this_machine_id_; }
// Train conf
const std::string& MdSaveSnapshotsPath() const;
......@@ -83,10 +84,12 @@ class JobDesc final {
JobDesc(const JobConf1& job_conf_);
void Init();
void SanityCheck();
void ParseThisMachineId();
void SplitDecodeOps();
void AddRecordLoadOps();
JobConf1 job_conf_;
int64_t this_machine_id_;
};
} // namespace oneflow
......
......@@ -8,7 +8,7 @@ std::string MachineCtx::GetCtrlAddr(int64_t machine_id) const {
return mchn.addr() + ":" + std::to_string(mchn.port());
}
MachineCtx::MachineCtx(int64_t this_mchn_id) : this_machine_id_(this_mchn_id) {
MachineCtx::MachineCtx() : this_machine_id_(Global<JobDesc>::Get()->this_machine_id()) {
LOG(INFO) << "this machine id: " << this_machine_id_;
}
......
......@@ -8,7 +8,6 @@ namespace oneflow {
class MachineCtx final {
public:
OF_DISALLOW_COPY_AND_MOVE(MachineCtx);
MachineCtx() = delete;
~MachineCtx() = default;
int64_t this_machine_id() const { return this_machine_id_; }
......@@ -19,7 +18,7 @@ class MachineCtx final {
private:
friend class Global<MachineCtx>;
explicit MachineCtx(int64_t this_mchn_id);
MachineCtx();
int64_t this_machine_id_;
};
......
......@@ -178,16 +178,16 @@ class Oneflow final {
OF_DISALLOW_COPY_AND_MOVE(Oneflow);
~Oneflow() = default;
Oneflow(const std::string& job_conf_filepath, int64_t this_mchn_id);
Oneflow(const std::string& job_conf_filepath);
private:
std::unique_ptr<CtrlServer> ctrl_server_;
};
Oneflow::Oneflow(const std::string& job_conf_filepath, int64_t this_mchn_id) {
Oneflow::Oneflow(const std::string& job_conf_filepath) {
// New All Global
Global<JobDesc>::New(job_conf_filepath);
Global<MachineCtx>::New(this_mchn_id);
Global<MachineCtx>::New();
const MachineCtx* machine_ctx = Global<MachineCtx>::Get();
bool DoProfile =
machine_ctx->IsThisMachineMaster() && Global<JobDesc>::Get()->collect_act_event();
......@@ -264,10 +264,9 @@ int main(int argc, char** argv) {
google::InitGoogleLogging(argv[0]);
gflags::SetVersionString(BuildVersionString());
gflags::ParseCommandLineFlags(&argc, &argv, true);
CHECK_GE(FLAGS_this_machine_id, 0);
LocalFS()->RecursivelyCreateDirIfNotExist(LogDir());
RedirectStdoutAndStderrToGlogDir();
{ Oneflow flow(FLAGS_job_conf, FLAGS_this_machine_id); }
{ Oneflow flow(FLAGS_job_conf); }
CloseStdoutAndStderr();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册