提交 f7f122c3 编写于 作者: S sandyhouse

add ut for sendrec, test=develop

上级 f4456ffe
...@@ -57,6 +57,7 @@ if(NOT WITH_GPU OR WIN32) ...@@ -57,6 +57,7 @@ if(NOT WITH_GPU OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_allreduce) LIST(REMOVE_ITEM TEST_OPS test_allreduce)
LIST(REMOVE_ITEM TEST_OPS test_broadcast) LIST(REMOVE_ITEM TEST_OPS test_broadcast)
LIST(REMOVE_ITEM TEST_OPS test_collective_reduce) LIST(REMOVE_ITEM TEST_OPS test_collective_reduce)
LIST(REMOVE_ITEM TEST_OPS test_collective_sendrecv)
LIST(REMOVE_ITEM TEST_OPS test_collective_scatter) LIST(REMOVE_ITEM TEST_OPS test_collective_scatter)
LIST(REMOVE_ITEM TEST_OPS test_collective_reduce_api) LIST(REMOVE_ITEM TEST_OPS test_collective_reduce_api)
LIST(REMOVE_ITEM TEST_OPS test_collective_scatter_api) LIST(REMOVE_ITEM TEST_OPS test_collective_scatter_api)
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
from six import string_types
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_base import TestCollectiveRunnerBase, runtime_main
class TestCollectiveScatter(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank=None):
ring_id = 0
with fluid.program_guard(main_prog, startup_program):
tindata = layers.data(
name="tindata", shape=[10, 1000], dtype='float32')
if rank == 0:
main_prog.global_block().append_op(
type="c_recv",
inputs={'Out': tindata},
attrs={'ring_id': ring_id,
'peer': 1})
else:
main_prog.global_block().append_op(
type="c_send",
inputs={'X': tindata},
attrs={'ring_id': ring_id,
'peer': 0})
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id})
return tindata
if __name__ == "__main__":
runtime_main(TestCollectiveScatter, "scatter", 0)
...@@ -103,7 +103,7 @@ class TestCollectiveRunnerBase(object): ...@@ -103,7 +103,7 @@ class TestCollectiveRunnerBase(object):
nranks = 2 nranks = 2
self.initCommunicator(startup_prog, rank, nranks, True, self.initCommunicator(startup_prog, rank, nranks, True,
current_endpoint, endpoints) current_endpoint, endpoints)
result = self.get_model(train_prog, startup_prog) result = self.get_model(train_prog, startup_prog, rank)
device_id = int(os.getenv("FLAGS_selected_gpus", "0")) device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace( place = fluid.CUDAPlace(
device_id) #if args.use_gpu else fluid.CPUPlace() device_id) #if args.use_gpu else fluid.CPUPlace()
...@@ -258,6 +258,9 @@ class TestDistBase(unittest.TestCase): ...@@ -258,6 +258,9 @@ class TestDistBase(unittest.TestCase):
self.assertTrue( self.assertTrue(
np.allclose( np.allclose(
tr1_out, need_result, rtol=1e-05, atol=1e-05)) tr1_out, need_result, rtol=1e-05, atol=1e-05))
elif col_type == "sendrecv":
need_result = input2
self.assertTrue(np.allclose(tr0_out, need_result))
elif col_type == "reduce_scatter": elif col_type == "reduce_scatter":
tmp = input1 + input2 tmp = input1 + input2
need_result1 = tmp[0:tmp.shape[0] // 2] need_result1 = tmp[0:tmp.shape[0] // 2]
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
from test_collective_base import TestDistBase
class TestCScatterOp(TestDistBase):
def _setup_config(self):
pass
def test_sendrecv(self):
self.check_with_place("collective_sendrecv_op.py", "sendrecv")
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册