test_dist_base.py 63.7 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14

15
import argparse
16
import ast
X
Xin Pan 已提交
17
import os
W
Wu Yi 已提交
18
import pickle
19
import random
20 21 22
import subprocess
import sys
import tempfile
23
import time
24 25 26
import unittest

import numpy as np
27 28

import paddle
29
import paddle.fluid as fluid
30
import paddle.fluid.dygraph as dygraph
meteor135's avatar
meteor135 已提交
31
import paddle.incubate.distributed.fleet.role_maker as role_maker
32
from paddle.fluid import compiler
meteor135's avatar
meteor135 已提交
33 34 35 36
from paddle.incubate.distributed.fleet.collective import (
    DistributedStrategy,
    fleet,
)
37

Y
Yan Xu 已提交
38
RUN_STEP = 5
39
DEFAULT_BATCH_SIZE = 2
40
DIST_UT_PORT = 0
41

T
typhoonzero 已提交
42

43
def print_to_out(out_losses):
T
tianshuo78520a 已提交
44
    sys.stdout.buffer.write(pickle.dumps(out_losses))
45 46 47


def print_to_err(class_name, log_str):
48 49
    localtime = time.asctime(time.localtime(time.time()))
    print_str = localtime + "\t" + class_name + "\t" + log_str
T
tianshuo78520a 已提交
50
    sys.stderr.buffer.write(pickle.dumps(print_str))
G
guru4elephant 已提交
51 52


53 54 55 56
def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)


57
class TestDistRunnerBase:
58 59 60 61 62 63 64 65
    def get_model(
        self,
        batch_size=DEFAULT_BATCH_SIZE,
        lr=0.1,
        single_device=False,
        use_dgc=False,
        dist_strategy=None,
    ):
T
typhoonzero 已提交
66
        raise NotImplementedError(
67 68
            "get_model should be implemented by child classes."
        )
T
typhoonzero 已提交
69

70
    @staticmethod
71 72 73 74 75 76 77 78 79 80 81
    def get_transpiler(
        trainer_id,
        main_program,
        pserver_endpoints,
        trainers,
        sync_mode,
        dc_asgd=False,
        current_endpoint=None,
        nccl_comm_num=1,
        hogwild_mode=False,
    ):
T
typhoonzero 已提交
82
        # NOTE: import fluid until runtime, or else forking processes will cause error.
83
        config = paddle.distributed.transpiler.DistributeTranspilerConfig()
W
Wu Yi 已提交
84
        config.enable_dc_asgd = dc_asgd
85
        config.sync_mode = sync_mode
T
tangwei12 已提交
86 87
        config.runtime_split_send_recv = hogwild_mode

88 89
        if nccl_comm_num > 1:
            config.nccl_comm_num = nccl_comm_num
90
        # config.runtime_split_send_recv = True
91
        t = paddle.distributed.transpiler.DistributeTranspiler(config=config)
92 93 94 95 96 97 98 99
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
            trainers=trainers,
            sync_mode=sync_mode,
            current_endpoint=current_endpoint,
        )
T
typhoonzero 已提交
100 101
        return t

102 103 104 105 106
    @staticmethod
    def get_lr_scheduler(program):
        lr_sheduler = None
        if hasattr(program, 'lr_sheduler'):
            from paddle.optimizer.lr import LRScheduler
107

108 109 110 111
            lr_sheduler = program.lr_sheduler
            assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler"
        return lr_sheduler

W
Wu Yi 已提交
112
    def run_pserver(self, args):
W
Wu Yi 已提交
113
        self.lr = args.lr
114
        self.get_model(batch_size=args.batch_size)
115
        # NOTE: pserver should not call memory optimize
T
tangwei12 已提交
116

117 118 119 120 121 122 123 124 125
        t = self.get_transpiler(
            trainer_id=args.trainer_id,
            main_program=fluid.default_main_program(),
            pserver_endpoints=args.endpoints,
            trainers=args.trainers,
            sync_mode=args.sync_mode,
            dc_asgd=args.dc_asgd,
            hogwild_mode=args.hogwild,
        )
W
Wu Yi 已提交
126
        pserver_prog = t.get_pserver_program(args.current_endpoint)
127 128 129
        startup_prog = t.get_startup_program(
            args.current_endpoint, pserver_prog
        )
Y
Yancey1989 已提交
130

T
typhoonzero 已提交
131 132 133
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
134
        print_to_err(type(self).__name__, "run pserver startup program done.")
T
typhoonzero 已提交
135
        exe.run(pserver_prog)
136
        print_to_err(type(self).__name__, "run pserver main program done.")
T
typhoonzero 已提交
137

138 139 140 141
    def run_pipeline_trainer(self, args):
        self.lr = args.lr

        dist_strategy = DistributedStrategy()
142 143 144 145 146 147 148 149 150 151 152
        (
            test_program,
            avg_cost,
            train_reader,
            test_reader,
            batch_acc,
            predict,
            data_loader,
        ) = self.get_model(
            batch_size=args.batch_size, dist_strategy=dist_strategy
        )
153 154 155 156 157 158 159 160 161 162 163 164 165

        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        eprint(type(self).__name__, "device_id: %d." % device_id)
        place = fluid.CUDAPlace(device_id)

        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
        eprint(type(self).__name__, "run worker startup program done.")

        data_loader.set_sample_list_generator(train_reader, place)
        data_loader.start()
        print_to_err(type(self).__name__, "begin to train on trainer")
        out_losses = []
166 167 168

        main_program = fluid.default_main_program()
        lr_sheduler = self.get_lr_scheduler(main_program)
169
        for i in range(RUN_STEP):
170
            loss = exe.run(main_program, fetch_list=[avg_cost])
171 172 173
            loss = loss[0] if loss else None
            out_losses.append(loss)
            print_to_err(type(self).__name__, "run step %d finished" % i)
174 175 176
            if lr_sheduler is not None:
                lr_sheduler.step()

177
        data_loader.reset()
178 179
        print_to_err(type(self).__name__, "trainer run finished")

T
tianshuo78520a 已提交
180
        sys.stdout.buffer.write(pickle.dumps(out_losses))
181

182 183 184 185 186 187 188 189 190 191 192
    def run_use_fleet_api_20_trainer(self, args):
        """
        1. remove codes for DistributedStrategy and leave the DistributedStrategy part to get_model()
        2. to run with fleet 2.0 api, set flags _use_fleet_api and _use_fleet_api_20 to True
        3. for now, not support test for model save
        """
        assert args.update_method == "nccl2" or "bkcl"

        self.lr = args.lr
        print_to_err("use_fleet 2.0", "fleet.node_num:")

193 194 195 196 197 198 199 200
        (
            test_program,
            avg_cost,
            train_reader,
            test_reader,
            batch_acc,
            predict,
        ) = self.get_model(batch_size=args.batch_size)
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217

        if fluid.core.is_compiled_with_cuda():
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
        elif fluid.core.is_compiled_with_xpu():
            device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
            place = fluid.XPUPlace(device_id)
        else:
            raise ValueError(
                "fleet dygraph api must in paddlepaddle-xpu or paddlepaddle-gpu."
            )

        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
        eprint(type(self).__name__, "run worker startup program done.")

        feed_var_list = [
218 219
            var
            for var in fluid.default_main_program().global_block().vars.values()
220 221 222 223 224 225 226 227 228 229 230 231 232
            if var.is_data
        ]

        eprint("feed_var_list:", feed_var_list)

        if feed_var_list[0].name == 'label':
            feed_var_list = feed_var_list[::-1]

        feeder = fluid.DataFeeder(feed_var_list, place)
        reader_generator = train_reader()

        def get_data():
            origin_batch = next(reader_generator)
233 234 235 236
            if (
                paddle.distributed.get_world_size() == 1
                and args.update_method == 'gloo'
            ):  # Gloo single mode
X
xiongkun 已提交
237 238 239
                return origin_batch

            elif args.update_method != "local" and args.use_reader_alloc:
