test_sparse_matmul_op.py 4.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import paddle
from paddle.fluid.framework import _test_eager_guard

import numpy as np
import scipy
import scipy.sparse as sp
import unittest
import os
import re

np.random.seed(2022)


def get_cuda_version():
    result = os.popen("nvcc --version").read()
    regex = r'release (\S+),'
    match = re.search(regex, result)
    if match:
        num = str(match.group(1))
        integer, decimal = num.split('.')
        return int(integer) * 1000 + int(float(decimal) * 10)
    else:
        return -1


@unittest.skipIf(
    not paddle.is_compiled_with_cuda() or get_cuda_version() < 11000,
    "paddle is not compiled with CUDA and cuda version need to >= 11.0")
class TestCsrDenseMatmul2D(unittest.TestCase):
    # x: csr, y: dense, out: dense
    def test_matmul(self):
        with _test_eager_guard():
            mask = np.random.rand(10, 12) < 0.2
            np_x = np.random.rand(10, 12) * mask

            np_csr = sp.csr_matrix(np_x)
            np_dense = np.random.rand(12, 6)
            np_out = np_csr @ np_dense

            np_out_grad = np.ones([10, 6])

            # dx(csr) = dout(dense) * y'(dense) * mask
            np_csr_grad = sp.csr_matrix(
                np.matmul(np_out_grad, np_dense.transpose(1, 0)) * mask)
            # dy(dense) = x'(csr) * dout(dense)
            np_dense_grad = np_csr.transpose() @ np_out_grad

            csr = paddle.to_tensor(np_x, stop_gradient=False).to_sparse_csr()
            dense = paddle.to_tensor(np_dense, stop_gradient=False)
            out = paddle.incubate.sparse.matmul(csr, dense)

            self.assertTrue(np.allclose(np_out, out.numpy()))

            if get_cuda_version() >= 11030:
                out.backward()
                self.assertTrue(
                    np.allclose(np_csr_grad.indptr,
                                csr.grad.crows().numpy()))
                self.assertTrue(
                    np.allclose(np_csr_grad.indices,
                                csr.grad.cols().numpy()))
                self.assertTrue(
                    np.allclose(np_csr_grad.data,
                                csr.grad.values().numpy()))

                self.assertTrue(np.allclose(np_dense_grad, dense.grad.numpy()))


@unittest.skipIf(
    not paddle.is_compiled_with_cuda() or get_cuda_version() < 11030,
    "paddle is not compiled with CUDA and cuda version need to >= 11.3")
class TestCsrMaskedMatmul2D(unittest.TestCase):
    # x: dense, y: dense, out: csr
    def test_matmul(self):
        with _test_eager_guard():
            np_mask = np.random.rand(10, 6) < 0.2

            np_x = np.random.rand(10, 12)
            np_y = np.random.rand(12, 6)
            np_out = sp.csr_matrix(np.matmul(np_x, np_y) * np_mask)

            np_out_grad = sp.csr_matrix(np.ones([10, 6]) * np_mask)
            # dx(dense) = dout(csr) * y'(dense)
            np_x_grad = np_out_grad @ np_y.transpose(1, 0)
            # dy(dense) = x'(dense) * dout(csr) -> dy'(dense) = dout'(csr) * x(dense)
            np_y_grad = (np_out_grad.transpose() @ np_x).transpose(1, 0)

            x = paddle.to_tensor(np_x, stop_gradient=False)
            y = paddle.to_tensor(np_y, stop_gradient=False)
            mask = paddle.to_tensor(np.ones([10, 6]) * np_mask).to_sparse_csr()
            out = paddle.incubate.sparse.masked_matmul(x, y, mask)

            self.assertTrue(np.allclose(np_out.indptr, out.crows().numpy()))
            self.assertTrue(np.allclose(np_out.indices, out.cols().numpy()))
            self.assertTrue(np.allclose(np_out.data, out.values().numpy()))

            out.backward()
            self.assertTrue(np.allclose(out.is_sparse_csr(), True))
            self.assertTrue(np.allclose(np_x_grad, x.grad.numpy()))
            self.assertTrue(np.allclose(np_y_grad, y.grad.numpy()))


#TODO(zhouwei25): support unit test of batch 'paddle.sparse.mm/masked_mm'

if __name__ == "__main__":
    unittest.main()