未验证 提交 b6be9bce 编写于 作者: qq_22305325's avatar qq_22305325 提交者: GitHub

Add ctrl conf (#4235)

* add CtrlConf Proto

* add HostListBootStrapClient

* add HostListBootStrapServer

* del OfOnceCall in host_list_boot_strap_client

* add BootStrapServer/Client

* Update control.proto

del rank2ctrl_addr

* add InitConfFromEnvDesc

* add log

* optimize code

* add CHECK

* InitCtrlConfFromEnvDesc

* del useless args def

* CtrlBootstrap

* minor fix
Co-authored-by: Nlixinqi <lixinqi0703106@163.com>
Co-authored-by: Noneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
上级 c0ab5087
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_CONTROL_BOOTSTRAP_CLIENT_H_
#define ONEFLOW_CORE_CONTROL_BOOTSTRAP_CLIENT_H_
#include "oneflow/core/control/rpc_client.h"
#include "oneflow/core/job/env_desc.h"
namespace oneflow {
class BootstrapClient : public RpcClient {
public:
OF_DISALLOW_COPY_AND_MOVE(BootstrapClient);
virtual ~BootstrapClient() override = default;
protected:
friend class Global<BootstrapClient>;
BootstrapClient() = default;
};
} // namespace oneflow
#endif // ONEFLOW_CORE_CONTROL_BOOTSTRAP_CLIENT_H_
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_CONTROL_BOOTSTRAP_SERVER_H_
#define ONEFLOW_CORE_CONTROL_BOOTSTRAP_SERVER_H_
#include "oneflow/core/control/rpc_server.h"
#include "oneflow/core/job/env_desc.h"
namespace oneflow {
class BootstrapServer : public RpcServer {
public:
OF_DISALLOW_COPY_AND_MOVE(BootstrapServer);
BootstrapServer() = default;
virtual ~BootstrapServer() override = default;
};
} // namespace oneflow
#endif // ONEFLOW_CORE_CONTROL_BOOTSTRAP_SERVER_H_
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <map>
#include "oneflow/core/control/ctrl_bootstrap.h"
#include "oneflow/core/control/host_list_bootstrap_server.h"
#include "oneflow/core/control/host_list_bootstrap_client.h"
namespace oneflow {
HostListCtrlBootstrap::~HostListCtrlBootstrap() {
bootstrap_client_.reset();
bootstrap_server_.reset();
}
HostListCtrlBootstrap::HostListCtrlBootstrap(const EnvDesc& env_desc)
: CtrlBootstrap(), env_desc_(env_desc.env_proto()) {
bootstrap_server_.reset(new HostListBootstrapServer(env_desc));
bootstrap_client_.reset(new HostListBootstrapClient(env_desc));
}
Maybe<void> HostListCtrlBootstrap::InitCtrlConf(CtrlConf* ret_ctrl_conf) {
bootstrap_client_->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
int64_t this_machine_id = env_desc_.GetMachineId(bootstrap_server_->this_machine_addr());
std::vector<CtrlConf> rank2ctrl_conf;
if (this_machine_id == 0) {
CtrlConf ctrl_conf;
{
Address* addr = ctrl_conf.mutable_ctrl_addr()->Add();
addr->set_host(bootstrap_server_->this_machine_addr());
addr->set_port(env_desc_.ctrl_port());
ctrl_conf.set_rank(this_machine_id);
rank2ctrl_conf.push_back(ctrl_conf);
}
for (int64_t machine_id = 1; machine_id < env_desc_.TotalMachineNum(); ++machine_id) {
std::string key = std::string("GetCtrlConf") + std::to_string(machine_id);
CtrlConf cur_ctrl_conf;
bootstrap_client_->PullMasterKV(key, &cur_ctrl_conf);
CHECK_EQ_OR_RETURN(machine_id, rank2ctrl_conf.size());
CHECK_EQ_OR_RETURN(machine_id, cur_ctrl_conf.rank());
rank2ctrl_conf.push_back(cur_ctrl_conf);
}
} else {
std::string key = std::string("GetCtrlConf") + std::to_string(this_machine_id);
CtrlConf cur_ctrl_conf;
cur_ctrl_conf.set_rank(this_machine_id);
Address* addr = cur_ctrl_conf.mutable_ctrl_addr()->Add();
addr->set_host(bootstrap_server_->this_machine_addr());
addr->set_port(env_desc_.ctrl_port());
bootstrap_client_->PushMasterKV(key, cur_ctrl_conf);
}
bootstrap_client_->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
if (this_machine_id == 0) {
ret_ctrl_conf->set_rank(this_machine_id);
for (const auto& ctrl_conf : rank2ctrl_conf) {
CHECK_EQ_OR_RETURN(ctrl_conf.ctrl_addr_size(), 1);
*ret_ctrl_conf->mutable_ctrl_addr()->Add() = ctrl_conf.ctrl_addr(0);
}
for (int64_t machine_id = 1; machine_id < env_desc_.TotalMachineNum(); ++machine_id) {
std::string key = std::string("BroadcastCtrlConf") + std::to_string(machine_id);
bootstrap_client_->PushMasterKV(key, *ret_ctrl_conf);
}
} else {
std::string key = std::string("BroadcastCtrlConf") + std::to_string(this_machine_id);
bootstrap_client_->PullMasterKV(key, ret_ctrl_conf);
ret_ctrl_conf->set_rank(this_machine_id);
}
bootstrap_client_->Barrier(__FILE__ ":" OF_PP_STRINGIZE(__LINE__));
LOG(INFO) << "\n" << ret_ctrl_conf->DebugString();
return Maybe<void>::Ok();
}
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_CONTROL_CTRL_BOOTSTRAP_H_
#define ONEFLOW_CORE_CONTROL_CTRL_BOOTSTRAP_H_
#include "oneflow/core/control/ctrl_bootstrap.pb.h"
#include "oneflow/core/job/env_desc.h"
#include "oneflow/core/common/maybe.h"
namespace oneflow {
class CtrlConf;
class CtrlBootstrap {
public:
virtual ~CtrlBootstrap() {}
virtual Maybe<void> InitCtrlConf(CtrlConf* ctrl_conf) = 0;
protected:
CtrlBootstrap() = default;
};
class HostListBootstrapServer;
class HostListBootstrapClient;
class HostListCtrlBootstrap final : public CtrlBootstrap {
public:
explicit HostListCtrlBootstrap(const EnvDesc& env_desc);
~HostListCtrlBootstrap() override;
Maybe<void> InitCtrlConf(CtrlConf* ctrl_conf) override;
private:
const EnvDesc env_desc_;
// Uses shared_ptr and forward declaration to avoid `#include ...`
std::shared_ptr<HostListBootstrapServer> bootstrap_server_;
std::shared_ptr<HostListBootstrapClient> bootstrap_client_;
};
} // namespace oneflow
#endif // ONEFLOW_CORE_CONTROL_CTRL_BOOTSTRAP_H_
syntax = "proto2";
package oneflow;
message Address {
required string host = 1;
required int64 port = 2;
}
message CtrlConf {
repeated Address ctrl_addr = 1;
required int64 rank = 2;
}
......@@ -21,6 +21,14 @@ namespace oneflow {
namespace {
#define GRPC_CHECK(x) CHECK_EQ(x.error_code(), grpc::StatusCode::OK)
} // namespace oneflow
CtrlClient::~CtrlClient() {
{
std::unique_lock<std::mutex> lck(need_heartbeat_thread_stop_mtx_);
need_heartbeat_thread_stop_ = true;
}
heartbeat_thread_.join();
}
CtrlClient::CtrlClient() {
......
......@@ -23,11 +23,15 @@ namespace oneflow {
class CtrlClient final : public RpcClient {
public:
OF_DISALLOW_COPY_AND_MOVE(CtrlClient);
~CtrlClient() override = default;
~CtrlClient();
private:
friend class Global<CtrlClient>;
CtrlClient();
bool need_heartbeat_thread_stop_;
std::mutex need_heartbeat_thread_stop_mtx_;
std::thread heartbeat_thread_;
};
#define FILE_LINE_STR __FILE__ ":" OF_PP_STRINGIZE(__LINE__)
......
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/control/host_list_bootstrap_client.h"
#include "oneflow/core/job/env_desc.h"
namespace oneflow {
HostListBootstrapClient::HostListBootstrapClient(const EnvDesc& env_desc) {
stubs_.reserve(env_desc.TotalMachineNum());
int32_t port = -1;
std::string addr = "";
for (int64_t i = 0; i < env_desc.TotalMachineNum(); ++i) {
const Machine& mchn = env_desc.machine(i);
port = (mchn.ctrl_port_agent() != -1) ? (mchn.ctrl_port_agent()) : env_desc.ctrl_port();
addr = mchn.addr() + ":" + std::to_string(port);
stubs_.push_back(CtrlService::NewStub(addr));
LoadServer(mchn.addr(), stubs_[i].get());
}
}
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_CONTROL_HOST_LIST_BOOTSTRAP_CLIENT_H_
#define ONEFLOW_CORE_CONTROL_HOST_LIST_BOOTSTRAP_CLIENT_H_
#include "oneflow/core/control/bootstrap_client.h"
#include "oneflow/core/job/env_desc.h"
namespace oneflow {
class HostListBootstrapClient final : public BootstrapClient {
public:
OF_DISALLOW_COPY_AND_MOVE(HostListBootstrapClient);
~HostListBootstrapClient() override = default;
HostListBootstrapClient(const EnvDesc& env_desc);
};
} // namespace oneflow
#endif // ONEFLOW_CORE_CONTROL_HOST_LIST_BOOTSTRAP_CLIENT_H_
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/control/host_list_bootstrap_server.h"
#include "oneflow/core/actor/act_event_logger.h"
#include "oneflow/core/job/profiler.h"
#include "grpc/grpc_posix.h"
namespace oneflow {
HostListBootstrapServer::HostListBootstrapServer(const EnvDesc& env_desc)
: BootstrapServer(), is_first_connect_(true), this_machine_addr_("") {
Init();
int port = env_desc.ctrl_port();
grpc::ServerBuilder server_builder;
server_builder.SetMaxMessageSize(INT_MAX);
int bound_port = 0;
server_builder.AddListeningPort("0.0.0.0:" + std::to_string(port),
grpc::InsecureServerCredentials(), &bound_port);
grpc_service_.reset(new CtrlService::AsyncService);
server_builder.RegisterService(grpc_service_.get());
cq_ = server_builder.AddCompletionQueue();
grpc_server_ = server_builder.BuildAndStart();
CHECK_EQ(port, bound_port) << "Port " << port << " is unavailable";
LOG(INFO) << "HostListBootstrapServer listening on "
<< "0.0.0.0:" + std::to_string(port);
loop_thread_ = std::thread(&HostListBootstrapServer::HandleRpcs, this);
}
void HostListBootstrapServer::OnLoadServer(CtrlCall<CtrlMethod::kLoadServer>* call) {
if (this->is_first_connect_) {
this->this_machine_addr_ = call->request().addr();
this->is_first_connect_ = false;
} else {
CHECK_EQ(call->request().addr(), this->this_machine_addr_);
}
call->SendResponse();
EnqueueRequest<CtrlMethod::kLoadServer>();
}
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_CONTROL_HOST_LIST_BOOTSTRAP_SERVER_H_
#define ONEFLOW_CORE_CONTROL_HOST_LIST_BOOTSTRAP_SERVER_H_
#include "oneflow/core/control/bootstrap_server.h"
#include "oneflow/core/job/env_desc.h"
namespace oneflow {
class HostListBootstrapServer final : public BootstrapServer {
public:
OF_DISALLOW_COPY_AND_MOVE(HostListBootstrapServer);
~HostListBootstrapServer() override = default;
HostListBootstrapServer(const EnvDesc& env_desc);
const std::string& this_machine_addr() { return this_machine_addr_; }
private:
void OnLoadServer(CtrlCall<CtrlMethod::kLoadServer>* call) override;
bool is_first_connect_;
std::string this_machine_addr_;
};
} // namespace oneflow
#endif // ONEFLOW_CORE_CONTROL_HOST_LIST_BOOTSTRAP_SERVER_H_
......@@ -47,14 +47,6 @@ class ClientCall final {
} // namespace
RpcClient::~RpcClient() {
{
std::unique_lock<std::mutex> lck(need_heartbeat_thread_stop_mtx_);
need_heartbeat_thread_stop_ = true;
}
heartbeat_thread_.join();
}
void RpcClient::Barrier(const std::string& barrier_name) {
// TODO(hanbinbin): depend world_size of Global<CtrlConf>
Barrier(barrier_name, Global<EnvDesc>::Get()->TotalMachineNum());
......
......@@ -26,7 +26,7 @@ namespace oneflow {
class RpcClient {
public:
OF_DISALLOW_COPY_AND_MOVE(RpcClient);
virtual ~RpcClient();
virtual ~RpcClient() = default;
void Barrier(const std::string& barrier_name);
void Barrier(const std::string& barrier_name, int32_t barrier_num);
......@@ -77,10 +77,6 @@ class RpcClient {
std::vector<std::unique_ptr<CtrlService::Stub>> stubs_;
std::mutex done_names_mtx_;
HashSet<std::string> done_names_;
bool need_heartbeat_thread_stop_;
std::mutex need_heartbeat_thread_stop_mtx_;
std::thread heartbeat_thread_;
};
} // namespace oneflow
......
......@@ -24,8 +24,10 @@ namespace oneflow {
class EnvDesc final {
public:
OF_DISALLOW_COPY_AND_MOVE(EnvDesc);
explicit EnvDesc(const EnvProto& env_proto) : env_proto_(env_proto) {}
~EnvDesc() = default;
const EnvProto& env_proto() const { return env_proto_; }
size_t TotalMachineNum() const { return env_proto_.machine().size(); }
const Machine& machine(int32_t idx) const { return env_proto_.machine(idx); }
int32_t ctrl_port() const { return env_proto_.ctrl_port(); }
......@@ -33,9 +35,6 @@ class EnvDesc final {
int64_t GetMachineId(const std::string& addr) const;
private:
friend class Global<EnvDesc>;
explicit EnvDesc(const EnvProto& env_proto) : env_proto_(env_proto) {}
EnvProto env_proto_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册