From 8a7e54d5891e7751909db425a8dde174c2d05c0a Mon Sep 17 00:00:00 2001 From: wanghuancoder Date: Mon, 7 Nov 2022 15:00:02 +0800 Subject: [PATCH] [Eager] eager tensor support pickler (#47025) * test_paddle_multiprocessing support eager tensor pickler --- .../unittests/test_paddle_multiprocessing.py | 24 ---------- .../incubate/multiprocessing/reductions.py | 48 ++++++++++--------- 2 files changed, 25 insertions(+), 47 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py b/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py index a6e3bf827d..4ddc3f3120 100644 --- a/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py +++ b/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py @@ -18,20 +18,12 @@ import unittest import time import paddle import paddle.incubate.multiprocessing as mp -from paddle.fluid.framework import ( - _enable_legacy_dygraph, - _test_eager_guard, - in_dygraph_mode, -) REPEAT = 20 HAS_SHM_FILES = os.path.isdir('/dev/shm') def fill_tensor(queue, event): - # make sure run in legacy dygraph - if in_dygraph_mode(): - _enable_legacy_dygraph() data = queue.get() with paddle.no_grad(): data[0][:] = 5 @@ -182,36 +174,24 @@ class TestMultiprocessingBase(unittest.TestCase): class TestMultiprocessingCpu(TestMultiprocessingBase): def func_test_pass_tensor(self): - if in_dygraph_mode(): - return paddle.set_device("cpu") self._test_sharing(repeat=REPEAT) def test_pass_tensor(self): - with _test_eager_guard(): - self.func_test_pass_tensor() self.func_test_pass_tensor() def func_test_pass_parambase(self): - if in_dygraph_mode(): - return paddle.set_device("cpu") self._test_sharing(repeat=1, param=True) def test_pass_parambase(self): - with _test_eager_guard(): - self.func_test_pass_parambase() self.func_test_pass_parambase() def func_test_pass_empty(self): - if in_dygraph_mode(): - return paddle.set_device("cpu") self._test_empty() def test_pass_empty(self): - with _test_eager_guard(): - self.func_test_pass_empty() self.func_test_pass_empty() @@ -221,14 +201,10 @@ class TestMultiprocessingGpu(TestMultiprocessingBase): "core is not compiled with CUDA", ) def func_test_pass_tensor(self): - if in_dygraph_mode(): - return paddle.set_device("gpu") self._test_sharing(mp.get_context("spawn"), "gpu") def test_pass_tensor(self): - with _test_eager_guard(): - self.func_test_pass_tensor() self.func_test_pass_tensor() diff --git a/python/paddle/incubate/multiprocessing/reductions.py b/python/paddle/incubate/multiprocessing/reductions.py index b16361971e..e60d90a9d6 100644 --- a/python/paddle/incubate/multiprocessing/reductions.py +++ b/python/paddle/incubate/multiprocessing/reductions.py @@ -45,11 +45,11 @@ def _supported_check(): return True -class LRUSharedCache(OrderedDict): +class _LRUSharedCache(OrderedDict): def __init__(self): self.limit = 128 self._after_fork() - register_after_fork(self, LRUSharedCache._after_fork) + register_after_fork(self, _LRUSharedCache._after_fork) def _after_fork(self): self.lock = threading.Lock() @@ -73,25 +73,25 @@ class LRUSharedCache(OrderedDict): super().__setitem__(key, value) -shared_cache = LRUSharedCache() +shared_cache = _LRUSharedCache() -def cuda_from_cache(key): +def _cuda_from_cache(key): lodtensor = shared_cache.get(key) if lodtensor is None: return None return lodtensor -def rebuild_tensor(cls, lodtensor, metadata): - if cls == paddle.fluid.framework.ParamBase: - tensor = paddle.fluid.framework.ParamBase( +def _rebuild_tensor(cls, lodtensor, metadata): + if cls == paddle.fluid.framework.EagerParamBase: + tensor = paddle.fluid.framework.EagerParamBase( lodtensor.shape(), lodtensor._dtype(), **metadata ) tensor.value().get_tensor()._share_data_with(lodtensor) else: size, stop_gradient = metadata - tensor = paddle.fluid.core.VarBase() + tensor = paddle.fluid.core.eager.Tensor() if lodtensor._is_initialized(): tensor.value().get_tensor()._share_data_with(lodtensor) else: @@ -100,7 +100,7 @@ def rebuild_tensor(cls, lodtensor, metadata): return tensor -def reduce_tensor(tensor): +def _reduce_tensor(tensor): lodtensor = tensor.value().get_tensor() if not tensor.stop_gradient and not tensor.is_leaf: @@ -113,12 +113,12 @@ def reduce_tensor(tensor): or tensor.place.is_gpu_place() or tensor.place.is_cuda_pinned_place() ): - if type(tensor) == paddle.fluid.framework.ParamBase: + if type(tensor) == paddle.fluid.framework.EagerParamBase: metadata = copy.deepcopy(tensor.__dict__) else: metadata = (tensor.size, tensor.stop_gradient) - return (rebuild_tensor, (type(tensor), lodtensor, metadata)) + return (_rebuild_tensor, (type(tensor), lodtensor, metadata)) else: raise ValueError( "Only support tensors of CPU/CUDA/CUDAPinned Place, Not support %s for now!" @@ -126,16 +126,16 @@ def reduce_tensor(tensor): ) -def rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod): +def _rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod): lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod)) lodtensor._shared_decref() return lodtensor -def rebuild_cuda_tensor( +def _rebuild_cuda_tensor( cls, handle, offset_bytes, size, type_idx, dims, lod, device_idx ): - cache_tensor = cuda_from_cache((handle, offset_bytes)) + cache_tensor = _cuda_from_cache((handle, offset_bytes)) if cache_tensor is None: lodtensor = cls._new_shared_cuda( (handle, offset_bytes, size, type_idx, dims, lod, device_idx) @@ -155,13 +155,13 @@ def rebuild_cuda_tensor( return lodtensor -def rebuild_lodtensor_empty(cls): +def _rebuild_lodtensor_empty(cls): # TODO: check if tensor initialized # TODO: handle the dtype of empty tensor return cls() -def reduce_lodtensor(lodtensor): +def _reduce_lodtensor(lodtensor): if ( lodtensor._place().is_cpu_place() or lodtensor._place().is_cuda_pinned_place() @@ -169,19 +169,19 @@ def reduce_lodtensor(lodtensor): for dim in lodtensor.shape(): if dim == 0: # Empty tensors have nothing be mmapped. - return (rebuild_lodtensor_empty, (type(lodtensor),)) + return (_rebuild_lodtensor_empty, (type(lodtensor),)) # Default use share filename stratege metadata = ( lodtensor._share_filename() ) # ipc_name, size, type_idx, dims, lod - rebuild = rebuild_lodtensor_filename + rebuild = _rebuild_lodtensor_filename lodtensor._shared_incref() # TODO, maintain reference for lodtensor # TODO: support file_discriptor stratege elif lodtensor._place().is_gpu_place(): metadata = lodtensor._share_cuda() - rebuild = rebuild_cuda_tensor + rebuild = _rebuild_cuda_tensor else: raise RuntimeError("We only support pass cpu/gpu lodtensor for now!") @@ -192,7 +192,9 @@ def init_reductions(): if not _supported_check(): return - ForkingPickler.register(paddle.Tensor, reduce_tensor) - ForkingPickler.register(paddle.fluid.core.VarBase, reduce_tensor) - ForkingPickler.register(paddle.fluid.framework.ParamBase, reduce_tensor) - ForkingPickler.register(paddle.fluid.core.LoDTensor, reduce_lodtensor) + ForkingPickler.register(paddle.Tensor, _reduce_tensor) + ForkingPickler.register(paddle.fluid.core.eager.Tensor, _reduce_tensor) + ForkingPickler.register( + paddle.fluid.framework.EagerParamBase, _reduce_tensor + ) + ForkingPickler.register(paddle.fluid.core.LoDTensor, _reduce_lodtensor) -- GitLab