未验证 提交 5bf3dec9 编写于 作者: Y Yulong Ao 提交者: GitHub

[Auto Parallel] Pybind ProcessMesh and DeviceMesh (#45013)

* [Auto Parallel] Pybind11 ProcessMesh and DeviceMesh

* [Auto Parallel] Fix the unittest problem

* [Auto Parallel] Explicitly add the src file for auto_parallel target

* [Auto Parallel] Add the proto depedency explicitly

* [Auto Parallel] Fix the cmake bug on windows and mac

* [Auto Parallel] Remove the pybind11 header file in process_mesh.h
上级 1773fbba
add_subdirectory(auto_parallel)
add_subdirectory(collective) add_subdirectory(collective)
add_subdirectory(store) add_subdirectory(store)
if(WITH_PYTHON) if(WITH_PYTHON)
...@@ -47,4 +48,3 @@ add_subdirectory(ps) ...@@ -47,4 +48,3 @@ add_subdirectory(ps)
add_subdirectory(test) add_subdirectory(test)
add_subdirectory(index_dataset) add_subdirectory(index_dataset)
add_subdirectory(fleet_executor) add_subdirectory(fleet_executor)
add_subdirectory(auto_parallel)
proto_library(auto_parallel_proto SRCS auto_parallel.proto)
cc_library( cc_library(
device_mesh device_mesh
SRCS device_mesh.cc SRCS device_mesh.cc
...@@ -34,4 +36,4 @@ cc_test( ...@@ -34,4 +36,4 @@ cc_test(
SRCS dist_mapper_test.cc SRCS dist_mapper_test.cc
DEPS dist_mapper) DEPS dist_mapper)
proto_library(auto_parallel_proto SRCS auto_parallel.proto) cc_library(auto_parallel DEPS device_mesh process_mesh dist_attr dist_mapper)
...@@ -30,6 +30,59 @@ message ProcessMeshProto { ...@@ -30,6 +30,59 @@ message ProcessMeshProto {
} }
// This distributed attribute describes how to distribute the corresponding tensor,
// and store any other information needed by auto parallel.
message TensorDistAttrProto {
// The process mesh where a tensor is distributed.
optional ProcessMeshProto process_mesh = 1;
// The length of dims_mapping is same as the length of the tensor shape.
// The i-th dimension of the tensor will be sharded by the dims_mapping[i]-th dimension
// of the above process mesh. If dims_mapping[i] is -1, the i-th dimension of the tensor
// will not be sharded. For example, given a tensor shape [2, 6, 12], a process mesh
// shape [2, 3] and a dims_mapping [-1, 1, 0], each sharded tensor will have a shape [2, 2, 6].
repeated int64 dims_mapping = 2;
// The batch dimension of the corresponding tensor.
optional int64 batch_dim = 3;
// If the dynamic_dims[i] is True, the i-th dimension of the corresponding tensor
// is dynamic changed. Otherwise, the i-th dimension of the tensor is static determined.
repeated bool dynamic_dims = 4;
}
// This distributed attribute describes how to distribute the corresponding operator,
// and store any other information needed by auto parallel.
message OperatorDistAttrProto {
message TensorDistAttrMappingEntryProto {
optional string name = 1;
optional TensorDistAttrProto tensor_dist_attr = 2;
}
// The key of this map is the input tensor name and the value is the distributed attribute
// of the input tensor required by this corresponding operator.
// The distributed attribute of the actual tensor may be not the same as that within
// the distributed attribute of the operator.
repeated TensorDistAttrMappingEntryProto input_dist_attrs = 1;
// The key of this map is the output tensor name and the value is the distributed attribute
// of the output tensor required by this corresponding operator.
// The distributed attribute of the actual tensor may be not the same as that within
// the distributed attribute of the operator.
repeated TensorDistAttrMappingEntryProto output_dist_attrs = 2;
// The process mesh where a op is distributed.
optional ProcessMeshProto process_mesh = 3;
// A operator ideally has a distributed operator which may have multiple distributed implementations.
// This filed is usually same as the operator type. However, some operators such as the element-wise operators
// may shared the same distributed operator, the field is use for this scenario.
optional string impl_type = 4;
// This field tells which distributed implementations of this corresponding operator
// will be selected for the actual computation.
optional int64 impl_idx = 5;
}
// This proto describes the capability of one device such as the computation and memory. // This proto describes the capability of one device such as the computation and memory.
message DeviceCapabilityProto { message DeviceCapabilityProto {
optional double single_precision_flops = 1; optional double single_precision_flops = 1;
......
...@@ -161,6 +161,23 @@ class Machine { ...@@ -161,6 +161,23 @@ class Machine {
void set_id(int64_t id) { id_ = id; } void set_id(int64_t id) { id_ = id; }
const std::unordered_map<int64_t, const Device*>& devices() const {
return devices_;
}
const std::unordered_map<int64_t, std::unordered_map<int64_t, const Link*>>&
links() const {
return links_;
}
const Device& device(int64_t global_id) const {
return *devices_.at(global_id);
}
const Link& link(int64_t source_id, int64_t target_id) const {
return *links_.at(source_id).at(target_id);
}
bool contains(int64_t device_id) const; bool contains(int64_t device_id) const;
void add_device(const Device& device); void add_device(const Device& device);
...@@ -196,8 +213,11 @@ class DeviceMesh { ...@@ -196,8 +213,11 @@ class DeviceMesh {
const std::vector<std::string>& dim_names() const { return dim_names_; } const std::vector<std::string>& dim_names() const { return dim_names_; }
std::string device_type() const { std::string device_type() const {
if (empty()) return std::string(); if (empty()) return "UNKNOWN";
return std::begin(devices_)->second.type(); if (devices_.empty())
return "UNKNOWN";
else
return std::begin(devices_)->second.type();
} }
const std::unordered_map<int64_t, Device>& devices() const { const std::unordered_map<int64_t, Device>& devices() const {
...@@ -209,6 +229,10 @@ class DeviceMesh { ...@@ -209,6 +229,10 @@ class DeviceMesh {
return links_; return links_;
} }
const std::unordered_map<int64_t, Machine>& machines() const {
return machines_;
}
const Device& device(int64_t global_id) const { const Device& device(int64_t global_id) const {
return devices_.at(global_id); return devices_.at(global_id);
} }
...@@ -217,6 +241,10 @@ class DeviceMesh { ...@@ -217,6 +241,10 @@ class DeviceMesh {
return links_.at(source_id).at(target_id); return links_.at(source_id).at(target_id);
} }
const Machine& machine(int64_t machine_id) const {
return machines_.at(machine_id);
}
int64_t size() const; int64_t size() const;
int64_t ndim() const { return shape_.size(); } int64_t ndim() const { return shape_.size(); }
......
...@@ -240,6 +240,34 @@ std::string TensorDistAttr::to_string() const { ...@@ -240,6 +240,34 @@ std::string TensorDistAttr::to_string() const {
return dist_str; return dist_str;
} }
TensorDistAttr TensorDistAttr::from_proto(const TensorDistAttrProto& proto) {
TensorDistAttr dist_attr;
dist_attr.process_mesh_ = ProcessMesh::from_proto(proto.process_mesh());
dist_attr.dims_mapping_.resize(proto.dims_mapping_size());
for (int64_t i = 0; i < proto.dims_mapping_size(); ++i) {
dist_attr.dims_mapping_[i] = proto.dims_mapping(i);
}
dist_attr.batch_dim_ = proto.batch_dim();
dist_attr.dynamic_dims_.resize(proto.dynamic_dims_size());
for (int64_t i = 0; i < proto.dynamic_dims_size(); ++i) {
dist_attr.dynamic_dims_[i] = proto.dynamic_dims(i);
}
return dist_attr;
}
TensorDistAttrProto TensorDistAttr::to_proto() const {
TensorDistAttrProto proto;
proto.mutable_process_mesh()->CopyFrom(process_mesh_.to_proto());
for (const auto& i : dims_mapping_) {
proto.add_dims_mapping(i);
}
proto.set_batch_dim(batch_dim_);
for (const auto& i : dynamic_dims_) {
proto.add_dynamic_dims(i);
}
return proto;
}
bool operator==(const TensorDistAttr& lhs, const TensorDistAttr& rhs) { bool operator==(const TensorDistAttr& lhs, const TensorDistAttr& rhs) {
if (lhs.process_mesh() != rhs.process_mesh()) { if (lhs.process_mesh() != rhs.process_mesh()) {
return false; return false;
...@@ -497,6 +525,43 @@ std::string OperatorDistAttr::to_string() const { ...@@ -497,6 +525,43 @@ std::string OperatorDistAttr::to_string() const {
return str; return str;
} }
OperatorDistAttr OperatorDistAttr::from_proto(
const OperatorDistAttrProto& proto) {
OperatorDistAttr dist_attr;
for (int64_t i = 0; i < proto.input_dist_attrs_size(); ++i) {
dist_attr.input_dist_attrs_[proto.input_dist_attrs(i).name()] =
TensorDistAttr::from_proto(
proto.input_dist_attrs(i).tensor_dist_attr());
}
for (int64_t i = 0; i < proto.output_dist_attrs_size(); ++i) {
dist_attr.output_dist_attrs_[proto.output_dist_attrs(i).name()] =
TensorDistAttr::from_proto(
proto.output_dist_attrs(i).tensor_dist_attr());
}
dist_attr.process_mesh_ = ProcessMesh::from_proto(proto.process_mesh());
dist_attr.impl_type_ = proto.impl_type();
dist_attr.impl_idx_ = proto.impl_idx();
return dist_attr;
}
OperatorDistAttrProto OperatorDistAttr::to_proto() const {
OperatorDistAttrProto proto;
for (const auto& item : input_dist_attrs_) {
auto proto_item = proto.mutable_input_dist_attrs()->Add();
proto_item->set_name(item.first);
proto_item->mutable_tensor_dist_attr()->CopyFrom(item.second.to_proto());
}
for (const auto& item : output_dist_attrs_) {
auto proto_item = proto.mutable_output_dist_attrs()->Add();
proto_item->set_name(item.first);
proto_item->mutable_tensor_dist_attr()->CopyFrom(item.second.to_proto());
}
proto.mutable_process_mesh()->CopyFrom(process_mesh_.to_proto());
proto.set_impl_type(impl_type_);
proto.set_impl_idx(impl_idx_);
return proto;
}
bool operator==(const OperatorDistAttr& lhs, const OperatorDistAttr& rhs) { bool operator==(const OperatorDistAttr& lhs, const OperatorDistAttr& rhs) {
if (lhs.process_mesh() != rhs.process_mesh()) { if (lhs.process_mesh() != rhs.process_mesh()) {
return false; return false;
......
...@@ -101,6 +101,10 @@ class TensorDistAttr { ...@@ -101,6 +101,10 @@ class TensorDistAttr {
// TensorDistAttr from_string(const std::string& dist_str); // TensorDistAttr from_string(const std::string& dist_str);
std::string to_string() const; std::string to_string() const;
static TensorDistAttr from_proto(const TensorDistAttrProto& proto);
TensorDistAttrProto to_proto() const;
private: private:
static std::vector<std::string> fields_; static std::vector<std::string> fields_;
const VarDesc* tensor_{nullptr}; const VarDesc* tensor_{nullptr};
...@@ -209,6 +213,10 @@ class OperatorDistAttr { ...@@ -209,6 +213,10 @@ class OperatorDistAttr {
// OperatorDistAttr from_string(const std::string& dist_str); // OperatorDistAttr from_string(const std::string& dist_str);
std::string to_string() const; std::string to_string() const;
static OperatorDistAttr from_proto(const OperatorDistAttrProto& proto);
OperatorDistAttrProto to_proto() const;
private: private:
static std::vector<std::string> fields_; static std::vector<std::string> fields_;
const OpDesc* op_{nullptr}; const OpDesc* op_{nullptr};
......
...@@ -80,7 +80,11 @@ TEST(DistAttr, ctor) { ...@@ -80,7 +80,11 @@ TEST(DistAttr, ctor) {
std::stringstream x_sstream; std::stringstream x_sstream;
x_sstream << x_dist_attr; x_sstream << x_dist_attr;
EXPECT_EQ(x_sstream.str(), x_dist_attr.to_string()); EXPECT_EQ(x_sstream.str(), x_dist_attr.to_string());
EXPECT_EQ(x_dist_attr, x_dist_attr); auto x_proto = x_dist_attr.to_proto();
TensorDistAttr new_x_dist_attr = TensorDistAttr::from_proto(x_proto);
EXPECT_EQ(x_dist_attr, new_x_dist_attr);
// new_x_dist_attr is not valid since it does not bind to an var_desc
EXPECT_EQ(new_x_dist_attr.verify(), false);
y_dist_attr.set_process_mesh(process_mesh); y_dist_attr.set_process_mesh(process_mesh);
y_dist_attr.set_dims_mapping(std::vector<int64_t>({-1, 0})); y_dist_attr.set_dims_mapping(std::vector<int64_t>({-1, 0}));
...@@ -134,7 +138,11 @@ TEST(DistAttr, ctor) { ...@@ -134,7 +138,11 @@ TEST(DistAttr, ctor) {
std::stringstream mul_sstream; std::stringstream mul_sstream;
mul_sstream << mul_dist_attr; mul_sstream << mul_dist_attr;
EXPECT_EQ(mul_sstream.str(), mul_dist_attr.to_string()); EXPECT_EQ(mul_sstream.str(), mul_dist_attr.to_string());
EXPECT_EQ(mul_dist_attr, mul_dist_attr); auto mul_proto = mul_dist_attr.to_proto();
OperatorDistAttr new_mul_dist_attr = OperatorDistAttr::from_proto(mul_proto);
EXPECT_EQ(mul_dist_attr, new_mul_dist_attr);
// new_mul_dist_attr is not valid since it does not bind to an op_desc
EXPECT_EQ(new_mul_dist_attr.verify(), false);
} }
} // namespace auto_parallel } // namespace auto_parallel
......
...@@ -14,7 +14,6 @@ limitations under the License. */ ...@@ -14,7 +14,6 @@ limitations under the License. */
#pragma once #pragma once
#include <pybind11/pybind11.h>
#include <atomic> #include <atomic>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
......
...@@ -39,6 +39,7 @@ set(PYBIND_DEPS ...@@ -39,6 +39,7 @@ set(PYBIND_DEPS
phi_utils phi_utils
tcp_store tcp_store
new_profiler new_profiler
auto_parallel
jit_layer jit_layer
jit_property) jit_property)
...@@ -124,6 +125,7 @@ set(PYBIND_SRCS ...@@ -124,6 +125,7 @@ set(PYBIND_SRCS
communication.cc communication.cc
cuda_streams_py.cc cuda_streams_py.cc
jit.cc jit.cc
auto_parallel_py.cc
op_function1.cc op_function1.cc
op_function2.cc op_function2.cc
op_function3.cc op_function3.cc
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <pybind11/operators.h>
#include <pybind11/stl.h>
#include "paddle/fluid/distributed/auto_parallel/device_mesh.h"
#include "paddle/fluid/distributed/auto_parallel/dist_attr.h"
#include "paddle/fluid/distributed/auto_parallel/dist_mapper.h"
#include "paddle/fluid/distributed/auto_parallel/process_mesh.h"
#include "paddle/fluid/framework/op_desc.h"
#include "paddle/fluid/framework/var_desc.h"
#include "paddle/fluid/pybind/auto_parallel_py.h"
namespace py = pybind11;
namespace paddle {
namespace pybind {
using paddle::distributed::auto_parallel::Device;
using paddle::distributed::auto_parallel::DeviceCapability;
using paddle::distributed::auto_parallel::DeviceMesh;
using paddle::distributed::auto_parallel::DistributedMapper;
using paddle::distributed::auto_parallel::Link;
using paddle::distributed::auto_parallel::LinkCapability;
using paddle::distributed::auto_parallel::Machine;
using paddle::distributed::auto_parallel::ProcessMesh;
void BindAutoParallel(py::module *m) {
py::class_<ProcessMesh>(*m, "ProcessMesh")
.def(py::init<const std::vector<int64_t> &,
const std::vector<int64_t> &,
const std::vector<std::string> &>(),
py::arg("shape"),
py::arg("process_ids"),
py::arg("dim_names"))
.def_property_readonly(
"shape", &ProcessMesh::shape, py::return_value_policy::reference)
.def_property_readonly("process_ids",
&ProcessMesh::process_ids,
py::return_value_policy::reference)
.def_property_readonly("dim_names",
&ProcessMesh::dim_names,
py::return_value_policy::reference)
.def_property_readonly("size", &ProcessMesh::size)
.def_property_readonly("ndim", &ProcessMesh::ndim)
.def("dim_size",
static_cast<int64_t (ProcessMesh::*)(int64_t) const>(
&ProcessMesh::dim_size))
.def("dim_size",
static_cast<int64_t (ProcessMesh::*)(const std::string &) const>(
&ProcessMesh::dim_size))
.def("empty", &ProcessMesh::empty)
.def("contains", &ProcessMesh::contains)
.def(py::self == py::self)
.def(py::self != py::self)
.def("__str__", &ProcessMesh::to_string);
py::class_<DeviceCapability>(*m, "DeviceCapability")
.def(py::init<>())
.def_readwrite("sflops", &DeviceCapability::single_precision_flops)
.def_readwrite("dflops", &DeviceCapability::double_precision_flops)
.def_readwrite("memory", &DeviceCapability::memory_size_in_bytes)
.def_readwrite("rate", &DeviceCapability::clock_rate_in_ghz)
.def("__str__", &DeviceCapability::to_string);
py::class_<Device>(*m, "Device")
.def(py::init<int64_t, int64_t, int64_t, const std::string &>(),
py::arg("global_id"),
py::arg("local_id"),
py::arg("machine_id"),
py::arg("type"))
.def_property_readonly("global_id", &Device::global_id)
.def_property_readonly("local_id", &Device::local_id)
.def_property_readonly("machine_id", &Device::machine_id)
.def_property_readonly("type", &Device::type)
.def_property("capability", &Device::capability, &Device::set_capability)
.def(py::self == py::self)
.def(py::self != py::self)
.def("__str__", &Device::to_string);
py::class_<LinkCapability>(*m, "LinkCapability")
.def(py::init<>())
.def_readwrite("bandwidth", &LinkCapability::bandwidth)
.def_readwrite("latency", &LinkCapability::latency)
.def("__str__", &LinkCapability::to_string);
py::class_<Link>(*m, "Link")
.def(py::init<int64_t, int64_t, const std::string &>(),
py::arg("source_id"),
py::arg("target_id"),
py::arg("type"))
.def_property_readonly("source_id", &Link::source_id)
.def_property_readonly("target_id", &Link::target_id)
.def_property_readonly("type", &Link::type)
.def_property("capability", &Link::capability, &Link::set_capability)
.def(py::self == py::self)
.def(py::self != py::self)
.def("__str__", &Link::to_string);
py::class_<Machine>(*m, "Machine")
.def_property_readonly("id", &Machine::id)
.def_property_readonly(
"devices", &Machine::devices, py::return_value_policy::reference)
.def_property_readonly(
"links", &Machine::links, py::return_value_policy::reference)
.def("device", &Machine::device)
.def("link", &Machine::link)
.def("contains", &Machine::contains)
.def("__str__", &Machine::to_string);
py::class_<DeviceMesh>(*m, "DeviceMesh")
.def(py::init<const std::string &,
const std::vector<int64_t> &,
const std::vector<int64_t> &,
const std::vector<std::string> &>(),
py::arg("name"),
py::arg("shape"),
py::arg("process_ids"),
py::arg("dim_names"))
.def_property_readonly("name", &DeviceMesh::name)
.def_property_readonly("shape", &DeviceMesh::shape)
.def_property_readonly("device_ids",
&DeviceMesh::device_ids,
py::return_value_policy::reference)
.def_property_readonly("dim_names",
&DeviceMesh::dim_names,
py::return_value_policy::reference)
.def_property_readonly("device_type", &DeviceMesh::device_type)
.def_property_readonly("size", &DeviceMesh::size)
.def_property_readonly("ndim", &DeviceMesh::ndim)
.def_property_readonly(
"devices", &DeviceMesh::devices, py::return_value_policy::reference)
.def_property_readonly(
"links", &DeviceMesh::links, py::return_value_policy::reference)
.def_property_readonly(
"machines", &DeviceMesh::machines, py::return_value_policy::reference)
.def("device", &DeviceMesh::device)
.def("link", &DeviceMesh::link)
.def("machine", &DeviceMesh::machine)
.def("empty", &DeviceMesh::empty)
.def("contains", &DeviceMesh::contains)
.def("add_device", &DeviceMesh::add_device)
.def("add_link", &DeviceMesh::add_link)
.def("dim_size",
static_cast<int64_t (DeviceMesh::*)(int64_t) const>(
&DeviceMesh::dim_size))
.def("dim_size",
static_cast<int64_t (DeviceMesh::*)(const std::string &) const>(
&DeviceMesh::dim_size))
.def(py::self == py::self)
.def(py::self != py::self)
.def("__str__", &DeviceMesh::to_string);
}
} // namespace pybind
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <pybind11/pybind11.h>
namespace paddle {
namespace pybind {
void BindAutoParallel(pybind11::module *m);
} // namespace pybind
} // namespace paddle
...@@ -97,6 +97,7 @@ limitations under the License. */ ...@@ -97,6 +97,7 @@ limitations under the License. */
#ifdef PADDLE_WITH_ASCEND #ifdef PADDLE_WITH_ASCEND
#include "paddle/fluid/pybind/ascend_wrapper_py.h" #include "paddle/fluid/pybind/ascend_wrapper_py.h"
#endif #endif
#include "paddle/fluid/pybind/auto_parallel_py.h"
#include "paddle/fluid/pybind/bind_cost_model.h" #include "paddle/fluid/pybind/bind_cost_model.h"
#include "paddle/fluid/pybind/bind_fleet_executor.h" #include "paddle/fluid/pybind/bind_fleet_executor.h"
#include "paddle/fluid/pybind/box_helper_py.h" #include "paddle/fluid/pybind/box_helper_py.h"
...@@ -1701,6 +1702,7 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -1701,6 +1702,7 @@ All parameter, weight, gradient are variables in Paddle.
BindProcessMeshDesc(&m); BindProcessMeshDesc(&m);
BindFleetExecutor(&m); BindFleetExecutor(&m);
BindTCPStore(&m); BindTCPStore(&m);
BindAutoParallel(&m);
BindJitProperty(&m); BindJitProperty(&m);
py::class_<framework::LoDRankTable>(m, "LodRankTable") py::class_<framework::LoDRankTable>(m, "LodRankTable")
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import numpy as np
from enum import IntEnum
from enum import unique
import paddle
from paddle.fluid import core
from paddle.fluid.core import Device
from paddle.fluid.core import Link
@unique
class DeviceType(IntEnum):
UNKNOWN = 0
CPU = 1
GPU = 2
XPU = 3
NPU = 4
DCU = 5
NIC = 6
@unique
class LinkType(IntEnum):
UNKNOWN = 0
LOC = 1
SYS = 2
PHB = 3
PIX = 4
PIB = 5
NVL = 6
NVB = 7
NET = 8
class DeviceMesh(core.DeviceMesh):
r"""
The class `DeviceMesh` describes the topology of physical devices.
Args:
mesh (list|numpy.array): an N-dimensional array describes the toplogy
of logical processes.
dim_names (list, optional): the i-th element of this list gives the name of the
i-th dimension.
Returns:
None
Examples:
.. code-block:: python
import paddle
import paddle.distributed as dist
paddle.enable_static()
mesh = dist.DeviceMesh([[2, 4, 5], [0, 1, 3]])
assert mesh.shape == [2, 3]
assert mesh.device_ids == [2, 4, 5, 0, 1, 3]
"""
def __init__(self, name, mesh, dim_names=None):
self._name = name
if not isinstance(mesh, list) and \
not isinstance(mesh, np.ndarray):
raise ValueError(
'The mesh must be an instance of list or np.ndarray.')
if isinstance(mesh, list):
mesh = np.array(mesh)
self._mesh = mesh
self._shape = list(self._mesh.shape)
self._device_ids = self._mesh.flatten().tolist()
assert all(isinstance(p, int) for p in self._device_ids), \
("All elements of the mesh be integer")
assert min(
self._device_ids) >= 0, ('All elements of the mesh must be >= 0.')
unique_device_ids = set(self._device_ids)
assert len(unique_device_ids) == len(
self._device_ids), ('All elements of the mesh must be unique.')
if dim_names is not None:
assert len(dim_names) == len(self._shape), \
("The length of dims_names must be same as the shape of the mesh.")
self._dim_names = dim_names
else:
self._dim_names = ["d" + str(i) for i in range(len(self._shape))]
# Follow the requirement for using pybind11
core.DeviceMesh.__init__(self, self._name, self._shape,
self._device_ids, self._dim_names)
@property
def mesh(self):
return self._mesh
# class Cluster(object):
# """
# The cluster represents the hardware resource.
# """
# def __init__(self):
# self._device_meshes = {}
# def device_mesh(self, device_mesh_name):
# return self._device_meshes[device_mesh_name]
# def add_device_mesh(self, device_mesh):
# self._device_meshes[device_mesh.name] = device_mesh
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import numpy as np
from paddle.fluid import core
class ProcessMesh(core.ProcessMesh):
r"""
The class `Processmesh` describes the topology of logical processes.
Args:
mesh (list|numpy.array): an N-dimensional array describes the toplogy
of logical processes.
dim_names (list, optional): the i-th element of this list gives the name of the
i-th dimension.
Returns:
None
Examples:
.. code-block:: python
import paddle
import paddle.distributed as dist
paddle.enable_static()
mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]])
assert mesh.shape == [2, 3]
assert mesh.processe_ids == [2, 4, 5, 0, 1, 3]
"""
def __init__(self, mesh, dim_names=None):
if not isinstance(mesh, list) and \
not isinstance(mesh, np.ndarray):
raise ValueError(
'The mesh must be an instance of list or np.ndarray.')
if isinstance(mesh, list):
mesh = np.array(mesh)
self._mesh = mesh
self._shape = list(self._mesh.shape)
self._process_ids = self._mesh.flatten().tolist()
assert all(isinstance(p, int) for p in self._process_ids), \
("All elements of the mesh must be integer")
assert min(
self._process_ids) >= 0, ('All elements of the mesh must be >= 0.')
unique_process_ids = set(self._process_ids)
assert len(unique_process_ids) == len(
self._process_ids), ('All elements of the mesh must be unique.')
if dim_names is not None:
assert len(dim_names) == len(self._shape), \
("The length of dims_names must be same as the shape of the mesh.")
self._dim_names = dim_names
else:
self._dim_names = ["d" + str(i) for i in range(len(self._shape))]
# Follow the requirement for using pybind11
core.ProcessMesh.__init__(self, self._shape, self._process_ids,
self._dim_names)
@property
def mesh(self):
return self._mesh
# def compute_compatible_process_meshes(process_meshes):
# """Compute the compatible process mesh given a list of process meshes."""
# if not process_meshes:
# return None
# def _compute_compatible_two_process_meshes(pm1, pm2):
# if pm1 is None:
# return True, pm2
# if pm2 is None:
# return True, pm1
# if pm1 == pm2:
# return True, pm1
# if pm1.device_mesh != pm2.device_mesh:
# return False, None
# if pm1.process_ids == pm2.process_ids:
# if len(pm1.shape) >= len(pm2.shape):
# return True, pm1
# else:
# return True, pm2
# process_set1 = set(pm1.process_ids)
# process_set2 = set(pm2.process_ids)
# if process_set1.issubset(process_set2):
# return True, pm2
# if process_set2.issubset(process_set1):
# return True, pm1
# return False, None
# compatible_result = None
# for process_mesh in process_meshes:
# compatible, compatible_result = _compute_compatible_two_process_meshes(
# compatible_result, process_mesh)
# if not compatible:
# return None
# return ProcessMesh(compatible_result.mesh, compatible_result.dim_names)
# def merge_process_meshes(process_meshes):
# """Merge a list of process meshes."""
# merged_process_mesh = None
# merged_process_ids = set()
# device_type = ""
# for process_mesh in process_meshes:
# if process_mesh is not None:
# process_ids = set(process_mesh.process_ids)
# if not device_type:
# device_type = process_mesh.device_type
# assert device_type != process_mesh.device_type, \
# "All process meshes must have the same device_type."
# merged_process_ids.union(process_ids)
# if len(merged_process_ids) != 0:
# merged_process_mesh = ProcessMesh(list(merged_process_ids))
# return merged_process_mesh
...@@ -61,4 +61,6 @@ if(WITH_DISTRIBUTE AND WITH_GPU) ...@@ -61,4 +61,6 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_prim_dist_op MODULES test_prim_dist_op ENVS ${dist_ENVS}) py_test_modules(test_prim_dist_op MODULES test_prim_dist_op ENVS ${dist_ENVS})
py_test_modules(test_to_static MODULES test_to_static ENVS ${dist_ENVS}) py_test_modules(test_to_static MODULES test_to_static ENVS ${dist_ENVS})
py_test_modules(test_dist_op_cost MODULES test_dist_op_cost ENVS ${dist_ENVS}) py_test_modules(test_dist_op_cost MODULES test_dist_op_cost ENVS ${dist_ENVS})
py_test_modules(test_cluster_v2 MODULES test_cluster_v2)
py_test_modules(test_process_mesh_v2 MODULES test_process_mesh_v2)
endif() endif()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import unittest
from paddle.distributed.auto_parallel.cluster_v2 import Device
from paddle.distributed.auto_parallel.cluster_v2 import Link
from paddle.distributed.auto_parallel.cluster_v2 import DeviceMesh
class TestDeviceMesh(unittest.TestCase):
def test_device_mesh(self):
name = "my_device_mesh"
mesh = [[0, 1, 2], [3, 4, 5]]
device_mesh = DeviceMesh(name, mesh, dim_names=["x", "y"])
device_mesh1 = DeviceMesh("another_mesh", [0, 1, 2, 3])
self.assertEqual(device_mesh.name, "my_device_mesh")
self.assertEqual(device_mesh.shape, [2, 3])
self.assertEqual(device_mesh.device_ids, [0, 1, 2, 3, 4, 5])
self.assertEqual(device_mesh.dim_names, ["x", "y"])
self.assertEqual(device_mesh.device_type, "UNKNOWN")
self.assertEqual(device_mesh.size, 6)
self.assertEqual(device_mesh.ndim, 2)
self.assertEqual(device_mesh.dim_size(0), 2)
self.assertEqual(device_mesh.dim_size(-1), 3)
self.assertEqual(device_mesh.dim_size("x"), 2)
self.assertEqual(device_mesh.dim_size("y"), 3)
self.assertEqual(device_mesh.empty(), False)
self.assertEqual(device_mesh.contains(0), True)
self.assertEqual(device_mesh.contains(6), False)
dev0 = Device(global_id=0, local_id=0, machine_id=0, type="GPU")
dev1 = Device(global_id=1, local_id=1, machine_id=0, type="GPU")
dev2 = Device(global_id=2, local_id=2, machine_id=0, type="GPU")
dev3 = Device(global_id=3, local_id=0, machine_id=1, type="GPU")
dev4 = Device(global_id=4, local_id=1, machine_id=1, type="GPU")
dev5 = Device(global_id=5, local_id=2, machine_id=1, type="GPU")
device_mesh.add_device(dev0)
device_mesh.add_device(dev1)
device_mesh.add_device(dev2)
device_mesh.add_device(dev3)
device_mesh.add_device(dev4)
device_mesh.add_device(dev5)
self.assertEqual(device_mesh.device(0), dev0)
self.assertEqual(device_mesh.device(1), dev1)
self.assertEqual(device_mesh.device(2), dev2)
self.assertEqual(device_mesh.device(3), dev3)
self.assertEqual(device_mesh.device(4), dev4)
self.assertEqual(device_mesh.device(5), dev5)
link0 = Link(source_id=0, target_id=1, type="NVL")
link1 = Link(source_id=1, target_id=0, type="NVL")
link2 = Link(source_id=3, target_id=4, type="NVL")
link3 = Link(source_id=4, target_id=5, type="NVL")
device_mesh.add_link(link0)
device_mesh.add_link(link1)
device_mesh.add_link(link2)
device_mesh.add_link(link3)
self.assertEqual(device_mesh.link(0, 1), link0)
self.assertEqual(device_mesh.link(1, 0), link1)
self.assertEqual(device_mesh.link(3, 4), link2)
self.assertEqual(device_mesh.link(4, 5), link3)
self.assertEqual(device_mesh.machine(0).id, 0)
self.assertEqual(device_mesh.machine(0).contains(3), False)
self.assertEqual(device_mesh.machine(0).device(2), dev2)
self.assertEqual(device_mesh.machine(1).link(3, 4), link2)
self.assertEqual(
device_mesh.machine(0).devices,
device_mesh.machine(0).devices)
self.assertEqual(
device_mesh.machine(0).links,
device_mesh.machine(0).links)
self.assertEqual(device_mesh.device_type, "GPU")
self.assertEqual(device_mesh.devices, device_mesh.devices)
self.assertEqual(device_mesh.links, device_mesh.links)
self.assertEqual(device_mesh.machines, device_mesh.machines)
self.assertEqual(device_mesh, device_mesh)
self.assertNotEqual(device_mesh, device_mesh1)
self.assertEqual(str(device_mesh), str(device_mesh))
def test_device(self):
device = Device(global_id=0, local_id=1, machine_id=2, type="GPU")
device.capability.sflops = 100
device.capability.dflops = 200
device.capability.memory = 32
device.capability.rate = 2
self.assertEqual(device.global_id, 0)
self.assertEqual(device.local_id, 1)
self.assertEqual(device.machine_id, 2)
self.assertEqual(device.type, "GPU")
self.assertAlmostEqual(device.capability.sflops, 100)
self.assertAlmostEqual(device.capability.dflops, 200)
self.assertAlmostEqual(device.capability.memory, 32)
self.assertAlmostEqual(device.capability.rate, 2)
self.assertEqual(device, device)
self.assertEqual(str(device), str(device))
def test_link(self):
link = Link(source_id=0, target_id=1, type="NVL")
link.capability.bandwidth = 100
link.capability.latency = 1
self.assertEqual(link.source_id, 0)
self.assertEqual(link.target_id, 1)
self.assertEqual(link.type, "NVL")
self.assertAlmostEqual(link.capability.bandwidth, 100)
self.assertAlmostEqual(link.capability.latency, 1)
self.assertEqual(link, link)
self.assertEqual(str(link), str(link))
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import unittest
from paddle.distributed.auto_parallel.process_mesh_v2 import ProcessMesh
class TestProcessMesh(unittest.TestCase):
def test_process_mesh(self):
mesh = [[0, 1, 2], [3, 4, 5]]
mesh2 = [[0, 1], [2, 3]]
process_mesh = ProcessMesh(mesh, dim_names=["x", "y"])
process_mesh2 = ProcessMesh(mesh2)
self.assertEqual(process_mesh.shape, [2, 3])
self.assertEqual(process_mesh.process_ids, [0, 1, 2, 3, 4, 5])
self.assertEqual(process_mesh.dim_names, ["x", "y"])
self.assertEqual(process_mesh.size, 6)
self.assertEqual(process_mesh.ndim, 2)
self.assertEqual(process_mesh.dim_size(0), 2)
self.assertEqual(process_mesh.dim_size(-1), 3)
self.assertEqual(process_mesh.dim_size("x"), 2)
self.assertEqual(process_mesh.dim_size("y"), 3)
self.assertEqual(process_mesh.empty(), False)
self.assertEqual(process_mesh.contains(0), True)
self.assertEqual(process_mesh.contains(6), False)
self.assertEqual(process_mesh, process_mesh)
self.assertNotEqual(process_mesh, process_mesh2)
self.assertEqual(str(process_mesh), str(process_mesh))
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册