test_dist_base.py 45.2 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25
import pickle
26
import random
W
Wu Yi 已提交
27
import numpy as np
28
import time
29 30

import paddle
31
import paddle.fluid as fluid
32
from paddle.fluid import compiler
33 34 35
import paddle.fluid.dygraph as dygraph
from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import DataParallel
36

37 38 39
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
import paddle.fluid.incubate.fleet.base.role_maker as role_maker

Y
Yan Xu 已提交
40
RUN_STEP = 5
41
DEFAULT_BATCH_SIZE = 2
42
DIST_UT_PORT = 0
43

T
typhoonzero 已提交
44

45 46 47 48 49 50 51 52
def print_to_out(out_losses):
    if six.PY2:
        print(pickle.dumps(out_losses))
    else:
        sys.stdout.buffer.write(pickle.dumps(out_losses))


def print_to_err(class_name, log_str):
53 54
    localtime = time.asctime(time.localtime(time.time()))
    print_str = localtime + "\t" + class_name + "\t" + log_str
G
guru4elephant 已提交
55
    if six.PY2:
56
        sys.stderr.write(pickle.dumps(print_str))
G
guru4elephant 已提交
57
    else:
58
        sys.stderr.buffer.write(pickle.dumps(print_str))
G
guru4elephant 已提交
59 60


61 62 63 64
def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)


T
typhoonzero 已提交
65
class TestDistRunnerBase(object):
W
Wu Yi 已提交
66 67 68
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
69 70
                  single_device=False,
                  use_dgc=False):
T
typhoonzero 已提交
71 72 73
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

74
    @staticmethod
W
Wu Yi 已提交
75 76 77 78 79
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
80
                       dc_asgd=False,
81
                       current_endpoint=None,
T
tangwei12 已提交
82 83
                       nccl_comm_num=1,
                       hogwild_mode=False):
T
typhoonzero 已提交
84
        # NOTE: import fluid until runtime, or else forking processes will cause error.
85
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
86
        config.enable_dc_asgd = dc_asgd
87
        config.sync_mode = sync_mode
T
tangwei12 已提交
88 89
        config.runtime_split_send_recv = hogwild_mode

90 91
        if nccl_comm_num > 1:
            config.nccl_comm_num = nccl_comm_num
92
        # config.runtime_split_send_recv = True
93
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
94 95 96 97
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
98
            trainers=trainers,
T
tangwei12 已提交
99
            sync_mode=sync_mode,
100
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
101 102
        return t

W
Wu Yi 已提交
103
    def run_pserver(self, args):
W
Wu Yi 已提交
104
        self.lr = args.lr
105
        self.get_model(batch_size=args.batch_size)
106
        # NOTE: pserver should not call memory optimize
T
tangwei12 已提交
107 108 109 110 111 112 113 114 115

        t = self.get_transpiler(
            trainer_id=args.trainer_id,
            main_program=fluid.default_main_program(),
            pserver_endpoints=args.endpoints,
            trainers=args.trainers,
            sync_mode=args.sync_mode,
            dc_asgd=args.dc_asgd,
            hogwild_mode=args.hogwild)
W
Wu Yi 已提交
116 117 118
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
119

T
typhoonzero 已提交
120 121 122
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
123
        print_to_err(type(self).__name__, "run pserver startup program done.")
T
typhoonzero 已提交
124
        exe.run(pserver_prog)
125
        print_to_err(type(self).__name__, "run pserver main program done.")
T
typhoonzero 已提交
126

127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
    def run_pipeline_trainer(self, args):
        self.lr = args.lr

        dist_strategy = DistributedStrategy()
        test_program, avg_cost, train_reader, test_reader, batch_acc, predict, data_loader = \
            self.get_model(batch_size=args.batch_size, dist_strategy=dist_strategy)

        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        eprint(type(self).__name__, "device_id: %d." % device_id)
        place = fluid.CUDAPlace(device_id)

        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
        eprint(type(self).__name__, "run worker startup program done.")

        data_loader.set_sample_list_generator(train_reader, place)
        data_loader.start()
        print_to_err(type(self).__name__, "begin to train on trainer")
        out_losses = []
        for i in six.moves.xrange(RUN_STEP):
            loss = exe.run(fluid.default_main_program(), fetch_list=[avg_cost])
            loss = loss[0] if loss else None
            out_losses.append(loss)
            print_to_err(type(self).__name__, "run step %d finished" % i)
        print_to_err(type(self).__name__, "trainer run finished")

        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))

        if args.save_model:
            model_save_dir = "/tmp"
            if fleet.worker_index() == 0:
                model_save_dir_fluid = os.path.join(model_save_dir,
                                                    "fluid_persistables")
                model_save_dir_fleet = os.path.join(model_save_dir,
                                                    "fleet_persistables")
                infer_save_dir_fluid = os.path.join(model_save_dir,
                                                    "fluid_infer")
                infer_save_dir_fleet = os.path.join(model_save_dir,
                                                    "fleet_infer")
            else:
                model_save_dir_fluid = os.path.join(model_save_dir,
                                                    "fluid_persistables_2")
                model_save_dir_fleet = os.path.join(model_save_dir,
                                                    "fleet_persistables_2")
                infer_save_dir_fluid = os.path.join(model_save_dir,
                                                    "fluid_infer_2")
                infer_save_dir_fleet = os.path.join(model_save_dir,
                                                    "fleet_infer_2")
            fluid.io.save_persistables(exe, model_save_dir_fluid,
                                       fleet._origin_program)
            fleet.save_persistables(executor=exe, dirname=model_save_dir_fleet)
            feeded_var_names = [var.name for var in feed_var_list]
            fluid.io.save_inference_model(infer_save_dir_fluid,
                                          feeded_var_names, [avg_cost], exe,
                                          fleet._origin_program)
            fleet.save_inference_model(exe, infer_save_dir_fleet,
                                       feeded_var_names, [avg_cost])

