message_service.cc 2.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
15
#include "paddle/fluid/distributed/fleet_executor/message_service.h"
16

17
#include "brpc/server.h"
18 19
#include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/message_bus.h"
20 21 22 23

namespace paddle {
namespace distributed {

24
void MessageServiceImpl::ReceiveInterceptorMessage(
25
    google::protobuf::RpcController* control_base,
26 27
    const InterceptorMessage* request,
    InterceptorResponse* response,
28
    google::protobuf::Closure* done) {
29
  brpc::ClosureGuard done_guard(done);
30
  VLOG(3) << "Message Service receives a message from interceptor "
31
          << request->src_id() << " to interceptor " << request->dst_id()
32
          << ", with the message: " << request->message_type();
33
  bool flag = GlobalVal<MessageBus>::Get()->DispatchMsgToCarrier(*request);
34
  response->set_rst(flag);
35 36
}

37 38
void MessageServiceImpl::IncreaseBarrierCount(
    google::protobuf::RpcController* control_base,
39 40
    const InterceptorMessage* request,
    InterceptorResponse* response,
41 42 43 44 45 46 47 48
    google::protobuf::Closure* done) {
  brpc::ClosureGuard done_guard(done);
  VLOG(3) << "Barrier Service receives a message from rank "
          << request->src_id() << " to rank " << request->dst_id();
  GlobalVal<MessageBus>::Get()->IncreaseBarrierCount();
  response->set_rst(true);
}

49 50 51
}  // namespace distributed
}  // namespace paddle
#endif