reducer.h 4.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <map>
#include <vector>
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
20 21
#include "paddle/fluid/eager/accumulation/accumulation_node.h"
#include "paddle/fluid/eager/api/utils/hook_utils.h"
22
#include "paddle/fluid/eager/api/utils/tensor_utils.h"
23 24 25 26 27 28 29 30
#include "paddle/fluid/eager/autograd_meta.h"
#include "paddle/fluid/eager/utils.h"
#include "paddle/fluid/operators/math/concat_and_split.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/api/include/tensor.h"
#include "paddle/phi/api/lib/ext_compat_utils.h"
#include "paddle/phi/common/data_type.h"
31 32 33 34

namespace paddle {
namespace distributed {
using Tensor = paddle::experimental::Tensor;
35 36 37
using Scalar = paddle::experimental::ScalarBase<paddle::experimental::Tensor>;
using ScalarArray =
    paddle::experimental::ScalarArrayBase<paddle::experimental::Tensor>;
38 39

std::vector<std::vector<size_t>> Eager_AssignGroupBySize(
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
    const std::vector<Tensor>, const std::vector<bool> &is_sparse_gradient,
    const std::vector<size_t> &group_size_limits,
    const std::vector<int64_t> &tensor_indices = {});

class EagerGroup {
 public:
  Tensor dense_contents_;

  // for concat kernel
  std::vector<phi::DenseTensor> dense_tensors_;
  std::vector<int64_t> length_;
  int64_t all_length_{0};
  std::vector<ScalarArray> origin_shapes_;

  // Global indices of participating tensors in the group
  std::vector<size_t> tensor_indices_;

  // Number of params that haven't been ready. When it is 0, it means
  // the group is ready.
  size_t pending_ = -1;

  // external message of group
  phi::DataType dtype_;

  // context is used to select the stream for concat
  void ConcatTensors(const platform::Place &);

  // context is used to select the stream for split
  void SplitTensors(const platform::Place &);

  friend std::ostream &operator<<(std::ostream &, const EagerGroup &);
};

struct TensorLocator {
  // record the index in groups_
  size_t group_index;
  size_t inside_group_index;
};

class EagerReducer {
 public:
  explicit EagerReducer(
      const std::vector<Tensor> tensors,
      const std::vector<std::vector<size_t>> &group_indices,
      const std::vector<bool> &is_sparse_gradient,
      std::shared_ptr<distributed::ProcessGroup> process_group,
      const std::vector<size_t> &group_size_limits,
      bool find_unused_parameters);

  virtual ~EagerReducer() {}

  std::shared_ptr<egr::GradNodeBase> GetGradNodeFromTensor(Tensor *tensor);

  void InitializeGroups(const std::vector<std::vector<size_t>> &group_indices);
  void InitializeDenseGroups(const std::vector<size_t> &tensor_indices_,
                             EagerGroup *p_group);
  void PrepareForBackward(const std::vector<Tensor> &outputs);
  void AddDistHook(size_t var_index);
  void MarkVarReady(const size_t var_index, const bool is_used_var);
  void MarkGroupReady(const size_t group_index);
  void FusedAllReduceSchedule(EagerGroup *group, const int curr_group_index);

 private:
  std::vector<Tensor> tensors_;
  std::vector<std::vector<size_t>> group_indices_;
  std::vector<bool> is_sparse_gradient_;
  std::shared_ptr<distributed::ProcessGroup> process_group_;
  std::vector<size_t> group_size_limits_;
  bool find_unused_vars_each_step_;

  std::vector<EagerGroup> groups_;
  std::vector<TensorLocator> variable_locators_;
  PlaceType place_;
  platform::Place inner_place_;
  size_t next_group_ = 0;
  int64_t nranks_ = -1;
  std::vector<std::shared_ptr<paddle::distributed::ProcessGroup::Task>> tasks_;

  bool grad_need_hooks_{false};

  std::vector<bool> vars_marked_ready_;
  std::vector<int> local_used_vars_;
};
123 124 125

}  //  namespace distributed
}  //  namespace paddle