diff --git a/paddle/fluid/distributed/CMakeLists.txt b/paddle/fluid/distributed/CMakeLists.txt index b18ed421fcd787669c89962e0102dd3640d8efb0..0201d1131eb4a2c666b31c444afec8e973d5cd4a 100755 --- a/paddle/fluid/distributed/CMakeLists.txt +++ b/paddle/fluid/distributed/CMakeLists.txt @@ -1,3 +1,4 @@ +add_subdirectory(auto_parallel) add_subdirectory(collective) add_subdirectory(store) if(WITH_PYTHON) @@ -47,4 +48,3 @@ add_subdirectory(ps) add_subdirectory(test) add_subdirectory(index_dataset) add_subdirectory(fleet_executor) -add_subdirectory(auto_parallel) diff --git a/paddle/fluid/distributed/auto_parallel/CMakeLists.txt b/paddle/fluid/distributed/auto_parallel/CMakeLists.txt index 976e76f8931bab236a7855a54d65c105fd9dd709..22da1d7d57305d7b0343f4532c56267f8cbcbc66 100644 --- a/paddle/fluid/distributed/auto_parallel/CMakeLists.txt +++ b/paddle/fluid/distributed/auto_parallel/CMakeLists.txt @@ -1,3 +1,5 @@ +proto_library(auto_parallel_proto SRCS auto_parallel.proto) + cc_library( device_mesh SRCS device_mesh.cc @@ -34,4 +36,4 @@ cc_test( SRCS dist_mapper_test.cc DEPS dist_mapper) -proto_library(auto_parallel_proto SRCS auto_parallel.proto) +cc_library(auto_parallel DEPS device_mesh process_mesh dist_attr dist_mapper) diff --git a/paddle/fluid/distributed/auto_parallel/auto_parallel.proto b/paddle/fluid/distributed/auto_parallel/auto_parallel.proto index 1413e80a8acb1fcfeb44c3731b09108215271f7a..262a16c312eb61ba623243a60881e0dc25d24c76 100644 --- a/paddle/fluid/distributed/auto_parallel/auto_parallel.proto +++ b/paddle/fluid/distributed/auto_parallel/auto_parallel.proto @@ -30,6 +30,59 @@ message ProcessMeshProto { } +// This distributed attribute describes how to distribute the corresponding tensor, +// and store any other information needed by auto parallel. +message TensorDistAttrProto { + // The process mesh where a tensor is distributed. + optional ProcessMeshProto process_mesh = 1; + + // The length of dims_mapping is same as the length of the tensor shape. + // The i-th dimension of the tensor will be sharded by the dims_mapping[i]-th dimension + // of the above process mesh. If dims_mapping[i] is -1, the i-th dimension of the tensor + // will not be sharded. For example, given a tensor shape [2, 6, 12], a process mesh + // shape [2, 3] and a dims_mapping [-1, 1, 0], each sharded tensor will have a shape [2, 2, 6]. + repeated int64 dims_mapping = 2; + + // The batch dimension of the corresponding tensor. + optional int64 batch_dim = 3; + + // If the dynamic_dims[i] is True, the i-th dimension of the corresponding tensor + // is dynamic changed. Otherwise, the i-th dimension of the tensor is static determined. + repeated bool dynamic_dims = 4; +} + +// This distributed attribute describes how to distribute the corresponding operator, +// and store any other information needed by auto parallel. +message OperatorDistAttrProto { + message TensorDistAttrMappingEntryProto { + optional string name = 1; + optional TensorDistAttrProto tensor_dist_attr = 2; + } + // The key of this map is the input tensor name and the value is the distributed attribute + // of the input tensor required by this corresponding operator. + // The distributed attribute of the actual tensor may be not the same as that within + // the distributed attribute of the operator. + repeated TensorDistAttrMappingEntryProto input_dist_attrs = 1; + + // The key of this map is the output tensor name and the value is the distributed attribute + // of the output tensor required by this corresponding operator. + // The distributed attribute of the actual tensor may be not the same as that within + // the distributed attribute of the operator. + repeated TensorDistAttrMappingEntryProto output_dist_attrs = 2; + + // The process mesh where a op is distributed. + optional ProcessMeshProto process_mesh = 3; + + // A operator ideally has a distributed operator which may have multiple distributed implementations. + // This filed is usually same as the operator type. However, some operators such as the element-wise operators + // may shared the same distributed operator, the field is use for this scenario. + optional string impl_type = 4; + + // This field tells which distributed implementations of this corresponding operator + // will be selected for the actual computation. + optional int64 impl_idx = 5; +} + // This proto describes the capability of one device such as the computation and memory. message DeviceCapabilityProto { optional double single_precision_flops = 1; diff --git a/paddle/fluid/distributed/auto_parallel/device_mesh.h b/paddle/fluid/distributed/auto_parallel/device_mesh.h index 15ec50f546d3020a62986123bf7176bc48b85bdf..ba87dbb9e86087d45f66095f811cafa71470163d 100644 --- a/paddle/fluid/distributed/auto_parallel/device_mesh.h +++ b/paddle/fluid/distributed/auto_parallel/device_mesh.h @@ -161,6 +161,23 @@ class Machine { void set_id(int64_t id) { id_ = id; } + const std::unordered_map& devices() const { + return devices_; + } + + const std::unordered_map>& + links() const { + return links_; + } + + const Device& device(int64_t global_id) const { + return *devices_.at(global_id); + } + + const Link& link(int64_t source_id, int64_t target_id) const { + return *links_.at(source_id).at(target_id); + } + bool contains(int64_t device_id) const; void add_device(const Device& device); @@ -196,8 +213,11 @@ class DeviceMesh { const std::vector& dim_names() const { return dim_names_; } std::string device_type() const { - if (empty()) return std::string(); - return std::begin(devices_)->second.type(); + if (empty()) return "UNKNOWN"; + if (devices_.empty()) + return "UNKNOWN"; + else + return std::begin(devices_)->second.type(); } const std::unordered_map& devices() const { @@ -209,6 +229,10 @@ class DeviceMesh { return links_; } + const std::unordered_map& machines() const { + return machines_; + } + const Device& device(int64_t global_id) const { return devices_.at(global_id); } @@ -217,6 +241,10 @@ class DeviceMesh { return links_.at(source_id).at(target_id); } + const Machine& machine(int64_t machine_id) const { + return machines_.at(machine_id); + } + int64_t size() const; int64_t ndim() const { return shape_.size(); } diff --git a/paddle/fluid/distributed/auto_parallel/dist_attr.cc b/paddle/fluid/distributed/auto_parallel/dist_attr.cc index 9f9609962fc4dcfdaf997ec05b5967de650227b9..53e28b7a904bf2e70612e77c5979bbfea007f23d 100644 --- a/paddle/fluid/distributed/auto_parallel/dist_attr.cc +++ b/paddle/fluid/distributed/auto_parallel/dist_attr.cc @@ -240,6 +240,34 @@ std::string TensorDistAttr::to_string() const { return dist_str; } +TensorDistAttr TensorDistAttr::from_proto(const TensorDistAttrProto& proto) { + TensorDistAttr dist_attr; + dist_attr.process_mesh_ = ProcessMesh::from_proto(proto.process_mesh()); + dist_attr.dims_mapping_.resize(proto.dims_mapping_size()); + for (int64_t i = 0; i < proto.dims_mapping_size(); ++i) { + dist_attr.dims_mapping_[i] = proto.dims_mapping(i); + } + dist_attr.batch_dim_ = proto.batch_dim(); + dist_attr.dynamic_dims_.resize(proto.dynamic_dims_size()); + for (int64_t i = 0; i < proto.dynamic_dims_size(); ++i) { + dist_attr.dynamic_dims_[i] = proto.dynamic_dims(i); + } + return dist_attr; +} + +TensorDistAttrProto TensorDistAttr::to_proto() const { + TensorDistAttrProto proto; + proto.mutable_process_mesh()->CopyFrom(process_mesh_.to_proto()); + for (const auto& i : dims_mapping_) { + proto.add_dims_mapping(i); + } + proto.set_batch_dim(batch_dim_); + for (const auto& i : dynamic_dims_) { + proto.add_dynamic_dims(i); + } + return proto; +} + bool operator==(const TensorDistAttr& lhs, const TensorDistAttr& rhs) { if (lhs.process_mesh() != rhs.process_mesh()) { return false; @@ -497,6 +525,43 @@ std::string OperatorDistAttr::to_string() const { return str; } +OperatorDistAttr OperatorDistAttr::from_proto( + const OperatorDistAttrProto& proto) { + OperatorDistAttr dist_attr; + for (int64_t i = 0; i < proto.input_dist_attrs_size(); ++i) { + dist_attr.input_dist_attrs_[proto.input_dist_attrs(i).name()] = + TensorDistAttr::from_proto( + proto.input_dist_attrs(i).tensor_dist_attr()); + } + for (int64_t i = 0; i < proto.output_dist_attrs_size(); ++i) { + dist_attr.output_dist_attrs_[proto.output_dist_attrs(i).name()] = + TensorDistAttr::from_proto( + proto.output_dist_attrs(i).tensor_dist_attr()); + } + dist_attr.process_mesh_ = ProcessMesh::from_proto(proto.process_mesh()); + dist_attr.impl_type_ = proto.impl_type(); + dist_attr.impl_idx_ = proto.impl_idx(); + return dist_attr; +} + +OperatorDistAttrProto OperatorDistAttr::to_proto() const { + OperatorDistAttrProto proto; + for (const auto& item : input_dist_attrs_) { + auto proto_item = proto.mutable_input_dist_attrs()->Add(); + proto_item->set_name(item.first); + proto_item->mutable_tensor_dist_attr()->CopyFrom(item.second.to_proto()); + } + for (const auto& item : output_dist_attrs_) { + auto proto_item = proto.mutable_output_dist_attrs()->Add(); + proto_item->set_name(item.first); + proto_item->mutable_tensor_dist_attr()->CopyFrom(item.second.to_proto()); + } + proto.mutable_process_mesh()->CopyFrom(process_mesh_.to_proto()); + proto.set_impl_type(impl_type_); + proto.set_impl_idx(impl_idx_); + return proto; +} + bool operator==(const OperatorDistAttr& lhs, const OperatorDistAttr& rhs) { if (lhs.process_mesh() != rhs.process_mesh()) { return false; diff --git a/paddle/fluid/distributed/auto_parallel/dist_attr.h b/paddle/fluid/distributed/auto_parallel/dist_attr.h index ae089ef94b9d6c6b316be740ad1f04ab4aa60325..44262f5bace9291762b440a3afb5676fddaa22e0 100644 --- a/paddle/fluid/distributed/auto_parallel/dist_attr.h +++ b/paddle/fluid/distributed/auto_parallel/dist_attr.h @@ -101,6 +101,10 @@ class TensorDistAttr { // TensorDistAttr from_string(const std::string& dist_str); std::string to_string() const; + static TensorDistAttr from_proto(const TensorDistAttrProto& proto); + + TensorDistAttrProto to_proto() const; + private: static std::vector fields_; const VarDesc* tensor_{nullptr}; @@ -209,6 +213,10 @@ class OperatorDistAttr { // OperatorDistAttr from_string(const std::string& dist_str); std::string to_string() const; + static OperatorDistAttr from_proto(const OperatorDistAttrProto& proto); + + OperatorDistAttrProto to_proto() const; + private: static std::vector fields_; const OpDesc* op_{nullptr}; diff --git a/paddle/fluid/distributed/auto_parallel/dist_attr_test.cc b/paddle/fluid/distributed/auto_parallel/dist_attr_test.cc index 1b9ac4271b4fbd8dbeac2d7376c08960345adad2..e2f035584c1c3e941d1aac254c1c0e913c4960f1 100644 --- a/paddle/fluid/distributed/auto_parallel/dist_attr_test.cc +++ b/paddle/fluid/distributed/auto_parallel/dist_attr_test.cc @@ -80,7 +80,11 @@ TEST(DistAttr, ctor) { std::stringstream x_sstream; x_sstream << x_dist_attr; EXPECT_EQ(x_sstream.str(), x_dist_attr.to_string()); - EXPECT_EQ(x_dist_attr, x_dist_attr); + auto x_proto = x_dist_attr.to_proto(); + TensorDistAttr new_x_dist_attr = TensorDistAttr::from_proto(x_proto); + EXPECT_EQ(x_dist_attr, new_x_dist_attr); + // new_x_dist_attr is not valid since it does not bind to an var_desc + EXPECT_EQ(new_x_dist_attr.verify(), false); y_dist_attr.set_process_mesh(process_mesh); y_dist_attr.set_dims_mapping(std::vector({-1, 0})); @@ -134,7 +138,11 @@ TEST(DistAttr, ctor) { std::stringstream mul_sstream; mul_sstream << mul_dist_attr; EXPECT_EQ(mul_sstream.str(), mul_dist_attr.to_string()); - EXPECT_EQ(mul_dist_attr, mul_dist_attr); + auto mul_proto = mul_dist_attr.to_proto(); + OperatorDistAttr new_mul_dist_attr = OperatorDistAttr::from_proto(mul_proto); + EXPECT_EQ(mul_dist_attr, new_mul_dist_attr); + // new_mul_dist_attr is not valid since it does not bind to an op_desc + EXPECT_EQ(new_mul_dist_attr.verify(), false); } } // namespace auto_parallel diff --git a/paddle/fluid/distributed/auto_parallel/process_mesh.h b/paddle/fluid/distributed/auto_parallel/process_mesh.h index 2652a8f606216619392389683a5b1b1320b9e994..c0a3b07d0a9dc61d4a40e90039c7352baaf50cfc 100644 --- a/paddle/fluid/distributed/auto_parallel/process_mesh.h +++ b/paddle/fluid/distributed/auto_parallel/process_mesh.h @@ -14,7 +14,6 @@ limitations under the License. */ #pragma once -#include #include #include #include diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index e6634ab37498c344b56e88abf9af5525ad0d2d3a..4ba49673a941f3e20549f9b33936ca18b45cc987 100755 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -39,6 +39,7 @@ set(PYBIND_DEPS phi_utils tcp_store new_profiler + auto_parallel jit_layer jit_property) @@ -124,6 +125,7 @@ set(PYBIND_SRCS communication.cc cuda_streams_py.cc jit.cc + auto_parallel_py.cc op_function1.cc op_function2.cc op_function3.cc diff --git a/paddle/fluid/pybind/auto_parallel_py.cc b/paddle/fluid/pybind/auto_parallel_py.cc new file mode 100644 index 0000000000000000000000000000000000000000..46ade96f4d105832409d1cd607d5a738da01e589 --- /dev/null +++ b/paddle/fluid/pybind/auto_parallel_py.cc @@ -0,0 +1,168 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "paddle/fluid/distributed/auto_parallel/device_mesh.h" +#include "paddle/fluid/distributed/auto_parallel/dist_attr.h" +#include "paddle/fluid/distributed/auto_parallel/dist_mapper.h" +#include "paddle/fluid/distributed/auto_parallel/process_mesh.h" +#include "paddle/fluid/framework/op_desc.h" +#include "paddle/fluid/framework/var_desc.h" +#include "paddle/fluid/pybind/auto_parallel_py.h" + +namespace py = pybind11; + +namespace paddle { +namespace pybind { + +using paddle::distributed::auto_parallel::Device; +using paddle::distributed::auto_parallel::DeviceCapability; +using paddle::distributed::auto_parallel::DeviceMesh; +using paddle::distributed::auto_parallel::DistributedMapper; +using paddle::distributed::auto_parallel::Link; +using paddle::distributed::auto_parallel::LinkCapability; +using paddle::distributed::auto_parallel::Machine; +using paddle::distributed::auto_parallel::ProcessMesh; + +void BindAutoParallel(py::module *m) { + py::class_(*m, "ProcessMesh") + .def(py::init &, + const std::vector &, + const std::vector &>(), + py::arg("shape"), + py::arg("process_ids"), + py::arg("dim_names")) + .def_property_readonly( + "shape", &ProcessMesh::shape, py::return_value_policy::reference) + .def_property_readonly("process_ids", + &ProcessMesh::process_ids, + py::return_value_policy::reference) + .def_property_readonly("dim_names", + &ProcessMesh::dim_names, + py::return_value_policy::reference) + .def_property_readonly("size", &ProcessMesh::size) + .def_property_readonly("ndim", &ProcessMesh::ndim) + .def("dim_size", + static_cast( + &ProcessMesh::dim_size)) + .def("dim_size", + static_cast( + &ProcessMesh::dim_size)) + .def("empty", &ProcessMesh::empty) + .def("contains", &ProcessMesh::contains) + .def(py::self == py::self) + .def(py::self != py::self) + .def("__str__", &ProcessMesh::to_string); + + py::class_(*m, "DeviceCapability") + .def(py::init<>()) + .def_readwrite("sflops", &DeviceCapability::single_precision_flops) + .def_readwrite("dflops", &DeviceCapability::double_precision_flops) + .def_readwrite("memory", &DeviceCapability::memory_size_in_bytes) + .def_readwrite("rate", &DeviceCapability::clock_rate_in_ghz) + .def("__str__", &DeviceCapability::to_string); + + py::class_(*m, "Device") + .def(py::init(), + py::arg("global_id"), + py::arg("local_id"), + py::arg("machine_id"), + py::arg("type")) + .def_property_readonly("global_id", &Device::global_id) + .def_property_readonly("local_id", &Device::local_id) + .def_property_readonly("machine_id", &Device::machine_id) + .def_property_readonly("type", &Device::type) + .def_property("capability", &Device::capability, &Device::set_capability) + .def(py::self == py::self) + .def(py::self != py::self) + .def("__str__", &Device::to_string); + + py::class_(*m, "LinkCapability") + .def(py::init<>()) + .def_readwrite("bandwidth", &LinkCapability::bandwidth) + .def_readwrite("latency", &LinkCapability::latency) + .def("__str__", &LinkCapability::to_string); + + py::class_(*m, "Link") + .def(py::init(), + py::arg("source_id"), + py::arg("target_id"), + py::arg("type")) + .def_property_readonly("source_id", &Link::source_id) + .def_property_readonly("target_id", &Link::target_id) + .def_property_readonly("type", &Link::type) + .def_property("capability", &Link::capability, &Link::set_capability) + .def(py::self == py::self) + .def(py::self != py::self) + .def("__str__", &Link::to_string); + + py::class_(*m, "Machine") + .def_property_readonly("id", &Machine::id) + .def_property_readonly( + "devices", &Machine::devices, py::return_value_policy::reference) + .def_property_readonly( + "links", &Machine::links, py::return_value_policy::reference) + .def("device", &Machine::device) + .def("link", &Machine::link) + .def("contains", &Machine::contains) + .def("__str__", &Machine::to_string); + + py::class_(*m, "DeviceMesh") + .def(py::init &, + const std::vector &, + const std::vector &>(), + py::arg("name"), + py::arg("shape"), + py::arg("process_ids"), + py::arg("dim_names")) + .def_property_readonly("name", &DeviceMesh::name) + .def_property_readonly("shape", &DeviceMesh::shape) + .def_property_readonly("device_ids", + &DeviceMesh::device_ids, + py::return_value_policy::reference) + .def_property_readonly("dim_names", + &DeviceMesh::dim_names, + py::return_value_policy::reference) + .def_property_readonly("device_type", &DeviceMesh::device_type) + .def_property_readonly("size", &DeviceMesh::size) + .def_property_readonly("ndim", &DeviceMesh::ndim) + .def_property_readonly( + "devices", &DeviceMesh::devices, py::return_value_policy::reference) + .def_property_readonly( + "links", &DeviceMesh::links, py::return_value_policy::reference) + .def_property_readonly( + "machines", &DeviceMesh::machines, py::return_value_policy::reference) + .def("device", &DeviceMesh::device) + .def("link", &DeviceMesh::link) + .def("machine", &DeviceMesh::machine) + .def("empty", &DeviceMesh::empty) + .def("contains", &DeviceMesh::contains) + .def("add_device", &DeviceMesh::add_device) + .def("add_link", &DeviceMesh::add_link) + .def("dim_size", + static_cast( + &DeviceMesh::dim_size)) + .def("dim_size", + static_cast( + &DeviceMesh::dim_size)) + .def(py::self == py::self) + .def(py::self != py::self) + .def("__str__", &DeviceMesh::to_string); +} + +} // namespace pybind +} // namespace paddle diff --git a/paddle/fluid/pybind/auto_parallel_py.h b/paddle/fluid/pybind/auto_parallel_py.h new file mode 100644 index 0000000000000000000000000000000000000000..b38163755040f956d05852027c8527ac075ae9bb --- /dev/null +++ b/paddle/fluid/pybind/auto_parallel_py.h @@ -0,0 +1,25 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace paddle { +namespace pybind { + +void BindAutoParallel(pybind11::module *m); + +} // namespace pybind +} // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 1fe424eccd9a57518fe2e8f9b3d73e3aaf5db506..bd62d649cc7c6553f647137022a2357246c53b60 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -97,6 +97,7 @@ limitations under the License. */ #ifdef PADDLE_WITH_ASCEND #include "paddle/fluid/pybind/ascend_wrapper_py.h" #endif +#include "paddle/fluid/pybind/auto_parallel_py.h" #include "paddle/fluid/pybind/bind_cost_model.h" #include "paddle/fluid/pybind/bind_fleet_executor.h" #include "paddle/fluid/pybind/box_helper_py.h" @@ -1701,6 +1702,7 @@ All parameter, weight, gradient are variables in Paddle. BindProcessMeshDesc(&m); BindFleetExecutor(&m); BindTCPStore(&m); + BindAutoParallel(&m); BindJitProperty(&m); py::class_(m, "LodRankTable") diff --git a/python/paddle/distributed/auto_parallel/cluster_v2.py b/python/paddle/distributed/auto_parallel/cluster_v2.py new file mode 100644 index 0000000000000000000000000000000000000000..bdaf18ee650a9d509af102aeb5dd8789db585a26 --- /dev/null +++ b/python/paddle/distributed/auto_parallel/cluster_v2.py @@ -0,0 +1,128 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import numpy as np +from enum import IntEnum +from enum import unique + +import paddle +from paddle.fluid import core +from paddle.fluid.core import Device +from paddle.fluid.core import Link + + +@unique +class DeviceType(IntEnum): + UNKNOWN = 0 + CPU = 1 + GPU = 2 + XPU = 3 + NPU = 4 + DCU = 5 + NIC = 6 + + +@unique +class LinkType(IntEnum): + UNKNOWN = 0 + LOC = 1 + SYS = 2 + PHB = 3 + PIX = 4 + PIB = 5 + NVL = 6 + NVB = 7 + NET = 8 + + +class DeviceMesh(core.DeviceMesh): + r""" + The class `DeviceMesh` describes the topology of physical devices. + + Args: + mesh (list|numpy.array): an N-dimensional array describes the toplogy + of logical processes. + dim_names (list, optional): the i-th element of this list gives the name of the + i-th dimension. + + Returns: + None + + Examples: + .. code-block:: python + + import paddle + import paddle.distributed as dist + + paddle.enable_static() + + mesh = dist.DeviceMesh([[2, 4, 5], [0, 1, 3]]) + assert mesh.shape == [2, 3] + assert mesh.device_ids == [2, 4, 5, 0, 1, 3] + + """ + + def __init__(self, name, mesh, dim_names=None): + self._name = name + + if not isinstance(mesh, list) and \ + not isinstance(mesh, np.ndarray): + raise ValueError( + 'The mesh must be an instance of list or np.ndarray.') + if isinstance(mesh, list): + mesh = np.array(mesh) + + self._mesh = mesh + + self._shape = list(self._mesh.shape) + + self._device_ids = self._mesh.flatten().tolist() + assert all(isinstance(p, int) for p in self._device_ids), \ + ("All elements of the mesh be integer") + assert min( + self._device_ids) >= 0, ('All elements of the mesh must be >= 0.') + unique_device_ids = set(self._device_ids) + assert len(unique_device_ids) == len( + self._device_ids), ('All elements of the mesh must be unique.') + + if dim_names is not None: + assert len(dim_names) == len(self._shape), \ + ("The length of dims_names must be same as the shape of the mesh.") + self._dim_names = dim_names + else: + self._dim_names = ["d" + str(i) for i in range(len(self._shape))] + + # Follow the requirement for using pybind11 + core.DeviceMesh.__init__(self, self._name, self._shape, + self._device_ids, self._dim_names) + + @property + def mesh(self): + return self._mesh + + +# class Cluster(object): +# """ +# The cluster represents the hardware resource. +# """ + +# def __init__(self): +# self._device_meshes = {} + +# def device_mesh(self, device_mesh_name): +# return self._device_meshes[device_mesh_name] + +# def add_device_mesh(self, device_mesh): +# self._device_meshes[device_mesh.name] = device_mesh diff --git a/python/paddle/distributed/auto_parallel/process_mesh_v2.py b/python/paddle/distributed/auto_parallel/process_mesh_v2.py new file mode 100644 index 0000000000000000000000000000000000000000..08a391e51eb9ea13a76a0d873382fa2e51fd1bf6 --- /dev/null +++ b/python/paddle/distributed/auto_parallel/process_mesh_v2.py @@ -0,0 +1,134 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import numpy as np +from paddle.fluid import core + + +class ProcessMesh(core.ProcessMesh): + r""" + The class `Processmesh` describes the topology of logical processes. + + Args: + mesh (list|numpy.array): an N-dimensional array describes the toplogy + of logical processes. + dim_names (list, optional): the i-th element of this list gives the name of the + i-th dimension. + + Returns: + None + + Examples: + .. code-block:: python + + import paddle + import paddle.distributed as dist + + paddle.enable_static() + + mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]]) + assert mesh.shape == [2, 3] + assert mesh.processe_ids == [2, 4, 5, 0, 1, 3] + + """ + + def __init__(self, mesh, dim_names=None): + if not isinstance(mesh, list) and \ + not isinstance(mesh, np.ndarray): + raise ValueError( + 'The mesh must be an instance of list or np.ndarray.') + if isinstance(mesh, list): + mesh = np.array(mesh) + + self._mesh = mesh + + self._shape = list(self._mesh.shape) + + self._process_ids = self._mesh.flatten().tolist() + assert all(isinstance(p, int) for p in self._process_ids), \ + ("All elements of the mesh must be integer") + assert min( + self._process_ids) >= 0, ('All elements of the mesh must be >= 0.') + unique_process_ids = set(self._process_ids) + assert len(unique_process_ids) == len( + self._process_ids), ('All elements of the mesh must be unique.') + + if dim_names is not None: + assert len(dim_names) == len(self._shape), \ + ("The length of dims_names must be same as the shape of the mesh.") + self._dim_names = dim_names + else: + self._dim_names = ["d" + str(i) for i in range(len(self._shape))] + + # Follow the requirement for using pybind11 + core.ProcessMesh.__init__(self, self._shape, self._process_ids, + self._dim_names) + + @property + def mesh(self): + return self._mesh + + +# def compute_compatible_process_meshes(process_meshes): +# """Compute the compatible process mesh given a list of process meshes.""" +# if not process_meshes: +# return None + +# def _compute_compatible_two_process_meshes(pm1, pm2): +# if pm1 is None: +# return True, pm2 +# if pm2 is None: +# return True, pm1 +# if pm1 == pm2: +# return True, pm1 +# if pm1.device_mesh != pm2.device_mesh: +# return False, None +# if pm1.process_ids == pm2.process_ids: +# if len(pm1.shape) >= len(pm2.shape): +# return True, pm1 +# else: +# return True, pm2 +# process_set1 = set(pm1.process_ids) +# process_set2 = set(pm2.process_ids) +# if process_set1.issubset(process_set2): +# return True, pm2 +# if process_set2.issubset(process_set1): +# return True, pm1 +# return False, None + +# compatible_result = None +# for process_mesh in process_meshes: +# compatible, compatible_result = _compute_compatible_two_process_meshes( +# compatible_result, process_mesh) +# if not compatible: +# return None +# return ProcessMesh(compatible_result.mesh, compatible_result.dim_names) + +# def merge_process_meshes(process_meshes): +# """Merge a list of process meshes.""" +# merged_process_mesh = None +# merged_process_ids = set() +# device_type = "" +# for process_mesh in process_meshes: +# if process_mesh is not None: +# process_ids = set(process_mesh.process_ids) +# if not device_type: +# device_type = process_mesh.device_type +# assert device_type != process_mesh.device_type, \ +# "All process meshes must have the same device_type." +# merged_process_ids.union(process_ids) +# if len(merged_process_ids) != 0: +# merged_process_mesh = ProcessMesh(list(merged_process_ids)) +# return merged_process_mesh diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt index 500cb91094f0140689aabc0fe388ca23efd7a370..eaae2d42655db4b6f4c7d3f4f0d0a043c958ca71 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt @@ -61,4 +61,6 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_prim_dist_op MODULES test_prim_dist_op ENVS ${dist_ENVS}) py_test_modules(test_to_static MODULES test_to_static ENVS ${dist_ENVS}) py_test_modules(test_dist_op_cost MODULES test_dist_op_cost ENVS ${dist_ENVS}) + py_test_modules(test_cluster_v2 MODULES test_cluster_v2) + py_test_modules(test_process_mesh_v2 MODULES test_process_mesh_v2) endif() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster_v2.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster_v2.py new file mode 100644 index 0000000000000000000000000000000000000000..acadc4842a2136176ccc2eee4a3dbb05ac753598 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster_v2.py @@ -0,0 +1,125 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +import unittest +from paddle.distributed.auto_parallel.cluster_v2 import Device +from paddle.distributed.auto_parallel.cluster_v2 import Link +from paddle.distributed.auto_parallel.cluster_v2 import DeviceMesh + + +class TestDeviceMesh(unittest.TestCase): + + def test_device_mesh(self): + name = "my_device_mesh" + mesh = [[0, 1, 2], [3, 4, 5]] + device_mesh = DeviceMesh(name, mesh, dim_names=["x", "y"]) + device_mesh1 = DeviceMesh("another_mesh", [0, 1, 2, 3]) + self.assertEqual(device_mesh.name, "my_device_mesh") + self.assertEqual(device_mesh.shape, [2, 3]) + self.assertEqual(device_mesh.device_ids, [0, 1, 2, 3, 4, 5]) + self.assertEqual(device_mesh.dim_names, ["x", "y"]) + self.assertEqual(device_mesh.device_type, "UNKNOWN") + self.assertEqual(device_mesh.size, 6) + self.assertEqual(device_mesh.ndim, 2) + self.assertEqual(device_mesh.dim_size(0), 2) + self.assertEqual(device_mesh.dim_size(-1), 3) + self.assertEqual(device_mesh.dim_size("x"), 2) + self.assertEqual(device_mesh.dim_size("y"), 3) + self.assertEqual(device_mesh.empty(), False) + self.assertEqual(device_mesh.contains(0), True) + self.assertEqual(device_mesh.contains(6), False) + + dev0 = Device(global_id=0, local_id=0, machine_id=0, type="GPU") + dev1 = Device(global_id=1, local_id=1, machine_id=0, type="GPU") + dev2 = Device(global_id=2, local_id=2, machine_id=0, type="GPU") + dev3 = Device(global_id=3, local_id=0, machine_id=1, type="GPU") + dev4 = Device(global_id=4, local_id=1, machine_id=1, type="GPU") + dev5 = Device(global_id=5, local_id=2, machine_id=1, type="GPU") + device_mesh.add_device(dev0) + device_mesh.add_device(dev1) + device_mesh.add_device(dev2) + device_mesh.add_device(dev3) + device_mesh.add_device(dev4) + device_mesh.add_device(dev5) + self.assertEqual(device_mesh.device(0), dev0) + self.assertEqual(device_mesh.device(1), dev1) + self.assertEqual(device_mesh.device(2), dev2) + self.assertEqual(device_mesh.device(3), dev3) + self.assertEqual(device_mesh.device(4), dev4) + self.assertEqual(device_mesh.device(5), dev5) + + link0 = Link(source_id=0, target_id=1, type="NVL") + link1 = Link(source_id=1, target_id=0, type="NVL") + link2 = Link(source_id=3, target_id=4, type="NVL") + link3 = Link(source_id=4, target_id=5, type="NVL") + device_mesh.add_link(link0) + device_mesh.add_link(link1) + device_mesh.add_link(link2) + device_mesh.add_link(link3) + self.assertEqual(device_mesh.link(0, 1), link0) + self.assertEqual(device_mesh.link(1, 0), link1) + self.assertEqual(device_mesh.link(3, 4), link2) + self.assertEqual(device_mesh.link(4, 5), link3) + + self.assertEqual(device_mesh.machine(0).id, 0) + self.assertEqual(device_mesh.machine(0).contains(3), False) + self.assertEqual(device_mesh.machine(0).device(2), dev2) + self.assertEqual(device_mesh.machine(1).link(3, 4), link2) + self.assertEqual( + device_mesh.machine(0).devices, + device_mesh.machine(0).devices) + self.assertEqual( + device_mesh.machine(0).links, + device_mesh.machine(0).links) + + self.assertEqual(device_mesh.device_type, "GPU") + self.assertEqual(device_mesh.devices, device_mesh.devices) + self.assertEqual(device_mesh.links, device_mesh.links) + self.assertEqual(device_mesh.machines, device_mesh.machines) + self.assertEqual(device_mesh, device_mesh) + self.assertNotEqual(device_mesh, device_mesh1) + self.assertEqual(str(device_mesh), str(device_mesh)) + + def test_device(self): + device = Device(global_id=0, local_id=1, machine_id=2, type="GPU") + device.capability.sflops = 100 + device.capability.dflops = 200 + device.capability.memory = 32 + device.capability.rate = 2 + self.assertEqual(device.global_id, 0) + self.assertEqual(device.local_id, 1) + self.assertEqual(device.machine_id, 2) + self.assertEqual(device.type, "GPU") + self.assertAlmostEqual(device.capability.sflops, 100) + self.assertAlmostEqual(device.capability.dflops, 200) + self.assertAlmostEqual(device.capability.memory, 32) + self.assertAlmostEqual(device.capability.rate, 2) + self.assertEqual(device, device) + self.assertEqual(str(device), str(device)) + + def test_link(self): + link = Link(source_id=0, target_id=1, type="NVL") + link.capability.bandwidth = 100 + link.capability.latency = 1 + self.assertEqual(link.source_id, 0) + self.assertEqual(link.target_id, 1) + self.assertEqual(link.type, "NVL") + self.assertAlmostEqual(link.capability.bandwidth, 100) + self.assertAlmostEqual(link.capability.latency, 1) + self.assertEqual(link, link) + self.assertEqual(str(link), str(link)) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_process_mesh_v2.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_process_mesh_v2.py new file mode 100644 index 0000000000000000000000000000000000000000..fcfafcb3e6d6d1a8607753dc74cf476bbe845140 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_process_mesh_v2.py @@ -0,0 +1,44 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +import unittest +from paddle.distributed.auto_parallel.process_mesh_v2 import ProcessMesh + + +class TestProcessMesh(unittest.TestCase): + + def test_process_mesh(self): + mesh = [[0, 1, 2], [3, 4, 5]] + mesh2 = [[0, 1], [2, 3]] + process_mesh = ProcessMesh(mesh, dim_names=["x", "y"]) + process_mesh2 = ProcessMesh(mesh2) + self.assertEqual(process_mesh.shape, [2, 3]) + self.assertEqual(process_mesh.process_ids, [0, 1, 2, 3, 4, 5]) + self.assertEqual(process_mesh.dim_names, ["x", "y"]) + self.assertEqual(process_mesh.size, 6) + self.assertEqual(process_mesh.ndim, 2) + self.assertEqual(process_mesh.dim_size(0), 2) + self.assertEqual(process_mesh.dim_size(-1), 3) + self.assertEqual(process_mesh.dim_size("x"), 2) + self.assertEqual(process_mesh.dim_size("y"), 3) + self.assertEqual(process_mesh.empty(), False) + self.assertEqual(process_mesh.contains(0), True) + self.assertEqual(process_mesh.contains(6), False) + self.assertEqual(process_mesh, process_mesh) + self.assertNotEqual(process_mesh, process_mesh2) + self.assertEqual(str(process_mesh), str(process_mesh)) + + +if __name__ == "__main__": + unittest.main()