未验证 提交 8a7e54d5 编写于 作者: W wanghuancoder 提交者: GitHub

[Eager] eager tensor support pickler (#47025)

* test_paddle_multiprocessing support eager tensor pickler
上级 eb102189
...@@ -18,20 +18,12 @@ import unittest ...@@ -18,20 +18,12 @@ import unittest
import time import time
import paddle import paddle
import paddle.incubate.multiprocessing as mp import paddle.incubate.multiprocessing as mp
from paddle.fluid.framework import (
_enable_legacy_dygraph,
_test_eager_guard,
in_dygraph_mode,
)
REPEAT = 20 REPEAT = 20
HAS_SHM_FILES = os.path.isdir('/dev/shm') HAS_SHM_FILES = os.path.isdir('/dev/shm')
def fill_tensor(queue, event): def fill_tensor(queue, event):
# make sure run in legacy dygraph
if in_dygraph_mode():
_enable_legacy_dygraph()
data = queue.get() data = queue.get()
with paddle.no_grad(): with paddle.no_grad():
data[0][:] = 5 data[0][:] = 5
...@@ -182,36 +174,24 @@ class TestMultiprocessingBase(unittest.TestCase): ...@@ -182,36 +174,24 @@ class TestMultiprocessingBase(unittest.TestCase):
class TestMultiprocessingCpu(TestMultiprocessingBase): class TestMultiprocessingCpu(TestMultiprocessingBase):
def func_test_pass_tensor(self): def func_test_pass_tensor(self):
if in_dygraph_mode():
return
paddle.set_device("cpu") paddle.set_device("cpu")
self._test_sharing(repeat=REPEAT) self._test_sharing(repeat=REPEAT)
def test_pass_tensor(self): def test_pass_tensor(self):
with _test_eager_guard():
self.func_test_pass_tensor()
self.func_test_pass_tensor() self.func_test_pass_tensor()
def func_test_pass_parambase(self): def func_test_pass_parambase(self):
if in_dygraph_mode():
return
paddle.set_device("cpu") paddle.set_device("cpu")
self._test_sharing(repeat=1, param=True) self._test_sharing(repeat=1, param=True)
def test_pass_parambase(self): def test_pass_parambase(self):
with _test_eager_guard():
self.func_test_pass_parambase()
self.func_test_pass_parambase() self.func_test_pass_parambase()
def func_test_pass_empty(self): def func_test_pass_empty(self):
if in_dygraph_mode():
return
paddle.set_device("cpu") paddle.set_device("cpu")
self._test_empty() self._test_empty()
def test_pass_empty(self): def test_pass_empty(self):
with _test_eager_guard():
self.func_test_pass_empty()
self.func_test_pass_empty() self.func_test_pass_empty()
...@@ -221,14 +201,10 @@ class TestMultiprocessingGpu(TestMultiprocessingBase): ...@@ -221,14 +201,10 @@ class TestMultiprocessingGpu(TestMultiprocessingBase):
"core is not compiled with CUDA", "core is not compiled with CUDA",
) )
def func_test_pass_tensor(self): def func_test_pass_tensor(self):
if in_dygraph_mode():
return
paddle.set_device("gpu") paddle.set_device("gpu")
self._test_sharing(mp.get_context("spawn"), "gpu") self._test_sharing(mp.get_context("spawn"), "gpu")
def test_pass_tensor(self): def test_pass_tensor(self):
with _test_eager_guard():
self.func_test_pass_tensor()
self.func_test_pass_tensor() self.func_test_pass_tensor()
......
...@@ -45,11 +45,11 @@ def _supported_check(): ...@@ -45,11 +45,11 @@ def _supported_check():
return True return True
class LRUSharedCache(OrderedDict): class _LRUSharedCache(OrderedDict):
def __init__(self): def __init__(self):
self.limit = 128 self.limit = 128
self._after_fork() self._after_fork()
register_after_fork(self, LRUSharedCache._after_fork) register_after_fork(self, _LRUSharedCache._after_fork)
def _after_fork(self): def _after_fork(self):
self.lock = threading.Lock() self.lock = threading.Lock()
...@@ -73,25 +73,25 @@ class LRUSharedCache(OrderedDict): ...@@ -73,25 +73,25 @@ class LRUSharedCache(OrderedDict):
super().__setitem__(key, value) super().__setitem__(key, value)
shared_cache = LRUSharedCache() shared_cache = _LRUSharedCache()
def cuda_from_cache(key): def _cuda_from_cache(key):
lodtensor = shared_cache.get(key) lodtensor = shared_cache.get(key)
if lodtensor is None: if lodtensor is None:
return None return None
return lodtensor return lodtensor
def rebuild_tensor(cls, lodtensor, metadata): def _rebuild_tensor(cls, lodtensor, metadata):
if cls == paddle.fluid.framework.ParamBase: if cls == paddle.fluid.framework.EagerParamBase:
tensor = paddle.fluid.framework.ParamBase( tensor = paddle.fluid.framework.EagerParamBase(
lodtensor.shape(), lodtensor._dtype(), **metadata lodtensor.shape(), lodtensor._dtype(), **metadata
) )
tensor.value().get_tensor()._share_data_with(lodtensor) tensor.value().get_tensor()._share_data_with(lodtensor)
else: else:
size, stop_gradient = metadata size, stop_gradient = metadata
tensor = paddle.fluid.core.VarBase() tensor = paddle.fluid.core.eager.Tensor()
if lodtensor._is_initialized(): if lodtensor._is_initialized():
tensor.value().get_tensor()._share_data_with(lodtensor) tensor.value().get_tensor()._share_data_with(lodtensor)
else: else:
...@@ -100,7 +100,7 @@ def rebuild_tensor(cls, lodtensor, metadata): ...@@ -100,7 +100,7 @@ def rebuild_tensor(cls, lodtensor, metadata):
return tensor return tensor
def reduce_tensor(tensor): def _reduce_tensor(tensor):
lodtensor = tensor.value().get_tensor() lodtensor = tensor.value().get_tensor()
if not tensor.stop_gradient and not tensor.is_leaf: if not tensor.stop_gradient and not tensor.is_leaf:
...@@ -113,12 +113,12 @@ def reduce_tensor(tensor): ...@@ -113,12 +113,12 @@ def reduce_tensor(tensor):
or tensor.place.is_gpu_place() or tensor.place.is_gpu_place()
or tensor.place.is_cuda_pinned_place() or tensor.place.is_cuda_pinned_place()
): ):
if type(tensor) == paddle.fluid.framework.ParamBase: if type(tensor) == paddle.fluid.framework.EagerParamBase:
metadata = copy.deepcopy(tensor.__dict__) metadata = copy.deepcopy(tensor.__dict__)
else: else:
metadata = (tensor.size, tensor.stop_gradient) metadata = (tensor.size, tensor.stop_gradient)
return (rebuild_tensor, (type(tensor), lodtensor, metadata)) return (_rebuild_tensor, (type(tensor), lodtensor, metadata))
else: else:
raise ValueError( raise ValueError(
"Only support tensors of CPU/CUDA/CUDAPinned Place, Not support %s for now!" "Only support tensors of CPU/CUDA/CUDAPinned Place, Not support %s for now!"
...@@ -126,16 +126,16 @@ def reduce_tensor(tensor): ...@@ -126,16 +126,16 @@ def reduce_tensor(tensor):
) )
def rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod): def _rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod):
lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod)) lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod))
lodtensor._shared_decref() lodtensor._shared_decref()
return lodtensor return lodtensor
def rebuild_cuda_tensor( def _rebuild_cuda_tensor(
cls, handle, offset_bytes, size, type_idx, dims, lod, device_idx cls, handle, offset_bytes, size, type_idx, dims, lod, device_idx
): ):
cache_tensor = cuda_from_cache((handle, offset_bytes)) cache_tensor = _cuda_from_cache((handle, offset_bytes))
if cache_tensor is None: if cache_tensor is None:
lodtensor = cls._new_shared_cuda( lodtensor = cls._new_shared_cuda(
(handle, offset_bytes, size, type_idx, dims, lod, device_idx) (handle, offset_bytes, size, type_idx, dims, lod, device_idx)
...@@ -155,13 +155,13 @@ def rebuild_cuda_tensor( ...@@ -155,13 +155,13 @@ def rebuild_cuda_tensor(
return lodtensor return lodtensor
def rebuild_lodtensor_empty(cls): def _rebuild_lodtensor_empty(cls):
# TODO: check if tensor initialized # TODO: check if tensor initialized
# TODO: handle the dtype of empty tensor # TODO: handle the dtype of empty tensor
return cls() return cls()
def reduce_lodtensor(lodtensor): def _reduce_lodtensor(lodtensor):
if ( if (
lodtensor._place().is_cpu_place() lodtensor._place().is_cpu_place()
or lodtensor._place().is_cuda_pinned_place() or lodtensor._place().is_cuda_pinned_place()
...@@ -169,19 +169,19 @@ def reduce_lodtensor(lodtensor): ...@@ -169,19 +169,19 @@ def reduce_lodtensor(lodtensor):
for dim in lodtensor.shape(): for dim in lodtensor.shape():
if dim == 0: if dim == 0:
# Empty tensors have nothing be mmapped. # Empty tensors have nothing be mmapped.
return (rebuild_lodtensor_empty, (type(lodtensor),)) return (_rebuild_lodtensor_empty, (type(lodtensor),))
# Default use share filename stratege # Default use share filename stratege
metadata = ( metadata = (
lodtensor._share_filename() lodtensor._share_filename()
) # ipc_name, size, type_idx, dims, lod ) # ipc_name, size, type_idx, dims, lod
rebuild = rebuild_lodtensor_filename rebuild = _rebuild_lodtensor_filename
lodtensor._shared_incref() lodtensor._shared_incref()
# TODO, maintain reference for lodtensor # TODO, maintain reference for lodtensor
# TODO: support file_discriptor stratege # TODO: support file_discriptor stratege
elif lodtensor._place().is_gpu_place(): elif lodtensor._place().is_gpu_place():
metadata = lodtensor._share_cuda() metadata = lodtensor._share_cuda()
rebuild = rebuild_cuda_tensor rebuild = _rebuild_cuda_tensor
else: else:
raise RuntimeError("We only support pass cpu/gpu lodtensor for now!") raise RuntimeError("We only support pass cpu/gpu lodtensor for now!")
...@@ -192,7 +192,9 @@ def init_reductions(): ...@@ -192,7 +192,9 @@ def init_reductions():
if not _supported_check(): if not _supported_check():
return return
ForkingPickler.register(paddle.Tensor, reduce_tensor) ForkingPickler.register(paddle.Tensor, _reduce_tensor)
ForkingPickler.register(paddle.fluid.core.VarBase, reduce_tensor) ForkingPickler.register(paddle.fluid.core.eager.Tensor, _reduce_tensor)
ForkingPickler.register(paddle.fluid.framework.ParamBase, reduce_tensor) ForkingPickler.register(
ForkingPickler.register(paddle.fluid.core.LoDTensor, reduce_lodtensor) paddle.fluid.framework.EagerParamBase, _reduce_tensor
)
ForkingPickler.register(paddle.fluid.core.LoDTensor, _reduce_lodtensor)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册