188 189 190 191 192 193 194 195 196 197
    def run_gpu_fleet_api_trainer(self, args):
        assert args.update_method == "nccl2"

        self.lr = args.lr

        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1

        dist_strategy = DistributedStrategy()
        dist_strategy.exec_strategy = exec_strategy
T
tangwei12 已提交
198
        dist_strategy.fuse_memory_size = 1  # MB
199
        dist_strategy.fuse_laryer_size = 1
200 201 202 203
        if args.use_local_sgd:
            dist_strategy.use_local_sgd = True
        if args.ut4grad_allreduce:
            dist_strategy._ut4grad_allreduce = True
204 205
        if args.sync_batch_norm:
            dist_strategy.sync_batch_norm = True
206 207 208

        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
209
        print_to_err("gpu_fleet", "fleet.node_num:")
T
tangwei12 已提交
210 211
        # "fleet.node_id:", fleet.node_id(),
        # "fleet.trainer_num:", fleet.worker_num())
212 213

        test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
T
tangwei12 已提交
214
            self.get_model(batch_size=args.batch_size, dist_strategy=dist_strategy)
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230

        trainer_prog = fleet._origin_program
        dist_prog = fleet.main_program

        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(device_id)

        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
        eprint(type(self).__name__, "run worker startup program done.")

        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

231 232 233 234 235 236 237
        eprint("feed_var_list:", feed_var_list)

        # tmp add this code to pass python35 gcc8 CI
        # Fixme(gongweibao, wangxi), need fix fleet api program order
        if feed_var_list[0].name == 'label':
            feed_var_list = feed_var_list[::-1]

238 239 240 241 242 243 244 245 246 247 248 249 250 251
        feeder = fluid.DataFeeder(feed_var_list, place)
        reader_generator = train_reader()

        def get_data():
            origin_batch = next(reader_generator)
            if args.update_method != "local" and args.use_reader_alloc:
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch

252
        print_to_err(type(self).__name__, "begin to train on trainer")
253 254 255 256 257 258
        out_losses = []
        for i in six.moves.xrange(RUN_STEP):
            loss, = exe.run(dist_prog,
                            fetch_list=[avg_cost.name],
                            feed=feeder.feed(get_data()))
            out_losses.append(loss[0])
259 260
            print_to_err(type(self).__name__, "run step %d finished" % i)
        print_to_err(type(self).__name__, "trainer run finished")
261 262 263 264 265 266

        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))

267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
        if args.save_model:
            model_save_dir = "/tmp"
            if fleet.worker_index() == 0:
                model_save_dir_fluid = os.path.join(model_save_dir,
                                                    "fluid_persistables")
                model_save_dir_fleet = os.path.join(model_save_dir,
                                                    "fleet_persistables")
                infer_save_dir_fluid = os.path.join(model_save_dir,
                                                    "fluid_infer")
                infer_save_dir_fleet = os.path.join(model_save_dir,
                                                    "fleet_infer")
            else:
                model_save_dir_fluid = os.path.join(model_save_dir,
                                                    "fluid_persistables_2")
                model_save_dir_fleet = os.path.join(model_save_dir,
                                                    "fleet_persistables_2")
                infer_save_dir_fluid = os.path.join(model_save_dir,
                                                    "fluid_infer_2")
                infer_save_dir_fleet = os.path.join(model_save_dir,
                                                    "fleet_infer_2")
            fluid.io.save_persistables(exe, model_save_dir_fluid,
                                       fleet._origin_program)
            fleet.save_persistables(executor=exe, dirname=model_save_dir_fleet)
            feeded_var_names = [var.name for var in feed_var_list]
            fluid.io.save_inference_model(infer_save_dir_fluid,
                                          feeded_var_names, [avg_cost], exe,
                                          fleet._origin_program)
            fleet.save_inference_model(exe, infer_save_dir_fleet,
                                       feeded_var_names, [avg_cost])

