未验证 提交 a52357fe 编写于 作者: Y Yulong Ao 提交者: GitHub

[Auto Parallel] Move the distributed info from python to c++ (#44510)

* [Auto Parallel] Move the distributed info from python to c++

* [Auto Parallel] Add dist_attrs for VarDesc and OpDesc

* [Auto Parallel] Add the lost file

* [Auto Parallel] Make the dist attr be unique_ptr

* [Auto Parallel] Add the proto conversion

* [Auto Parallel] Improve the proto support

* [Auto Parallel] Fix the bugs for adding a device or a link

* [Auto Parallel] Add the C++ ProcessMesh and DistributedMapper

* [Auto Parallel] Improve the impl of these dist attrs

* [Auto Parallel] Pybind11 ProcessMesh and DeviceMesh

* [Auto Parallel] Fix the unittest problem

* [Auto Parallel] Explicitly add the src file for auto_parallel target

* [Auto Parallel] Add the proto depedency explicitly

* [Auto Parallel] Fix the cmake bug on windows and mac

* [Auto Parallel] Remove the pybind11 header file in process_mesh.h

* [Auto Parallel] Remove unused codes

* [Auto Parallel] Check whether the dist attr is null

* [Auto Parallel] Implement the assign operator for OpDesc explicitly
上级 7e2a20d5
......@@ -502,8 +502,14 @@ cc_test(
cc_library(
proto_desc
SRCS var_desc.cc op_desc.cc block_desc.cc program_desc.cc process_mesh_desc.cc
DEPS attribute shape_inference op_info operator glog version)
SRCS var_desc.cc op_desc.cc block_desc.cc program_desc.cc
DEPS attribute
shape_inference
op_info
operator
glog
version
dist_attr)
if(WITH_CRYPTO)
add_dependencies(proto_desc cryptopp)
......
......@@ -40,13 +40,6 @@ enum AttrType {
VARS = 14;
}
message ProcessMeshDesc {
required int32 id = 1;
required int32 parent_id = 2;
repeated int32 topology = 3;
repeated int32 process_group = 4;
};
// OpDesc describes an instance of a C++ framework::OperatorBase
// derived class type.
message OpDesc {
......
......@@ -421,6 +421,12 @@ OpDesc::OpDesc(const std::string &type,
block_ = nullptr;
}
OpDesc::OpDesc(const OpDesc &other) {
CopyFrom(other);
block_ = other.block_;
need_update_ = true;
}
OpDesc::OpDesc(const OpDesc &other, BlockDesc *block) {
CopyFrom(other);
block_ = block;
......@@ -435,8 +441,10 @@ void OpDesc::CopyFrom(const OpDesc &op_desc) {
inputs_ = op_desc.inputs_;
outputs_ = op_desc.outputs_;
attrs_ = op_desc.attrs_;
// The record of original_id_ is only for auto parallel.
original_id_ = op_desc.original_id_;
if (op_desc.dist_attr_) {
dist_attr_.reset(new OperatorDistAttr(*op_desc.dist_attr_));
}
need_update_ = true;
}
......@@ -481,6 +489,15 @@ OpDesc::OpDesc(const proto::OpDesc &desc, BlockDesc *block)
this->block_ = block;
}
// Explicitly implement the assign operator, Since the added
// unique_ptr data member does not have the implicit assign operator.
OpDesc &OpDesc::operator=(const OpDesc &other) {
CopyFrom(other);
block_ = other.block_;
need_update_ = true;
return *this;
}
proto::OpDesc *OpDesc::Proto() {
Flush();
return &desc_;
......@@ -985,6 +1002,20 @@ void OpDesc::InferVarType(BlockDesc *block) const {
}
}
OperatorDistAttr *OpDesc::MutableDistAttr() {
if (dist_attr_) {
return dist_attr_.get();
} else {
dist_attr_.reset(new OperatorDistAttr(*this));
return dist_attr_.get();
}
}
void OpDesc::SetDistAttr(const OperatorDistAttr &dist_attr) {
MutableDistAttr();
*dist_attr_ = dist_attr;
}
void OpDesc::UpdateVarAttr(const std::string &name, const Attribute &attr) {
auto attr_type = static_cast<proto::AttrType>(attr.index() - 1);
auto type = GetAttrType(name, true);
......
......@@ -15,11 +15,13 @@ limitations under the License. */
#pragma once
#include <atomic>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "paddle/fluid/distributed/auto_parallel/dist_attr.h"
#include "paddle/fluid/framework/attribute.h"
#include "paddle/fluid/framework/type_defs.h"
#include "paddle/fluid/framework/var_desc.h"
......@@ -31,6 +33,8 @@ class VarDesc;
class BlockDesc;
class ProgramDesc;
using paddle::distributed::auto_parallel::OperatorDistAttr;
class OpDesc {
public:
OpDesc() {}
......@@ -40,12 +44,16 @@ class OpDesc {
const VariableNameMap &outputs,
const AttributeMap &attrs);
OpDesc(const OpDesc &desc);
OpDesc(const proto::OpDesc &desc, BlockDesc *block);
explicit OpDesc(BlockDesc *block) : block_(block) {}
OpDesc(const OpDesc &other, BlockDesc *block);
OpDesc &operator=(const OpDesc &other);
void CopyFrom(const OpDesc &op_desc);
proto::OpDesc *Proto();
......@@ -169,12 +177,14 @@ class OpDesc {
void UpdateVarAttr(const std::string &name, const Attribute &attr);
// The Id() and OrignalId() are only used for auto parallel.
bool NeedUpdate() const { return need_update_; }
// The following methods are only used for auto parallel.
uint64_t Id() const { return id_; }
uint64_t OriginalId() const { return original_id_; }
void SetOriginalId(uint64_t original_id) { original_id_ = original_id; }
bool NeedUpdate() const { return need_update_; }
OperatorDistAttr *MutableDistAttr();
void SetDistAttr(const OperatorDistAttr &dist_attr);
private:
friend class ProgramDesc;
......@@ -193,13 +203,6 @@ class OpDesc {
return ret_val;
}
// This thread-safe implementation seems to be redudent since the neural
// networks are usually constructed in a single thread
static uint64_t GenerateId() {
static std::atomic<std::uint64_t> uid{0};
// Must start from one
return ++uid;
}
// it it really needed? or just mantain a ptr from block?
proto::OpDesc desc_;
BlockDesc *block_{nullptr}; // not_own
......@@ -214,13 +217,15 @@ class OpDesc {
// local changes should be synchronized, need_update_ should be set to true.
bool need_update_{false};
// Note: the id_ is unique (only for auto parallel).
// Note: the following members are only used for auto_parallel for now.
static uint64_t GenerateId() {
static std::atomic<std::uint64_t> uid{0};
// Must start from one
return ++uid;
}
uint64_t id_ = GenerateId();
// Note: the orignal_id_ is used for referring to the original OpDesc
// that the current OpDesc is built from (only for auto parallel).
// The default original_id_ is same as the id_, which means the
// current OpDesc is not built from the other one.
uint64_t original_id_ = id_;
std::unique_ptr<OperatorDistAttr> dist_attr_;
};
std::vector<std::string> AttrVarNames(const Attribute &attr);
......
/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/process_mesh_desc.h"
namespace paddle {
namespace framework {
int32_t ProcessMeshDesc::next_id = -1;
ProcessMeshDesc::ProcessMeshDesc(const std::vector<int32_t> &topo,
const std::vector<int32_t> &process_group,
int32_t parent_id) {
int32_t cur_id = ++next_id;
desc_.set_id(cur_id);
desc_.set_parent_id(parent_id);
for (size_t i = 0; i != topo.size(); ++i) {
desc_.add_topology(topo[i]);
}
for (size_t i = 0; i != process_group.size(); ++i) {
desc_.add_process_group(process_group[i]);
}
ProcessMeshDescMap::GetInstance().Insert(cur_id, this);
}
std::vector<int32_t> ProcessMeshDesc::Topology() const {
size_t size = desc_.topology_size();
std::vector<int32_t> ret(size);
for (auto i = 0; i != desc_.topology_size(); ++i) {
ret[i] = desc_.topology(i);
}
return ret;
}
std::vector<int32_t> ProcessMeshDesc::ProcessGroup() const {
size_t size = desc_.process_group_size();
std::vector<int32_t> ret(size);
for (auto i = 0; i != desc_.process_group_size(); ++i) {
ret[i] = desc_.process_group(i);
}
return ret;
}
ProcessMeshDescMap &ProcessMeshDescMap::GetInstance() {
static ProcessMeshDescMap g_process_mesh_desc_map;
return g_process_mesh_desc_map;
}
} // namespace framework
} // namespace paddle
/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/proto_desc.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/macros.h"
namespace paddle {
namespace framework {
class ProcessMeshDesc {
public:
ProcessMeshDesc(const std::vector<int32_t>& topo,
const std::vector<int32_t>& process_group,
int32_t parent_id);
int32_t ID() const { return desc_.id(); }
int32_t Parent() const { return desc_.parent_id(); }
std::vector<int32_t> Topology() const;
std::vector<int32_t> ProcessGroup() const;
static int32_t next_id;
private:
proto::ProcessMeshDesc desc_; // not_own
};
class ProcessMeshDescMap {
public:
static ProcessMeshDescMap& GetInstance();
bool Has(int32_t index) const { return map_.find(index) != map_.end(); }
void Insert(int32_t index, ProcessMeshDesc* mesh) {
PADDLE_ENFORCE_NE(
Has(index),
true,
platform::errors::AlreadyExists("Index (%d) has been used.", index));
map_.insert(std::make_pair(index, mesh));
}
private:
ProcessMeshDescMap() = default;
// Use raw pointer to avoid double free
std::unordered_map<int32_t, ProcessMeshDesc*> map_;
DISABLE_COPY_AND_ASSIGN(ProcessMeshDescMap);
};
} // namespace framework
} // namespace paddle
......@@ -21,6 +21,15 @@ limitations under the License. */
namespace paddle {
namespace framework {
VarDesc::VarDesc(const VarDesc &other)
: desc_(other.desc_),
attrs_(other.attrs_),
original_id_(other.original_id_) {
if (other.dist_attr_) {
dist_attr_.reset(new TensorDistAttr(*other.dist_attr_));
}
}
proto::VarType::Type VarDesc::GetType() const { return desc_.type().type(); }
void VarDesc::SetType(proto::VarType::Type type) {
......@@ -354,6 +363,22 @@ Attribute VarDesc::GetAttr(const std::string &name) const {
return it->second;
}
TensorDistAttr *VarDesc::MutableDistAttr() {
// If dist_attr_ is nullptr, construct a new one and return.
if (dist_attr_) {
return dist_attr_.get();
} else {
dist_attr_.reset(new TensorDistAttr(*this));
return dist_attr_.get();
}
}
void VarDesc::SetDistAttr(const TensorDistAttr &dist_attr) {
// Make sure this dist attr be created
MutableDistAttr();
*dist_attr_ = dist_attr;
}
bool operator==(const VarDesc &left, const VarDesc &right) {
return left.Proto()->SerializeAsString() ==
right.Proto()->SerializeAsString();
......
......@@ -20,6 +20,7 @@ limitations under the License. */
#include <vector>
#include "glog/logging.h"
#include "paddle/fluid/distributed/auto_parallel/dist_attr.h"
#include "paddle/fluid/framework/attribute.h"
#include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/type_defs.h"
......@@ -27,6 +28,8 @@ limitations under the License. */
namespace paddle {
namespace framework {
using paddle::distributed::auto_parallel::TensorDistAttr;
// convert between std::vector and protobuf repeated.
template <typename T>
inline std::vector<T> RepeatedToVector(
......@@ -73,21 +76,22 @@ class VarDesc {
}
// Explicitly implement the copy constructor for auto parallel
VarDesc(const VarDesc &other)
: desc_(other.desc_),
attrs_(other.attrs_),
original_id_(other.original_id_) {}
VarDesc(const VarDesc &other);
VarDesc &operator=(const VarDesc &other) {
desc_ = other.desc_;
attrs_ = other.attrs_;
original_id_ = other.original_id_;
if (other.dist_attr_) {
dist_attr_.reset(new TensorDistAttr(*other.dist_attr_));
}
need_updated_ = true;
return *this;
}
proto::VarDesc *Proto() {
return &desc_;
need_updated_ = true;
return &desc_;
}
const proto::VarDesc *Proto() const { return &desc_; }
......@@ -187,16 +191,18 @@ class VarDesc {
Attribute GetAttr(const std::string &name) const;
// The Id() and OriginalId() are only used for auto parallel.
bool NeedUpdate() const { return need_updated_; }
void SetNeedUpdate(bool need) { need_updated_ = need; }
// The following methods are only used for auto parallel.
uint64_t Id() const { return id_; }
uint64_t OriginalId() const { return original_id_; }
void SetOriginalId(uint64_t original_id) {
original_id_ = original_id;
need_updated_ = true;
}
bool NeedUpdate() const { return need_updated_; }
void SetNeedUpdate(bool need) { need_updated_ = need; }
TensorDistAttr *MutableDistAttr();
void SetDistAttr(const TensorDistAttr &dist_attr);
private:
const proto::VarType::TensorDesc &tensor_desc() const;
......@@ -204,26 +210,20 @@ class VarDesc {
proto::VarType::TensorDesc *mutable_tensor_desc();
std::vector<proto::VarType::TensorDesc *> mutable_tensor_descs();
// This thread-safe implementation seems to be redudent since the neural
// networks are usually constructed in a single thread.
static uint64_t GenerateId() {
static std::atomic<std::uint64_t> uid{0};
return ++uid;
}
// it it really needed? or just mantain a ptr from block?
proto::VarDesc desc_;
AttributeMap attrs_;
bool need_updated_{false};
// Note: the id_ is unique for all VarDesc (only for auto parallel).
// Note: the following members are only used for auto parallel for now
static uint64_t GenerateId() {
static std::atomic<std::uint64_t> uid{0};
return ++uid;
}
uint64_t id_ = GenerateId();
// Note: the orignal_id_ is used for referring to the original VarDesc
// that the current VarDesc is built from (only for auto parallel).
// The default original_id_ is same as the id_, which means the
// current VarDesc is not built from the other one.
uint64_t original_id_ = id_;
std::unique_ptr<TensorDistAttr> dist_attr_;
};
bool operator==(const VarDesc &left, const VarDesc &right);
......
......@@ -35,7 +35,11 @@ using paddle::distributed::auto_parallel::DistributedMapper;
using paddle::distributed::auto_parallel::Link;
using paddle::distributed::auto_parallel::LinkCapability;
using paddle::distributed::auto_parallel::Machine;
using paddle::distributed::auto_parallel::OperatorDistAttr;
using paddle::distributed::auto_parallel::ProcessMesh;
using paddle::distributed::auto_parallel::TensorDistAttr;
using paddle::framework::OpDesc;
using paddle::framework::VarDesc;
void BindAutoParallel(py::module *m) {
py::class_<ProcessMesh>(*m, "ProcessMesh")
......@@ -162,6 +166,67 @@ void BindAutoParallel(py::module *m) {
.def(py::self == py::self)
.def(py::self != py::self)
.def("__str__", &DeviceMesh::to_string);
py::class_<TensorDistAttr>(*m, "TensorDistAttr")
.def(py::init<const VarDesc &>())
.def_property_readonly("tensor", &TensorDistAttr::tensor)
.def_property("process_mesh",
&TensorDistAttr::process_mesh,
&TensorDistAttr::set_process_mesh)
.def_property("dims_mapping",
&TensorDistAttr::dims_mapping,
&TensorDistAttr::set_dims_mapping)
.def_property("batch_dim",
&TensorDistAttr::batch_dim,
&TensorDistAttr::set_batch_dim)
.def_property("dynamic_dims",
&TensorDistAttr::dynamic_dims,
&TensorDistAttr::set_dynamic_dims)
.def("is_annotated", &TensorDistAttr::is_annotated)
.def("annotate", &TensorDistAttr::annotate)
.def("verify", &TensorDistAttr::verify)
.def(py::self == py::self)
.def(py::self != py::self)
.def("__str__", &TensorDistAttr::to_string);
py::class_<OperatorDistAttr>(*m, "OperatorDistAttr")
.def(py::init<const OpDesc &>())
.def_property_readonly("op", &OperatorDistAttr::op)
.def_property("process_mesh",
&OperatorDistAttr::process_mesh,
&OperatorDistAttr::set_process_mesh)
.def_property("impl_type",
&OperatorDistAttr::impl_type,
&OperatorDistAttr::set_impl_type)
.def_property("impl_idx",
&OperatorDistAttr::impl_idx,
&OperatorDistAttr::set_impl_idx)
.def("input", &OperatorDistAttr::input)
.def("output", &OperatorDistAttr::output)
.def("input_dist_attrs",
&OperatorDistAttr::input_dist_attrs,
py::return_value_policy::reference)
.def("output_dist_attrs",
&OperatorDistAttr::output_dist_attrs,
py::return_value_policy::reference)
.def("input_dist_attr",
static_cast<TensorDistAttr &(
OperatorDistAttr::*)(const std::string &)>(
&OperatorDistAttr::input_dist_attr),
py::return_value_policy::reference)
.def("output_dist_attr",
static_cast<TensorDistAttr &(
OperatorDistAttr::*)(const std::string &)>(
&OperatorDistAttr::output_dist_attr),
py::return_value_policy::reference)
.def("set_input_dist_attr", &OperatorDistAttr::set_input_dist_attr)
.def("set_output_dist_attr", &OperatorDistAttr::set_output_dist_attr)
.def("is_annotated", &OperatorDistAttr::is_annotated)
.def("annotate", &OperatorDistAttr::annotate)
.def("verify", &OperatorDistAttr::verify)
.def(py::self == py::self)
.def(py::self != py::self)
.def("__str__", &OperatorDistAttr::to_string);
}
} // namespace pybind
......
......@@ -18,10 +18,10 @@ limitations under the License. */
#include <string>
#include <tuple>
#include "paddle/fluid/distributed/auto_parallel/dist_attr.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/op_desc.h"
#include "paddle/fluid/framework/process_mesh_desc.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/var_desc.h"
#include "paddle/fluid/framework/version.h"
......@@ -39,6 +39,9 @@ PyTypeObject *g_blockdesc_pytype = nullptr;
namespace pd = paddle::framework;
namespace jit = paddle::jit;
using paddle::distributed::auto_parallel::OperatorDistAttr;
using paddle::distributed::auto_parallel::TensorDistAttr;
template <typename T>
static pybind11::bytes SerializeMessage(
T &self) { // NOLINT due to pybind11 convention.
......@@ -113,18 +116,6 @@ void BindProgramDesc(pybind11::module *m) {
});
}
void BindProcessMeshDesc(pybind11::module *m) {
pybind11::class_<pd::ProcessMeshDesc>(*m, "ProcessMeshDesc", "")
.def(pybind11::init<const std::vector<int32_t> &,
const std::vector<int32_t> &,
int32_t>())
.def_property_readonly("id", &pd::ProcessMeshDesc::ID)
.def_property_readonly("parent", &pd::ProcessMeshDesc::Parent)
.def_property_readonly("topology", &pd::ProcessMeshDesc::Topology)
.def_property_readonly("process_group",
&pd::ProcessMeshDesc::ProcessGroup);
}
void BindBlockDesc(pybind11::module *m) {
pybind11::class_<pd::BlockDesc> blockdesc(*m, "BlockDesc", "");
g_blockdesc_pytype = (PyTypeObject *)blockdesc.ptr(); // NOLINT
......@@ -252,6 +243,10 @@ void BindVarDsec(pybind11::module *m) {
.def("id", &pd::VarDesc::Id)
.def("original_id", &pd::VarDesc::OriginalId)
.def("set_original_id", &pd::VarDesc::SetOriginalId)
.def_property("dist_attr",
&pd::VarDesc::MutableDistAttr,
&pd::VarDesc::SetDistAttr,
pybind11::return_value_policy::reference)
.def("attr", &pd::VarDesc::GetAttr);
pybind11::enum_<pd::proto::VarType::Type> vartype(var_desc, "VarType", "");
......@@ -398,6 +393,10 @@ void BindOpDesc(pybind11::module *m) {
.def("id", &pd::OpDesc::Id)
.def("original_id", &pd::OpDesc::OriginalId)
.def("set_original_id", &pd::OpDesc::SetOriginalId)
.def_property("dist_attr",
&pd::OpDesc::MutableDistAttr,
&pd::OpDesc::SetDistAttr,
pybind11::return_value_policy::reference)
.def("inputs", [](pd::OpDesc &self) { return self.Inputs(); })
.def("outputs", &pd::OpDesc::Outputs);
}
......
......@@ -34,7 +34,6 @@ void BindProgramDesc(pybind11::module* m);
void BindBlockDesc(pybind11::module* m);
void BindVarDsec(pybind11::module* m);
void BindOpDesc(pybind11::module* m);
void BindProcessMeshDesc(pybind11::module* m);
void BindJitProperty(pybind11::module* m);
} // namespace pybind
......
......@@ -1699,7 +1699,6 @@ All parameter, weight, gradient are variables in Paddle.
BindCostModel(&m);
BindConstValue(&m);
BindGlobalValueGetterSetter(&m);
BindProcessMeshDesc(&m);
BindFleetExecutor(&m);
BindTCPStore(&m);
BindAutoParallel(&m);
......
......@@ -2531,31 +2531,18 @@ class Variable(object):
return self.desc.attr(name)
@property
def process_mesh(self):
def dist_attr(self):
"""
Get the process mesh belonging to this Variable.
Get distributed attribute of this Variable.
"""
from paddle.distributed.auto_parallel.interface import _g_process_mesh_map
from paddle.distributed.auto_parallel.interface import ProcessMesh
mesh_attr_name = 'mesh_id' + core.kAutoParallelSuffix()
mesh_id = self.desc.attr(mesh_attr_name)
return _g_process_mesh_map[mesh_id]
return self.desc.dist_attr
@property
def shard_mask(self):
"""
Get shard_mask belonging to this Variable.
"""
mask_attr_name = 'mask' + core.kAutoParallelSuffix()
return self.desc.attr(mask_attr_name)
@property
def offload_device(self):
@dist_attr.setter
def dist_attr(self, dist_attr):
"""
Get the offload device of this Variable.
Set distributed attribute of this Variable.
"""
offload_attr_name = 'offload_device' + core.kAutoParallelSuffix()
return self.desc.attr(offload_attr_name)
self.desc.dist_attr = dist_attr
def get_all_op_protos():
......@@ -3298,29 +3285,18 @@ class Operator(object):
return False
@property
def process_mesh(self):
def dist_attr(self):
"""
Get the process mesh belonging to this Operator.
Get distributed attribute of this Variable.
"""
from paddle.distributed.auto_parallel.interface import _g_process_mesh_map
mesh_attr_name = 'mesh_id' + core.kAutoParallelSuffix()
mesh_id = self.attr(mesh_attr_name)
return _g_process_mesh_map[mesh_id]
return self.desc.dist_attr
def dims_mapping(self, name):
"""
Get the dims_mapping for the op's var named `name`.
"""
dims_mapping_attr_name = name + core.kAutoParallelSuffix()
return self.attr(dims_mapping_attr_name)
@property
def pipeline_stage(self):
@dist_attr.setter
def dist_attr(self, dist_attr):
"""
Get pipeline stage of the Operator.
Set distributed attribute of this Variable.
"""
pipeline_stage_attr_name = 'pipeline_stage' + core.kAutoParallelSuffix()
return self.desc.attr(pipeline_stage_attr_name)
self.desc.dist_attr = dist_attr
class Block(object):
......
......@@ -63,4 +63,5 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_dist_op_cost MODULES test_dist_op_cost ENVS ${dist_ENVS})
py_test_modules(test_cluster_v2 MODULES test_cluster_v2)
py_test_modules(test_process_mesh_v2 MODULES test_process_mesh_v2)
py_test_modules(test_dist_attr_v2 MODULES test_dist_attr_v2)
endif()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import unittest
import paddle
import numpy as np
import paddle.nn as nn
import paddle.static as static
from paddle.fluid.core import TensorDistAttr
from paddle.fluid.core import OperatorDistAttr
from paddle.distributed.auto_parallel.process_mesh_v2 import ProcessMesh
paddle.enable_static()
class TestDistAttr(unittest.TestCase):
def test_tensor_dist_attr_ctor(self):
train_program = static.Program()
start_program = static.Program()
with static.program_guard(train_program, start_program):
input = static.data(name="input", shape=[2, 3], dtype='float32')
dist_attr = TensorDistAttr(input.desc)
self.assertEqual(dist_attr.process_mesh.empty(), True)
self.assertEqual(dist_attr.dims_mapping, [-1, -1])
self.assertEqual(dist_attr.batch_dim, 0)
self.assertEqual(dist_attr.dynamic_dims, [0, 0])
dist_attr.process_mesh = ProcessMesh([[0, 1, 2], [3, 4, 5]])
dist_attr.dims_mapping = [0, -1]
dist_attr.batch_dim = 1
dist_attr.dynamic_dims = [1, 1]
self.assertEqual(dist_attr.process_mesh,
ProcessMesh([[0, 1, 2], [3, 4, 5]]))
self.assertEqual(dist_attr.dims_mapping, [0, -1])
self.assertEqual(dist_attr.batch_dim, 1)
self.assertEqual(dist_attr.dynamic_dims, [1, 1])
self.assertTrue(dist_attr.verify())
self.assertTrue(str(dist_attr), str(dist_attr))
def test_tensor_dist_attr(self):
train_program = static.Program()
start_program = static.Program()
with static.program_guard(train_program, start_program):
input = static.data(name="input", shape=[2, 3], dtype='float32')
input1 = static.data(name="input1", shape=[2, 3], dtype='float32')
dist_attr = input.dist_attr
dist_attr.process_mesh = ProcessMesh([[0, 1, 2], [3, 4, 5]])
dist_attr.dims_mapping = [0, -1]
dist_attr.batch_dim = 1
dist_attr.dynamic_dims = [1, 1]
self.assertEqual(input.dist_attr.process_mesh,
ProcessMesh([[0, 1, 2], [3, 4, 5]]))
self.assertEqual(input.dist_attr.dims_mapping, [0, -1])
self.assertEqual(input.dist_attr.batch_dim, 1)
self.assertEqual(input.dist_attr.dynamic_dims, [1, 1])
self.assertTrue(input.dist_attr.verify())
input1.dist_attr = dist_attr
self.assertEqual(input1.dist_attr.process_mesh,
ProcessMesh([[0, 1, 2], [3, 4, 5]]))
self.assertEqual(input1.dist_attr.dims_mapping, [0, -1])
self.assertEqual(input1.dist_attr.batch_dim, 1)
self.assertEqual(input1.dist_attr.dynamic_dims, [1, 1])
self.assertTrue(input1.dist_attr.verify())
def test_operator_dist_attr_ctor(self):
train_program = static.Program()
start_program = static.Program()
with static.program_guard(train_program, start_program):
input = static.data(name="input", shape=[2, 3], dtype='float32')
input1 = static.data(name="input1", shape=[3, 4], dtype='float32')
output = paddle.matmul(input, input1)
op = train_program.current_block().ops[0]
process_mesh = ProcessMesh([[0, 1, 2], [3, 4, 5]])
op_dist_attr = OperatorDistAttr(op.desc)
op_dist_attr.process_mesh = process_mesh
# Set the distributed attribute of input
input_dist_attr = TensorDistAttr(input.desc)
input_dist_attr.dims_mapping = [0, -1]
op_dist_attr.set_input_dist_attr(input.name, input_dist_attr)
# Set the distributed attribute of input1
input1_dist_attr = TensorDistAttr(input1.desc)
input1_dist_attr.dims_mapping = [-1, 1]
op_dist_attr.set_input_dist_attr(input1.name, input1_dist_attr)
# Set the distributed attribute of output
output_dist_attr = TensorDistAttr(output.desc)
output_dist_attr.dims_mapping = [0, 1]
op_dist_attr.set_output_dist_attr(output.name, output_dist_attr)
self.assertEqual(op_dist_attr.process_mesh, process_mesh)
self.assertEqual(
op_dist_attr.input_dist_attr(input.name).process_mesh, process_mesh)
self.assertEqual(
op_dist_attr.input_dist_attr(input1.name).process_mesh,
process_mesh)
self.assertEqual(
op_dist_attr.output_dist_attr(output.name).process_mesh,
process_mesh)
self.assertEqual(
op_dist_attr.input_dist_attr(input.name).dims_mapping, [0, -1])
self.assertEqual(
op_dist_attr.input_dist_attr(input1.name).dims_mapping, [-1, 1])
self.assertEqual(
op_dist_attr.output_dist_attr(output.name).dims_mapping, [0, 1])
self.assertTrue(op_dist_attr.verify())
self.assertTrue(str(op_dist_attr), str(op_dist_attr))
op_dist_attr = OperatorDistAttr(op.desc)
op_dist_attr.process_mesh = process_mesh
# Set the distributed attribute of input directly
input_dist_attr = op_dist_attr.input_dist_attr(input.name)
input_dist_attr.dims_mapping = [-1, 0]
# Set the distributed attribute of input1 directly
input1_dist_attr = op_dist_attr.input_dist_attr(input1.name)
input1_dist_attr.dims_mapping = [0, -1]
# Set the distributed attribute of output directly
output_dist_attr = op_dist_attr.output_dist_attr(output.name)
output_dist_attr.dims_mapping = [-1, -1]
self.assertEqual(op_dist_attr.process_mesh, process_mesh)
self.assertEqual(input_dist_attr.process_mesh, process_mesh)
self.assertEqual(input1_dist_attr.process_mesh, process_mesh)
self.assertEqual(output_dist_attr.process_mesh, process_mesh)
self.assertEqual(input_dist_attr.dims_mapping, [-1, 0])
self.assertEqual(input1_dist_attr.dims_mapping, [0, -1])
self.assertEqual(output_dist_attr.dims_mapping, [-1, -1])
self.assertTrue(op_dist_attr.verify())
self.assertTrue(str(op_dist_attr), str(op_dist_attr))
def test_operator_dist_attr(self):
train_program = static.Program()
start_program = static.Program()
with static.program_guard(train_program, start_program):
input = static.data(name="input", shape=[2, 3], dtype='float32')
input1 = static.data(name="input1", shape=[3, 4], dtype='float32')
output = paddle.matmul(input, input1)
op = train_program.current_block().ops[0]
process_mesh = ProcessMesh([[0, 1, 2], [3, 4, 5]])
op_dist_attr = op.dist_attr
op_dist_attr.process_mesh = process_mesh
# Set the distributed attribute of input
input_dist_attr = TensorDistAttr(input.desc)
input_dist_attr.dims_mapping = [0, -1]
op_dist_attr.set_input_dist_attr(input.name, input_dist_attr)
# Set the distributed attribute of input1
input1_dist_attr = TensorDistAttr(input1.desc)
input1_dist_attr.dims_mapping = [-1, 1]
op_dist_attr.set_input_dist_attr(input1.name, input1_dist_attr)
# Set the distributed attribute of output
output_dist_attr = TensorDistAttr(output.desc)
output_dist_attr.dims_mapping = [0, 1]
op_dist_attr.set_output_dist_attr(output.name, output_dist_attr)
self.assertEqual(op.desc.dist_attr.process_mesh, process_mesh)
self.assertEqual(
op.dist_attr.input_dist_attr(input.name).process_mesh, process_mesh)
self.assertEqual(
op.dist_attr.input_dist_attr(input1.name).process_mesh,
process_mesh)
self.assertEqual(
op.dist_attr.input_dist_attr(input.name).dims_mapping, [0, -1])
self.assertEqual(
op.dist_attr.input_dist_attr(input.name).dims_mapping, [0, -1])
self.assertEqual(
op.desc.dist_attr.input_dist_attr(input1.name).dims_mapping,
[-1, 1])
self.assertEqual(
op.dist_attr.output_dist_attr(output.name).dims_mapping, [0, 1])
self.assertTrue(op.desc.dist_attr.verify())
self.assertTrue(str(op_dist_attr), str(op_dist_attr))
op.dist_attr = OperatorDistAttr(op.desc)
self.assertEqual(op.desc.dist_attr, OperatorDistAttr(op.desc))
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册