reductions.py 6.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import paddle

# TODO: check the hooks of tensor
# TODO: check serializing named tensor
# TODO: check influence on autograd
import sys
import warnings
import copy
import threading
from multiprocessing.util import register_after_fork
from multiprocessing.reduction import ForkingPickler

from collections import OrderedDict


def _supported_check():
    if sys.platform != "linux":
        # warnings.warn("`paddle.multiprocessing` only support linux for now, "
        #               " import this will not take any effect !")

        return False

    if not sys.version_info >= (3, 4):
38 39 40 41 42
        warnings.warn(
            "Use `paddle.multiprocessing` to share paddle tensor "
            "requires python version greater than 3.4 ."
            " `paddle.multiprocessing` will not take any effect !!!"
        )
43 44 45 46 47
        return False

    return True


48
class _LRUSharedCache(OrderedDict):
49 50 51
    def __init__(self):
        self.limit = 128
        self._after_fork()
52
        register_after_fork(self, _LRUSharedCache._after_fork)
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75

    def _after_fork(self):
        self.lock = threading.Lock()

    def get(self, key):
        with self.lock:
            try:
                value = super().pop(key)
                super().__setitem__(key, value)
                return value
            except KeyError:
                return None

    def __setitem__(self, key, value):
        with self.lock:
            try:
                super().__delitem__(key)
            except KeyError:
                if len(self) >= self.limit:
                    super().popitem(last=False)
            super().__setitem__(key, value)


76
shared_cache = _LRUSharedCache()
77 78


79
def _cuda_from_cache(key):
80 81 82 83 84 85
    lodtensor = shared_cache.get(key)
    if lodtensor is None:
        return None
    return lodtensor


86 87 88
def _rebuild_tensor(cls, lodtensor, metadata):
    if cls == paddle.fluid.framework.EagerParamBase:
        tensor = paddle.fluid.framework.EagerParamBase(
89 90
            lodtensor.shape(), lodtensor._dtype(), **metadata
        )
91 92 93
        tensor.value().get_tensor()._share_data_with(lodtensor)
    else:
        size, stop_gradient = metadata
94
        tensor = paddle.fluid.core.eager.Tensor()
95 96 97 98 99 100 101 102
        if lodtensor._is_initialized():
            tensor.value().get_tensor()._share_data_with(lodtensor)
        else:
            tensor = paddle.to_tensor([], dtype=lodtensor._dtype())
        tensor.stop_gradient = stop_gradient
    return tensor


103
def _reduce_tensor(tensor):
104 105 106 107 108 109 110
    lodtensor = tensor.value().get_tensor()

    if not tensor.stop_gradient and not tensor.is_leaf:
        raise RuntimeError(
            "Refusing to serialize non-leaf tensor which not stop_gradient, you can detach it!"
        )
    # TODO: add serializing name and  hooks check
111 112 113 114 115
    if (
        tensor.place.is_cpu_place()
        or tensor.place.is_gpu_place()
        or tensor.place.is_cuda_pinned_place()
    ):
116
        if type(tensor) == paddle.fluid.framework.EagerParamBase:
117 118 119 120
            metadata = copy.deepcopy(tensor.__dict__)
        else:
            metadata = (tensor.size, tensor.stop_gradient)

121
        return (_rebuild_tensor, (type(tensor), lodtensor, metadata))
122 123 124
    else:
        raise ValueError(
            "Only support tensors of CPU/CUDA/CUDAPinned Place, Not support %s for now!"
125 126
            % tensor.place
        )
127 128


129
def _rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod):
130 131 132 133 134
    lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod))
    lodtensor._shared_decref()
    return lodtensor


135
def _rebuild_cuda_tensor(
136 137
    cls, handle, offset_bytes, size, type_idx, dims, lod, device_idx
):
138
    cache_tensor = _cuda_from_cache((handle, offset_bytes))
139 140
    if cache_tensor is None:
        lodtensor = cls._new_shared_cuda(
141 142
            (handle, offset_bytes, size, type_idx, dims, lod, device_idx)
        )
143 144 145 146 147 148 149 150
        # We only cache cuda shared tensor here.
        # The opening cost of cudaIpcMemoryHandle is very high.
        # Since we cache the recived tensor directly,
        # The sender may reallocate the tensor space,
        # you should manualy maintian the lifecycle of ipc tensor
        shared_cache[(handle, offset_bytes)] = lodtensor
    else:
        lodtensor = paddle.fluid.core.LoDTensor()
151 152 153
        lodtensor._share_buffer_with(
            cache_tensor, (size, type_idx, dims, lod, device_idx)
        )
154 155 156 157

    return lodtensor


158
def _rebuild_lodtensor_empty(cls):
159 160
    # TODO: check if tensor initialized
    # TODO: handle the dtype of empty tensor
161 162 163
    return cls()


164
def _reduce_lodtensor(lodtensor):
165 166 167 168
    if (
        lodtensor._place().is_cpu_place()
        or lodtensor._place().is_cuda_pinned_place()
    ):
169 170 171
        for dim in lodtensor.shape():
            if dim == 0:
                # Empty tensors have nothing be mmapped.
172
                return (_rebuild_lodtensor_empty, (type(lodtensor),))
173 174

        # Default use share filename stratege
175 176
        metadata = (
            lodtensor._share_filename()
177
        )  # ipc_name, size, type_idx, dims, lod
178
        rebuild = _rebuild_lodtensor_filename
179 180 181 182 183
        lodtensor._shared_incref()
        # TODO, maintain reference for lodtensor
        # TODO: support file_discriptor stratege
    elif lodtensor._place().is_gpu_place():
        metadata = lodtensor._share_cuda()
184
        rebuild = _rebuild_cuda_tensor
185 186 187
    else:
        raise RuntimeError("We only support pass cpu/gpu lodtensor for now!")

188
    return (rebuild, (type(lodtensor),) + metadata)
189 190 191 192 193 194


def init_reductions():
    if not _supported_check():
        return

195 196 197 198 199 200
    ForkingPickler.register(paddle.Tensor, _reduce_tensor)
    ForkingPickler.register(paddle.fluid.core.eager.Tensor, _reduce_tensor)
    ForkingPickler.register(
        paddle.fluid.framework.EagerParamBase, _reduce_tensor
    )
    ForkingPickler.register(paddle.fluid.core.LoDTensor, _reduce_lodtensor)