未验证 提交 81848fff 编写于 作者: Y Yuang Liu 提交者: GitHub

[fleet executor] fleet executor for npu (#40607)

上级 06fee998
......@@ -4,7 +4,7 @@ if(WITH_PYTHON)
endif()
proto_library(interceptor_message_proto SRCS interceptor_message.proto)
if(WITH_DISTRIBUTE AND WITH_PSCORE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL))
if(WITH_DISTRIBUTE AND WITH_PSCORE)
set(BRPC_DEPS brpc ssl crypto protobuf zlib leveldb snappy gflags glog)
else()
set(BRPC_DEPS "")
......
......@@ -67,8 +67,7 @@ bool MessageBus::IsInit() const { return is_init_; }
MessageBus::~MessageBus() {
VLOG(3) << "Message bus releases resource.";
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
server_.Stop(1000);
server_.Join();
#endif
......@@ -87,8 +86,7 @@ bool MessageBus::Send(int64_t dst_rank,
IsInit(), true,
platform::errors::PreconditionNotMet(
"Using message bus since it has not been initialized."));
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
int retry_time = 0; // message bus will retry sending for 10 times
while (retry_time < 10) {
++retry_time;
......@@ -173,8 +171,7 @@ void MessageBus::ListenPort() {
LOG(INFO) << "No need listen to port since training on single card.";
return;
}
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
// function keep listen the port and handle the message
PADDLE_ENFORCE_EQ(
server_.AddService(&message_service_, brpc::SERVER_DOESNT_OWN_SERVICE), 0,
......@@ -203,8 +200,7 @@ void MessageBus::ListenPort() {
#endif
}
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
bool MessageBus::SendInterRank(int64_t dst_rank,
const InterceptorMessage& interceptor_message) {
const auto& dst_addr = GetAddr(dst_rank);
......
......@@ -20,8 +20,7 @@
#include <thread>
#include <unordered_map>
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
#include "brpc/channel.h"
#include "brpc/server.h"
#include "paddle/fluid/distributed/fleet_executor/message_service.h"
......@@ -64,8 +63,7 @@ class MessageBus final {
const std::string& GetAddr(int64_t rank) const;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
// send the message inter rank (dst is different rank with src)
bool SendInterRank(int64_t dst_rank,
const InterceptorMessage& interceptor_message);
......@@ -81,8 +79,7 @@ class MessageBus final {
// the ip needs to be listened
std::string addr_;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
MessageServiceImpl message_service_;
// brpc server
brpc::Server server_;
......
......@@ -11,8 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
#include "paddle/fluid/distributed/fleet_executor/message_service.h"
#include "brpc/server.h"
#include "paddle/fluid/distributed/fleet_executor/global.h"
......
......@@ -11,8 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
#pragma once
#include "brpc/server.h"
......
......@@ -50,8 +50,7 @@
#include "paddle/phi/api/ext/op_meta_info.h"
#include "paddle/utils/string/split.h"
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
......@@ -374,8 +373,7 @@ static void DisablePrepareDataOpt(
}
bool AnalysisPredictor::PrepareExecutor() {
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
if (config_.dist_config().use_dist_model()) {
VLOG(3) << "use_dist_model is enabled, will init FleetExecutor.";
return PrepareFleetExecutor();
......@@ -393,8 +391,7 @@ bool AnalysisPredictor::PrepareExecutor() {
return true;
}
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
bool AnalysisPredictor::PrepareFleetExecutor() {
VLOG(3) << "AnalysisPredictor::PrepareFleetExecutor()";
if (config_.dist_config().nranks() > 1 && !CommInit()) {
......@@ -1194,8 +1191,7 @@ std::vector<std::string> AnalysisPredictor::GetOutputNames() {
std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetInputTensor(
const std::string &name) {
framework::Scope *scope;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
if (config_.dist_config().use_dist_model()) {
scope = scope_.get();
} else {
......@@ -1244,8 +1240,7 @@ std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetInputTensor(
std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetOutputTensor(
const std::string &name) {
framework::Scope *scope;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
if (config_.dist_config().use_dist_model()) {
scope = scope_.get();
} else {
......@@ -1292,8 +1287,7 @@ std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetOutputTensor(
}
bool AnalysisPredictor::ZeroCopyRun() {
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
if (config_.dist_config().use_dist_model()) {
VLOG(3) << "ZeroCopyRun will use the fleet executor.";
inference::Timer timer;
......
......@@ -18,8 +18,7 @@
#include <memory>
#include <string>
#include <vector>
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#endif
#include "paddle/fluid/framework/naive_executor.h"
......@@ -395,8 +394,7 @@ class AnalysisPredictor : public PaddlePredictor {
void StatisticShapeRangeInfo();
void CollectShapeRangeInfo();
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
// fleet exe related
///
......@@ -488,8 +486,7 @@ class AnalysisPredictor : public PaddlePredictor {
std::map<std::string, std::vector<std::vector<int32_t>>> shape_info_;
static int clone_num_;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
// fleet executor related
distributed::FleetExecutorDesc executor_desc_;
std::shared_ptr<distributed::FleetExecutor> fleet_exe_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册