240 241 242 243 244 245 246 247 248 249
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch

        print_to_err(type(self).__name__, "begin to train on trainer")
        out_losses = []
250
        for i in range(RUN_STEP):
251 252 253 254 255
            (loss,) = exe.run(
                fluid.default_main_program(),
                fetch_list=[avg_cost.name],
                feed=feeder.feed(get_data()),
            )
256 257 258 259 260
            out_losses.append(loss[0])
            print_to_err(type(self).__name__, "run step %d finished" % i)
        print_to_err(type(self).__name__, "trainer run finished")
        print_to_err(type(self).__name__, "dist losses: {}".format(out_losses))

T
tianshuo78520a 已提交
261
        sys.stdout.buffer.write(pickle.dumps(out_losses))
262

263 264
    def run_use_fleet_api_trainer(self, args):
        assert args.update_method == "nccl2" or "bkcl"
265 266 267 268 269 270 271 272

        self.lr = args.lr

        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1

        dist_strategy = DistributedStrategy()
        dist_strategy.exec_strategy = exec_strategy
T
tangwei12 已提交
273
        dist_strategy.fuse_memory_size = 1  # MB
274
        dist_strategy.fuse_laryer_size = 1
275 276 277 278
        if args.use_local_sgd:
            dist_strategy.use_local_sgd = True
        if args.ut4grad_allreduce:
            dist_strategy._ut4grad_allreduce = True
279 280
        if args.sync_batch_norm:
            dist_strategy.sync_batch_norm = True
281 282 283

        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
284
        print_to_err("use_fleet", "fleet.node_num:")
T
tangwei12 已提交
285 286
        # "fleet.node_id:", fleet.node_id(),
        # "fleet.trainer_num:", fleet.worker_num())
287

288 289 290 291 292 293 294 295 296 297
        (
            test_program,
            avg_cost,
            train_reader,
            test_reader,
            batch_acc,
            predict,
        ) = self.get_model(
            batch_size=args.batch_size, dist_strategy=dist_strategy
        )
298 299 300 301

        trainer_prog = fleet._origin_program
        dist_prog = fleet.main_program

302 303 304 305 306 307 308 309 310 311
        if fluid.core.is_compiled_with_cuda():
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
        elif fluid.core.is_compiled_with_xpu():
            device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
            place = fluid.XPUPlace(device_id)
        else:
            raise ValueError(
                "fleet dygraph api must in paddlepaddle-xpu or paddlepaddle-gpu."
            )
312 313 314 315 316 317

        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
        eprint(type(self).__name__, "run worker startup program done.")

        feed_var_list = [
318 319
            var
            for var in trainer_prog.global_block().vars.values()
320 321 322
            if var.is_data
        ]

323 324 325 326 327 328 329
        eprint("feed_var_list:", feed_var_list)

        # tmp add this code to pass python35 gcc8 CI
        # Fixme(gongweibao, wangxi), need fix fleet api program order
        if feed_var_list[0].name == 'label':
            feed_var_list = feed_var_list[::-1]

330 331 332 333 334 335 336 337 338 339 340 341 342 343
        feeder = fluid.DataFeeder(feed_var_list, place)
        reader_generator = train_reader()

        def get_data():
            origin_batch = next(reader_generator)
            if args.update_method != "local" and args.use_reader_alloc:
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch

344
        print_to_err(type(self).__name__, "begin to train on trainer")
345
        out_losses = []
346
        for i in range(RUN_STEP):
347 348 349 350 351
            (loss,) = exe.run(
                dist_prog,
                fetch_list=[avg_cost.name],
                feed=feeder.feed(get_data()),
            )
352
            out_losses.append(loss[0])
353 354
            print_to_err(type(self).__name__, "run step %d finished" % i)
        print_to_err(type(self).__name__, "trainer run finished")
355

T
tianshuo78520a 已提交
356
        sys.stdout.buffer.write(pickle.dumps(out_losses))
357

358 359 360
        if args.save_model:
            model_save_dir = "/tmp"
            if fleet.worker_index() == 0:
361 362 363 364 365 366 367 368 369 370 371 372
                model_save_dir_fluid = os.path.join(
                    model_save_dir, "fluid_persistables"
                )
                model_save_dir_fleet = os.path.join(
                    model_save_dir, "fleet_persistables"
                )
                infer_save_dir_fluid = os.path.join(
                    model_save_dir, "fluid_infer"
                )
                infer_save_dir_fleet = os.path.join(
                    model_save_dir, "fleet_infer"
                )
373
            else:
374 375 376 377 378 379 380 381 382 383 384 385
                model_save_dir_fluid = os.path.join(
                    model_save_dir, "fluid_persistables_2"
                )
                model_save_dir_fleet = os.path.join(
                    model_save_dir, "fleet_persistables_2"
                )
                infer_save_dir_fluid = os.path.join(
                    model_save_dir, "fluid_infer_2"
                )
                infer_save_dir_fleet = os.path.join(
                    model_save_dir, "fleet_infer_2"
                )
386
            paddle.distributed.io.save_persistables(
387 388
                exe, model_save_dir_fluid, fleet._origin_program
            )
389 390
            fleet.save_persistables(executor=exe, dirname=model_save_dir_fleet)
            feeded_var_names = [var.name for var in feed_var_list]
391 392 393 394 395 396 397 398 399 400
            fluid.io.save_inference_model(
                infer_save_dir_fluid,
                feeded_var_names,
                [avg_cost],
                exe,
                fleet._origin_program,
            )
            fleet.save_inference_model(
                exe, infer_save_dir_fleet, feeded_var_names, [avg_cost]
            )
401

402
    def run_trainer(self, args):
W
Wu Yi 已提交
403
        self.lr = args.lr
W
Wu Yi 已提交
404
        if args.nccl2_reduce_layer_local_run:
405 406 407 408 409 410 411 412
            (
                test_program,
                avg_cost,
                train_reader,
                test_reader,
                batch_acc,
                predict,
            ) = self.get_model(batch_size=args.batch_size, single_device=True)
413
        elif args.use_dgc:
414 415 416 417 418 419 420 421
            (
                test_program,
                avg_cost,
                train_reader,
                test_reader,
                batch_acc,
                predict,
            ) = self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
422
        else:
423 424 425 426 427 428 429 430
            (
                test_program,
                avg_cost,
                train_reader,
                test_reader,
                batch_acc,
                predict,
            ) = self.get_model(batch_size=args.batch_size)
431

W
Wu Yi 已提交
432
        if args.update_method == "pserver":
433
            print_to_err(
434
                type(self).__name__,
435 436 437 438 439 440 441 442 443 444 445
                "begin to run transpile on trainer with pserver mode",
            )
            t = self.get_transpiler(
                trainer_id=args.trainer_id,
                main_program=fluid.default_main_program(),
                pserver_endpoints=args.endpoints,
                trainers=args.trainers,
                sync_mode=args.sync_mode,
                dc_asgd=args.dc_asgd,
                hogwild_mode=args.hogwild,
            )
T
tangwei12 已提交
446

T
typhoonzero 已提交
447
            trainer_prog = t.get_trainer_program()
448
            print_to_err(
449
                type(self).__name__,
450 451 452 453 454 455
                "get trainer program done with pserver mode.",
            )
        elif (
            args.update_method == "nccl2"
            or args.update_method == "nccl2_reduce_layer"
        ):
W
Wu Yi 已提交
456
            # transpile for nccl2
457
            config = paddle.distributed.transpiler.DistributeTranspilerConfig()
W
Wu Yi 已提交
458
            config.mode = "nccl2"
459
            config.nccl_comm_num = args.nccl_comm_num
460 461
            if args.use_hallreduce:
                config.use_hierarchical_allreduce = True
462 463 464
                config.hierarchical_allreduce_inter_nranks = (
                    args.hallreduce_inter_nranks
                )
465
            print_to_err(
466
                type(self).__name__,
467 468
                "begin to run transpile on trainer with nccl2 mode",
            )
