test_nccl_ops.py 2.4 KB
Newer Older
D
Dong Zhihong 已提交
1 2 3 4 5
import unittest, os
import numpy as np
import paddle.v2 as paddle
from paddle.v2.framework.op import Operator
import paddle.v2.framework.core as core
D
Dong Zhihong 已提交
6
from op_test import OpTest, create_op, set_input
D
Dong Zhihong 已提交
7

D
Dong Zhihong 已提交
8 9
# gpu_list = os.environ["NV_LIST"]
gpu_list = "0,1,2,3"
D
Dong Zhihong 已提交
10 11 12 13 14

if not core.is_compile_gpu() or not gpu_list:
    exit(0)


D
Dong Zhihong 已提交
15 16
def allreduce(tensors, gpus):
    num_device = len(gpus)
D
Dong Zhihong 已提交
17 18 19 20 21 22 23 24 25 26 27
    assert (len(tensors) == num_device), "not match of tensor and device"
    Out = tensors
    for i in range(1, len(tensors)):
        Out[0] += Out[i]

    for i in range(1, len(tensors)):
        Out[i] = Out[0]

    return Out


D
Dong Zhihong 已提交
28
class TestNCCLAllReduce(unittest.TestCase):
D
Dong Zhihong 已提交
29
    def setUp(self):
D
Dong Zhihong 已提交
30

D
Dong Zhihong 已提交
31
        self.op_type = "ncclAllReduce"
D
Dong Zhihong 已提交
32

D
Dong Zhihong 已提交
33 34 35 36
        self.gpus = [int(g) for g in gpu_list.split(",")]

        self.g_scope = core.Scope()
        self.g_ctx = core.DeviceContext.create(core.CPUPlace())
D
Dong Zhihong 已提交
37 38 39 40 41
        self.scopes = []
        self.ops = []
        self.places = []

        self.input_data = []
D
Dong Zhihong 已提交
42

D
Dong Zhihong 已提交
43
        for i in range(len(self.gpus)):
D
Dong Zhihong 已提交
44 45 46 47 48
            self.input_data.append(np.random.random((32, 32)))
        self.output_data = allreduce(self.input_data, self.gpus)

        nccl_init = Operator("ncclInit", Out="Communicator", gpus=self.gpus)
        op.run(self.g_scope, self.g_ctx)
D
Dong Zhihong 已提交
49 50

        for i in range(len(self.gpus)):
D
Dong Zhihong 已提交
51 52
            # insert kid scope
            scope = self.g_scope.new_scope()
D
Dong Zhihong 已提交
53
            place = core.GPUPlace(self.gpus[i])
D
Dong Zhihong 已提交
54

D
Dong Zhihong 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
            inputs = {"X": self.input_data[i]}
            outputs = {"Out": self.output_data[i]}
            attrs = {"gpus": self.gpus}

            op = create_op(scope, self.op_type, inputs, outputs, attrs)
            set_input(scope, op, inputs, place)

            self.scopes.append(scope)
            self.ops.append(op)
            self.places.append(place)

    def test_output(self):
        idx = 0
        for scope, place, op in zip(self.scopes, self.places, self.ops):
            ctx = core.DeviceContext.create(place)
            op.run(scope, ctx)

        for out_name, out_dup in Operator.get_op_outputs(self.op.type()):
            actual = np.array(scope.find_var(out_name).get_tensor())
            expect = self.output_data[idx]

            idx += 1
            self.assertTrue(actual, expect), "has diff"


D
Dong Zhihong 已提交
80 81 82 83 84
# if __name__ == "__main__":
#     unittest.main()
# usage : export NV_LIST=0,1,2,3 python *.py

# os.environ["NV_LIST"] = ["0,1,2,3"]
D
Dong Zhihong 已提交
85

D
Dong Zhihong 已提交
86
if __name__ == "__main__":
D
Dong Zhihong 已提交
87
    unittest.main()