未验证 提交 41c422a1 编写于 作者: qq_22305325's avatar qq_22305325 提交者: GitHub

Rank info bootstrap (#4248)

* add CtrlConf Proto

* add HostListBootStrapClient

* add HostListBootStrapServer

* del OfOnceCall in host_list_boot_strap_client

* add BootStrapServer/Client

* Update control.proto

del rank2ctrl_addr

* add InitConfFromEnvDesc

* add log

* optimize code

* add CHECK

* InitCtrlConfFromEnvDesc

* del useless args def

* CtrlBootstrap

* minor fix

* refactor CtrlServer/CtrlClient with ProcessCtx

* RankInfoBootstrap

* fix bug and optimize

* minor optimize

* minor fix

* use WorkerProcessInfo

* minor optimize
Co-authored-by: Nlixinqi <lixinqi0703106@163.com>
Co-authored-by: Noneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
上级 d5341c9c
......@@ -5,6 +5,7 @@ import "oneflow/core/actor/act_event.proto";
message LoadServerRequest {
required string addr = 1;
optional int64 rank = 2 [default = -1];
}
message LoadServerResponse {
......
......@@ -15,75 +15,143 @@ limitations under the License.
*/
#include <map>
#include "oneflow/core/control/ctrl_bootstrap.h"
#include "oneflow/core/control/worker_process_info.pb.h"
#include "oneflow/core/control/host_list_bootstrap_server.h"
#include "oneflow/core/control/host_list_bootstrap_client.h"
#include "oneflow/core/control/rank_info_bootstrap_server.h"
#include "oneflow/core/control/rank_info_bootstrap_client.h"
namespace oneflow {
HostListCtrlBootstrap::~HostListCtrlBootstrap() {
bootstrap_client_.reset();
bootstrap_server_.reset();
}
HostListCtrlBootstrap::HostListCtrlBootstrap(const EnvDesc& env_desc) : CtrlBootstrap() {
bootstrap_server_.reset(new HostListBootstrapServer(env_desc));
bootstrap_client_.reset(new HostListBootstrapClient(env_desc));
bootstrap_client_->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
host_ = bootstrap_server_->this_machine_addr();
rank_ = env_desc.GetMachineId(host_);
world_size_ = env_desc.TotalMachineNum();
}
Maybe<void> HostListCtrlBootstrap::InitProcessCtx(int64_t port, ProcessCtx* ret_process_ctx) {
std::vector<ProcessCtx> rank2process_ctx;
Maybe<void> CtrlBootstrap::InitProcessCtx(int64_t port, ProcessCtx* ret_process_ctx) {
std::vector<WorkerProcessInfo> worker_process_info_list;
if (rank() == 0) {
ProcessCtx process_ctx;
WorkerProcessInfo worker_process_info;
{
process_ctx.set_rank(rank());
Address* addr = process_ctx.mutable_ctrl_addr()->Add();
addr->set_host(host());
addr->set_port(port);
worker_process_info.set_rank(rank());
worker_process_info.set_port(port);
JUST(SetCurrentHostByMaster(&worker_process_info));
}
rank2process_ctx.push_back(process_ctx);
worker_process_info_list.push_back(worker_process_info);
for (int64_t world_rank = 1; world_rank < world_size(); ++world_rank) {
std::string key = std::string("GetProcessCtx") + std::to_string(world_rank);
ProcessCtx cur_process_ctx;
bootstrap_client_->PullMasterKV(key, &cur_process_ctx);
CHECK_EQ_OR_RETURN(world_rank, rank2process_ctx.size());
CHECK_EQ_OR_RETURN(world_rank, cur_process_ctx.rank());
rank2process_ctx.push_back(cur_process_ctx);
std::string key = std::string("GetWorkerProcessInfo") + std::to_string(world_rank);
WorkerProcessInfo cur_work_process_info;
mut_bootstrap_client()->PullMasterKV(key, &cur_work_process_info);
CHECK_EQ_OR_RETURN(world_rank, worker_process_info_list.size());
CHECK_EQ_OR_RETURN(world_rank, cur_work_process_info.rank());
worker_process_info_list.push_back(cur_work_process_info);
}
} else {
std::string key = std::string("GetProcessCtx") + std::to_string(rank());
ProcessCtx cur_process_ctx;
std::string key = std::string("GetWorkerProcessInfo") + std::to_string(rank());
WorkerProcessInfo cur_work_process_info;
{
cur_process_ctx.set_rank(rank());
Address* addr = cur_process_ctx.mutable_ctrl_addr()->Add();
addr->set_host(host());
addr->set_port(port);
cur_work_process_info.set_rank(rank());
cur_work_process_info.set_port(port);
JUST(SetCurrentHostByWorker(&cur_work_process_info));
}
bootstrap_client_->PushMasterKV(key, cur_process_ctx);
mut_bootstrap_client()->PushMasterKV(key, cur_work_process_info);
}
bootstrap_client_->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
mut_bootstrap_client()->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
if (rank() == 0) {
ret_process_ctx->set_rank(rank());
ret_process_ctx->mutable_ctrl_addr()->Clear();
for (const auto& process_ctx : rank2process_ctx) {
CHECK_EQ_OR_RETURN(process_ctx.ctrl_addr_size(), 1);
*ret_process_ctx->mutable_ctrl_addr()->Add() = process_ctx.ctrl_addr(0);
for (const auto& worker_process_info : worker_process_info_list) {
Address* addr = ret_process_ctx->mutable_ctrl_addr()->Add();
if (worker_process_info.has_host()) { addr->set_host(worker_process_info.host()); }
addr->set_port(worker_process_info.port());
JUST(SetHostByMaster(addr, worker_process_info.rank()));
}
bootstrap_client_->PushMasterKV("BroadcastProcessCtx", *ret_process_ctx);
mut_bootstrap_client()->PushMasterKV("BroadcastProcessCtx", *ret_process_ctx);
} else {
bootstrap_client_->PullMasterKV("BroadcastProcessCtx", ret_process_ctx);
mut_bootstrap_client()->PullMasterKV("BroadcastProcessCtx", ret_process_ctx);
ret_process_ctx->set_rank(rank());
}
bootstrap_client_->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
mut_bootstrap_client()->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
LOG(INFO) << "\n" << ret_process_ctx->DebugString();
return Maybe<void>::Ok();
}
HostListCtrlBootstrap::HostListCtrlBootstrap(const EnvDesc& env_desc) : CtrlBootstrap() {
bootstrap_server_.reset(new HostListBootstrapServer(env_desc));
bootstrap_client_.reset(new HostListBootstrapClient(env_desc));
bootstrap_client_->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
host_ = bootstrap_server_->this_machine_addr();
rank_ = env_desc.GetMachineId(host_);
world_size_ = env_desc.TotalMachineNum();
}
HostListCtrlBootstrap::~HostListCtrlBootstrap() {
bootstrap_client_.reset();
bootstrap_server_.reset();
}
Maybe<void> HostListCtrlBootstrap::SetHostByMaster(Address* addr, int64_t world_rank) const {
return Maybe<void>::Ok();
}
Maybe<void> HostListCtrlBootstrap::SetCurrentHostByMaster(
WorkerProcessInfo* worker_process_info) const {
worker_process_info->set_host(host());
return Maybe<void>::Ok();
}
Maybe<void> HostListCtrlBootstrap::SetCurrentHostByWorker(
WorkerProcessInfo* worker_process_info) const {
worker_process_info->set_host(host());
return Maybe<void>::Ok();
}
BootstrapServer* HostListCtrlBootstrap::mut_bootstrap_server() { return bootstrap_server_.get(); }
BootstrapClient* HostListCtrlBootstrap::mut_bootstrap_client() { return bootstrap_client_.get(); }
RankInfoCtrlBootstrap::RankInfoCtrlBootstrap(const BootstrapConf& bootstrap_conf)
: CtrlBootstrap(), bootstrap_conf_(bootstrap_conf) {
bootstrap_server_.reset(new RankInfoBootstrapServer(bootstrap_conf));
bootstrap_client_.reset(new RankInfoBootstrapClient(bootstrap_conf));
bootstrap_client_->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
master_host_ = bootstrap_conf.master_addr().host();
rank_ = bootstrap_conf.rank();
world_size_ = bootstrap_conf.world_size();
}
RankInfoCtrlBootstrap::~RankInfoCtrlBootstrap() {
bootstrap_client_.reset();
bootstrap_server_.reset();
}
Maybe<void> RankInfoCtrlBootstrap::SetHostByMaster(Address* addr, int64_t world_rank) const {
if (addr->has_host()) { return Maybe<void>::Ok(); }
const auto& rank2host = JUST(bootstrap_server_->rank2host());
CHECK_EQ_OR_RETURN(rank2host.size(), world_size());
CHECK_GE_OR_RETURN(world_rank, 0);
CHECK_LT_OR_RETURN(world_rank, rank2host.size());
addr->set_host(rank2host.at(world_rank));
return Maybe<void>::Ok();
}
Maybe<void> RankInfoCtrlBootstrap::SetCurrentHostByMaster(
WorkerProcessInfo* worker_process_info) const {
CHECK_EQ_OR_RETURN(rank(), 0);
if (bootstrap_conf_.has_host()) {
worker_process_info->set_host(bootstrap_conf_.host());
} else {
worker_process_info->set_host(master_host_);
}
return Maybe<void>::Ok();
}
Maybe<void> RankInfoCtrlBootstrap::SetCurrentHostByWorker(
WorkerProcessInfo* worker_process_info) const {
CHECK_NE_OR_RETURN(rank(), 0);
if (bootstrap_conf_.has_host()) { worker_process_info->set_host(bootstrap_conf_.host()); }
return Maybe<void>::Ok();
}
BootstrapServer* RankInfoCtrlBootstrap::mut_bootstrap_server() { return bootstrap_server_.get(); }
BootstrapClient* RankInfoCtrlBootstrap::mut_bootstrap_client() { return bootstrap_client_.get(); }
} // namespace oneflow
......@@ -23,17 +23,25 @@ limitations under the License.
namespace oneflow {
class ProcessCtx;
class WorkerProcessInfo;
class BootstrapServer;
class BootstrapClient;
class CtrlBootstrap {
public:
virtual ~CtrlBootstrap() {}
virtual Maybe<void> InitProcessCtx(int64_t port, ProcessCtx* process_ctx) = 0;
Maybe<void> InitProcessCtx(int64_t port, ProcessCtx* process_ctx);
protected:
virtual int64_t rank() const = 0;
virtual int64_t world_size() const = 0;
virtual std::string host() const = 0;
virtual Maybe<void> SetHostByMaster(Address*, int64_t world_rank) const = 0;
virtual Maybe<void> SetCurrentHostByMaster(WorkerProcessInfo*) const = 0;
virtual Maybe<void> SetCurrentHostByWorker(WorkerProcessInfo*) const = 0;
virtual BootstrapServer* mut_bootstrap_server() = 0;
virtual BootstrapClient* mut_bootstrap_client() = 0;
CtrlBootstrap() = default;
};
......@@ -46,13 +54,19 @@ class HostListCtrlBootstrap final : public CtrlBootstrap {
explicit HostListCtrlBootstrap(const EnvDesc& env_desc);
~HostListCtrlBootstrap() override;
Maybe<void> InitProcessCtx(int64_t port, ProcessCtx* process_ctx) override;
private:
std::string host() const override { return host_; }
int64_t rank() const override { return rank_; }
int64_t world_size() const override { return world_size_; }
std::string host() const { return host_; }
Maybe<void> SetHostByMaster(Address*, int64_t world_rank) const override;
Maybe<void> SetCurrentHostByMaster(WorkerProcessInfo*) const override;
Maybe<void> SetCurrentHostByWorker(WorkerProcessInfo*) const override;
BootstrapServer* mut_bootstrap_server() override;
BootstrapClient* mut_bootstrap_client() override;
// Uses shared_ptr and forward declaration to avoid `#include ...`
std::shared_ptr<HostListBootstrapServer> bootstrap_server_;
std::shared_ptr<HostListBootstrapClient> bootstrap_client_;
......@@ -62,6 +76,35 @@ class HostListCtrlBootstrap final : public CtrlBootstrap {
int64_t world_size_;
};
class RankInfoBootstrapServer;
class RankInfoBootstrapClient;
class RankInfoCtrlBootstrap final : public CtrlBootstrap {
public:
explicit RankInfoCtrlBootstrap(const BootstrapConf& bootstrap_conf);
~RankInfoCtrlBootstrap() override;
private:
int64_t rank() const override { return rank_; }
int64_t world_size() const override { return world_size_; }
Maybe<void> SetHostByMaster(Address*, int64_t world_rank) const override;
Maybe<void> SetCurrentHostByMaster(WorkerProcessInfo*) const override;
Maybe<void> SetCurrentHostByWorker(WorkerProcessInfo*) const override;
BootstrapServer* mut_bootstrap_server() override;
BootstrapClient* mut_bootstrap_client() override;
// Uses shared_ptr and forward declaration to avoid `#include ...`
std::shared_ptr<RankInfoBootstrapServer> bootstrap_server_;
std::shared_ptr<RankInfoBootstrapClient> bootstrap_client_;
std::string master_host_;
BootstrapConf bootstrap_conf_;
int64_t rank_;
int64_t world_size_;
};
} // namespace oneflow
#endif // ONEFLOW_CORE_CONTROL_CTRL_BOOTSTRAP_H_
......@@ -10,3 +10,11 @@ message ProcessCtx {
repeated Address ctrl_addr = 1;
required int64 rank = 2;
}
message BootstrapConf {
required Address master_addr = 1;
required int64 rank = 2;
required int64 world_size = 3;
optional string host = 4;
optional int64 ctrl_port = 5 [default = -1];
}
......@@ -47,6 +47,7 @@ class CtrlCall final : public CtrlCallIf {
CtrlRequest<ctrl_method>* mut_request() { return &request_; }
CtrlResponse<ctrl_method>* mut_response() { return &response_; }
grpc::ServerContext* mut_server_ctx() { return &server_ctx_; }
const grpc::ServerContext& server_ctx() const { return server_ctx_; }
grpc::ServerAsyncResponseWriter<CtrlResponse<ctrl_method>>* mut_responder() {
return &responder_;
}
......
......@@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/control/ctrl_server.h"
#include "oneflow/core/control/ctrl_bootstrap.pb.h"
#include "oneflow/core/actor/act_event_logger.h"
#include "oneflow/core/job/profiler.h"
#include "oneflow/core/job/env_desc.h"
......@@ -21,21 +22,30 @@ limitations under the License.
namespace oneflow {
CtrlServer::CtrlServer() : RpcServer(), port_(0) {
CtrlServer::CtrlServer(int ctrl_port) : RpcServer(), port_(ctrl_port) {
Init();
grpc::ServerBuilder server_builder;
server_builder.SetMaxMessageSize(INT_MAX);
server_builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(), &port_);
int bound_port = 0;
server_builder.AddListeningPort("0.0.0.0:" + std::to_string(port_),
grpc::InsecureServerCredentials(), &bound_port);
grpc_service_.reset(new CtrlService::AsyncService);
server_builder.RegisterService(grpc_service_.get());
cq_ = server_builder.AddCompletionQueue();
grpc_server_ = server_builder.BuildAndStart();
CHECK_NE(port(), 0);
if (port() != 0) {
CHECK_EQ(port(), bound_port) << "Port " << port() << " is unavailable";
} else {
port_ = bound_port;
CHECK_NE(port(), 0);
}
LOG(INFO) << "CtrlServer listening on "
<< "0.0.0.0:" + std::to_string(port());
loop_thread_ = std::thread(&CtrlServer::HandleRpcs, this);
}
CtrlServer::CtrlServer() : CtrlServer(0) {}
void CtrlServer::OnLoadServer(CtrlCall<CtrlMethod::kLoadServer>* call) {
call->SendResponse();
EnqueueRequest<CtrlMethod::kLoadServer>();
......
......@@ -26,6 +26,8 @@ class CtrlServer final : public RpcServer {
~CtrlServer() override {}
CtrlServer();
// port may be configured in bootstrap_conf
CtrlServer(int ctrl_port);
int64_t port() const { return port_; }
......
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/control/rank_info_bootstrap_client.h"
namespace oneflow {
RankInfoBootstrapClient::RankInfoBootstrapClient(const BootstrapConf& bootstrap_conf) {
stubs_.reserve(bootstrap_conf.world_size());
const auto& master_addr = bootstrap_conf.master_addr();
const std::string& host = master_addr.host() + ":" + std::to_string(master_addr.port());
stubs_.push_back(CtrlService::NewStub(host));
LoadServerRequest request;
request.set_addr(master_addr.host());
request.set_rank(bootstrap_conf.rank());
LoadServer(request, stubs_[0].get());
}
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_CONTROL_RANK_INFO_BOOTSTRAP_CLIENT_H_
#define ONEFLOW_CORE_CONTROL_RANK_INFO_BOOTSTRAP_CLIENT_H_
#include "oneflow/core/control/bootstrap_client.h"
#include "oneflow/core/control/ctrl_bootstrap.pb.h"
#include "oneflow/core/job/env_desc.h"
namespace oneflow {
class RankInfoBootstrapClient final : public BootstrapClient {
public:
OF_DISALLOW_COPY_AND_MOVE(RankInfoBootstrapClient);
~RankInfoBootstrapClient() override = default;
RankInfoBootstrapClient(const BootstrapConf& bootstrap_conf);
};
} // namespace oneflow
#endif // ONEFLOW_CORE_CONTROL_RANK_INFO_BOOTSTRAP_CLIENT_H_
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/control/rank_info_bootstrap_server.h"
#include "oneflow/core/actor/act_event_logger.h"
#include "oneflow/core/job/profiler.h"
#include "grpc/grpc_posix.h"
namespace oneflow {
namespace {
std::string GetHostFromUri(const std::string& uri) {
size_t first_delimiter_pos = uri.find(":");
CHECK_NE(first_delimiter_pos, std::string::npos);
const std::string& protocol_family = uri.substr(0, first_delimiter_pos);
CHECK_EQ(protocol_family, "ipv4");
size_t second_delimiter_pos = uri.rfind(":");
return uri.substr(first_delimiter_pos + 1, second_delimiter_pos - first_delimiter_pos - 1);
}
} // namespace
RankInfoBootstrapServer::RankInfoBootstrapServer(const BootstrapConf& bootstrap_conf)
: BootstrapServer(), port_(0), world_size_(bootstrap_conf.world_size()) {
Init();
int p = (bootstrap_conf.rank() == 0 ? bootstrap_conf.master_addr().port() : 0);
grpc::ServerBuilder server_builder;
server_builder.SetMaxMessageSize(INT_MAX);
server_builder.AddListeningPort("0.0.0.0:" + std::to_string(p), grpc::InsecureServerCredentials(),
&port_);
grpc_service_.reset(new CtrlService::AsyncService);
server_builder.RegisterService(grpc_service_.get());
cq_ = server_builder.AddCompletionQueue();
grpc_server_ = server_builder.BuildAndStart();
if (bootstrap_conf.rank() == 0) { CHECK_EQ(p, port()) << "Port " << p << " is unavailable"; }
LOG(INFO) << "RankInfoBootstrapServer listening on "
<< "0.0.0.0:" + std::to_string(port());
loop_thread_ = std::thread(&RankInfoBootstrapServer::HandleRpcs, this);
}
Maybe<const std::vector<std::string>&> RankInfoBootstrapServer::rank2host() const {
CHECK_NOTNULL(rank2host_.get());
return *rank2host_;
}
void RankInfoBootstrapServer::OnLoadServer(CtrlCall<CtrlMethod::kLoadServer>* call) {
int64_t rank = call->request().rank();
CHECK_GE(rank, 0);
CHECK_LT(rank, world_size_);
if (!rank2host_) { rank2host_ = std::make_shared<std::vector<std::string>>(world_size_); }
rank2host_->at(rank) = GetHostFromUri(call->server_ctx().peer());
call->SendResponse();
EnqueueRequest<CtrlMethod::kLoadServer>();
}
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_CONTROL_RANK_INFO_BOOTSTRAP_SERVER_H_
#define ONEFLOW_CORE_CONTROL_RANK_INFO_BOOTSTRAP_SERVER_H_
#include "oneflow/core/control/bootstrap_server.h"
#include "oneflow/core/control/ctrl_bootstrap.pb.h"
#include "oneflow/core/job/env_desc.h"
#include "oneflow/core/common/maybe.h"
namespace oneflow {
class RankInfoBootstrapServer final : public BootstrapServer {
public:
OF_DISALLOW_COPY_AND_MOVE(RankInfoBootstrapServer);
~RankInfoBootstrapServer() override = default;
RankInfoBootstrapServer(const BootstrapConf& bootstrap_conf);
int64_t port() const { return port_; }
Maybe<const std::vector<std::string>&> rank2host() const;
private:
void OnLoadServer(CtrlCall<CtrlMethod::kLoadServer>* call) override;
int port_;
const int64_t world_size_;
// use std::shared_ptr as std::optional
std::shared_ptr<std::vector<std::string>> rank2host_;
};
} // namespace oneflow
#endif // ONEFLOW_CORE_CONTROL_RANK_INFO_BOOTSTRAP_SERVER_H_
......@@ -179,18 +179,22 @@ void RpcClient::EraseCount(const std::string& k) {
}
void RpcClient::LoadServer(const std::string& server_addr, CtrlService::Stub* stub) {
LoadServerRequest request;
request.set_addr(server_addr);
return LoadServer(request, stub);
}
void RpcClient::LoadServer(const LoadServerRequest& request, CtrlService::Stub* stub) {
int32_t retry_idx = 0;
for (; retry_idx < max_retry_num; ++retry_idx) {
grpc::ClientContext client_ctx;
LoadServerRequest request;
request.set_addr(server_addr);
LoadServerResponse response;
grpc::Status st = stub->CallMethod<CtrlMethod::kLoadServer>(&client_ctx, request, &response);
if (st.error_code() == grpc::StatusCode::OK) {
LOG(INFO) << "LoadServer " << server_addr << " Successful at " << retry_idx << " times";
LOG(INFO) << "LoadServer " << request.addr() << " Successful at " << retry_idx << " times";
break;
} else if (st.error_code() == grpc::StatusCode::UNAVAILABLE) {
LOG(INFO) << "LoadServer " << server_addr << " Failed at " << retry_idx << " times"
LOG(INFO) << "LoadServer " << request.addr() << " Failed at " << retry_idx << " times"
<< " error_code " << st.error_code() << " error_message " << st.error_message();
std::this_thread::sleep_for(std::chrono::seconds(sleep_seconds));
continue;
......
......@@ -68,6 +68,7 @@ class RpcClient {
protected:
RpcClient() = default;
void LoadServer(const std::string& server_addr, CtrlService::Stub* stub);
void LoadServer(const LoadServerRequest& request, CtrlService::Stub* stub);
void PushMasterKV(const std::string& k, std::function<void(std::string*)> VSetter);
void PullMasterKV(const std::string& k, std::function<void(const std::string&)> VGetter);
CtrlService::Stub* GetMasterStub() { return stubs_[0].get(); }
......
syntax = "proto2";
package oneflow;
message WorkerProcessInfo {
required int64 rank = 1;
required int64 port = 2;
optional string host = 3;
}
......@@ -83,8 +83,10 @@ Maybe<void> EnvGlobalObjectsScope::Init(const EnvProto& env_proto) {
Global<EnvDesc>::New(env_proto);
Global<CtrlServer>::New();
Global<ProcessCtx>::New();
JUST(HostListCtrlBootstrap(*Global<EnvDesc>::Get())
.InitProcessCtx(Global<CtrlServer>::Get()->port(), Global<ProcessCtx>::Get()));
// Avoid dead lock by using CHECK_JUST instead of JUST. because it maybe be blocked in
// ~CtrlBootstrap.
CHECK_JUST(HostListCtrlBootstrap(*Global<EnvDesc>::Get())
.InitProcessCtx(Global<CtrlServer>::Get()->port(), Global<ProcessCtx>::Get()));
Global<CtrlClient>::New(*Global<ProcessCtx>::Get());
int64_t this_mchn_id = Global<ProcessCtx>::Get()->rank();
Global<MachineCtx>::New(this_mchn_id);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册