469 470 471
            nccl2_t = paddle.distributed.transpiler.DistributeTranspiler(
                config=config
            )
472 473 474 475 476 477 478
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint,
            )
479
            print_to_err(
480 481
                type(self).__name__, "get trainer program done. with nccl2 mode"
            )
W
Wu Yi 已提交
482
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
483
        else:
484
            print_to_err(
485
                type(self).__name__,
486 487
                "do nothing about main program, just use it",
            )
T
typhoonzero 已提交
488
            trainer_prog = fluid.default_main_program()
489
            print_to_err(type(self).__name__, "use main program done.")
T
typhoonzero 已提交
490

491 492 493
        # FIXME(gongwb):wait pserver initialization.
        time.sleep(1)

494
        if args.use_cuda:
495 496
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
497 498 499
        else:
            place = fluid.CPUPlace()

500 501
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
502
        print_to_err(type(self).__name__, "run worker startup program done.")
T
typhoonzero 已提交
503

W
Wu Yi 已提交
504 505
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
506

W
Wu Yi 已提交
507
        build_stra = fluid.BuildStrategy()
508 509 510
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
511

512 513 514 515
        if args.fuse_all_reduce is not None:
            sys.stderr.write('fuse_all_reduce={}'.format(args.fuse_all_reduce))
            build_stra.fuse_all_reduce_ops = args.fuse_all_reduce

T
tangwei12 已提交
516 517 518
        if args.hogwild:
            build_stra.async_mode = True

519 520 521
        if args.enable_backward_deps:
            build_stra.enable_backward_optimizer_op_deps = True

W
Wu Yi 已提交
522
        if args.use_reduce:
523 524 525
            build_stra.reduce_strategy = (
                fluid.BuildStrategy.ReduceStrategy.Reduce
            )
W
Wu Yi 已提交
526
        else:
527 528 529
            build_stra.reduce_strategy = (
                fluid.BuildStrategy.ReduceStrategy.AllReduce
            )
W
Wu Yi 已提交
530

W
Wu Yi 已提交
531
        pass_builder = None
X
Xin Pan 已提交
532
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
533
            pass_builder = build_stra._finalize_strategy_and_create_passes()
534
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
535
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
536

537 538 539 540
        if (
            args.update_method == "nccl2"
            or args.update_method == "nccl2_reduce_layer"
        ):
541 542
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
543
        else:
W
Wu Yi 已提交
544
            # case args.update_method == "nccl2_reduce_layer":
545 546
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
547

548
        print_to_err(type(self).__name__, "begin to compile with data parallel")
X
Xin Pan 已提交
549
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
550
            loss_name=avg_cost.name,
W
Wu Yi 已提交
551
            build_strategy=build_stra,
552 553
            exec_strategy=exec_strategy,
        )
554
        print_to_err(type(self).__name__, "program compiled with data parallel")
T
typhoonzero 已提交
555 556

        feed_var_list = [
557 558
            var
            for var in trainer_prog.global_block().vars.values()
T
typhoonzero 已提交
559 560 561 562
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
563
        reader_generator = train_reader()
T
typhoonzero 已提交
564

565 566
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
567
            if args.update_method != "local" and args.use_reader_alloc:
568 569 570 571 572 573 574
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
575

576
        lr_scheduler = self.get_lr_scheduler(trainer_prog)
577
        print_to_err(type(self).__name__, "begin to train on trainer")
W
Wu Yi 已提交
578
        out_losses = []
579
        for i in range(RUN_STEP):
580 581 582
            (loss,) = exe.run(
                binary, fetch_list=[avg_cost.name], feed=feeder.feed(get_data())
            )
W
Wu Yi 已提交
583
            out_losses.append(loss[0])
584
            print_to_err(type(self).__name__, "run step %d finished" % i)
585 586 587
            if lr_scheduler is not None:
                lr_scheduler.step()

588
        print_to_err(type(self).__name__, "trainer run finished")
589

590
        print_to_out(out_losses)
T
typhoonzero 已提交
591 592


593
class TestParallelDyGraphRunnerBase:
594 595
    def get_model(self):
        raise NotImplementedError(
596 597
            "get_model should be implemented by child classes."
        )
598 599 600

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
601 602
            "train_one_loop should be implemented by the child classes."
        )
603

604
    def _get_data(self, batch, args):
605 606 607 608
        if (
            paddle.distributed.get_world_size() == 1
            and args.update_method == 'gloo'
        ):  # Gloo single mode
X
xiongkun 已提交
609 610
            return batch
        elif args.update_method != "local":
611
            new_batch = []
612

613 614 615
            # NOTE(@xiongkun03) args.diff_batch means batch length is different:
            # such as : batch = [2,3,4,5], then the first rank will get [2]  and
            # the second rank will get [3,4,5].
616 617
            # this function is for test sparse_embedding_differ_length
            if hasattr(args, "diff_batch") and args.diff_batch:
618 619 620
                assert (
                    len(batch) > 2
                ), "in differ_batch mode, len(batch) must > 2."
621 622 623 624 625 626 627 628 629 630 631 632 633 634
                if paddle.distributed.get_rank() == 0:
                    new_batch.append(batch[0])
                elif paddle.distributed.get_rank() == 1:
                    new_batch.extend([_ for _ in batch[1:]])
                else:
                    raise NotImplementedError(
                        "Current TestParallelDyGraphRunnerBase don't support world_size > 2"
                    )
                return new_batch
            else:
                for offset, item in enumerate(batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
635 636 637
        else:
            return batch

638 639
    def run_trainer(self, args):
        seed = 90
X
xiongkun 已提交
640 641 642
        if args.update_method == 'gloo':
            place = fluid.CPUPlace()
        elif fluid.core.is_compiled_with_cuda():
643 644 645 646 647
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
        elif fluid.core.is_compiled_with_xpu():
            device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
            place = fluid.XPUPlace(device_id)
648 649 650
        elif fluid.core.is_compiled_with_npu():
            device_id = int(os.getenv("FLAGS_selected_npus", "0"))
            place = fluid.NPUPlace(device_id)
651 652 653
        elif fluid.core.is_compiled_with_mlu():
            device_id = int(os.getenv("FLAGS_selected_mlus", "0"))
            place = fluid.MLUPlace(device_id)
654
        else:
655
            assert "Only support CUDAPlace or XPUPlace or CPU(Gloo) for now."
656 657 658 659

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
Y
Yan Xu 已提交
660 661
            np.random.seed(seed)
            import random
662

663
            random.seed(seed)
664 665
            model, train_reader, opt = self.get_model()
            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
Y
Yan Xu 已提交
666

667 668 669 670 671 672 673
            # if args.update_method == "nccl2":
            if (
                args.update_method == "nccl2"
                or args.update_method == "bkcl"
                or args.update_method == "hccl"
                or args.update_method == "cncl"
            ):
674 675 676 677 678
                strategy = dygraph.parallel.ParallelStrategy()
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
679
                paddle.distributed.init_parallel_env()
680
                print_to_err(
681
                    type(self).__name__,
682 683
                    "begin to prepare context in dygraph with nccl2",
                )
684 685
                if not args.find_unused_parameters:
                    model = dygraph.parallel.DataParallel(
686 687
                        model, strategy, find_unused_parameters=False
                    )
688 689
                else:
                    model = dygraph.parallel.DataParallel(
690 691
                        model, strategy, find_unused_parameters=True
                    )
692
                print_to_err(type(self).__name__, "model built in dygraph")
X
xiongkun 已提交
693 694 695 696 697

            elif args.update_method == "gloo":
                paddle.distributed.init_parallel_env()
                if not args.find_unused_parameters:
                    model = dygraph.parallel.DataParallel(
698 699
                        model, find_unused_parameters=False
                    )
X
xiongkun 已提交
700 701
                else:
                    model = dygraph.parallel.DataParallel(
702 703
                        model, find_unused_parameters=True
                    )
X
xiongkun 已提交
704

705
            out_losses = []
706
            print_to_err(type(self).__name__, "begin to run dygraph training")
707
            for step_id, data in enumerate(train_reader()):
708
                data = self._get_data(data, args)
709 710 711
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)
G
guru4elephant 已提交
712
                if step_id % 10 == 0:
