test_dist_base.py 24.3 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25 26
import pickle
import numpy as np
T
typhoonzero 已提交
27

28
import paddle.fluid as fluid
29
from paddle.fluid import compiler
30 31 32
import paddle.fluid.dygraph as dygraph
from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import DataParallel
33

Y
Yan Xu 已提交
34
RUN_STEP = 5
35
DEFAULT_BATCH_SIZE = 2
36

T
typhoonzero 已提交
37 38

class TestDistRunnerBase(object):
W
Wu Yi 已提交
39 40 41
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
42 43
                  single_device=False,
                  use_dgc=False):
T
typhoonzero 已提交
44 45 46
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

47
    @staticmethod
W
Wu Yi 已提交
48 49 50 51 52
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
53 54
                       dc_asgd=False,
                       current_endpoint=None):
T
typhoonzero 已提交
55
        # NOTE: import fluid until runtime, or else forking processes will cause error.
56
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
57
        config.enable_dc_asgd = dc_asgd
58
        config.sync_mode = sync_mode
59
        # config.runtime_split_send_recv = True
60
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
61 62 63 64
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
65
            trainers=trainers,
66
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
67 68
        return t

W
Wu Yi 已提交
69
    def run_pserver(self, args):
W
Wu Yi 已提交
70
        self.lr = args.lr
71
        self.get_model(batch_size=args.batch_size)
72
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
73 74
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
W
Wu Yi 已提交
75
                                args.trainers, args.sync_mode, args.dc_asgd)
W
Wu Yi 已提交
76 77 78
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
79

T
typhoonzero 已提交
80 81 82 83 84
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        exe.run(pserver_prog)

85
    def run_trainer(self, args):
W
Wu Yi 已提交
86
        self.lr = args.lr
W
Wu Yi 已提交
87 88 89
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
90 91 92
        elif args.use_dgc:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
93 94 95
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
96

W
Wu Yi 已提交
97
        if args.mem_opt:
98
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
W
Wu Yi 已提交
99
        if args.update_method == "pserver":
W
Wu Yi 已提交
100 101 102
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
W
Wu Yi 已提交
103
                                    args.sync_mode, args.dc_asgd)
T
typhoonzero 已提交
104
            trainer_prog = t.get_trainer_program()
W
Wu Yi 已提交
105
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
106 107 108 109 110 111 112 113 114 115 116
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
117 118 119
        else:
            trainer_prog = fluid.default_main_program()

120
        if args.use_cuda:
121 122
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
123 124 125
        else:
            place = fluid.CPUPlace()

126 127
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
T
typhoonzero 已提交
128

W
Wu Yi 已提交
129 130 131
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
        exec_strategy.allow_op_delay = False
132

W
Wu Yi 已提交
133
        build_stra = fluid.BuildStrategy()
134 135 136
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
137 138 139 140 141 142

        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
143
        pass_builder = None
X
Xin Pan 已提交
144
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
145
            pass_builder = build_stra._finalize_strategy_and_create_passes()
146
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
147
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
148

W
Wu Yi 已提交
149
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
150 151
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
152
        else:
W
Wu Yi 已提交
153
            # case args.update_method == "nccl2_reduce_layer":
154 155
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
156

X
Xin Pan 已提交
157
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
158
            loss_name=avg_cost.name,
W
Wu Yi 已提交
159
            build_strategy=build_stra,
W
Wu Yi 已提交
160
            exec_strategy=exec_strategy)
T
typhoonzero 已提交
161 162 163 164 165 166 167

        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
168
        reader_generator = train_reader()
T
typhoonzero 已提交
169

170 171
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
172
            if args.update_method != "local" and args.use_reader_alloc:
173 174 175 176 177 178 179
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
180

W
Wu Yi 已提交
181
        out_losses = []
182
        for _ in six.moves.xrange(RUN_STEP):
183 184
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
185
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
186 187 188 189 190
            out_losses.append(loss[0])
        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))
T
typhoonzero 已提交
191 192


193 194 195 196 197 198 199 200 201 202
class TestParallelDyGraphRunnerBase(object):
    def get_model(self):
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
            "train_one_loop should be implemented by the child classes.")

    def run_trainer(self, args):
Y
Yan Xu 已提交
203

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
        seed = 90
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(device_id)

        def _get_data(batch):
            if args.update_method != "local":
                new_batch = []
                for offset, item in enumerate(batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return batch

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
Y
Yan Xu 已提交
221 222 223
            np.random.seed(seed)
            import random
            random.seed = seed
224 225
            model, train_reader, opt = self.get_model()
            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
Y
Yan Xu 已提交
226

227 228 229 230 231 232 233
            if args.update_method == "nccl2":
                strategy = dygraph.parallel.ParallelStrategy()
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
                dygraph.parallel.prepare_context(strategy)
Y
Yan Xu 已提交
234
                model = dygraph.parallel.DataParallel(model, strategy)
235 236 237 238 239 240
            out_losses = []
            for step_id, data in enumerate(train_reader()):
                data = _get_data(data)
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)
Y
Yan Xu 已提交
241
                out_losses.append(loss.numpy())
