未验证 提交 fe291daf 编写于 作者: J Jiabin Yang 提交者: GitHub

Support sharding (#40637)

* suppor sharding api

* support multi api for sharding in eager

* support multi api for sharding in eager

* fix test

* fix test coverage
上级 323d55a7
......@@ -948,7 +948,7 @@ static PyObject* tensor_register_reduce_hook(TensorObject* self, PyObject* args,
EAGER_CATCH_AND_THROW_RETURN_NULL
}
static PyObject* set_grad_type(TensorObject* self, PyObject* args,
static PyObject* tensor__set_grad_type(TensorObject* self, PyObject* args,
PyObject* kwargs) {
EAGER_TRY
auto var_type = pybind::CastPyArg2ProtoType(PyTuple_GET_ITEM(args, 0), 0);
......@@ -963,6 +963,42 @@ static PyObject* set_grad_type(TensorObject* self, PyObject* args,
EAGER_CATCH_AND_THROW_RETURN_NULL
}
static PyObject* tensor__clear(TensorObject* self, PyObject* args,
PyObject* kwargs) {
EAGER_TRY
self->tensor.reset();
return Py_None;
EAGER_CATCH_AND_THROW_RETURN_NULL
}
static PyObject* tensor__copy_gradient_from(TensorObject* self, PyObject* args,
PyObject* kwargs) {
EAGER_TRY
auto src = CastPyArg2Tensor(PyTuple_GET_ITEM(args, 0), 0);
if (self->tensor.is_initialized()) {
PADDLE_ENFORCE_EQ(self->tensor.dtype(), src.dtype(),
platform::errors::PreconditionNotMet(
"Tensor %s has different data type with Tensor %s",
self->tensor.name(), src.name()));
PADDLE_ENFORCE_EQ(self->tensor.impl()->type_info().id(),
src.impl()->type_info().id(),
platform::errors::PreconditionNotMet(
"Tensor %s has different type with Tensor %s, Tensor "
"ShareGradientDataWith cannot be performed!",
self->tensor.name(), src.name()));
}
VLOG(6) << "Tensor copy gradient from: " << src.name();
auto* p_grad = egr::EagerUtils::mutable_grad(self->tensor);
if (p_grad) {
PADDLE_ENFORCE_EQ(src.initialized(), true,
platform::errors::InvalidArgument(
"Tensor %s has not been initialized", src.name()));
p_grad->set_impl(src.impl());
}
Py_INCREF(Py_None);
return Py_None;
EAGER_CATCH_AND_THROW_RETURN_NULL
}
static PyObject* tensor_method_get_non_zero_indices(TensorObject* self,
PyObject* args,
PyObject* kwargs) {
......@@ -1117,7 +1153,12 @@ PyMethodDef variable_methods[] = {
{"_register_backward_hook",
(PyCFunction)(void (*)(void))tensor_register_reduce_hook,
METH_VARARGS | METH_KEYWORDS, NULL},
{"_set_grad_type", (PyCFunction)(void (*)(void))set_grad_type,
{"_set_grad_type", (PyCFunction)(void (*)(void))tensor__set_grad_type,
METH_VARARGS | METH_KEYWORDS, NULL},
{"_clear", (PyCFunction)(void (*)(void))tensor__clear,
METH_VARARGS | METH_KEYWORDS, NULL},
{"_copy_gradient_from",
(PyCFunction)(void (*)(void))tensor__copy_gradient_from,
METH_VARARGS | METH_KEYWORDS, NULL},
/***the method of sparse tensor****/
{"non_zero_indices",
......
......@@ -829,6 +829,8 @@ PYBIND11_MODULE(core_noavx, m) {
[](const framework::Tensor &self) {
return reinterpret_cast<uintptr_t>(self.data());
})
.def("_slice", &framework::Tensor::Slice)
.def("_numel", &framework::Tensor::numel)
.def("_is_initialized",
[](const framework::Tensor &self) { return self.IsInitialized(); })
.def("_get_dims",
......
......@@ -427,9 +427,7 @@ class PADDLE_API Tensor final {
* @param blocking, Should we copy this in sync way.
* @return void
*/
void copy_(const Tensor& src,
const phi::Place& target_place,
const bool blocking);
void copy_(const Tensor& src, const phi::Place& target_place, bool blocking);
/**
* @brief Cast datatype from one to another
*
......
......@@ -84,26 +84,26 @@ void Tensor::copy_(const Tensor &src,
if (is_initialized()) {
PADDLE_ENFORCE_EQ(dtype(),
src.dtype(),
platform::errors::PreconditionNotMet(
phi::errors::PreconditionNotMet(
"Tensor %s has different data type with Tensor %s, "
"Tensor Copy cannot be performed!",
name(),
src.name()));
PADDLE_ENFORCE_EQ(impl()->type_info().id(),
src.impl()->type_info().id(),
platform::errors::PreconditionNotMet(
phi::errors::PreconditionNotMet(
"Tensor %s has different type with Tensor %s, Tensor "
"Copy cannot be performed!",
name(),
src.name()));
PADDLE_ENFORCE_EQ(target_place,
inner_place(),
platform::errors::PreconditionNotMet(
phi::errors::PreconditionNotMet(
"Place is different of dst tensor and args %s, which "
"current tensor holds %s "
"Copy cannot be performed!",
target_place.DebugString(),
inner_place().DebugString()));
target_place,
inner_place()));
kernel_key_set.backend_set =
kernel_key_set.backend_set |
BackendSet(phi::TransToPhiBackend(inner_place()));
......@@ -177,7 +177,7 @@ void Tensor::copy_(const Tensor &src,
blocking,
static_cast<phi::SelectedRows *>(impl_.get()));
} else {
PADDLE_THROW(paddle::platform::errors::InvalidArgument(
PADDLE_THROW(phi::errors::InvalidArgument(
"We currently only support dense tensor copy for now and if u need to "
"copy selected rows please raise a issue."));
}
......
......@@ -797,6 +797,34 @@ def monkey_patch_varbase():
def value(self):
return self
@framework.dygraph_only
def _slice(self, begin_idx, end_idx):
return core.eager.Tensor(self.get_tensor()._slice(begin_idx, end_idx))
@framework.dygraph_only
def _numel(self):
return self.get_tensor()._numel()
@framework.dygraph_only
def cpu(self):
if self.place.is_cpu_place():
return self
else:
res = self._copy_to(core.CPUPlace(), True)
res.stop_gradient = self.stop_gradient
res.persistable = self.persistable
return res
@framework.dygraph_only
def cuda(self, device_id, blocking):
if self.place.is_gpu_place():
return self
else:
res = self._copy_to(core.CUDAPlace(device_id), True)
res.stop_gradient = self.stop_gradient
res.persistable = self.persistable
return res
if core._in_eager_mode() and not hasattr(core, "eager"):
return
......@@ -820,6 +848,10 @@ def monkey_patch_varbase():
setattr(core.eager.Tensor, "_set_grad_ivar", _set_grad_ivar)
setattr(core.eager.Tensor, "clone", clone)
setattr(core.eager.Tensor, "value", value)
setattr(core.eager.Tensor, "cpu", cpu)
setattr(core.eager.Tensor, "cuda", cuda)
setattr(core.eager.Tensor, "_slice", _slice)
setattr(core.eager.Tensor, "_numel", _numel)
else:
setattr(core.VarBase, "__name__", "Tensor")
setattr(core.VarBase, "grad", grad)
......
......@@ -634,20 +634,39 @@ class EagerVariablePropertiesAndMethodsTestCase(unittest.TestCase):
if core.is_compiled_with_cuda():
tensor3 = tensor2._copy_to(core.CUDAPlace(0), True)
self.assertTrue(np.array_equal(tensor3.numpy(), arr2))
self.assertTrue(tensor3.persistable, True)
self.assertTrue(tensor3.stop_gradient, True)
self.assertEqual(tensor3.persistable, True)
self.assertEqual(tensor3.stop_gradient, True)
self.assertTrue(tensor3.place.is_gpu_place())
tensor4 = paddle.to_tensor([1, 2, 3], place='gpu_pinned')
tensor5 = tensor4._copy_to(core.CUDAPlace(0), True)
tensor4 = tensor2.cuda(0, True)
self.assertTrue(np.array_equal(tensor4.numpy(), arr2))
self.assertEqual(tensor4.persistable, True)
self.assertEqual(tensor4.stop_gradient, False)
self.assertTrue(tensor4.place.is_gpu_place())
tensor5 = tensor4.cpu()
self.assertTrue(np.array_equal(tensor5.numpy(), arr2))
self.assertEqual(tensor5.persistable, True)
self.assertEqual(tensor5.stop_gradient, False)
self.assertTrue(tensor5.place.is_cpu_place())
tensor10 = paddle.to_tensor([1, 2, 3], place='gpu_pinned')
tensor11 = tensor10._copy_to(core.CUDAPlace(0), True)
self.assertTrue(
np.array_equal(tensor4.numpy(), tensor5.numpy()))
np.array_equal(tensor10.numpy(), tensor11.numpy()))
else:
tensor3 = tensor2._copy_to(core.CPUPlace(), True)
self.assertTrue(np.array_equal(tensor3.numpy(), arr2))
self.assertTrue(tensor3.persistable, True)
self.assertTrue(tensor3.stop_gradient, True)
self.assertEqual(tensor3.persistable, True)
self.assertEqual(tensor3.stop_gradient, True)
self.assertTrue(tensor3.place.is_cpu_place())
tensor4 = tensor2.cpu()
self.assertTrue(np.array_equal(tensor4.numpy(), arr2))
self.assertEqual(tensor4.persistable, True)
self.assertEqual(tensor4.stop_gradient, False)
self.assertTrue(tensor4.place.is_cpu_place())
def test_share_buffer_to(self):
with _test_eager_guard():
arr = np.ones([4, 16, 16, 32]).astype('float32')
......@@ -784,6 +803,34 @@ class EagerVariablePropertiesAndMethodsTestCase(unittest.TestCase):
self.assertEqual(egr_tensor.shape, [4, 16, 16, 32])
self.assertTrue(np.array_equal(egr_tensor.numpy(), new_arr))
def test_sharding_related_api(self):
with _test_eager_guard():
arr0 = np.random.rand(4, 16, 16, 32).astype('float32')
egr_tensor1 = core.eager.Tensor(arr0,
core.CPUPlace(), True, False,
"numpy_tensor1", False)
self.assertEqual(egr_tensor1._numel(), 32768)
self.assertEqual(egr_tensor1._slice(0, 2)._numel(), 16384)
def test_copy_gradient_from(self):
with _test_eager_guard():
np_x = np.random.random((2, 2))
np_y = np.random.random((2, 2))
x = paddle.to_tensor(np_x, dtype="float64", stop_gradient=False)
y = paddle.to_tensor(np_y, dtype="float64")
out = x + x
out.backward()
x._copy_gradient_from(y)
self.assertTrue(np.array_equal(x.grad.numpy(), np_y))
def test_clear(self):
with _test_eager_guard():
np_x = np.random.random((3, 8, 8))
x = paddle.to_tensor(np_x, dtype="float64")
self.assertTrue(x._is_initialized())
x._clear()
self.assertFalse(x._is_initialized())
class EagerParamBaseUsageTestCase(unittest.TestCase):
def test_print(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册