713
                    print_to_err(
714
                        type(self).__name__,
715 716
                        "loss at step %d: %f" % (step_id, loss.numpy()),
                    )
Y
Yan Xu 已提交
717
                out_losses.append(loss.numpy())
718 719 720 721

                loss.backward()

                opt.minimize(loss)
722 723
                if not args.accumulate_gradient:
                    model.clear_gradients()
724
        print_to_out(out_losses)
725

726 727 728 729 730 731 732 733 734
    def run_trainer_with_spawn(self, args):
        # 1. enable dygraph
        paddle.disable_static()

        # 2. init seed
        seed = 90
        paddle.static.default_startup_program().random_seed = seed
        paddle.static.default_main_program().random_seed = seed
        np.random.seed(seed)
735
        random.seed(seed)
736
        # get trainer id
L
LiYuRio 已提交
737 738
        paddle.distributed.parallel._get_global_parallel_env()
        args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
739 740

        # 3. init parallel env
X
xiongkun 已提交
741
        if args.update_method in ["nccl2", "gloo"]:
742 743 744 745
            paddle.distributed.init_parallel_env()

        # 4. train model
        model, train_reader, opt = self.get_model()
X
xiongkun 已提交
746
        if args.update_method in ["nccl2", "gloo"]:
747
            model = paddle.DataParallel(
748 749
                model, find_unused_parameters=args.find_unused_parameters
            )
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764

        out_losses = []
        for step_id, data in enumerate(train_reader()):
            data = self._get_data(data, args)
            if step_id == RUN_STEP:
                break
            loss = self.run_one_loop(model, opt, data)
            out_losses.append(loss.numpy())

            loss.backward()

            opt.minimize(loss)
            model.clear_gradients()
        return out_losses

765
    def run_use_fleet_api_trainer(self, args):
766
        import paddle.distributed.fleet as fleet
767

768 769 770 771 772 773 774 775
        # 1. enable dygraph
        paddle.disable_static()

        # 2. init seed
        seed = 90
        paddle.static.default_startup_program().random_seed = seed
        paddle.static.default_main_program().random_seed = seed
        np.random.seed(seed)
776
        random.seed(seed)
777
        # get trainer id
L
LiYuRio 已提交
778 779
        paddle.distributed.parallel._get_global_parallel_env()
        args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
780

781 782
        # set strategy
        strategy = fleet.DistributedStrategy()
783 784
        if args.find_unused_parameters:
            strategy.find_unused_parameters = True
785

786
        # 3. init parallel env
787
        if args.update_method == "nccl2" or "bkcl" or "hccl":
788
            fleet.init(is_collective=True, strategy=strategy)
789 790 791

        # 4. train model
        model, train_reader, opt = self.get_model()
792
        if args.update_method == "nccl2" or "bkcl" or "hccl":
793 794 795 796 797 798 799 800 801 802 803 804 805 806
            opt = fleet.distributed_optimizer(opt)
            model = fleet.distributed_model(model)

        out_losses = []
        for step_id, data in enumerate(train_reader()):
            data = self._get_data(data, args)
            if step_id == RUN_STEP:
                break
            loss = self.run_one_loop(model, opt, data)
            out_losses.append(loss.numpy())

            loss.backward()

            opt.step()
807 808
            if not args.accumulate_gradient:
                opt.clear_grad()
809 810
        print_to_out(out_losses)

811

T
typhoonzero 已提交
812
def runtime_main(test_class):
W
Wu Yi 已提交
813
    parser = argparse.ArgumentParser(description='Run dist test.')
814 815 816
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer']
    )
W
Wu Yi 已提交
817
    parser.add_argument('--endpoints', type=str, required=False, default="")
818 819 820 821 822 823 824 825 826 827 828 829 830 831 832
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
        choices=[
            "pserver",
            "nccl2",
            "bkcl",
            "local",
            "nccl2_reduce_layer",
            "gloo",
            "hccl",
            "cncl",
        ],
    )
W
Wu Yi 已提交
833 834
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
835
    parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
836 837
    parser.add_argument('--enable_backward_deps', action='store_true')
    parser.add_argument('--use_hallreduce', action='store_true')
838
    parser.add_argument('--use_pipeline', action='store_true')
839
    parser.add_argument('--use_fleet_api', action='store_true')
840
    parser.add_argument('--use_fleet_api_20', action='store_true')
841
    parser.add_argument('--use_local_sgd', action='store_true')
842
    parser.add_argument('--diff_batch', action='store_true')
843
    parser.add_argument('--ut4grad_allreduce', action='store_true')
844 845 846 847 848 849
    parser.add_argument(
        '--hallreduce_inter_nranks', type=int, required=False, default=2
    )
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default=""
    )
W
Wu Yi 已提交
850
    parser.add_argument('--sync_mode', action='store_true')
851
    parser.add_argument('--use_cuda', action='store_true')
X
xiongkun 已提交
852
    parser.add_argument('--use_cpu', action='store_true')
853
    parser.add_argument('--use_xpu', action='store_true')
854
    parser.add_argument('--use_dgc', action='store_true')
855
    parser.add_argument('--use_npu', action='store_true')
856
    parser.add_argument('--use_mlu', action='store_true')
857
    parser.add_argument('--accumulate_gradient', action='store_true')
858
    parser.add_argument('--find_unused_parameters', action='store_true')
W
Wu Yi 已提交
859
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
860
    parser.add_argument('--dc_asgd', action='store_true')
T
tangwei12 已提交
861
    parser.add_argument('--hogwild', action='store_true')
862
    parser.add_argument('--save_model', action='store_true')
863 864 865
    parser.add_argument(
        '--use_reader_alloc', action='store_true', required=False
    )
866
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
867
    parser.add_argument('--lr', required=False, type=float, default=0.001)
868 869 870 871 872 873 874 875 876
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1
    )
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False,
    )
877
    parser.add_argument('--sync_batch_norm', action='store_true')
878 879 880
    parser.add_argument(
        '--fuse_all_reduce', required=False, type=ast.literal_eval, default=None
    )
W
Wu Yi 已提交
881 882

    args = parser.parse_args()
T
typhoonzero 已提交
883

X
xiongkun 已提交
884 885 886
    if args.update_method == 'gloo':
        paddle.set_device("cpu")

T
typhoonzero 已提交
887
    model = test_class()
W
Wu Yi 已提交
888
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
889
        model.run_pserver(args)
890 891
    elif args.use_fleet_api:
        model.run_use_fleet_api_trainer(args)
892 893
    elif args.use_fleet_api_20:
        model.run_use_fleet_api_20_trainer(args)
894 895
    elif args.use_pipeline:
        model.run_pipeline_trainer(args)
T
typhoonzero 已提交
896
    else:
897
        model.run_trainer(args)
X
Xin Pan 已提交
898

M
minqiyang 已提交
899

Y
Yancey1989 已提交
900 901
import socket
from contextlib import closing
M
minqiyang 已提交
902

X
Xin Pan 已提交
903 904

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
905 906 907
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

908 909 910
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
911
            self.__use_xpu = False
912
            self._use_dgc = False
913
            self.__use_npu = False
914
            self._use_mlu = False
915 916
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
917
            self.__use_xpu = False
918
            self.__use_npu = False
919
            self._use_mlu = False
920 921 922 923
        elif self._enforce_place == "XPU":
            self.__use_cuda = False
            self.__use_xpu = True
            self._use_dgc = False
924
            self.__use_npu = False
925
            self._use_mlu = False
926 927 928 929 930
        elif self._enforce_place == "NPU":
            self.__use_cuda = False
            self.__use_xpu = False
            self._use_dgc = False
            self.__use_npu = True