297
    def run_trainer(self, args):
W
Wu Yi 已提交
298
        self.lr = args.lr
W
Wu Yi 已提交
299 300 301
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
302 303 304
        elif args.use_dgc:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
305 306 307
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
308

W
Wu Yi 已提交
309
        if args.update_method == "pserver":
310
            print_to_err(
311 312
                type(self).__name__,
                "begin to run transpile on trainer with pserver mode")
T
tangwei12 已提交
313 314 315 316 317 318 319 320 321
            t = self.get_transpiler(
                trainer_id=args.trainer_id,
                main_program=fluid.default_main_program(),
                pserver_endpoints=args.endpoints,
                trainers=args.trainers,
                sync_mode=args.sync_mode,
                dc_asgd=args.dc_asgd,
                hogwild_mode=args.hogwild)

T
typhoonzero 已提交
322
            trainer_prog = t.get_trainer_program()
323
            print_to_err(
324 325
                type(self).__name__,
                "get trainer program done with pserver mode.")
W
Wu Yi 已提交
326
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
327 328 329
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
330
            config.nccl_comm_num = args.nccl_comm_num
331 332 333
            if args.use_hallreduce:
                config.use_hierarchical_allreduce = True
                config.hierarchical_allreduce_inter_nranks = args.hallreduce_inter_nranks
334
            print_to_err(
335 336
                type(self).__name__,
                "begin to run transpile on trainer with nccl2 mode")
W
Wu Yi 已提交
337 338 339 340 341 342 343
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
344
            print_to_err(
345 346
                type(self).__name__,
                "get trainer program done. with nccl2 mode")
W
Wu Yi 已提交
347
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
348
        else:
349
            print_to_err(
350 351
                type(self).__name__,
                "do nothing about main program, just use it")
T
typhoonzero 已提交
352
            trainer_prog = fluid.default_main_program()
353
            print_to_err(type(self).__name__, "use main program done.")
T
typhoonzero 已提交
354

355 356 357
        # FIXME(gongwb):wait pserver initialization.
        time.sleep(1)

358
        if args.use_cuda:
359 360
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
361 362 363
        else:
            place = fluid.CPUPlace()

364 365
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
366
        print_to_err(type(self).__name__, "run worker startup program done.")
T
typhoonzero 已提交
367

W
Wu Yi 已提交
368 369
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
370

W
Wu Yi 已提交
371
        build_stra = fluid.BuildStrategy()
372 373 374
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
375

T
tangwei12 已提交
376 377 378
        if args.hogwild:
            build_stra.async_mode = True

379 380 381
        if args.enable_backward_deps:
            build_stra.enable_backward_optimizer_op_deps = True

W
Wu Yi 已提交
382 383 384 385 386
        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
387
        pass_builder = None
X
Xin Pan 已提交
388
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
389
            pass_builder = build_stra._finalize_strategy_and_create_passes()
390
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
391
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
392

W
Wu Yi 已提交
393
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
394 395
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
396
        else:
W
Wu Yi 已提交
397
            # case args.update_method == "nccl2_reduce_layer":
398 399
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
400

401
        print_to_err(type(self).__name__, "begin to compile with data parallel")
X
Xin Pan 已提交
402
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
403
            loss_name=avg_cost.name,
W
Wu Yi 已提交
404
            build_strategy=build_stra,
W
Wu Yi 已提交
405
            exec_strategy=exec_strategy)
406
        print_to_err(type(self).__name__, "program compiled with data parallel")
T
typhoonzero 已提交
407 408 409 410 411 412 413

        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
414
        reader_generator = train_reader()
T
typhoonzero 已提交
415

416 417
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
418
            if args.update_method != "local" and args.use_reader_alloc:
419 420 421 422 423 424 425
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
426

427
        print_to_err(type(self).__name__, "begin to train on trainer")
W
Wu Yi 已提交
428
        out_losses = []
429
        for i in six.moves.xrange(RUN_STEP):
430 431
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
432
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
433
            out_losses.append(loss[0])
434 435
            print_to_err(type(self).__name__, "run step %d finished" % i)
        print_to_err(type(self).__name__, "trainer run finished")
436

437
        print_to_out(out_losses)
T
typhoonzero 已提交
438 439


440 441 442 443 444 445 446 447 448
class TestParallelDyGraphRunnerBase(object):
    def get_model(self):
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
            "train_one_loop should be implemented by the child classes.")

