From a52357fe2837307b9cf15e2ca45d501e987341d0 Mon Sep 17 00:00:00 2001 From: Yulong Ao Date: Mon, 15 Aug 2022 11:34:30 +0800 Subject: [PATCH] [Auto Parallel] Move the distributed info from python to c++ (#44510) * [Auto Parallel] Move the distributed info from python to c++ * [Auto Parallel] Add dist_attrs for VarDesc and OpDesc * [Auto Parallel] Add the lost file * [Auto Parallel] Make the dist attr be unique_ptr * [Auto Parallel] Add the proto conversion * [Auto Parallel] Improve the proto support * [Auto Parallel] Fix the bugs for adding a device or a link * [Auto Parallel] Add the C++ ProcessMesh and DistributedMapper * [Auto Parallel] Improve the impl of these dist attrs * [Auto Parallel] Pybind11 ProcessMesh and DeviceMesh * [Auto Parallel] Fix the unittest problem * [Auto Parallel] Explicitly add the src file for auto_parallel target * [Auto Parallel] Add the proto depedency explicitly * [Auto Parallel] Fix the cmake bug on windows and mac * [Auto Parallel] Remove the pybind11 header file in process_mesh.h * [Auto Parallel] Remove unused codes * [Auto Parallel] Check whether the dist attr is null * [Auto Parallel] Implement the assign operator for OpDesc explicitly --- paddle/fluid/framework/CMakeLists.txt | 10 +- paddle/fluid/framework/framework.proto | 7 - paddle/fluid/framework/op_desc.cc | 33 ++- paddle/fluid/framework/op_desc.h | 35 ++-- paddle/fluid/framework/process_mesh_desc.cc | 61 ------ paddle/fluid/framework/process_mesh_desc.h | 67 ------ paddle/fluid/framework/var_desc.cc | 25 +++ paddle/fluid/framework/var_desc.h | 42 ++-- paddle/fluid/pybind/auto_parallel_py.cc | 65 ++++++ paddle/fluid/pybind/protobuf.cc | 25 ++- paddle/fluid/pybind/protobuf.h | 1 - paddle/fluid/pybind/pybind.cc | 1 - python/paddle/fluid/framework.py | 52 ++--- .../unittests/auto_parallel/CMakeLists.txt | 1 + .../auto_parallel/test_dist_attr_v2.py | 190 ++++++++++++++++++ 15 files changed, 388 insertions(+), 227 deletions(-) delete mode 100644 paddle/fluid/framework/process_mesh_desc.cc delete mode 100644 paddle/fluid/framework/process_mesh_desc.h create mode 100644 python/paddle/fluid/tests/unittests/auto_parallel/test_dist_attr_v2.py diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index ebee8603f4d..49b98cf6b51 100755 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -502,8 +502,14 @@ cc_test( cc_library( proto_desc - SRCS var_desc.cc op_desc.cc block_desc.cc program_desc.cc process_mesh_desc.cc - DEPS attribute shape_inference op_info operator glog version) + SRCS var_desc.cc op_desc.cc block_desc.cc program_desc.cc + DEPS attribute + shape_inference + op_info + operator + glog + version + dist_attr) if(WITH_CRYPTO) add_dependencies(proto_desc cryptopp) diff --git a/paddle/fluid/framework/framework.proto b/paddle/fluid/framework/framework.proto index 391197d967a..b58b643cdff 100644 --- a/paddle/fluid/framework/framework.proto +++ b/paddle/fluid/framework/framework.proto @@ -40,13 +40,6 @@ enum AttrType { VARS = 14; } -message ProcessMeshDesc { - required int32 id = 1; - required int32 parent_id = 2; - repeated int32 topology = 3; - repeated int32 process_group = 4; -}; - // OpDesc describes an instance of a C++ framework::OperatorBase // derived class type. message OpDesc { diff --git a/paddle/fluid/framework/op_desc.cc b/paddle/fluid/framework/op_desc.cc index 4ae4f88118f..e5d8f6f9f0e 100644 --- a/paddle/fluid/framework/op_desc.cc +++ b/paddle/fluid/framework/op_desc.cc @@ -421,6 +421,12 @@ OpDesc::OpDesc(const std::string &type, block_ = nullptr; } +OpDesc::OpDesc(const OpDesc &other) { + CopyFrom(other); + block_ = other.block_; + need_update_ = true; +} + OpDesc::OpDesc(const OpDesc &other, BlockDesc *block) { CopyFrom(other); block_ = block; @@ -435,8 +441,10 @@ void OpDesc::CopyFrom(const OpDesc &op_desc) { inputs_ = op_desc.inputs_; outputs_ = op_desc.outputs_; attrs_ = op_desc.attrs_; - // The record of original_id_ is only for auto parallel. original_id_ = op_desc.original_id_; + if (op_desc.dist_attr_) { + dist_attr_.reset(new OperatorDistAttr(*op_desc.dist_attr_)); + } need_update_ = true; } @@ -481,6 +489,15 @@ OpDesc::OpDesc(const proto::OpDesc &desc, BlockDesc *block) this->block_ = block; } +// Explicitly implement the assign operator, Since the added +// unique_ptr data member does not have the implicit assign operator. +OpDesc &OpDesc::operator=(const OpDesc &other) { + CopyFrom(other); + block_ = other.block_; + need_update_ = true; + return *this; +} + proto::OpDesc *OpDesc::Proto() { Flush(); return &desc_; @@ -985,6 +1002,20 @@ void OpDesc::InferVarType(BlockDesc *block) const { } } +OperatorDistAttr *OpDesc::MutableDistAttr() { + if (dist_attr_) { + return dist_attr_.get(); + } else { + dist_attr_.reset(new OperatorDistAttr(*this)); + return dist_attr_.get(); + } +} + +void OpDesc::SetDistAttr(const OperatorDistAttr &dist_attr) { + MutableDistAttr(); + *dist_attr_ = dist_attr; +} + void OpDesc::UpdateVarAttr(const std::string &name, const Attribute &attr) { auto attr_type = static_cast(attr.index() - 1); auto type = GetAttrType(name, true); diff --git a/paddle/fluid/framework/op_desc.h b/paddle/fluid/framework/op_desc.h index a1f264a849d..7b0d7c587e7 100644 --- a/paddle/fluid/framework/op_desc.h +++ b/paddle/fluid/framework/op_desc.h @@ -15,11 +15,13 @@ limitations under the License. */ #pragma once #include +#include #include #include #include #include +#include "paddle/fluid/distributed/auto_parallel/dist_attr.h" #include "paddle/fluid/framework/attribute.h" #include "paddle/fluid/framework/type_defs.h" #include "paddle/fluid/framework/var_desc.h" @@ -31,6 +33,8 @@ class VarDesc; class BlockDesc; class ProgramDesc; +using paddle::distributed::auto_parallel::OperatorDistAttr; + class OpDesc { public: OpDesc() {} @@ -40,12 +44,16 @@ class OpDesc { const VariableNameMap &outputs, const AttributeMap &attrs); + OpDesc(const OpDesc &desc); + OpDesc(const proto::OpDesc &desc, BlockDesc *block); explicit OpDesc(BlockDesc *block) : block_(block) {} OpDesc(const OpDesc &other, BlockDesc *block); + OpDesc &operator=(const OpDesc &other); + void CopyFrom(const OpDesc &op_desc); proto::OpDesc *Proto(); @@ -169,12 +177,14 @@ class OpDesc { void UpdateVarAttr(const std::string &name, const Attribute &attr); - // The Id() and OrignalId() are only used for auto parallel. + bool NeedUpdate() const { return need_update_; } + + // The following methods are only used for auto parallel. uint64_t Id() const { return id_; } uint64_t OriginalId() const { return original_id_; } void SetOriginalId(uint64_t original_id) { original_id_ = original_id; } - - bool NeedUpdate() const { return need_update_; } + OperatorDistAttr *MutableDistAttr(); + void SetDistAttr(const OperatorDistAttr &dist_attr); private: friend class ProgramDesc; @@ -193,13 +203,6 @@ class OpDesc { return ret_val; } - // This thread-safe implementation seems to be redudent since the neural - // networks are usually constructed in a single thread - static uint64_t GenerateId() { - static std::atomic uid{0}; - // Must start from one - return ++uid; - } // it it really needed? or just mantain a ptr from block? proto::OpDesc desc_; BlockDesc *block_{nullptr}; // not_own @@ -214,13 +217,15 @@ class OpDesc { // local changes should be synchronized, need_update_ should be set to true. bool need_update_{false}; - // Note: the id_ is unique (only for auto parallel). + // Note: the following members are only used for auto_parallel for now. + static uint64_t GenerateId() { + static std::atomic uid{0}; + // Must start from one + return ++uid; + } uint64_t id_ = GenerateId(); - // Note: the orignal_id_ is used for referring to the original OpDesc - // that the current OpDesc is built from (only for auto parallel). - // The default original_id_ is same as the id_, which means the - // current OpDesc is not built from the other one. uint64_t original_id_ = id_; + std::unique_ptr dist_attr_; }; std::vector AttrVarNames(const Attribute &attr); diff --git a/paddle/fluid/framework/process_mesh_desc.cc b/paddle/fluid/framework/process_mesh_desc.cc deleted file mode 100644 index 207e10fc194..00000000000 --- a/paddle/fluid/framework/process_mesh_desc.cc +++ /dev/null @@ -1,61 +0,0 @@ -/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/fluid/framework/process_mesh_desc.h" - -namespace paddle { -namespace framework { - -int32_t ProcessMeshDesc::next_id = -1; - -ProcessMeshDesc::ProcessMeshDesc(const std::vector &topo, - const std::vector &process_group, - int32_t parent_id) { - int32_t cur_id = ++next_id; - desc_.set_id(cur_id); - desc_.set_parent_id(parent_id); - for (size_t i = 0; i != topo.size(); ++i) { - desc_.add_topology(topo[i]); - } - for (size_t i = 0; i != process_group.size(); ++i) { - desc_.add_process_group(process_group[i]); - } - ProcessMeshDescMap::GetInstance().Insert(cur_id, this); -} - -std::vector ProcessMeshDesc::Topology() const { - size_t size = desc_.topology_size(); - std::vector ret(size); - for (auto i = 0; i != desc_.topology_size(); ++i) { - ret[i] = desc_.topology(i); - } - return ret; -} - -std::vector ProcessMeshDesc::ProcessGroup() const { - size_t size = desc_.process_group_size(); - std::vector ret(size); - for (auto i = 0; i != desc_.process_group_size(); ++i) { - ret[i] = desc_.process_group(i); - } - return ret; -} - -ProcessMeshDescMap &ProcessMeshDescMap::GetInstance() { - static ProcessMeshDescMap g_process_mesh_desc_map; - return g_process_mesh_desc_map; -} - -} // namespace framework -} // namespace paddle diff --git a/paddle/fluid/framework/process_mesh_desc.h b/paddle/fluid/framework/process_mesh_desc.h deleted file mode 100644 index ee0646c921e..00000000000 --- a/paddle/fluid/framework/process_mesh_desc.h +++ /dev/null @@ -1,67 +0,0 @@ -/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include -#include - -#include "paddle/fluid/framework/framework.pb.h" -#include "paddle/fluid/framework/proto_desc.h" -#include "paddle/fluid/platform/enforce.h" -#include "paddle/fluid/platform/macros.h" - -namespace paddle { -namespace framework { - -class ProcessMeshDesc { - public: - ProcessMeshDesc(const std::vector& topo, - const std::vector& process_group, - int32_t parent_id); - - int32_t ID() const { return desc_.id(); } - int32_t Parent() const { return desc_.parent_id(); } - - std::vector Topology() const; - std::vector ProcessGroup() const; - - static int32_t next_id; - - private: - proto::ProcessMeshDesc desc_; // not_own -}; - -class ProcessMeshDescMap { - public: - static ProcessMeshDescMap& GetInstance(); - - bool Has(int32_t index) const { return map_.find(index) != map_.end(); } - - void Insert(int32_t index, ProcessMeshDesc* mesh) { - PADDLE_ENFORCE_NE( - Has(index), - true, - platform::errors::AlreadyExists("Index (%d) has been used.", index)); - map_.insert(std::make_pair(index, mesh)); - } - - private: - ProcessMeshDescMap() = default; - // Use raw pointer to avoid double free - std::unordered_map map_; - DISABLE_COPY_AND_ASSIGN(ProcessMeshDescMap); -}; -} // namespace framework -} // namespace paddle diff --git a/paddle/fluid/framework/var_desc.cc b/paddle/fluid/framework/var_desc.cc index b24be4e8194..d015d91b45c 100644 --- a/paddle/fluid/framework/var_desc.cc +++ b/paddle/fluid/framework/var_desc.cc @@ -21,6 +21,15 @@ limitations under the License. */ namespace paddle { namespace framework { +VarDesc::VarDesc(const VarDesc &other) + : desc_(other.desc_), + attrs_(other.attrs_), + original_id_(other.original_id_) { + if (other.dist_attr_) { + dist_attr_.reset(new TensorDistAttr(*other.dist_attr_)); + } +} + proto::VarType::Type VarDesc::GetType() const { return desc_.type().type(); } void VarDesc::SetType(proto::VarType::Type type) { @@ -354,6 +363,22 @@ Attribute VarDesc::GetAttr(const std::string &name) const { return it->second; } +TensorDistAttr *VarDesc::MutableDistAttr() { + // If dist_attr_ is nullptr, construct a new one and return. + if (dist_attr_) { + return dist_attr_.get(); + } else { + dist_attr_.reset(new TensorDistAttr(*this)); + return dist_attr_.get(); + } +} + +void VarDesc::SetDistAttr(const TensorDistAttr &dist_attr) { + // Make sure this dist attr be created + MutableDistAttr(); + *dist_attr_ = dist_attr; +} + bool operator==(const VarDesc &left, const VarDesc &right) { return left.Proto()->SerializeAsString() == right.Proto()->SerializeAsString(); diff --git a/paddle/fluid/framework/var_desc.h b/paddle/fluid/framework/var_desc.h index 1072657b4af..63cbcb93420 100644 --- a/paddle/fluid/framework/var_desc.h +++ b/paddle/fluid/framework/var_desc.h @@ -20,6 +20,7 @@ limitations under the License. */ #include #include "glog/logging.h" +#include "paddle/fluid/distributed/auto_parallel/dist_attr.h" #include "paddle/fluid/framework/attribute.h" #include "paddle/fluid/framework/framework.pb.h" #include "paddle/fluid/framework/type_defs.h" @@ -27,6 +28,8 @@ limitations under the License. */ namespace paddle { namespace framework { +using paddle::distributed::auto_parallel::TensorDistAttr; + // convert between std::vector and protobuf repeated. template inline std::vector RepeatedToVector( @@ -73,21 +76,22 @@ class VarDesc { } // Explicitly implement the copy constructor for auto parallel - VarDesc(const VarDesc &other) - : desc_(other.desc_), - attrs_(other.attrs_), - original_id_(other.original_id_) {} + VarDesc(const VarDesc &other); + VarDesc &operator=(const VarDesc &other) { desc_ = other.desc_; attrs_ = other.attrs_; original_id_ = other.original_id_; + if (other.dist_attr_) { + dist_attr_.reset(new TensorDistAttr(*other.dist_attr_)); + } need_updated_ = true; return *this; } proto::VarDesc *Proto() { - return &desc_; need_updated_ = true; + return &desc_; } const proto::VarDesc *Proto() const { return &desc_; } @@ -187,16 +191,18 @@ class VarDesc { Attribute GetAttr(const std::string &name) const; - // The Id() and OriginalId() are only used for auto parallel. + bool NeedUpdate() const { return need_updated_; } + void SetNeedUpdate(bool need) { need_updated_ = need; } + + // The following methods are only used for auto parallel. uint64_t Id() const { return id_; } uint64_t OriginalId() const { return original_id_; } void SetOriginalId(uint64_t original_id) { original_id_ = original_id; need_updated_ = true; } - - bool NeedUpdate() const { return need_updated_; } - void SetNeedUpdate(bool need) { need_updated_ = need; } + TensorDistAttr *MutableDistAttr(); + void SetDistAttr(const TensorDistAttr &dist_attr); private: const proto::VarType::TensorDesc &tensor_desc() const; @@ -204,26 +210,20 @@ class VarDesc { proto::VarType::TensorDesc *mutable_tensor_desc(); std::vector mutable_tensor_descs(); - // This thread-safe implementation seems to be redudent since the neural - // networks are usually constructed in a single thread. - static uint64_t GenerateId() { - static std::atomic uid{0}; - return ++uid; - } - // it it really needed? or just mantain a ptr from block? proto::VarDesc desc_; AttributeMap attrs_; bool need_updated_{false}; - // Note: the id_ is unique for all VarDesc (only for auto parallel). + // Note: the following members are only used for auto parallel for now + static uint64_t GenerateId() { + static std::atomic uid{0}; + return ++uid; + } uint64_t id_ = GenerateId(); - // Note: the orignal_id_ is used for referring to the original VarDesc - // that the current VarDesc is built from (only for auto parallel). - // The default original_id_ is same as the id_, which means the - // current VarDesc is not built from the other one. uint64_t original_id_ = id_; + std::unique_ptr dist_attr_; }; bool operator==(const VarDesc &left, const VarDesc &right); diff --git a/paddle/fluid/pybind/auto_parallel_py.cc b/paddle/fluid/pybind/auto_parallel_py.cc index 46ade96f4d1..12cd79b264d 100644 --- a/paddle/fluid/pybind/auto_parallel_py.cc +++ b/paddle/fluid/pybind/auto_parallel_py.cc @@ -35,7 +35,11 @@ using paddle::distributed::auto_parallel::DistributedMapper; using paddle::distributed::auto_parallel::Link; using paddle::distributed::auto_parallel::LinkCapability; using paddle::distributed::auto_parallel::Machine; +using paddle::distributed::auto_parallel::OperatorDistAttr; using paddle::distributed::auto_parallel::ProcessMesh; +using paddle::distributed::auto_parallel::TensorDistAttr; +using paddle::framework::OpDesc; +using paddle::framework::VarDesc; void BindAutoParallel(py::module *m) { py::class_(*m, "ProcessMesh") @@ -162,6 +166,67 @@ void BindAutoParallel(py::module *m) { .def(py::self == py::self) .def(py::self != py::self) .def("__str__", &DeviceMesh::to_string); + + py::class_(*m, "TensorDistAttr") + .def(py::init()) + .def_property_readonly("tensor", &TensorDistAttr::tensor) + .def_property("process_mesh", + &TensorDistAttr::process_mesh, + &TensorDistAttr::set_process_mesh) + .def_property("dims_mapping", + &TensorDistAttr::dims_mapping, + &TensorDistAttr::set_dims_mapping) + .def_property("batch_dim", + &TensorDistAttr::batch_dim, + &TensorDistAttr::set_batch_dim) + .def_property("dynamic_dims", + &TensorDistAttr::dynamic_dims, + &TensorDistAttr::set_dynamic_dims) + .def("is_annotated", &TensorDistAttr::is_annotated) + .def("annotate", &TensorDistAttr::annotate) + .def("verify", &TensorDistAttr::verify) + .def(py::self == py::self) + .def(py::self != py::self) + .def("__str__", &TensorDistAttr::to_string); + + py::class_(*m, "OperatorDistAttr") + .def(py::init()) + .def_property_readonly("op", &OperatorDistAttr::op) + .def_property("process_mesh", + &OperatorDistAttr::process_mesh, + &OperatorDistAttr::set_process_mesh) + .def_property("impl_type", + &OperatorDistAttr::impl_type, + &OperatorDistAttr::set_impl_type) + .def_property("impl_idx", + &OperatorDistAttr::impl_idx, + &OperatorDistAttr::set_impl_idx) + .def("input", &OperatorDistAttr::input) + .def("output", &OperatorDistAttr::output) + .def("input_dist_attrs", + &OperatorDistAttr::input_dist_attrs, + py::return_value_policy::reference) + .def("output_dist_attrs", + &OperatorDistAttr::output_dist_attrs, + py::return_value_policy::reference) + .def("input_dist_attr", + static_cast( + &OperatorDistAttr::input_dist_attr), + py::return_value_policy::reference) + .def("output_dist_attr", + static_cast( + &OperatorDistAttr::output_dist_attr), + py::return_value_policy::reference) + .def("set_input_dist_attr", &OperatorDistAttr::set_input_dist_attr) + .def("set_output_dist_attr", &OperatorDistAttr::set_output_dist_attr) + .def("is_annotated", &OperatorDistAttr::is_annotated) + .def("annotate", &OperatorDistAttr::annotate) + .def("verify", &OperatorDistAttr::verify) + .def(py::self == py::self) + .def(py::self != py::self) + .def("__str__", &OperatorDistAttr::to_string); } } // namespace pybind diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index 74debcf8882..cb7b44afcbc 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -18,10 +18,10 @@ limitations under the License. */ #include #include +#include "paddle/fluid/distributed/auto_parallel/dist_attr.h" #include "paddle/fluid/framework/block_desc.h" #include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/framework/op_desc.h" -#include "paddle/fluid/framework/process_mesh_desc.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/var_desc.h" #include "paddle/fluid/framework/version.h" @@ -39,6 +39,9 @@ PyTypeObject *g_blockdesc_pytype = nullptr; namespace pd = paddle::framework; namespace jit = paddle::jit; +using paddle::distributed::auto_parallel::OperatorDistAttr; +using paddle::distributed::auto_parallel::TensorDistAttr; + template static pybind11::bytes SerializeMessage( T &self) { // NOLINT due to pybind11 convention. @@ -113,18 +116,6 @@ void BindProgramDesc(pybind11::module *m) { }); } -void BindProcessMeshDesc(pybind11::module *m) { - pybind11::class_(*m, "ProcessMeshDesc", "") - .def(pybind11::init &, - const std::vector &, - int32_t>()) - .def_property_readonly("id", &pd::ProcessMeshDesc::ID) - .def_property_readonly("parent", &pd::ProcessMeshDesc::Parent) - .def_property_readonly("topology", &pd::ProcessMeshDesc::Topology) - .def_property_readonly("process_group", - &pd::ProcessMeshDesc::ProcessGroup); -} - void BindBlockDesc(pybind11::module *m) { pybind11::class_ blockdesc(*m, "BlockDesc", ""); g_blockdesc_pytype = (PyTypeObject *)blockdesc.ptr(); // NOLINT @@ -252,6 +243,10 @@ void BindVarDsec(pybind11::module *m) { .def("id", &pd::VarDesc::Id) .def("original_id", &pd::VarDesc::OriginalId) .def("set_original_id", &pd::VarDesc::SetOriginalId) + .def_property("dist_attr", + &pd::VarDesc::MutableDistAttr, + &pd::VarDesc::SetDistAttr, + pybind11::return_value_policy::reference) .def("attr", &pd::VarDesc::GetAttr); pybind11::enum_ vartype(var_desc, "VarType", ""); @@ -398,6 +393,10 @@ void BindOpDesc(pybind11::module *m) { .def("id", &pd::OpDesc::Id) .def("original_id", &pd::OpDesc::OriginalId) .def("set_original_id", &pd::OpDesc::SetOriginalId) + .def_property("dist_attr", + &pd::OpDesc::MutableDistAttr, + &pd::OpDesc::SetDistAttr, + pybind11::return_value_policy::reference) .def("inputs", [](pd::OpDesc &self) { return self.Inputs(); }) .def("outputs", &pd::OpDesc::Outputs); } diff --git a/paddle/fluid/pybind/protobuf.h b/paddle/fluid/pybind/protobuf.h index 9980d0187b5..93064de7d92 100644 --- a/paddle/fluid/pybind/protobuf.h +++ b/paddle/fluid/pybind/protobuf.h @@ -34,7 +34,6 @@ void BindProgramDesc(pybind11::module* m); void BindBlockDesc(pybind11::module* m); void BindVarDsec(pybind11::module* m); void BindOpDesc(pybind11::module* m); -void BindProcessMeshDesc(pybind11::module* m); void BindJitProperty(pybind11::module* m); } // namespace pybind diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index bd62d649cc7..367ae4c3a88 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1699,7 +1699,6 @@ All parameter, weight, gradient are variables in Paddle. BindCostModel(&m); BindConstValue(&m); BindGlobalValueGetterSetter(&m); - BindProcessMeshDesc(&m); BindFleetExecutor(&m); BindTCPStore(&m); BindAutoParallel(&m); diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 8db5a4353ae..edf68762328 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -2531,31 +2531,18 @@ class Variable(object): return self.desc.attr(name) @property - def process_mesh(self): + def dist_attr(self): """ - Get the process mesh belonging to this Variable. + Get distributed attribute of this Variable. """ - from paddle.distributed.auto_parallel.interface import _g_process_mesh_map - from paddle.distributed.auto_parallel.interface import ProcessMesh - mesh_attr_name = 'mesh_id' + core.kAutoParallelSuffix() - mesh_id = self.desc.attr(mesh_attr_name) - return _g_process_mesh_map[mesh_id] + return self.desc.dist_attr - @property - def shard_mask(self): - """ - Get shard_mask belonging to this Variable. - """ - mask_attr_name = 'mask' + core.kAutoParallelSuffix() - return self.desc.attr(mask_attr_name) - - @property - def offload_device(self): + @dist_attr.setter + def dist_attr(self, dist_attr): """ - Get the offload device of this Variable. + Set distributed attribute of this Variable. """ - offload_attr_name = 'offload_device' + core.kAutoParallelSuffix() - return self.desc.attr(offload_attr_name) + self.desc.dist_attr = dist_attr def get_all_op_protos(): @@ -3298,29 +3285,18 @@ class Operator(object): return False @property - def process_mesh(self): + def dist_attr(self): """ - Get the process mesh belonging to this Operator. + Get distributed attribute of this Variable. """ - from paddle.distributed.auto_parallel.interface import _g_process_mesh_map - mesh_attr_name = 'mesh_id' + core.kAutoParallelSuffix() - mesh_id = self.attr(mesh_attr_name) - return _g_process_mesh_map[mesh_id] + return self.desc.dist_attr - def dims_mapping(self, name): - """ - Get the dims_mapping for the op's var named `name`. - """ - dims_mapping_attr_name = name + core.kAutoParallelSuffix() - return self.attr(dims_mapping_attr_name) - - @property - def pipeline_stage(self): + @dist_attr.setter + def dist_attr(self, dist_attr): """ - Get pipeline stage of the Operator. + Set distributed attribute of this Variable. """ - pipeline_stage_attr_name = 'pipeline_stage' + core.kAutoParallelSuffix() - return self.desc.attr(pipeline_stage_attr_name) + self.desc.dist_attr = dist_attr class Block(object): diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt index eaae2d42655..b4998ffebe7 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt @@ -63,4 +63,5 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_dist_op_cost MODULES test_dist_op_cost ENVS ${dist_ENVS}) py_test_modules(test_cluster_v2 MODULES test_cluster_v2) py_test_modules(test_process_mesh_v2 MODULES test_process_mesh_v2) + py_test_modules(test_dist_attr_v2 MODULES test_dist_attr_v2) endif() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_dist_attr_v2.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_dist_attr_v2.py new file mode 100644 index 00000000000..75b73d06122 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_dist_attr_v2.py @@ -0,0 +1,190 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +import unittest +import paddle +import numpy as np +import paddle.nn as nn +import paddle.static as static +from paddle.fluid.core import TensorDistAttr +from paddle.fluid.core import OperatorDistAttr + +from paddle.distributed.auto_parallel.process_mesh_v2 import ProcessMesh + +paddle.enable_static() + + +class TestDistAttr(unittest.TestCase): + + def test_tensor_dist_attr_ctor(self): + train_program = static.Program() + start_program = static.Program() + with static.program_guard(train_program, start_program): + input = static.data(name="input", shape=[2, 3], dtype='float32') + dist_attr = TensorDistAttr(input.desc) + self.assertEqual(dist_attr.process_mesh.empty(), True) + self.assertEqual(dist_attr.dims_mapping, [-1, -1]) + self.assertEqual(dist_attr.batch_dim, 0) + self.assertEqual(dist_attr.dynamic_dims, [0, 0]) + + dist_attr.process_mesh = ProcessMesh([[0, 1, 2], [3, 4, 5]]) + dist_attr.dims_mapping = [0, -1] + dist_attr.batch_dim = 1 + dist_attr.dynamic_dims = [1, 1] + self.assertEqual(dist_attr.process_mesh, + ProcessMesh([[0, 1, 2], [3, 4, 5]])) + self.assertEqual(dist_attr.dims_mapping, [0, -1]) + self.assertEqual(dist_attr.batch_dim, 1) + self.assertEqual(dist_attr.dynamic_dims, [1, 1]) + self.assertTrue(dist_attr.verify()) + self.assertTrue(str(dist_attr), str(dist_attr)) + + def test_tensor_dist_attr(self): + train_program = static.Program() + start_program = static.Program() + with static.program_guard(train_program, start_program): + input = static.data(name="input", shape=[2, 3], dtype='float32') + input1 = static.data(name="input1", shape=[2, 3], dtype='float32') + dist_attr = input.dist_attr + dist_attr.process_mesh = ProcessMesh([[0, 1, 2], [3, 4, 5]]) + dist_attr.dims_mapping = [0, -1] + dist_attr.batch_dim = 1 + dist_attr.dynamic_dims = [1, 1] + self.assertEqual(input.dist_attr.process_mesh, + ProcessMesh([[0, 1, 2], [3, 4, 5]])) + self.assertEqual(input.dist_attr.dims_mapping, [0, -1]) + self.assertEqual(input.dist_attr.batch_dim, 1) + self.assertEqual(input.dist_attr.dynamic_dims, [1, 1]) + self.assertTrue(input.dist_attr.verify()) + + input1.dist_attr = dist_attr + self.assertEqual(input1.dist_attr.process_mesh, + ProcessMesh([[0, 1, 2], [3, 4, 5]])) + self.assertEqual(input1.dist_attr.dims_mapping, [0, -1]) + self.assertEqual(input1.dist_attr.batch_dim, 1) + self.assertEqual(input1.dist_attr.dynamic_dims, [1, 1]) + self.assertTrue(input1.dist_attr.verify()) + + def test_operator_dist_attr_ctor(self): + train_program = static.Program() + start_program = static.Program() + with static.program_guard(train_program, start_program): + input = static.data(name="input", shape=[2, 3], dtype='float32') + input1 = static.data(name="input1", shape=[3, 4], dtype='float32') + output = paddle.matmul(input, input1) + op = train_program.current_block().ops[0] + process_mesh = ProcessMesh([[0, 1, 2], [3, 4, 5]]) + op_dist_attr = OperatorDistAttr(op.desc) + + op_dist_attr.process_mesh = process_mesh + # Set the distributed attribute of input + input_dist_attr = TensorDistAttr(input.desc) + input_dist_attr.dims_mapping = [0, -1] + op_dist_attr.set_input_dist_attr(input.name, input_dist_attr) + # Set the distributed attribute of input1 + input1_dist_attr = TensorDistAttr(input1.desc) + input1_dist_attr.dims_mapping = [-1, 1] + op_dist_attr.set_input_dist_attr(input1.name, input1_dist_attr) + # Set the distributed attribute of output + output_dist_attr = TensorDistAttr(output.desc) + output_dist_attr.dims_mapping = [0, 1] + op_dist_attr.set_output_dist_attr(output.name, output_dist_attr) + self.assertEqual(op_dist_attr.process_mesh, process_mesh) + self.assertEqual( + op_dist_attr.input_dist_attr(input.name).process_mesh, process_mesh) + self.assertEqual( + op_dist_attr.input_dist_attr(input1.name).process_mesh, + process_mesh) + self.assertEqual( + op_dist_attr.output_dist_attr(output.name).process_mesh, + process_mesh) + self.assertEqual( + op_dist_attr.input_dist_attr(input.name).dims_mapping, [0, -1]) + self.assertEqual( + op_dist_attr.input_dist_attr(input1.name).dims_mapping, [-1, 1]) + self.assertEqual( + op_dist_attr.output_dist_attr(output.name).dims_mapping, [0, 1]) + self.assertTrue(op_dist_attr.verify()) + self.assertTrue(str(op_dist_attr), str(op_dist_attr)) + + op_dist_attr = OperatorDistAttr(op.desc) + op_dist_attr.process_mesh = process_mesh + # Set the distributed attribute of input directly + input_dist_attr = op_dist_attr.input_dist_attr(input.name) + input_dist_attr.dims_mapping = [-1, 0] + # Set the distributed attribute of input1 directly + input1_dist_attr = op_dist_attr.input_dist_attr(input1.name) + input1_dist_attr.dims_mapping = [0, -1] + # Set the distributed attribute of output directly + output_dist_attr = op_dist_attr.output_dist_attr(output.name) + output_dist_attr.dims_mapping = [-1, -1] + self.assertEqual(op_dist_attr.process_mesh, process_mesh) + self.assertEqual(input_dist_attr.process_mesh, process_mesh) + self.assertEqual(input1_dist_attr.process_mesh, process_mesh) + self.assertEqual(output_dist_attr.process_mesh, process_mesh) + self.assertEqual(input_dist_attr.dims_mapping, [-1, 0]) + self.assertEqual(input1_dist_attr.dims_mapping, [0, -1]) + self.assertEqual(output_dist_attr.dims_mapping, [-1, -1]) + self.assertTrue(op_dist_attr.verify()) + self.assertTrue(str(op_dist_attr), str(op_dist_attr)) + + def test_operator_dist_attr(self): + train_program = static.Program() + start_program = static.Program() + with static.program_guard(train_program, start_program): + input = static.data(name="input", shape=[2, 3], dtype='float32') + input1 = static.data(name="input1", shape=[3, 4], dtype='float32') + output = paddle.matmul(input, input1) + op = train_program.current_block().ops[0] + process_mesh = ProcessMesh([[0, 1, 2], [3, 4, 5]]) + op_dist_attr = op.dist_attr + + op_dist_attr.process_mesh = process_mesh + # Set the distributed attribute of input + input_dist_attr = TensorDistAttr(input.desc) + input_dist_attr.dims_mapping = [0, -1] + op_dist_attr.set_input_dist_attr(input.name, input_dist_attr) + # Set the distributed attribute of input1 + input1_dist_attr = TensorDistAttr(input1.desc) + input1_dist_attr.dims_mapping = [-1, 1] + op_dist_attr.set_input_dist_attr(input1.name, input1_dist_attr) + # Set the distributed attribute of output + output_dist_attr = TensorDistAttr(output.desc) + output_dist_attr.dims_mapping = [0, 1] + op_dist_attr.set_output_dist_attr(output.name, output_dist_attr) + + self.assertEqual(op.desc.dist_attr.process_mesh, process_mesh) + self.assertEqual( + op.dist_attr.input_dist_attr(input.name).process_mesh, process_mesh) + self.assertEqual( + op.dist_attr.input_dist_attr(input1.name).process_mesh, + process_mesh) + self.assertEqual( + op.dist_attr.input_dist_attr(input.name).dims_mapping, [0, -1]) + self.assertEqual( + op.dist_attr.input_dist_attr(input.name).dims_mapping, [0, -1]) + self.assertEqual( + op.desc.dist_attr.input_dist_attr(input1.name).dims_mapping, + [-1, 1]) + self.assertEqual( + op.dist_attr.output_dist_attr(output.name).dims_mapping, [0, 1]) + self.assertTrue(op.desc.dist_attr.verify()) + self.assertTrue(str(op_dist_attr), str(op_dist_attr)) + + op.dist_attr = OperatorDistAttr(op.desc) + self.assertEqual(op.desc.dist_attr, OperatorDistAttr(op.desc)) + + +if __name__ == "__main__": + unittest.main() -- GitLab