931 932 933 934 935 936 937
            self._use_mlu = False
        elif self._enforce_place == "MLU":
            self.__use_cuda = False
            self.__use_xpu = False
            self._use_dgc = False
            self.__use_npu = False
            self._use_mlu = True
938 939 940 941 942
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
943 944 945 946
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
947

X
Xin Pan 已提交
948 949 950
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
951
        self._port_set = set()
M
minqiyang 已提交
952
        self._python_interp = sys.executable
W
Wu Yi 已提交
953
        self._sync_mode = True
T
tangwei12 已提交
954
        self._hogwild_mode = False
955
        self._enforce_place = None
W
Wu Yi 已提交
956
        self._use_reduce = False
W
Wu Yi 已提交
957
        self._dc_asgd = False  # must use with async mode
958
        self._use_reader_alloc = True
W
Wu Yi 已提交
959
        self._nccl2_mode = False
960
        self._bkcl_mode = False
X
xiongkun 已提交
961
        self._gloo_mode = False  # now, support gloo backend
962
        self._hccl_mode = False
963
        self._cncl_mode = False
964
        self._pipeline_mode = False
965
        self._mp_mode = False
966
        self._diff_batch = False
W
Wu Yi 已提交
967 968 969 970 971
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
972
        self._lr = 0.001
973
        self._use_dgc = False
974
        self._dygraph = False
975
        self._nccl_comm_num = 1
976
        self._enable_backward_deps = False
977
        self._use_fleet_api = False
978
        self._use_fleet_api_20 = False
979 980
        self._use_local_sgd = False
        self._ut4grad_allreduce = False
981
        self._use_hallreduce = False
982
        self._save_model = False
983
        self._fuse_all_reduce = None
984
        self._accumulate_gradient = False
985
        self._find_unused_parameters = False
W
Wu Yi 已提交
986
        self._setup_config()
987 988 989 990 991 992 993

        global DIST_UT_PORT
        if DIST_UT_PORT == 0 and os.getenv("PADDLE_DIST_UT_PORT"):
            DIST_UT_PORT = int(os.getenv("PADDLE_DIST_UT_PORT"))

        if DIST_UT_PORT == 0:
            self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
994 995 996
                self._find_free_port(),
                self._find_free_port(),
            )
997 998
        else:
            self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
999 1000 1001
                DIST_UT_PORT,
                DIST_UT_PORT + 1,
            )
1002
            DIST_UT_PORT += 2
1003
            self._dist_port = DIST_UT_PORT
1004

1005
        self._after_setup_config()
X
Xin Pan 已提交
1006

1007 1008 1009 1010 1011
        self.temp_dir = tempfile.TemporaryDirectory()

    def tearDown(self):
        self.temp_dir.cleanup()

Y
Yancey1989 已提交
1012
    def _find_free_port(self):
Y
Yancey1989 已提交
1013
        def __free_port():
1014 1015 1016
            with closing(
                socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            ) as s:
Y
Yancey1989 已提交
1017
                s.bind(('', 0))
1018
                print_to_err(
1019 1020
                    type(self).__name__, "socket name: %s" % s.getsockname()[1]
                )
Y
Yancey1989 已提交
1021 1022 1023 1024 1025 1026 1027
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
1028

1029 1030 1031
    def start_pserver(
        self, model_file, check_error_log, required_envs, log_name=""
    ):
X
Xin Pan 已提交
1032
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
1033 1034 1035 1036 1037 1038 1039 1040
        ps_cmd = "%s"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            required_envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            ps_cmd += " -m coverage run --branch -p"

        ps_cmd += " %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"

1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
        ps0_cmd = ps_cmd % (
            self._python_interp,
            model_file,
            self._ps_endpoints,
            ps0_ep,
            self._trainers,
        )
        ps1_cmd = ps_cmd % (
            self._python_interp,
            model_file,
            self._ps_endpoints,
            ps1_ep,
            self._trainers,
        )
W
Wu Yi 已提交
1055 1056 1057 1058

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
X
Xin Pan 已提交
1059

1060 1061
        print(ps0_cmd)
        print(ps1_cmd)
1062 1063 1064 1065
        path0 = os.path.join(self.temp_dir.name, log_name + "_ps0_err.log")
        path1 = os.path.join(self.temp_dir.name, log_name + "_ps1_err.log")
        ps0_pipe = open(path0, "wb")
        ps1_pipe = open(path1, "wb")
G
gongweibao 已提交
1066

1067
        print_to_err(type(self).__name__, "going to start pserver process 0")
1068 1069 1070 1071 1072 1073
        ps0_proc = subprocess.Popen(
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs,
        )
1074
        print_to_err(type(self).__name__, "going to start pserver process 1")
1075 1076 1077 1078 1079 1080
        ps1_proc = subprocess.Popen(
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs,
        )
G
gongweibao 已提交
1081

1082
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
1083

1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
    def _run_local(
        self,
        model,
        envs,
        check_error_log=False,
        batch_size=DEFAULT_BATCH_SIZE,
        batch_merge_repeat=1,
        log_name="",
        devices="1",
    ):
G
gongweibao 已提交
1094

1095 1096 1097 1098 1099 1100
        cmd = self._python_interp

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            cmd += " -m coverage run --branch -p"

1101 1102 1103 1104
        cmd += " %s --role trainer --update_method local --lr %f" % (
            model,
            self._lr,
        )
1105

1106 1107 1108 1109
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
1110 1111
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
1112

1113
        if self.__use_cuda:
1114
            cmd += " --use_cuda"
W
Wu Yi 已提交
1115
            env_local = {
1116 1117
                "CUDA_VISIBLE_DEVICES": devices,
                "PADDLE_TRAINERS_NUM": "1",
1118
                "PADDLE_TRAINER_ID": "0",
1119 1120 1121 1122 1123
            }
        elif self.__use_xpu:
            cmd += " --use_xpu"
            env_local = {
                "FLAGS_selected_xpus": devices,
W
Wu Yi 已提交
1124
                "PADDLE_TRAINERS_NUM": "1",
1125
                "PADDLE_TRAINER_ID": "0",
W
Wu Yi 已提交
1126
            }
1127 1128 1129 1130 1131
        elif self.__use_npu:
            cmd += " --use_npu"
            env_local = {
                "FLAGS_selected_npus": devices,
                "PADDLE_TRAINERS_NUM": "1",
1132
                "PADDLE_TRAINER_ID": "0",
1133
            }
1134 1135 1136
        else:
            env_local = {'CPU_NUM': '1'}

1137
        # not use dgc in single card
1138
        if len(devices) > 1 and self._use_dgc:
1139 1140
            cmd += " --use_dgc"

1141 1142 1143
        if self._accumulate_gradient:
            cmd += " --accumulate_gradient"

1144 1145 1146
        if self._find_unused_parameters:
            cmd += " --find_unused_parameters"

W
Wu Yi 已提交
1147 1148
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
1149

1150
        if check_error_log:
1151 1152
            path = os.path.join(self.temp_dir.name, log_name + "_local.log")
            err_log = open(path, "wb")
1153 1154 1155 1156 1157 1158
            local_proc = subprocess.Popen(
                cmd.split(" "),
                stdout=subprocess.PIPE,
                stderr=err_log,
                env=env_local,
            )
G
gongweibao 已提交
1159
        else:
1160 1161 1162 1163 1164 1165
            local_proc = subprocess.Popen(
                cmd.split(" "),
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                env=env_local,
            )
G
gongweibao 已提交
1166

1167 1168 1169 1170 1171 1172
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
1173
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
1174

W
Wu Yi 已提交
1175
        return pickle.loads(local_out)
1176

1177 1178 1179 1180 1181 1182 1183 1184 1185 1186
    def _run_local_gloo(
        self,
        model,
        envs,
        check_error_log=False,
        batch_size=DEFAULT_BATCH_SIZE,
        batch_merge_repeat=1,
        log_name="",
        devices="0",
    ):