449 450 451 452 453 454 455 456 457 458
    def _get_data(self, batch, args):
        if args.update_method != "local":
            new_batch = []
            for offset, item in enumerate(batch):
                if offset % 2 == args.trainer_id:
                    new_batch.append(item)
            return new_batch
        else:
            return batch

459
    def run_trainer(self, args):
Y
Yan Xu 已提交
460

461 462 463 464 465 466 467
        seed = 90
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(device_id)

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
Y
Yan Xu 已提交
468 469
            np.random.seed(seed)
            import random
470
            random.seed(seed)
471 472
            model, train_reader, opt = self.get_model()
            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
Y
Yan Xu 已提交
473

474 475 476 477 478 479
            if args.update_method == "nccl2":
                strategy = dygraph.parallel.ParallelStrategy()
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
480
                print_to_err(
481 482
                    type(self).__name__,
                    "begin to prepare context in dygraph with nccl2")
483
                dygraph.parallel.prepare_context(strategy)
Y
Yan Xu 已提交
484
                model = dygraph.parallel.DataParallel(model, strategy)
485
                print_to_err(type(self).__name__, "model built in dygraph")
486
            out_losses = []
487
            print_to_err(type(self).__name__, "begin to run dygraph training")
488
            for step_id, data in enumerate(train_reader()):
489
                data = self._get_data(data, args)
490 491 492
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)
G
guru4elephant 已提交
493
                if step_id % 10 == 0:
494
                    print_to_err(
495
                        type(self).__name__,
496
                        "loss at step %d: %f" % (step_id, loss.numpy()))
Y
Yan Xu 已提交
497
                out_losses.append(loss.numpy())
498 499 500 501 502

                loss.backward()

                opt.minimize(loss)
                model.clear_gradients()
503
        print_to_out(out_losses)
504

505 506 507 508 509 510 511 512 513
    def run_trainer_with_spawn(self, args):
        # 1. enable dygraph
        paddle.disable_static()

        # 2. init seed
        seed = 90
        paddle.static.default_startup_program().random_seed = seed
        paddle.static.default_main_program().random_seed = seed
        np.random.seed(seed)
514
        random.seed(seed)
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
        # get trainer id
        args.trainer_id = paddle.distributed.get_rank()

        # 3. init parallel env
        if args.update_method == "nccl2":
            paddle.distributed.init_parallel_env()

        # 4. train model
        model, train_reader, opt = self.get_model()
        if args.update_method == "nccl2":
            model = paddle.DataParallel(model)

        out_losses = []
        for step_id, data in enumerate(train_reader()):
            data = self._get_data(data, args)
            if step_id == RUN_STEP:
                break
            loss = self.run_one_loop(model, opt, data)
            out_losses.append(loss.numpy())

            loss.backward()

            opt.minimize(loss)
            model.clear_gradients()
        return out_losses

541 542 543 544 545 546 547 548 549 550 551
    def run_gpu_fleet_api_trainer(self, args):
        import paddle.distributed.fleet as fleet
        import paddle.distributed.fleet.base.role_maker as role_maker
        # 1. enable dygraph
        paddle.disable_static()

        # 2. init seed
        seed = 90
        paddle.static.default_startup_program().random_seed = seed
        paddle.static.default_main_program().random_seed = seed
        np.random.seed(seed)
552
        random.seed(seed)
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
        # get trainer id
        args.trainer_id = paddle.distributed.get_rank()

        # 3. init parallel env
        if args.update_method == "nccl2":
            fleet.init(is_collective=True)

        # 4. train model
        model, train_reader, opt = self.get_model()
        if args.update_method == "nccl2":
            opt = fleet.distributed_optimizer(opt)
            model = fleet.distributed_model(model)

        out_losses = []
        for step_id, data in enumerate(train_reader()):
            data = self._get_data(data, args)
            if step_id == RUN_STEP:
                break
            loss = self.run_one_loop(model, opt, data)
            out_losses.append(loss.numpy())

            loss.backward()

            opt.step()
            opt.clear_grad()
        print_to_out(out_losses)

580

T
typhoonzero 已提交
581
def runtime_main(test_class):
W
Wu Yi 已提交
582 583 584 585
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
586 587 588 589
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
590
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
591 592
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
593
    parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
594 595
    parser.add_argument('--enable_backward_deps', action='store_true')
    parser.add_argument('--use_hallreduce', action='store_true')
596
    parser.add_argument('--use_pipeline', action='store_true')
597
    parser.add_argument('--gpu_fleet_api', action='store_true')
598 599
    parser.add_argument('--use_local_sgd', action='store_true')
    parser.add_argument('--ut4grad_allreduce', action='store_true')
600
    parser.add_argument(
601
        '--hallreduce_inter_nranks', type=int, required=False, default=2)
W
Wu Yi 已提交
602 603 604
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
605
    parser.add_argument('--use_cuda', action='store_true')
