test_collective_base.py 12.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pickle
17
import socket
18 19
import subprocess
import sys
20
import tempfile
21 22
import time
import unittest
23
from contextlib import closing
24 25 26

import numpy as np

27
import paddle.fluid.unique_name as nameGen
28
from paddle import fluid
29 30 31
from paddle.fluid import core


32
class TestCollectiveRunnerBase:
33 34
    def get_model(self, train_prog, startup_prog):
        raise NotImplementedError(
35 36
            "get model should be implemented by child class."
        )
37 38 39 40 41 42 43

    def wait_server_ready(self, endpoints):
        while True:
            all_ok = True
            not_ready_endpoints = []
            for ep in endpoints:
                ip_port = ep.split(":")
44 45 46
                with closing(
                    socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                ) as sock:
47
                    sock.settimeout(2)
48 49
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                    if hasattr(socket, 'SO_REUSEPORT'):
50 51 52
                        sock.setsockopt(
                            socket.SOL_SOCKET, socket.SO_REUSEPORT, 1
                        )
53

54 55 56 57 58 59
                    result = sock.connect_ex((ip_port[0], int(ip_port[1])))
                    if result != 0:
                        all_ok = False
                        not_ready_endpoints.append(ep)
            if not all_ok:
                sys.stderr.write("server not ready, wait 3 sec to retry...\n")
60 61 62
                sys.stderr.write(
                    "not ready endpoints:" + str(not_ready_endpoints) + "\n"
                )
63 64 65 66 67
                sys.stderr.flush()
                time.sleep(3)
            else:
                break

68
    # endpoints should be ["ip1:port1","ip2:port2"]
69

70 71 72
    def initCommunicator(
        self, program, rank, nranks, wait_port, current_endpoint, endpoints
    ):
73 74 75 76 77
        other_endpoints = endpoints[:]
        other_endpoints.remove(current_endpoint)
        if rank == 0 and wait_port:
            self.wait_server_ready(other_endpoints)
        block = program.global_block()
78 79 80 81 82
        nccl_id_var = block.create_var(
            name=nameGen.generate('nccl_id'),
            persistable=True,
            type=core.VarDesc.VarType.RAW,
        )
83

84 85 86 87 88 89 90 91 92 93
        block.append_op(
            type='c_gen_nccl_id',
            inputs={},
            outputs={'Out': nccl_id_var},
            attrs={
                'rank': rank,
                'endpoint': current_endpoint,
                'other_endpoints': other_endpoints,
            },
        )
94

95 96 97 98 99 100 101 102 103 104
        block.append_op(
            type='c_comm_init',
            inputs={'X': nccl_id_var},
            outputs={},
            attrs={
                'nranks': nranks,
                'rank': rank,
                'ring_id': self.global_ring_id,
            },
        )
105 106 107 108 109 110 111 112

    def run_trainer(self, args):
        train_prog = fluid.Program()
        startup_prog = fluid.Program()
        endpoints = args["endpoints"].split(",")
        rank = args["trainerid"]
        current_endpoint = args["currentendpoint"]
        nranks = 2
113 114 115
        self.initCommunicator(
            startup_prog, rank, nranks, True, current_endpoint, endpoints
        )
L
lilong12 已提交
116
        self.rank = rank
117 118 119
        result = self.get_model(train_prog, startup_prog)
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(
120 121
            device_id
        )  # if args.use_gpu else fluid.CPUPlace()
122 123 124 125
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        np.random.seed(os.getpid())
        indata = np.random.random((10, 1000))
126 127 128
        out = exe.run(
            train_prog, feed={'tindata': indata}, fetch_list=[result.name]
        )
129 130 131
        dump_file = os.environ['DUMP_FILE']
        with open(dump_file, 'wb') as f:
            pickle.dump(out, f)
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149


def runtime_main(test_class, col_type, sub_type):
    args = {}
    model = test_class()
    args["deviceid"] = os.getenv("FLAGS_selected_gpus")
    args["trainerid"] = int(os.getenv("PADDLE_TRAINER_ID"))
    args["trainernum"] = int(os.getenv("PADDLE_TRAINERS_NUM"))
    args["endpoints"] = os.getenv('PADDLE_TRAINER_ENDPOINTS')
    args["currentendpoint"] = os.getenv("PADDLE_CURRENT_ENDPOINT")
    args["col_type"] = col_type
    model.run_trainer(args)