X
xiongkun 已提交
1187 1188
        saved_endpoints = self._ps_endpoints
        self._ps_endpoints = self._ps_endpoints.split(',')[0]
1189 1190 1191
        result = self._run_cluster_gloo(
            model, envs, 'gloo', check_error_log, log_name
        )
X
xiongkun 已提交
1192 1193 1194
        self._ps_endpoints = saved_endpoints
        return result

1195
    def _run_cluster(self, model, envs, check_error_log, log_name):
X
Xin Pan 已提交
1196
        # Run dist train to compare with local results
1197 1198 1199
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(
            model, check_error_log, envs, log_name=log_name
        )
W
Wu Yi 已提交
1200

X
Xin Pan 已提交
1201
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
1202

1203 1204 1205 1206 1207 1208 1209 1210
        tr_cmd = "%s"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            tr_cmd += " -m coverage run --branch -p"

        tr_cmd += " %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"

1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228
        tr0_cmd = tr_cmd % (
            self._python_interp,
            model,
            self._ps_endpoints,
            0,
            ps0_ep,
            self._trainers,
            self._lr,
        )
        tr1_cmd = tr_cmd % (
            self._python_interp,
            model,
            self._ps_endpoints,
            1,
            ps1_ep,
            self._trainers,
            self._lr,
        )
W
Wu Yi 已提交
1229 1230 1231 1232

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
T
tangwei12 已提交
1233 1234 1235
        if self._hogwild_mode:
            tr0_cmd += " --hogwild"
            tr1_cmd += " --hogwild"
W
Wu Yi 已提交
1236 1237 1238
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
1239 1240 1241
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
1242
        if self.__use_cuda:
1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
1253

W
Wu Yi 已提交
1254 1255
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
1256 1257 1258 1259 1260

        path0 = os.path.join(self.temp_dir.name, log_name + "_tr0_err.log")
        path1 = os.path.join(self.temp_dir.name, log_name + "_tr1_err.log")
        tr0_pipe = open(path0, "wb")
        tr1_pipe = open(path1, "wb")
G
gongweibao 已提交
1261

1262
        print_to_err(type(self).__name__, "going to start trainer process 0")
1263 1264 1265 1266 1267 1268
        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0,
        )
1269
        print_to_err(type(self).__name__, "going to start trainer process 1")
1270 1271 1272 1273 1274 1275
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1,
        )
X
Xin Pan 已提交
1276

1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

1289 1290
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
1291

G
gongweibao 已提交
1292
        # close trainer file
1293 1294 1295 1296
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
1297

W
Wu Yi 已提交
1298 1299
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
1300

W
Wu Yi 已提交
1301 1302
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

1303 1304 1305
    def _get_gloo_trainer_cmd(
        self, model, ep, update_method, trainer_id, trainer_num
    ):
X
xiongkun 已提交
1306 1307 1308 1309 1310 1311 1312 1313
        env = {}
        tr_cmd = "%s -u"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            tr_cmd += " -m coverage run --branch -p"

        tr_cmd += " %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"

1314 1315 1316 1317 1318 1319 1320 1321 1322
        tr_cmd = tr_cmd % (
            self._python_interp,
            model,
            self._ps_endpoints,
            trainer_id,
            ep,
            update_method,
            self._lr,
        )
X
xiongkun 已提交
1323 1324 1325 1326 1327

        if self._use_reduce:
            tr_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr_cmd += " --use_reader_alloc"
1328 1329
        # assert self._use_reduce == False, "gloo not support _use_reduce"
        # assert self._use_reader_alloc == False, "gloo not support _use_reduce"
X
xiongkun 已提交
1330 1331
        if self._save_model:
            tr_cmd += " --save_model"
1332 1333
        if self._diff_batch:
            tr_cmd += " --diff_batch"
X
xiongkun 已提交
1334 1335
        self.__use_cuda = False
        self.__use_xpu = False
1336 1337
        assert not self.__use_cuda, "gloo not support use cuda"
        assert not self.__use_xpu, "gloo not support use xpu"
X
xiongkun 已提交
1338
        tr_cmd += " --use_cpu"
1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349
        env.update(
            {
                "PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
                "PADDLE_TRAINER_ID": "{}".format(trainer_id),
                "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                "PADDLE_CURRENT_ENDPOINT": ep,
                "PADDLE_CURRENT_ENDPOINT": ep,
                "PADDLE_DISTRI_BACKEND": "gloo",
                "GLOG_v": "2",
            }
        )
X
xiongkun 已提交
1350

1351
        assert not self._use_dgc, "gloo not support use dgc"
1352

X
xiongkun 已提交
1353 1354 1355 1356 1357 1358
        if self._accumulate_gradient:
            tr_cmd += " --accumulate_gradient"

        if self._find_unused_parameters:
            tr_cmd += " --find_unused_parameters"

1359
        assert not self._pipeline_mode, "gloo not support use pipeline"
X
xiongkun 已提交
1360 1361 1362 1363 1364 1365 1366

        if self._enable_backward_deps:  # build strategy, save it
            tr_cmd += " --enable_backward_deps"

        if self._fuse_all_reduce is not None:
            tr_cmd += " --fuse_all_reduce {}".format(self._fuse_all_reduce)

1367 1368
        assert not self._use_fleet_api, "gloo not support use fleet api"
        assert not self._use_fleet_api_20, "gloo not support use fleet api"
X
xiongkun 已提交
1369 1370
        return tr_cmd, env

1371 1372 1373
    def _get_nccl2_trainer_cmd(
        self, model, ep, update_method, trainer_id, trainer_num
    ):
1374
        env = {}
1375 1376 1377 1378 1379 1380 1381
        tr_cmd = "%s -u"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            tr_cmd += " -m coverage run --branch -p"

        tr_cmd += " %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"

1382 1383 1384 1385 1386 1387 1388 1389 1390
        tr_cmd = tr_cmd % (
            self._python_interp,
            model,
            self._ps_endpoints,
            trainer_id,
            ep,
            update_method,
            self._lr,
        )
W
Wu Yi 已提交
1391 1392

        if self._use_reduce:
1393
            tr_cmd += " --use_reduce"
W
Wu Yi 已提交
1394
        if self._use_reader_alloc:
1395
            tr_cmd += " --use_reader_alloc"
1396 1397
        if self._save_model:
            tr_cmd += " --save_model"
W
Wu Yi 已提交
1398
        if self.__use_cuda:
1399
            tr_cmd += " --use_cuda"
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
            env.update(
                {
                    "FLAGS_selected_gpus": "{}".format(0),
                    "CUDA_VISIBLE_DEVICES": "{}".format(trainer_id),
                    "PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
                    "PADDLE_TRAINER_ID": "{}".format(trainer_id),
                    "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                    "PADDLE_CURRENT_ENDPOINT": ep,
                }
            )
1410 1411 1412 1413
        # TODO(liuyuhui):XPU_VISIBLE_DEVICES is not working right now,
        # will update it after Badiu Kunlun partners' support.
        elif self.__use_xpu:
            tr_cmd += " --use_xpu"
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
            env.update(
                {
                    "FLAGS_selected_xpus": "{}".format(trainer_id),
                    # "XPU_VISIBLE_DEVICES": "{}".format(trainer_id + 1),
                    "PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
                    "PADDLE_TRAINER_ID": "{}".format(trainer_id),
                    "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                    "PADDLE_CURRENT_ENDPOINT": ep,
                    "GLOG_v": "2",
                }
            )
1425 1426
        elif self.__use_npu:
            tr_cmd += " --use_npu"
1427 1428 1429 1430 1431 1432 1433 1434 1435 1436
            env.update(
                {
                    "FLAGS_selected_npus": "{}".format(trainer_id),
                    "PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
                    "PADDLE_TRAINER_ID": "{}".format(trainer_id),
                    "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                    "PADDLE_CURRENT_ENDPOINT": ep,
                    "GLOG_v": "2",
                }
            )
1437 1438
        elif self._use_mlu:
            tr_cmd += " --use_mlu"
