reductions.py 6.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import paddle

# TODO: check the hooks of tensor
# TODO: check serializing named tensor
# TODO: check influence on autograd
import sys
import warnings
import copy
import threading
from multiprocessing.util import register_after_fork
from multiprocessing.reduction import ForkingPickler

from collections import OrderedDict


def _supported_check():
    if sys.platform != "linux":
        # warnings.warn("`paddle.multiprocessing` only support linux for now, "
        #               " import this will not take any effect !")

        return False

    if not sys.version_info >= (3, 4):
        warnings.warn("Use `paddle.multiprocessing` to share paddle tensor "
                      "requires python version greater than 3.4 ."
                      " `paddle.multiprocessing` will not take any effect !!!")
        return False

    return True


class LRUSharedCache(OrderedDict):
47

48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
    def __init__(self):
        self.limit = 128
        self._after_fork()
        register_after_fork(self, LRUSharedCache._after_fork)

    def _after_fork(self):
        self.lock = threading.Lock()

    def get(self, key):
        with self.lock:
            try:
                value = super().pop(key)
                super().__setitem__(key, value)
                return value
            except KeyError:
                return None

    def __setitem__(self, key, value):
        with self.lock:
            try:
                super().__delitem__(key)
            except KeyError:
                if len(self) >= self.limit:
                    super().popitem(last=False)
            super().__setitem__(key, value)


shared_cache = LRUSharedCache()


def cuda_from_cache(key):
    lodtensor = shared_cache.get(key)
    if lodtensor is None:
        return None
    return lodtensor


def rebuild_tensor(cls, lodtensor, metadata):
    if cls == paddle.fluid.framework.ParamBase:
        tensor = paddle.fluid.framework.ParamBase(lodtensor.shape(),
                                                  lodtensor._dtype(),
                                                  **metadata)
        tensor.value().get_tensor()._share_data_with(lodtensor)
    else:
        size, stop_gradient = metadata
        tensor = paddle.fluid.core.VarBase()
        if lodtensor._is_initialized():
            tensor.value().get_tensor()._share_data_with(lodtensor)
        else:
            tensor = paddle.to_tensor([], dtype=lodtensor._dtype())
        tensor.stop_gradient = stop_gradient
    return tensor


def reduce_tensor(tensor):
    lodtensor = tensor.value().get_tensor()

    if not tensor.stop_gradient and not tensor.is_leaf:
        raise RuntimeError(
            "Refusing to serialize non-leaf tensor which not stop_gradient, you can detach it!"
        )
    # TODO: add serializing name and  hooks check
    if tensor.place.is_cpu_place() or tensor.place.is_gpu_place(
    ) or tensor.place.is_cuda_pinned_place():
        if type(tensor) == paddle.fluid.framework.ParamBase:
            metadata = copy.deepcopy(tensor.__dict__)
        else:
            metadata = (tensor.size, tensor.stop_gradient)

        return (rebuild_tensor, (type(tensor), lodtensor, metadata))
    else:
        raise ValueError(
            "Only support tensors of CPU/CUDA/CUDAPinned Place, Not support %s for now!"
            % tensor.place)


def rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod):
    lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod))
    lodtensor._shared_decref()
    return lodtensor


def rebuild_cuda_tensor(cls, handle, offset_bytes, size, type_idx, dims, lod,
                        device_idx):
    cache_tensor = cuda_from_cache((handle, offset_bytes))
    if cache_tensor is None:
        lodtensor = cls._new_shared_cuda(
            (handle, offset_bytes, size, type_idx, dims, lod, device_idx))
        # We only cache cuda shared tensor here.
        # The opening cost of cudaIpcMemoryHandle is very high.
        # Since we cache the recived tensor directly,
        # The sender may reallocate the tensor space,
        # you should manualy maintian the lifecycle of ipc tensor
        shared_cache[(handle, offset_bytes)] = lodtensor
    else:
        lodtensor = paddle.fluid.core.LoDTensor()
        lodtensor._share_buffer_with(cache_tensor,
                                     (size, type_idx, dims, lod, device_idx))

    return lodtensor


def rebuild_lodtensor_empty(cls):
    #TODO: check if tensor initialized
    #TODO: handle the dtype of empty tensor
    return cls()


def reduce_lodtensor(lodtensor):
    if lodtensor._place().is_cpu_place() or lodtensor._place(
    ).is_cuda_pinned_place():
        for dim in lodtensor.shape():
            if dim == 0:
                # Empty tensors have nothing be mmapped.
                return (rebuild_lodtensor_empty, (type(lodtensor), ))

        # Default use share filename stratege
        metadata = lodtensor._share_filename(
        )  # ipc_name, size, type_idx, dims, lod
        rebuild = rebuild_lodtensor_filename
        lodtensor._shared_incref()
        # TODO, maintain reference for lodtensor
        # TODO: support file_discriptor stratege
    elif lodtensor._place().is_gpu_place():
        metadata = lodtensor._share_cuda()
        rebuild = rebuild_cuda_tensor
    else:
        raise RuntimeError("We only support pass cpu/gpu lodtensor for now!")

    return (rebuild, (type(lodtensor), ) + metadata)


def init_reductions():
    if not _supported_check():
        return

    ForkingPickler.register(paddle.Tensor, reduce_tensor)
    ForkingPickler.register(paddle.fluid.core.VarBase, reduce_tensor)
    ForkingPickler.register(paddle.fluid.framework.ParamBase, reduce_tensor)
    ForkingPickler.register(paddle.fluid.core.LoDTensor, reduce_lodtensor)