test_async_read_write.py 4.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import numpy as np

import paddle
from paddle.fluid import core
from paddle.device import cuda


class TestAsyncRead(unittest.TestCase):
    def setUp(self):
        self.empty = paddle.to_tensor(
            np.array(
                [], dtype="int64"), place=paddle.CPUPlace())
        data = np.random.randn(100, 50, 50).astype("float32")
        self.src = paddle.to_tensor(data, place=paddle.CUDAPinnedPlace())
        self.dst = paddle.empty(shape=[100, 50, 50], dtype="float32")
        self.index = paddle.to_tensor(
            np.array(
                [1, 3, 5, 7, 9], dtype="int64")).cpu()
        self.buffer = paddle.empty(
            shape=[50, 50, 50], dtype="float32").pin_memory()
        self.stream = cuda.Stream()

    def test_async_read_empty_offset_and_count(self):
        with cuda.stream_guard(self.stream):
            core.async_read(self.src, self.dst, self.index, self.buffer,
                            self.empty, self.empty)
        array1 = paddle.gather(self.src, self.index)
        array2 = self.dst[:len(self.index)]

        self.assertTrue(np.allclose(array1.numpy(), array2.numpy()))

    def test_async_read_success(self):
        offset = paddle.to_tensor(
            np.array(
                [10, 20], dtype="int64"), place=paddle.CPUPlace())
        count = paddle.to_tensor(
            np.array(
                [5, 10], dtype="int64"), place=paddle.CPUPlace())
        with cuda.stream_guard(self.stream):
            core.async_read(self.src, self.dst, self.index, self.buffer, offset,
                            count)

        # index data
        index_array1 = paddle.gather(self.src, self.index)
        count_numel = paddle.sum(count).numpy()[0]
        index_array2 = self.dst[count_numel:count_numel + len(self.index)]
        self.assertTrue(np.allclose(index_array1.numpy(), index_array2.numpy()))

        # offset, count
        offset_a = paddle.gather(self.src, paddle.to_tensor(np.arange(10, 15)))
        offset_b = paddle.gather(self.src, paddle.to_tensor(np.arange(20, 30)))
        offset_array1 = paddle.concat([offset_a, offset_b], axis=0)
        offset_array2 = self.dst[:count_numel]
        self.assertTrue(
            np.allclose(offset_array1.numpy(), offset_array2.numpy()))

    def test_async_read_only_1dim(self):
        src = paddle.rand([40], dtype="float32").pin_memory()
        dst = paddle.empty([40], dtype="float32")
        buffer_ = paddle.empty([20]).pin_memory()
        with cuda.stream_guard(self.stream):
            core.async_read(src, dst, self.index, buffer_, self.empty,
                            self.empty)
        array1 = paddle.gather(src, self.index)
        array2 = dst[:len(self.index)]
        self.assertTrue(np.allclose(array1.numpy(), array2.numpy()))


class TestAsyncWrite(unittest.TestCase):
    def setUp(self):
        self.src = paddle.rand(shape=[100, 50, 50, 5], dtype="float32")
        self.dst = paddle.empty(
            shape=[200, 50, 50, 5], dtype="float32").pin_memory()
        self.stream = cuda.Stream()

    def test_async_write_success(self):
        offset = paddle.to_tensor(
            np.array(
                [0, 60], dtype="int64"), place=paddle.CPUPlace())
        count = paddle.to_tensor(
            np.array(
                [40, 60], dtype="int64"), place=paddle.CPUPlace())
        with cuda.stream_guard(self.stream):
            core.async_write(self.src, self.dst, offset, count)

        offset_a = paddle.gather(self.dst, paddle.to_tensor(np.arange(0, 40)))
        offset_b = paddle.gather(self.dst, paddle.to_tensor(np.arange(60, 120)))
        offset_array = paddle.concat([offset_a, offset_b], axis=0)
        self.assertTrue(np.allclose(self.src.numpy(), offset_array.numpy()))


if __name__ == "__main__":
    if core.is_compiled_with_cuda():
        unittest.main()