class TestDistBase(unittest.TestCase):
    def setUp(self):
        self._port_set = set()
        self._trainers = 2
150
        self._ps_endpoints = "127.0.0.1:{},127.0.0.1:{}".format(
151 152 153
            self._find_free_port(),
            self._find_free_port(),
        )
154 155
        self._python_interp = sys.executable

156 157 158 159 160
        self.temp_dir = tempfile.TemporaryDirectory()

    def tearDown(self):
        self.temp_dir.cleanup()

161 162
    def _find_free_port(self):
        def __free_port():
163 164 165
            with closing(
                socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            ) as s:
166 167 168 169 170 171 172 173 174 175 176 177
                s.bind(('', 0))
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port

    def _run_cluster(self, model_file, envs):
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
178
        # print("w0_ep:",w0_ep," w1_ep:",w1_ep)
179
        env0 = {
180
            "FLAGS_selected_gpus": "0",
181 182 183
            "PADDLE_TRAINER_ID": "0",
            "PADDLE_TRAINERS_NUM": "2",
            "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
184
            "PADDLE_CURRENT_ENDPOINT": w0_ep,
185 186 187
        }

        env1 = {
188
            "FLAGS_selected_gpus": "1",
189 190 191
            "PADDLE_TRAINER_ID": "1",
            "PADDLE_TRAINERS_NUM": "2",
            "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
192
            "PADDLE_CURRENT_ENDPOINT": w1_ep,
193
        }
194 195 196 197 198

        cur_pid = os.getpid()
        dump_file_0 = f'./out_data_0_{cur_pid}.pickled'
        dump_file_1 = f'./out_data_1_{cur_pid}.pickled'

199
        # update environment
200 201
        env0.update(envs)
        env1.update(envs)
202 203 204
        env0['DUMP_FILE'] = dump_file_0
        env1['DUMP_FILE'] = dump_file_1

205 206 207
        tr_cmd = "%s %s"
        tr0_cmd = tr_cmd % (self._python_interp, model_file)
        tr1_cmd = tr_cmd % (self._python_interp, model_file)
208 209 210 211
        path0 = os.path.join(self.temp_dir.name, "/tmp/tr0_err.log")
        path1 = os.path.join(self.temp_dir.name, "/tmp/tr1_err.log")
        tr0_pipe = open(path0, "wb")
        tr1_pipe = open(path1, "wb")
212 213 214 215 216 217 218
        # print(tr0_cmd)
        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0,
        )
219

220 221 222 223 224 225
        tr1_proc = subprocess.Popen(
            tr0_cmd.strip().split(),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1,
        )
226 227 228 229 230 231 232 233

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)
        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()
234 235 236 237 238 239 240

        def load_and_remove(path):
            with open(path, 'rb') as f:
                out = pickle.load(f)
            os.remove(path)
            return out

241
        return (
242 243
            load_and_remove(dump_file_0),
            load_and_remove(dump_file_1),
244 245 246
            tr0_proc.pid,
            tr1_proc.pid,
        )
247

248 249 250
    def check_with_place(
        self, model_file, col_type, check_error_log=False, need_envs={}
    ):
251 252 253 254 255 256 257
        required_envs = {
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
            "FLAGS_eager_delete_tensor_gb": "0.0",
            "PATH": os.getenv("PATH"),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "LD_PRELOAD": os.getenv("LD_PRELOAD", ""),
258
            "GLOG_v": "3",
259
            "NCCL_P2P_DISABLE": "1",
260 261 262 263 264
        }
        required_envs.update(need_envs)
        if check_error_log:
            required_envs["GLOG_v"] = "3"
            required_envs["GLOG_logtostderr"] = "1"
265
        tr0_out, tr1_out, pid0, pid1 = self._run_cluster(
266 267
            model_file, required_envs
        )
268 269 270 271 272 273
        np.random.seed(pid0)
        input1 = np.random.random((10, 1000))
        np.random.seed(pid1)
        input2 = np.random.random((10, 1000))
        if col_type == "allgather":
            need_result = np.vstack((input1, input2))