242

Y
Yan Xu 已提交
243 244 245
                # FIXME(Yancey1989): scale the loss inplace
                if args.update_method == "nccl2":
                    loss = model.scale_loss(loss)
246 247

                loss.backward()
Y
Yan Xu 已提交
248 249
                if args.update_method == "nccl2":
                    model.apply_collective_grads()
250 251 252 253 254 255 256 257 258

                opt.minimize(loss)
                model.clear_gradients()
            if six.PY2:
                print(pickle.dumps(out_losses))
            else:
                sys.stdout.buffer.write(pickle.dumps(out_losses))


T
typhoonzero 已提交
259
def runtime_main(test_class):
W
Wu Yi 已提交
260 261 262 263
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
264 265 266 267
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
268
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
269 270 271 272 273 274
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
275
    parser.add_argument('--use_cuda', action='store_true')
276
    parser.add_argument('--use_dgc', action='store_true')
W
Wu Yi 已提交
277
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
278
    parser.add_argument('--dc_asgd', action='store_true')
279
    parser.add_argument(
W
Wu Yi 已提交
280
        '--use_reader_alloc', action='store_true', required=False)
281
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
282
    parser.add_argument('--lr', required=False, type=float, default=0.001)
283 284
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
285 286 287 288 289
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
W
Wu Yi 已提交
290 291

    args = parser.parse_args()
T
typhoonzero 已提交
292 293

    model = test_class()
W
Wu Yi 已提交
294
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
295
        model.run_pserver(args)
T
typhoonzero 已提交
296
    else:
297
        model.run_trainer(args)
X
Xin Pan 已提交
298

M
minqiyang 已提交
299

M
minqiyang 已提交
300
import paddle.compat as cpt
Y
Yancey1989 已提交
301 302
import socket
from contextlib import closing
M
minqiyang 已提交
303

X
Xin Pan 已提交
304 305

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
306 307 308
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

309 310 311
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
312
            self._use_dgc = False
313 314 315 316 317 318 319
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
320 321 322 323
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
324

X
Xin Pan 已提交
325 326 327
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
328
        self._port_set = set()
Y
Yancey1989 已提交
329 330
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
331
        self._python_interp = sys.executable
W
Wu Yi 已提交
332
        self._sync_mode = True
333
        self._enforce_place = None
W
Wu Yi 已提交
334
        self._mem_opt = False
W
Wu Yi 已提交
335
        self._use_reduce = False
W
Wu Yi 已提交
336
        self._dc_asgd = False  # must use with async mode
337
        self._use_reader_alloc = True
W
Wu Yi 已提交
338
        self._nccl2_mode = False
339
        self._mp_mode = False
W
Wu Yi 已提交
340 341 342 343 344
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
345
        self._lr = 0.001
346
        self._use_dgc = False
347
        self._dygraph = False
W
Wu Yi 已提交
348
        self._setup_config()
349
        self._after_setup_config()
X
Xin Pan 已提交
350

Y
Yancey1989 已提交
351
    def _find_free_port(self):
Y
Yancey1989 已提交
352 353 354 355 356 357 358 359 360 361 362
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
363

364
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
365
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
366
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
W
Wu Yi 已提交
367
        ps0_cmd = ps_cmd % \
368 369
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
370
        ps1_cmd = ps_cmd % \
371 372
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
373 374 375 376 377 378 379

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
380

381 382
        print(ps0_cmd)
        print(ps1_cmd)
M
minqiyang 已提交
383 384
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
385