606
    parser.add_argument('--use_dgc', action='store_true')
W
Wu Yi 已提交
607
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
608
    parser.add_argument('--dc_asgd', action='store_true')
T
tangwei12 已提交
609
    parser.add_argument('--hogwild', action='store_true')
610
    parser.add_argument('--save_model', action='store_true')
611
    parser.add_argument(
W
Wu Yi 已提交
612
        '--use_reader_alloc', action='store_true', required=False)
613
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
614
    parser.add_argument('--lr', required=False, type=float, default=0.001)
615 616
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
617 618 619 620 621
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
622
    parser.add_argument('--sync_batch_norm', action='store_true')
W
Wu Yi 已提交
623 624

    args = parser.parse_args()
T
typhoonzero 已提交
625 626

    model = test_class()
W
Wu Yi 已提交
627
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
628
        model.run_pserver(args)
629 630
    elif args.gpu_fleet_api:
        model.run_gpu_fleet_api_trainer(args)
631 632
    elif args.use_pipeline:
        model.run_pipeline_trainer(args)
T
typhoonzero 已提交
633
    else:
634
        model.run_trainer(args)
X
Xin Pan 已提交
635

M
minqiyang 已提交
636

M
minqiyang 已提交
637
import paddle.compat as cpt
Y
Yancey1989 已提交
638 639
import socket
from contextlib import closing
M
minqiyang 已提交
640

X
Xin Pan 已提交
641 642

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
643 644 645
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

646 647 648
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
649
            self._use_dgc = False
650 651 652 653 654 655 656
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
657 658 659 660
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
661

X
Xin Pan 已提交
662 663 664
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
665
        self._port_set = set()
M
minqiyang 已提交
666
        self._python_interp = sys.executable
W
Wu Yi 已提交
667
        self._sync_mode = True
T
tangwei12 已提交
668
        self._hogwild_mode = False
669
        self._enforce_place = None
W
Wu Yi 已提交
670
        self._use_reduce = False
W
Wu Yi 已提交
671
        self._dc_asgd = False  # must use with async mode
672
        self._use_reader_alloc = True
W
Wu Yi 已提交
673
        self._nccl2_mode = False
674
        self._pipeline_mode = False
675
        self._mp_mode = False
W
Wu Yi 已提交
676 677 678 679 680
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
681
        self._lr = 0.001
682
        self._use_dgc = False
683
        self._dygraph = False
684
        self._nccl_comm_num = 1
685
        self._enable_backward_deps = False
686
        self._gpu_fleet_api = False
687 688
        self._use_local_sgd = False
        self._ut4grad_allreduce = False
689
        self._use_hallreduce = False
690
        self._save_model = False
W
Wu Yi 已提交
691
        self._setup_config()
692 693 694 695 696 697 698 699 700 701 702 703 704 705

        global DIST_UT_PORT
        if DIST_UT_PORT == 0 and os.getenv("PADDLE_DIST_UT_PORT"):
            DIST_UT_PORT = int(os.getenv("PADDLE_DIST_UT_PORT"))

        if DIST_UT_PORT == 0:
            self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                self._find_free_port(), self._find_free_port())
        else:
            print("set begin_port:", DIST_UT_PORT)
            self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                DIST_UT_PORT, DIST_UT_PORT + 1)
            DIST_UT_PORT += 2

706
        self._after_setup_config()
X
Xin Pan 已提交
707

Y
Yancey1989 已提交
708
    def _find_free_port(self):
Y
Yancey1989 已提交
709 710 711 712
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
713
                print_to_err(
714
                    type(self).__name__, "socket name: %s" % s.getsockname()[1])
Y
Yancey1989 已提交
715 716 717 718 719 720 721
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
722

723 724 725 726 727
    def start_pserver(self,
                      model_file,
                      check_error_log,
                      required_envs,
                      log_name=""):
X
Xin Pan 已提交
728
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
729 730 731 732 733 734 735 736
        ps_cmd = "%s"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            required_envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            ps_cmd += " -m coverage run --branch -p"

        ps_cmd += " %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"

W
Wu Yi 已提交
737
        ps0_cmd = ps_cmd % \
738 739
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
740
        ps1_cmd = ps_cmd % \
741 742
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
743 744 745 746

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
X
Xin Pan 已提交
747

748 749
        print(ps0_cmd)
        print(ps1_cmd)
750 751
        ps0_pipe = open(log_name + "_ps0_err.log", "wb")
        ps1_pipe = open(log_name + "_ps1_err.log", "wb")
G
gongweibao 已提交
752

753
        print_to_err(type(self).__name__, "going to start pserver process 0")
