test_collective_base.py 12.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pickle
17
import socket
18 19
import subprocess
import sys
20
import tempfile
21 22
import time
import unittest
23
from contextlib import closing
24 25 26

import numpy as np

27
import paddle.fluid.unique_name as nameGen
28
from paddle import fluid
29 30 31
from paddle.fluid import core


32
class TestCollectiveRunnerBase:
33 34
    def get_model(self, train_prog, startup_prog):
        raise NotImplementedError(
35 36
            "get model should be implemented by child class."
        )
37 38 39 40 41 42 43

    def wait_server_ready(self, endpoints):
        while True:
            all_ok = True
            not_ready_endpoints = []
            for ep in endpoints:
                ip_port = ep.split(":")
44 45 46
                with closing(
                    socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                ) as sock:
47
                    sock.settimeout(2)
48 49
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                    if hasattr(socket, 'SO_REUSEPORT'):
50 51 52
                        sock.setsockopt(
                            socket.SOL_SOCKET, socket.SO_REUSEPORT, 1
                        )
53

54 55 56 57 58 59
                    result = sock.connect_ex((ip_port[0], int(ip_port[1])))
                    if result != 0:
                        all_ok = False
                        not_ready_endpoints.append(ep)
            if not all_ok:
                sys.stderr.write("server not ready, wait 3 sec to retry...\n")
60 61 62
                sys.stderr.write(
                    "not ready endpoints:" + str(not_ready_endpoints) + "\n"
                )
63 64 65 66 67
                sys.stderr.flush()
                time.sleep(3)
            else:
                break

68
    # endpoints should be ["ip1:port1","ip2:port2"]
69

70 71 72
    def initCommunicator(
        self, program, rank, nranks, wait_port, current_endpoint, endpoints
    ):
73 74 75 76 77
        other_endpoints = endpoints[:]
        other_endpoints.remove(current_endpoint)
        if rank == 0 and wait_port:
            self.wait_server_ready(other_endpoints)
        block = program.global_block()
78 79 80 81 82
        nccl_id_var = block.create_var(
            name=nameGen.generate('nccl_id'),
            persistable=True,
            type=core.VarDesc.VarType.RAW,
        )
83

84 85 86 87 88 89 90 91 92 93
        block.append_op(
            type='c_gen_nccl_id',
            inputs={},
            outputs={'Out': nccl_id_var},
            attrs={
                'rank': rank,
                'endpoint': current_endpoint,
                'other_endpoints': other_endpoints,
            },
        )
94

95 96 97 98 99 100 101 102 103 104
        block.append_op(
            type='c_comm_init',
            inputs={'X': nccl_id_var},
            outputs={},
            attrs={
                'nranks': nranks,
                'rank': rank,
                'ring_id': self.global_ring_id,
            },
        )
105 106 107 108 109 110 111 112

    def run_trainer(self, args):
        train_prog = fluid.Program()
        startup_prog = fluid.Program()
        endpoints = args["endpoints"].split(",")
        rank = args["trainerid"]
        current_endpoint = args["currentendpoint"]
        nranks = 2
113 114 115
        self.initCommunicator(
            startup_prog, rank, nranks, True, current_endpoint, endpoints
        )
L
lilong12 已提交
116
        self.rank = rank
117 118 119
        result = self.get_model(train_prog, startup_prog)
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(
120 121
            device_id
        )  # if args.use_gpu else fluid.CPUPlace()
122 123 124 125
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        np.random.seed(os.getpid())
        indata = np.random.random((10, 1000))
126 127 128
        out = exe.run(
            train_prog, feed={'tindata': indata}, fetch_list=[result.name]
        )
T
tianshuo78520a 已提交
129
        sys.stdout.buffer.write(pickle.dumps(out))
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147