1439 1440 1441 1442 1443 1444 1445 1446 1447 1448
            env.update(
                {
                    "FLAGS_selected_mlus": "{}".format(trainer_id),
                    "PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
                    "PADDLE_TRAINER_ID": "{}".format(trainer_id),
                    "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                    "PADDLE_CURRENT_ENDPOINT": ep,
                    "GLOG_v": "4",
                }
            )
W
Wu Yi 已提交
1449
        else:
1450
            env.update({'CPU_NUM': '1'})
W
Wu Yi 已提交
1451

1452
        if self._use_dgc:
1453 1454
            tr_cmd += " --use_dgc"

1455 1456 1457
        if self._accumulate_gradient:
            tr_cmd += " --accumulate_gradient"

1458 1459 1460
        if self._find_unused_parameters:
            tr_cmd += " --find_unused_parameters"

1461 1462
        if self._pipeline_mode:
            tr_cmd += " --use_pipeline"
1463
        if self._mp_mode:
W
WangXi 已提交
1464
            env = {"FLAGS_selected_gpus": "{}".format(trainer_id)}
1465 1466

        if self._nccl_comm_num > 1:
1467
            tr_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
1468

1469 1470
        if self._use_hallreduce:
            tr_cmd += " --use_hallreduce --hallreduce_inter_nranks 2"
1471

1472
        if self._enable_backward_deps:
1473
            tr_cmd += " --enable_backward_deps"
1474

1475 1476 1477
        if self._fuse_all_reduce is not None:
            tr_cmd += " --fuse_all_reduce {}".format(self._fuse_all_reduce)

1478
        if self._use_fleet_api:
1479 1480 1481 1482 1483
            tr_cmd += (
                " --use_fleet_api_20"
                if self._use_fleet_api_20
                else " --use_fleet_api"
            )
1484 1485 1486 1487
            if self._use_local_sgd:
                tr_cmd += " --use_local_sgd"
            if self._ut4grad_allreduce:
                tr_cmd += " --ut4grad_allreduce"
1488 1489
            if hasattr(self, '_sync_batch_norm') and self._sync_batch_norm:
                tr_cmd += " --sync_batch_norm"
1490

1491 1492 1493
        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            env['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')

1494
        return tr_cmd, env
W
Wu Yi 已提交
1495

1496 1497 1498 1499 1500 1501 1502 1503 1504 1505
    def _run_cluster_gloo(
        self, model, envs, update_method, check_error_log, log_name
    ):
        assert update_method == "gloo", (
            "_run_cluster_gloo must have update_method: gloo, but get %s"
            % update_method
        )
        assert (
            not self._use_hallreduce
        ), "_run_cluster_gloo must have _use_hallreduce = false"
X
xiongkun 已提交
1506 1507 1508 1509 1510 1511 1512 1513

        worker_endpoints = self._ps_endpoints.split(",")

        trainer_num = len(worker_endpoints)

        procs = []
        pipes = []
        for i in range(0, trainer_num):
1514 1515 1516
            tr_cmd, tr_env = self._get_gloo_trainer_cmd(
                model, worker_endpoints[i], update_method, i, trainer_num
            )
X
xiongkun 已提交
1517 1518 1519
            tr_env.update(envs)
            tr_env["GLOG_vmodule"] = 'gloo_context=4'
            tr_env["GLOG_v"] = '3'
1520 1521 1522 1523 1524
            print(
                "use_hallreduce:{} tr_cmd:{}, env: {}".format(
                    self._use_hallreduce, tr_cmd, tr_env
                )
            )
X
xiongkun 已提交
1525

1526 1527 1528
            path = os.path.join(
                self.temp_dir.name, log_name + "_tr{}_err.log".format(i)
            )
1529
            tr_pipe = open(path, "wb")
X
xiongkun 已提交
1530 1531 1532

            print_to_err(
                type(self).__name__,
1533 1534 1535 1536 1537 1538 1539 1540
                "going to start process {} with nccl2".format(i),
            )
            tr_proc = subprocess.Popen(
                tr_cmd.strip().split(" "),
                stdout=subprocess.PIPE,
                stderr=tr_pipe,
                env=tr_env,
            )
X
xiongkun 已提交
1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552

            procs.append(tr_proc)
            pipes.append(tr_pipe)

        outs = []
        for i in range(0, trainer_num):
            tr_out, tr_err = procs[i].communicate()
            outs.append(tr_out)
            pipes[i].close()
            sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err))

        if trainer_num == 1:
1553 1554
            if check_error_log:
                print("outs[0]:", outs[0])
X
xiongkun 已提交
1555 1556 1557 1558 1559 1560 1561 1562
            return pickle.loads(outs[0])

        else:
            if check_error_log:
                print("outs[0]:", outs[0])
                print("outs[1]:", outs[1])
            return pickle.loads(outs[0]), pickle.loads(outs[1])

1563 1564 1565
    def _run_cluster_nccl2(
        self, model, envs, update_method, check_error_log, log_name
    ):
1566 1567
        if self._use_hallreduce:
            self._ps_endpoints = ""
1568 1569 1570

            global DIST_UT_PORT
            if DIST_UT_PORT == 0:
W
WangXi 已提交
1571
                # NOTE(wangxi). hallreduce test must use 4cards after nccl>=2.7
1572 1573
                for i in range(0, 4):
                    self._ps_endpoints += "127.0.0.1:%s," % (
1574 1575
                        self._find_free_port()
                    )
1576 1577 1578 1579
            else:
                for i in range(0, 4):
                    self._ps_endpoints += "127.0.0.1:%s," % (DIST_UT_PORT + i)
                DIST_UT_PORT += 4
1580
            self._ps_endpoints = self._ps_endpoints[:-1]
W
Wu Yi 已提交
1581

1582 1583
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
W
Wu Yi 已提交
1584

1585
        trainer_num = len(worker_endpoints)
W
Wu Yi 已提交
1586

1587 1588 1589 1590
        procs = []
        pipes = []
        for i in range(0, trainer_num):
            tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
1591 1592
                model, worker_endpoints[i], update_method, i, trainer_num
            )
1593
            tr_env.update(envs)
1594 1595 1596 1597 1598
            print(
                "use_hallreduce:{} tr_cmd:{}, env: {}".format(
                    self._use_hallreduce, tr_cmd, tr_env
                )
            )
W
Wu Yi 已提交
1599

1600 1601 1602
            path = os.path.join(
                self.temp_dir.name, log_name + "_tr{}_err.log".format(i)
            )
1603
            tr_pipe = open(path, "wb")
W
Wu Yi 已提交
1604

1605
            print_to_err(
1606
                type(self).__name__,
1607 1608 1609 1610 1611 1612 1613 1614
                "going to start process {} with nccl2".format(i),
            )
            tr_proc = subprocess.Popen(
                tr_cmd.strip().split(" "),
                stdout=subprocess.PIPE,
                stderr=tr_pipe,
                env=tr_env,
            )
1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625

            procs.append(tr_proc)
            pipes.append(tr_pipe)

        outs = []
        for i in range(0, trainer_num):
            tr_out, tr_err = procs[i].communicate()
            outs.append(tr_out)
            pipes[i].close()
            sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err))

1626 1627 1628
        if check_error_log:
            print("outs[0]:", outs[0])
            print("outs[1]:", outs[1])
1629

1630
        return pickle.loads(outs[0]), pickle.loads(outs[1])
1631

1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
    def _run_pipeline(self, model, envs, check_error_log, log_name):
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        update_method = "nccl2"

        trainer_num = len(worker_endpoints)

        procs = []
        pipes = []
        for i in range(0, trainer_num):
            tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
1643 1644
                model, worker_endpoints[i], update_method, i, trainer_num
            )
1645 1646 1647 1648 1649 1650 1651
            tr_env.update(envs)
            tr_env['CUDA_VISIBLE_DEVICES'] = "0,1"
            tr_env['NCCL_SHM_DISABLE'] = '1'
            tr_env['FLAGS_selected_gpus'] = str(i)
            tr_env['FLAGS_cudnn_deterministic'] = '0'
            print("tr_cmd:{}, env: {}".format(tr_cmd, tr_env))