X
Xin Pan 已提交
754
        ps0_proc = subprocess.Popen(
755 756 757 758
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
759
        print_to_err(type(self).__name__, "going to start pserver process 1")
X
Xin Pan 已提交
760
        ps1_proc = subprocess.Popen(
761 762 763 764
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
765

766
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
767

768 769 770 771 772
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
773
                   batch_merge_repeat=1,
774 775
                   log_name="",
                   gpus="0"):
G
gongweibao 已提交
776

777 778 779 780 781 782
        cmd = self._python_interp

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            cmd += " -m coverage run --branch -p"

783 784
        cmd += " %s --role trainer --update_method local --lr %f" % (model,
                                                                     self._lr)
785

786 787 788 789
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
790 791
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
792

793
        if self.__use_cuda:
794
            cmd += " --use_cuda"
W
Wu Yi 已提交
795
            env_local = {
796
                "CUDA_VISIBLE_DEVICES": gpus,
W
Wu Yi 已提交
797 798 799
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
800 801 802
        else:
            env_local = {'CPU_NUM': '1'}

803 804 805 806
        # not use dgc in single card
        if len(gpus) > 1 and self._use_dgc:
            cmd += " --use_dgc"

W
Wu Yi 已提交
807 808
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
809

810
        if check_error_log:
811
            err_log = open(log_name + "_local.log", "wb")
G
gongweibao 已提交
812
            local_proc = subprocess.Popen(
813
                cmd.split(" "),
G
gongweibao 已提交
814
                stdout=subprocess.PIPE,
815
                stderr=err_log,
W
Wu Yi 已提交
816
                env=env_local)
G
gongweibao 已提交
817 818
        else:
            local_proc = subprocess.Popen(
819
                cmd.split(" "),
G
gongweibao 已提交
820
                stdout=subprocess.PIPE,
821
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
822
                env=env_local)
G
gongweibao 已提交
823

824 825 826 827 828 829
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
830
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
831

W
Wu Yi 已提交
832
        return pickle.loads(local_out)
833

834
    def _run_cluster(self, model, envs, check_error_log, log_name):
X
Xin Pan 已提交
835
        # Run dist train to compare with local results
836 837
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(
            model, check_error_log, envs, log_name=log_name)
W
Wu Yi 已提交
838

X
Xin Pan 已提交
839
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
840

841 842 843 844 845 846 847 848
        tr_cmd = "%s"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            tr_cmd += " -m coverage run --branch -p"

        tr_cmd += " %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"

W
Wu Yi 已提交
849
        tr0_cmd = tr_cmd % \
850
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
851
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
852
        tr1_cmd = tr_cmd % \
853
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
854
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
855 856 857 858

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
T
tangwei12 已提交
859 860 861
        if self._hogwild_mode:
            tr0_cmd += " --hogwild"
            tr1_cmd += " --hogwild"
W
Wu Yi 已提交
862 863 864
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
865 866 867
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
868
        if self.__use_cuda:
869 870 871 872 873 874 875 876 877 878
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
879

W
Wu Yi 已提交
880 881
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
882 883
        tr0_pipe = open(log_name + "_tr0_err.log", "wb")
        tr1_pipe = open(log_name + "_tr1_err.log", "wb")
G
gongweibao 已提交
884

885
        print_to_err(type(self).__name__, "going to start trainer process 0")
X
Xin Pan 已提交
886
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
887
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
888
            stdout=subprocess.PIPE,
G
gongweibao 已提交
889
            stderr=tr0_pipe,
X
Xin Pan 已提交
890
            env=env0)
891
        print_to_err(type(self).__name__, "going to start trainer process 1")
X
Xin Pan 已提交
892
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
893
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
894
            stdout=subprocess.PIPE,
G
gongweibao 已提交
895
            stderr=tr1_pipe,
X
Xin Pan 已提交
896 897
            env=env1)

898 899 900 901 902 903 904 905 906 907 908 909
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

910 911
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
912

G
gongweibao 已提交
913
        # close trainer file
914 915 916 917
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
918

W
Wu Yi 已提交
919 920
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
921

W
Wu Yi 已提交
922 923
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

924 925 926
    def _get_nccl2_trainer_cmd(self, model, ep, update_method, trainer_id,
                               trainer_num):
        env = {}
927 928 929 930 931 932 933
        tr_cmd = "%s -u"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            tr_cmd += " -m coverage run --branch -p"

        tr_cmd += " %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"

934
        tr_cmd = tr_cmd % \
T
tangwei12 已提交
935 936
                 (self._python_interp, model, self._ps_endpoints,
                  trainer_id, ep, update_method, self._lr)
W
Wu Yi 已提交
937 938

        if self._use_reduce:
939
            tr_cmd += " --use_reduce"
W
Wu Yi 已提交
940
        if self._use_reader_alloc:
941
            tr_cmd += " --use_reader_alloc"