def runtime_main(test_class, col_type, sub_type):
    args = {}
    model = test_class()
    args["deviceid"] = os.getenv("FLAGS_selected_gpus")
    args["trainerid"] = int(os.getenv("PADDLE_TRAINER_ID"))
    args["trainernum"] = int(os.getenv("PADDLE_TRAINERS_NUM"))
    args["endpoints"] = os.getenv('PADDLE_TRAINER_ENDPOINTS')
    args["currentendpoint"] = os.getenv("PADDLE_CURRENT_ENDPOINT")
    args["col_type"] = col_type
    model.run_trainer(args)


class TestDistBase(unittest.TestCase):
    def setUp(self):
        self._port_set = set()
        self._trainers = 2
148
        self._ps_endpoints = "127.0.0.1:{},127.0.0.1:{}".format(
149 150 151
            self._find_free_port(),
            self._find_free_port(),
        )
152 153
        self._python_interp = sys.executable

154 155 156 157 158
        self.temp_dir = tempfile.TemporaryDirectory()

    def tearDown(self):
        self.temp_dir.cleanup()

159 160
    def _find_free_port(self):
        def __free_port():
161 162 163
            with closing(
                socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            ) as s:
164 165 166 167 168 169 170 171 172 173 174 175
                s.bind(('', 0))
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port

    def _run_cluster(self, model_file, envs):
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
176
        # print("w0_ep:",w0_ep," w1_ep:",w1_ep)
177
        env0 = {
178
            "FLAGS_selected_gpus": "0",
179 180 181
            "PADDLE_TRAINER_ID": "0",
            "PADDLE_TRAINERS_NUM": "2",
            "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
182
            "PADDLE_CURRENT_ENDPOINT": w0_ep,
183 184 185
        }

        env1 = {
186
            "FLAGS_selected_gpus": "1",
187 188 189
            "PADDLE_TRAINER_ID": "1",
            "PADDLE_TRAINERS_NUM": "2",
            "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
190
            "PADDLE_CURRENT_ENDPOINT": w1_ep,
191
        }
192
        # update environment
193 194 195 196 197
        env0.update(envs)
        env1.update(envs)
        tr_cmd = "%s %s"
        tr0_cmd = tr_cmd % (self._python_interp, model_file)
        tr1_cmd = tr_cmd % (self._python_interp, model_file)
198 199 200 201
        path0 = os.path.join(self.temp_dir.name, "/tmp/tr0_err.log")
        path1 = os.path.join(self.temp_dir.name, "/tmp/tr1_err.log")
        tr0_pipe = open(path0, "wb")
        tr1_pipe = open(path1, "wb")
202 203 204 205 206 207 208
        # print(tr0_cmd)
        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0,
        )
209

210 211 212 213 214 215
        tr1_proc = subprocess.Popen(
            tr0_cmd.strip().split(),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1,
        )
216 217 218 219 220 221 222 223

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)
        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()
224 225 226 227 228 229
        return (
            pickle.loads(tr0_out),
            pickle.loads(tr1_out),
            tr0_proc.pid,
            tr1_proc.pid,
        )
230

231 232 233
    def check_with_place(
        self, model_file, col_type, check_error_log=False, need_envs={}
    ):
234 235 236 237 238 239 240
        required_envs = {
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
            "FLAGS_eager_delete_tensor_gb": "0.0",
            "PATH": os.getenv("PATH"),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "LD_PRELOAD": os.getenv("LD_PRELOAD", ""),
241
            "GLOG_v": "3",
242
            "NCCL_P2P_DISABLE": "1",
243 244 245 246 247
        }
        required_envs.update(need_envs)
        if check_error_log:
            required_envs["GLOG_v"] = "3"
            required_envs["GLOG_logtostderr"] = "1"
248
        tr0_out, tr1_out, pid0, pid1 = self._run_cluster(
249 250
            model_file, required_envs
        )
251 252 253 254 255 256
        np.random.seed(pid0)
        input1 = np.random.random((10, 1000))
        np.random.seed(pid1)
        input2 = np.random.random((10, 1000))
        if col_type == "allgather":
            need_result = np.vstack((input1, input2))