274 275
            np.testing.assert_allclose(tr0_out[0], need_result, rtol=1e-05)
            np.testing.assert_allclose(tr1_out[0], need_result, rtol=1e-05)
276 277
        elif col_type == "broadcast":
            need_result = input2
278 279
            np.testing.assert_allclose(tr0_out[0], need_result, rtol=1e-05)
            np.testing.assert_allclose(tr1_out[0], need_result, rtol=1e-05)
L
lilong12 已提交
280 281
        elif col_type == "reduce":
            need_result = input1 + input2
282
            np.testing.assert_allclose(tr1_out[0], need_result, rtol=1e-05)
L
lilong12 已提交
283 284
        elif col_type == "scatter":
            need_result = input2
285 286
            need_result1 = need_result[0 : need_result.shape[0] // 2]
            need_result2 = need_result[need_result.shape[0] // 2 :]
287 288
            np.testing.assert_allclose(tr0_out[0], need_result1, rtol=1e-05)
            np.testing.assert_allclose(tr1_out[0], need_result2, rtol=1e-05)
289 290
        elif col_type == "allreduce":
            need_result = input1 + input2
291 292 293 294 295 296
            np.testing.assert_allclose(
                tr0_out[0], need_result, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0], need_result, rtol=1e-05, atol=1e-05
            )
297 298
        elif col_type == "reduce_scatter":
            tmp = input1 + input2
299 300 301 302 303 304 305 306
            need_result1 = tmp[0 : tmp.shape[0] // 2]
            need_result2 = tmp[tmp.shape[0] // 2 :]
            np.testing.assert_allclose(
                tr0_out[0], need_result1, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0], need_result2, rtol=1e-05, atol=1e-05
            )
L
lilong12 已提交
307 308
        elif col_type == "sendrecv":
            need_result = input1
309 310 311
            np.testing.assert_allclose(
                tr1_out[0], need_result, rtol=1e-05, atol=1e-05
            )
L
lilong12 已提交
312 313 314
        elif col_type == "identity":
            need_result1 = input1
            need_result2 = input2
315 316
            np.testing.assert_allclose(tr0_out[0], need_result1, rtol=0, atol=0)
            np.testing.assert_allclose(tr1_out[0], need_result2, rtol=0, atol=0)
317 318 319 320 321 322 323 324
        elif col_type == "reduce_slicegather":
            slicesize = input1.shape[0] // 2
            tmp10 = input1[0:slicesize]
            tmp11 = input2[0:slicesize]
            need_result1 = np.concatenate((tmp10, tmp11), axis=1)
            tmp20 = input1[slicesize:]
            tmp21 = input2[slicesize:]
            need_result2 = np.concatenate((tmp20, tmp21), axis=1)
325 326
            np.testing.assert_allclose(tr0_out, need_result1, rtol=1e-05)
            np.testing.assert_allclose(tr1_out, need_result2, rtol=1e-05)
L
lilong12 已提交
327 328
        elif col_type == "concat":
            need_result = np.concatenate((input1, input2), axis=1)
329 330 331 332 333 334
            np.testing.assert_allclose(
                tr0_out[0], need_result, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0], need_result, rtol=1e-05, atol=1e-05
            )
L
lilong12 已提交
335 336 337
        elif col_type == "split":
            need_result1 = np.split(input1, 2, axis=1)[0]
            need_result2 = np.split(input2, 2, axis=1)[1]
338 339 340 341 342 343
            np.testing.assert_allclose(
                tr0_out[0], need_result1, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0], need_result2, rtol=1e-05, atol=1e-05
            )
344 345 346
        elif col_type == "sendrecv_array":
            need_result1 = np.array([[0, 1, 2]])
            need_result2 = np.array([[3, 4, 5]])
347 348 349 350 351 352
            np.testing.assert_allclose(
                tr1_out[0][0], need_result1, rtol=1e-05, atol=1e-05
            )
            np.testing.assert_allclose(
                tr1_out[0][1], need_result2, rtol=1e-05, atol=1e-05
            )
353 354
        else:
            pass