942 943
        if self._save_model:
            tr_cmd += " --save_model"
W
Wu Yi 已提交
944
        if self.__use_cuda:
945 946
            tr_cmd += " --use_cuda"
            env.update({
947
                "FLAGS_selected_gpus": "{}".format(0),
948
                "CUDA_VISIBLE_DEVICES": "{}".format(trainer_id),
949
                "PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
950 951 952
                "PADDLE_TRAINER_ID": "{}".format(trainer_id),
                "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                "PADDLE_CURRENT_ENDPOINT": ep,
953
            })
W
Wu Yi 已提交
954
        else:
955
            env.update({'CPU_NUM': '1'})
W
Wu Yi 已提交
956

957
        if self._use_dgc:
958 959
            tr_cmd += " --use_dgc"

960 961
        if self._pipeline_mode:
            tr_cmd += " --use_pipeline"
962
        if self._mp_mode:
963
            env = {"FLAGS_selected_gpus": "{}".format(trainer_id)}
964 965

        if self._nccl_comm_num > 1:
966
            tr_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
967

968 969
        if self._use_hallreduce:
            tr_cmd += " --use_hallreduce --hallreduce_inter_nranks 2"
970

971
        if self._enable_backward_deps:
972
            tr_cmd += " --enable_backward_deps"
973

974 975
        if self._gpu_fleet_api:
            tr_cmd += " --gpu_fleet_api"
976 977 978 979
            if self._use_local_sgd:
                tr_cmd += " --use_local_sgd"
            if self._ut4grad_allreduce:
                tr_cmd += " --ut4grad_allreduce"
980 981
            if hasattr(self, '_sync_batch_norm') and self._sync_batch_norm:
                tr_cmd += " --sync_batch_norm"
982

983 984 985
        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            env['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')

986
        return tr_cmd, env
W
Wu Yi 已提交
987

988
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
989
                           check_error_log, log_name):
990 991
        if self._use_hallreduce:
            self._ps_endpoints = ""
992 993 994

            global DIST_UT_PORT
            if DIST_UT_PORT == 0:
995
                # NOTE(wangxi). hallreduce test must use 4cards after nccl>=2.7
996 997 998 999 1000 1001 1002
                for i in range(0, 4):
                    self._ps_endpoints += "127.0.0.1:%s," % (
                        self._find_free_port())
            else:
                for i in range(0, 4):
                    self._ps_endpoints += "127.0.0.1:%s," % (DIST_UT_PORT + i)
                DIST_UT_PORT += 4
1003
            self._ps_endpoints = self._ps_endpoints[:-1]
W
Wu Yi 已提交
1004

1005 1006 1007 1008 1009 1010
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
1011

1012
        trainer_num = len(worker_endpoints)
W
Wu Yi 已提交
1013

1014 1015 1016 1017 1018 1019 1020 1021
        procs = []
        pipes = []
        for i in range(0, trainer_num):
            tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
                model, worker_endpoints[i], update_method, i, trainer_num)
            tr_env.update(envs)
            print("use_hallreduce:{} tr_cmd:{}, env: {}".format(
                self._use_hallreduce, tr_cmd, tr_env))
W
Wu Yi 已提交
1022

1023
            tr_pipe = open(log_name + "_tr{}_err.log".format(i), "wb")
W
Wu Yi 已提交
1024

1025
            print_to_err(
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
                type(self).__name__,
                "going to start process {} with nccl2".format(i))
            tr_proc = subprocess.Popen(
                tr_cmd.strip().split(" "),
                stdout=subprocess.PIPE,
                stderr=tr_pipe,
                env=tr_env)

            procs.append(tr_proc)
            pipes.append(tr_pipe)

        outs = []
        for i in range(0, trainer_num):
            tr_out, tr_err = procs[i].communicate()
            outs.append(tr_out)
            pipes[i].close()
            sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err))

1044 1045 1046
        if check_error_log:
            print("outs[0]:", outs[0])
            print("outs[1]:", outs[1])
1047
        return pickle.loads(outs[0]), pickle.loads(outs[1])
1048

1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
    def _run_pipeline(self, model, envs, check_error_log, log_name):
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        update_method = "nccl2"

        trainer_num = len(worker_endpoints)

        procs = []
        pipes = []
        for i in range(0, trainer_num):
            tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
                model, worker_endpoints[i], update_method, i, trainer_num)
            tr_env.update(envs)
            tr_env['CUDA_VISIBLE_DEVICES'] = "0,1"
            tr_env['NCCL_SHM_DISABLE'] = '1'
            tr_env['FLAGS_selected_gpus'] = str(i)
            tr_env['FLAGS_cudnn_deterministic'] = '0'
            print("tr_cmd:{}, env: {}".format(tr_cmd, tr_env))

            tr_pipe = open("/tmp/" + "tr{}_err.log".format(i), "wb")

            print_to_err(
                type(self).__name__,
                "going to start process {} with nccl2".format(i))
            tr_proc = subprocess.Popen(
                tr_cmd.strip().split(" "),
                stdout=subprocess.PIPE,
                stderr=tr_pipe,
                env=tr_env)

            procs.append(tr_proc)
            pipes.append(tr_pipe)

        outs = []
        for i in range(0, trainer_num):
            tr_out, tr_err = procs[i].communicate()
            outs.append(tr_out)
            pipes[i].close()
            sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err))

        if check_error_log:
            print("outs[0]:", outs[0])
            print("outs[1]:", outs[1])
        return pickle.loads(outs[0]), pickle.loads(outs[1])