X
Xin Pan 已提交
386
        ps0_proc = subprocess.Popen(
387 388 389 390
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
X
Xin Pan 已提交
391
        ps1_proc = subprocess.Popen(
392 393 394 395
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
396

397
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
398

399 400 401 402 403 404
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
405

W
Wu Yi 已提交
406 407
        cmd = "%s %s --role trainer --lr %f" % (self._python_interp, model,
                                                self._lr)
408 409 410 411
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
412 413
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
414

415
        if self.__use_cuda:
416
            cmd += " --use_cuda"
W
Wu Yi 已提交
417 418 419 420 421
            env_local = {
                "CUDA_VISIBLE_DEVICES": "0",
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
422 423 424
        else:
            env_local = {'CPU_NUM': '1'}

W
Wu Yi 已提交
425 426
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
427

428
        if check_error_log:
429
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
430
            local_proc = subprocess.Popen(
431
                cmd.split(" "),
G
gongweibao 已提交
432
                stdout=subprocess.PIPE,
433
                stderr=err_log,
W
Wu Yi 已提交
434
                env=env_local)
G
gongweibao 已提交
435 436
        else:
            local_proc = subprocess.Popen(
437
                cmd.split(" "),
G
gongweibao 已提交
438
                stdout=subprocess.PIPE,
439
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
440
                env=env_local)
G
gongweibao 已提交
441

442 443 444 445 446 447
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
448
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
449

W
Wu Yi 已提交
450
        return pickle.loads(local_out)
451 452

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
453
        # Run dist train to compare with local results
454 455
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
W
Wu Yi 已提交
456

X
Xin Pan 已提交
457
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
458

W
Wu Yi 已提交
459
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"
W
Wu Yi 已提交
460
        tr0_cmd = tr_cmd % \
461
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
462
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
463
        tr1_cmd = tr_cmd % \
464
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
465
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
466 467 468 469 470 471 472 473 474 475

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
476 477 478
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
479
        if self.__use_cuda:
480 481 482 483 484 485 486 487 488 489
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
490

W
Wu Yi 已提交
491 492
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
493 494
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
495

X
Xin Pan 已提交
496
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
497
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
498
            stdout=subprocess.PIPE,
G
gongweibao 已提交
499
            stderr=tr0_pipe,
X
Xin Pan 已提交
500 501
            env=env0)
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
502
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
503
            stdout=subprocess.PIPE,
G
gongweibao 已提交
504
            stderr=tr1_pipe,
X
Xin Pan 已提交
505 506
            env=env1)

507 508 509 510 511 512 513 514 515 516 517 518
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

519 520
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
521

G
gongweibao 已提交
522
        # close trainer file
523 524 525 526
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
527

W
Wu Yi 已提交
528 529
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
530

531 532 533 534 535 536
        # print server log
        with open("/tmp/ps0_err.log", "r") as fn:
            sys.stderr.write("ps0 stderr: %s\n" % fn.read())
        with open("/tmp/ps1_err.log", "r") as fn:
            sys.stderr.write("ps1 stderr: %s\n" % fn.read())

537
        # print log
538 539 540 541
        with open("/tmp/tr0_err.log", "r") as fn:
            sys.stderr.write('trainer 0 stderr: %s\n' % fn.read())
        with open("/tmp/tr1_err.log", "r") as fn:
            sys.stderr.write('trainer 1 stderr: %s\n' % fn.read())
542

W
Wu Yi 已提交
543 544
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

W
Wu Yi 已提交
545 546
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
                           check_error_log):
W
Wu Yi 已提交
547 548 549
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
W
Wu Yi 已提交
550 551 552 553
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
554

W
Wu Yi 已提交
555
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
W
Wu Yi 已提交
556 557
        tr0_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
558
                   0, w0_ep, update_method, self._lr)
W
Wu Yi 已提交
559 560
        tr1_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
561
                   1, w1_ep, update_method, self._lr)
W
Wu Yi 已提交
562 563 564 565 566 567 568 569 570 571 572 573 574

        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
        if self.__use_cuda:
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
W
Wu Yi 已提交
575 576 577 578 579 580 581 582 583 584 585
            env0 = {
                "CUDA_VISIBLE_DEVICES": "0",
                # for test nccl2 layer
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "0"
            }
            env1 = {
                "CUDA_VISIBLE_DEVICES": "1",
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "1"
            }
W
Wu Yi 已提交
586 587 588 589
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

590 591 592
        if self._use_dgc:
            tr0_cmd += " --use_dgc"
            tr1_cmd += " --use_dgc"
593 594 595 596
        if self._mp_mode:
            env0 = {"FLAGS_selected_gpus": "0"}
            env1 = {"FLAGS_selected_gpus": "1"}

W
Wu Yi 已提交
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
        env0.update(envs)
        env1.update(envs)

        print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")

        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0)
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1)

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()

        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()

        # print log
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)

W
Wu Yi 已提交
627
        return pickle.loads(tr0_out), pickle.loads(tr1_out)
628 629 630 631 632 633 634 635 636 637 638 639

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
640
            "FLAGS_rpc_deadline": "5000",  # 5sec to fail fast
641
            "FLAGS_cudnn_deterministic": "1",
W
Wu Yi 已提交
642 643
            "http_proxy": "",
            "NCCL_P2P_DISABLE": "1"
644 645 646 647 648
        }

        required_envs.update(need_envs)

        if check_error_log:
W
Wu Yi 已提交
649
            required_envs["GLOG_v"] = "3"
650 651 652 653
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
W
Wu Yi 已提交
654
                                check_error_log)
W
Wu Yi 已提交
655
        if self._nccl2_mode:
W
Wu Yi 已提交
656 657 658 659 660 661
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, True, check_error_log)
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, False, check_error_log)
W
Wu Yi 已提交
662 663 664
        else:
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log)
665 666

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
667 668 669
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
Y
Yan Xu 已提交
670
            dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
W
Wu Yi 已提交
671 672
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)