1652 1653
            path = os.path.join(self.temp_dir.name + "tr{}_err.log".format(i))
            tr_pipe = open(path, "wb")
1654 1655 1656

            print_to_err(
                type(self).__name__,
1657 1658 1659 1660 1661 1662 1663 1664
                "going to start process {} with nccl2".format(i),
            )
            tr_proc = subprocess.Popen(
                tr_cmd.strip().split(" "),
                stdout=subprocess.PIPE,
                stderr=tr_pipe,
                env=tr_env,
            )
1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680

            procs.append(tr_proc)
            pipes.append(tr_pipe)

        outs = []
        for i in range(0, trainer_num):
            tr_out, tr_err = procs[i].communicate()
            outs.append(tr_out)
            pipes[i].close()
            sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err))

        if check_error_log:
            print("outs[0]:", outs[0])
            print("outs[1]:", outs[1])
        return pickle.loads(outs[0]), pickle.loads(outs[1])

1681
    def _get_required_envs(self, check_error_log=False, need_envs={}):
1682 1683 1684 1685 1686 1687
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
G
guru4elephant 已提交
1688
            "FLAGS_rpc_deadline": "30000",  # 5sec to fail fast
1689
            "FLAGS_rpc_retry_bind_port": "50",
1690
            "FLAGS_cudnn_deterministic": "1",
1691
            "FLAGS_rpc_disable_reuse_port": "1",
W
Wu Yi 已提交
1692
            "http_proxy": "",
1693
            "NCCL_P2P_DISABLE": "1",
1694
            "NCCL_SHM_DISABLE": "1",
1695 1696 1697
        }

        if check_error_log:
1698 1699 1700 1701
            required_envs["GLOG_vmodule"] = (
                "fused_all_reduce_op_handle=10,all_reduce_op_handle=10,alloc_continuous_space_op=10,fuse_all_reduce_op_pass=10,"
                "alloc_continuous_space_for_grad_pass=10,fast_threaded_ssa_graph_executor=10,executor=10,operator=10,"
                "sparse_all_reduce_op_handle=10,gen_nccl_id_op=10,gen_nccl_id_op_help=10,nccl_helper=10,grpc_client=10,"
1702
                "grpc_server=10,request_handler_impl=10,section_worker=10"
1703
            )
1704 1705
            required_envs["GLOG_logtostderr"] = "1"

1706 1707
        if os.getenv('NVIDIA_TF32_OVERRIDE', '') is not None:
            required_envs['NVIDIA_TF32_OVERRIDE'] = os.getenv(
1708 1709
                'NVIDIA_TF32_OVERRIDE', ''
            )
1710

1711 1712 1713
        required_envs.update(need_envs)
        return required_envs

1714 1715 1716 1717 1718 1719 1720 1721
    def check_with_place(
        self,
        model_file,
        delta=1e-3,
        check_error_log=False,
        need_envs={},
        log_name="",
    ):
1722
        if self._dygraph and (self._gloo_mode or self._nccl2_mode):
1723 1724 1725 1726 1727 1728 1729
            self.check_with_place_func(
                model_file=model_file,
                delta=delta,
                check_error_log=check_error_log,
                need_envs=need_envs,
                log_name=log_name,
            )
1730
        else:
1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746
            self.check_with_place_func(
                model_file=model_file,
                delta=delta,
                check_error_log=check_error_log,
                need_envs=need_envs,
                log_name=log_name,
            )

    def check_with_place_func(
        self,
        model_file,
        delta=1e-3,
        check_error_log=False,
        need_envs={},
        log_name="",
    ):
1747 1748
        required_envs = self._get_required_envs(check_error_log, need_envs)

X
xiongkun 已提交
1749
        if self._gloo_mode:
1750 1751 1752
            local_losses = self._run_local_gloo(
                model_file, required_envs, check_error_log, log_name=log_name
            )
X
xiongkun 已提交
1753
        else:
1754 1755 1756
            local_losses = self._run_local(
                model_file, required_envs, check_error_log, log_name=log_name
            )
1757

W
Wu Yi 已提交
1758
        if self._nccl2_mode:
W
Wu Yi 已提交
1759 1760
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
1761 1762
                    model_file,
                    required_envs,
1763 1764
                    update_method="nccl2_reduce_layer",
                    check_error_log=check_error_log,
1765 1766
                    log_name=log_name,
                )
W
Wu Yi 已提交
1767 1768
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
1769 1770
                    model_file,
                    required_envs,
1771 1772
                    update_method='nccl2',
                    check_error_log=check_error_log,
1773 1774
                    log_name=log_name,
                )
1775 1776 1777 1778 1779 1780
        elif self._bkcl_mode:
            tr0_losses, tr1_losses = self._run_cluster_nccl2(
                model_file,
                required_envs,
                update_method='bkcl',
                check_error_log=check_error_log,
1781 1782
                log_name=log_name,
            )
X
xiongkun 已提交
1783 1784 1785 1786 1787 1788 1789
        elif self._gloo_mode:
            # gloo mode, cpu only parallel train @xiongkun03
            tr0_losses, tr1_losses = self._run_cluster_gloo(
                model_file,
                required_envs,
                update_method='gloo',
                check_error_log=check_error_log,
1790 1791
                log_name=log_name,
            )
1792 1793 1794 1795 1796 1797
        elif self._hccl_mode:
            tr0_losses, tr1_losses = self._run_cluster_nccl2(
                model_file,
                required_envs,
                update_method='hccl',
                check_error_log=check_error_log,
1798 1799
                log_name=log_name,
            )
1800 1801 1802 1803 1804 1805
        elif self._cncl_mode:
            tr0_losses, tr1_losses = self._run_cluster_nccl2(
                model_file,
                required_envs,
                update_method='cncl',
                check_error_log=check_error_log,
1806 1807
                log_name=log_name,
            )
1808
        elif self._pipeline_mode:
1809 1810 1811
            tr0_losses, tr1_losses = self._run_pipeline(
                model_file, required_envs, check_error_log, log_name=log_name
            )
W
Wu Yi 已提交
1812
        else:
1813 1814 1815
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log, log_name=log_name
            )
1816 1817

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
1818 1819 1820
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
1821 1822 1823 1824
            if self._pipeline_mode:
                dist_loss = np.array([tr1_loss])
            else:
                dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
W
Wu Yi 已提交
1825 1826
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)
1827

1828 1829 1830 1831 1832 1833 1834 1835
    def check_with_place_multi_cards(
        self,
        model_file,
        delta=1e-3,
        check_error_log=False,
        need_envs={},
        log_name="",
    ):
1836

1837 1838 1839 1840 1841 1842
        # need open p2p or shm otherwise multi cards mode will hang
        need_envs.update({"NCCL_P2P_DISABLE": "0", "NCCL_SHM_DISABLE": "0"})

        required_envs = self._get_required_envs(check_error_log, need_envs)

        if self._use_dgc:
1843 1844 1845 1846 1847 1848 1849
            multi_cards_losses = self._run_local(
                model_file,
                required_envs,
                check_error_log,
                log_name=log_name + "_dgc_2cards",
                devices="0,1",
            )
1850 1851

            self._use_dgc = False
1852 1853 1854 1855 1856 1857 1858
            base_losses = self._run_local(
                model_file,
                required_envs,
                check_error_log,
                log_name=log_name + "_base_2cards",
                devices="0,1",
            )
1859 1860 1861 1862 1863 1864 1865 1866

            self._use_dgc = True

            for step_id in range(RUN_STEP):
                base_loss = base_losses[step_id]
                multi_cards_loss = multi_cards_losses[step_id]
                print("=======", base_loss, ":", multi_cards_loss, "=======")
                self.assertAlmostEqual(base_loss, multi_cards_loss, delta=delta)