257 258
            np.testing.assert_allclose(tr0_out[0], need_result, rtol=1e-05)
            np.testing.assert_allclose(tr1_out[0], need_result, rtol=1e-05)
259 260
        elif col_type == "broadcast":
            need_result = input2
261 262
            np.testing.assert_allclose(tr0_out[0], need_result, rtol=1e-05)
            np.testing.assert_allclose(tr1_out[0], need_result, rtol=1e-05)
L
lilong12 已提交
263 264
        elif col_type == "reduce":
            need_result = input1 + input2
265
            np.testing.assert_allclose(tr1_out[0], need_result, rtol=1e-05)
L
lilong12 已提交
266 267
        elif col_type == "scatter":
            need_result = input2
268 269
            need_result1 = need_result[0 : need_result.shape[0] // 2]
            need_result2 = need_result[need_result.shape[0] // 2 :]
270 271
            np.testing.assert_allclose(tr0_out[0], need_result1, rtol=1e-05)
            np.testing.assert_allclose(tr1_out[0], need_result2, rtol=1e-05)
272 273
        elif col_type == "allreduce":
            need_result = input1 + input2
274 275 276 277 278 279
            np.testing.assert_allclose(
                tr0_out[0], need_result, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0], need_result, rtol=1e-05, atol=1e-05
            )
280 281
        elif col_type == "reduce_scatter":
            tmp = input1 + input2
282 283 284 285 286 287 288 289
            need_result1 = tmp[0 : tmp.shape[0] // 2]
            need_result2 = tmp[tmp.shape[0] // 2 :]
            np.testing.assert_allclose(
                tr0_out[0], need_result1, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0], need_result2, rtol=1e-05, atol=1e-05
            )
L
lilong12 已提交
290 291
        elif col_type == "sendrecv":
            need_result = input1
292 293 294
            np.testing.assert_allclose(
                tr1_out[0], need_result, rtol=1e-05, atol=1e-05
            )
L
lilong12 已提交
295 296 297
        elif col_type == "identity":
            need_result1 = input1
            need_result2 = input2
298 299
            np.testing.assert_allclose(tr0_out[0], need_result1, rtol=0, atol=0)
            np.testing.assert_allclose(tr1_out[0], need_result2, rtol=0, atol=0)
300 301 302 303 304 305 306 307
        elif col_type == "reduce_slicegather":
            slicesize = input1.shape[0] // 2
            tmp10 = input1[0:slicesize]
            tmp11 = input2[0:slicesize]
            need_result1 = np.concatenate((tmp10, tmp11), axis=1)
            tmp20 = input1[slicesize:]
            tmp21 = input2[slicesize:]
            need_result2 = np.concatenate((tmp20, tmp21), axis=1)
308 309
            np.testing.assert_allclose(tr0_out, need_result1, rtol=1e-05)
            np.testing.assert_allclose(tr1_out, need_result2, rtol=1e-05)
L
lilong12 已提交
310 311
        elif col_type == "concat":
            need_result = np.concatenate((input1, input2), axis=1)
312 313 314 315 316 317
            np.testing.assert_allclose(
                tr0_out[0], need_result, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0], need_result, rtol=1e-05, atol=1e-05
            )
L
lilong12 已提交
318 319 320
        elif col_type == "split":
            need_result1 = np.split(input1, 2, axis=1)[0]
            need_result2 = np.split(input2, 2, axis=1)[1]
321 322 323 324 325 326
            np.testing.assert_allclose(
                tr0_out[0], need_result1, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0], need_result2, rtol=1e-05, atol=1e-05
            )
327 328 329
        elif col_type == "sendrecv_array":
            need_result1 = np.array([[0, 1, 2]])
            need_result2 = np.array([[3, 4, 5]])
330 331 332 333 334 335
            np.testing.assert_allclose(
                tr1_out[0][0], need_result1, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0][1], need_result2, rtol=1e-05, atol=1e-05
            )
336 337
        else:
            pass