1094
    def _get_required_envs(self, check_error_log=False, need_envs={}):
1095 1096 1097 1098 1099 1100
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
G
guru4elephant 已提交
1101
            "FLAGS_rpc_deadline": "30000",  # 5sec to fail fast
1102
            "FLAGS_rpc_retry_bind_port": "50",
1103
            "FLAGS_cudnn_deterministic": "1",
1104
            "FLAGS_rpc_disable_reuse_port": "1",
W
Wu Yi 已提交
1105
            "http_proxy": "",
1106 1107
            "NCCL_P2P_DISABLE": "1",
            "NCCL_SHM_DISABLE": "1"
1108 1109 1110
        }

        if check_error_log:
1111
            required_envs["GLOG_vmodule"] = \
1112 1113
                "fused_all_reduce_op_handle=10,all_reduce_op_handle=10,alloc_continuous_space_op=10,fuse_all_reduce_op_pass=10," \
                "alloc_continuous_space_for_grad_pass=10,fast_threaded_ssa_graph_executor=10,executor=10,operator=10," \
1114 1115
                "sparse_all_reduce_op_handle=10,gen_nccl_id_op=10,gen_nccl_id_op_help=10,nccl_helper=10,grpc_client=10," \
                "grpc_server=10,request_handler_impl=10"
1116 1117
            required_envs["GLOG_logtostderr"] = "1"

1118 1119 1120 1121 1122 1123 1124 1125 1126
        required_envs.update(need_envs)
        return required_envs

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={},
                         log_name=""):
1127

1128 1129
        required_envs = self._get_required_envs(check_error_log, need_envs)

T
tangwei12 已提交
1130
        local_losses \
1131
            = self._run_local(model_file, required_envs,
1132 1133
                              check_error_log, log_name=log_name)

W
Wu Yi 已提交
1134
        if self._nccl2_mode:
W
Wu Yi 已提交
1135 1136
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
1137 1138 1139 1140 1141
                    model_file,
                    required_envs,
                    True,
                    check_error_log,
                    log_name=log_name)
W
Wu Yi 已提交
1142 1143
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
1144 1145 1146 1147 1148
                    model_file,
                    required_envs,
                    False,
                    check_error_log,
                    log_name=log_name)
1149 1150 1151
        elif self._pipeline_mode:
            tr0_losses, tr1_losses = self._run_pipeline(
                model_file, required_envs, check_error_log, log_name=log_name)
W
Wu Yi 已提交
1152 1153
        else:
            tr0_losses, tr1_losses = self._run_cluster(
1154
                model_file, required_envs, check_error_log, log_name=log_name)
1155 1156

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
1157 1158 1159
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
1160 1161 1162 1163
            if self._pipeline_mode:
                dist_loss = np.array([tr1_loss])
            else:
                dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
W
Wu Yi 已提交
1164 1165
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)
1166 1167 1168 1169 1170 1171 1172

    def check_with_place_multi_cards(self,
                                     model_file,
                                     delta=1e-3,
                                     check_error_log=False,
                                     need_envs={},
                                     log_name=""):
1173

1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
        # need open p2p or shm otherwise multi cards mode will hang
        need_envs.update({"NCCL_P2P_DISABLE": "0", "NCCL_SHM_DISABLE": "0"})

        required_envs = self._get_required_envs(check_error_log, need_envs)

        if self._use_dgc:
            multi_cards_losses = self._run_local(
                model_file,
                required_envs,
                check_error_log,
                log_name=log_name + "_dgc_2cards",
                gpus="0,1")

            self._use_dgc = False
            base_losses = self._run_local(
                model_file,
                required_envs,
                check_error_log,
                log_name=log_name + "_base_2cards",
                gpus="0,1")

            self._use_dgc = True

            for step_id in range(RUN_STEP):
                base_loss = base_losses[step_id]
                multi_cards_loss = multi_cards_losses[step_id]
                print("=======", base_loss, ":", multi_cards_loss, "=======")
                self.assertAlmostEqual(base_loss, multi_cards_loss, delta=delta)