test_dist_base.py 63.6 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14

15
import argparse
16
import ast
X
Xin Pan 已提交
17
import os
W
Wu Yi 已提交
18
import pickle
19
import random
20 21 22
import subprocess
import sys
import tempfile
23
import time
24 25 26
import unittest

import numpy as np
27 28

import paddle
29
from paddle import fluid
30 31 32
from paddle.distributed.fleet.meta_optimizers import (
    RawProgramOptimizer as RawProgram,
)
33
from paddle.fluid import compiler
34
from paddle.incubate.distributed.fleet import role_maker
meteor135's avatar
meteor135 已提交
35 36 37 38
from paddle.incubate.distributed.fleet.collective import (
    DistributedStrategy,
    fleet,
)
39

Y
Yan Xu 已提交
40
RUN_STEP = 5
41
DEFAULT_BATCH_SIZE = 2
42
DIST_UT_PORT = 0
43

T
typhoonzero 已提交
44

45
def print_to_out(out_losses):
T
tianshuo78520a 已提交
46
    sys.stdout.buffer.write(pickle.dumps(out_losses))
47 48 49


def print_to_err(class_name, log_str):
50 51
    localtime = time.asctime(time.localtime(time.time()))
    print_str = localtime + "\t" + class_name + "\t" + log_str
T
tianshuo78520a 已提交
52
    sys.stderr.buffer.write(pickle.dumps(print_str))
G
guru4elephant 已提交
53 54


55 56 57 58
def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)


59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
def _insert_comm_op(opt, loss, build_strategy=None):
    opt = RawProgram(opt)
    role = paddle.distributed.fleet.base.role_maker.PaddleCloudRoleMaker(
        is_collective=True
    )
    strategy = paddle.distributed.fleet.DistributedStrategy()
    if build_strategy is not None:
        strategy.build_strategy = build_strategy
    opt._set_basic_info(loss, role, opt, strategy)

    # following code is a copy of RawProgramOptimizer.minimize except init_comm_group
    opt.endpoints = opt.role_maker._get_trainer_endpoints()
    opt.current_endpoint = opt.endpoints[opt.role_maker._worker_index()]
    opt.rank = opt.role_maker._worker_index()
    opt.nranks = opt.role_maker._worker_num()
    startup_program = paddle.static.default_startup_program()
    opt.startup_program = startup_program

    block = loss.block
    program = block.program
    opt.main_program = program

    optimize_ops, params_grads = opt.inner_opt.minimize(loss, startup_program)

    opt.main_program = program
    if opt.nranks > 1:
        opt._transpile_main_program(loss)


88
class TestDistRunnerBase:
89 90 91 92 93 94 95 96
    def get_model(
        self,
        batch_size=DEFAULT_BATCH_SIZE,
        lr=0.1,
        single_device=False,
        use_dgc=False,
        dist_strategy=None,
    ):
T
typhoonzero 已提交
97
        raise NotImplementedError(
98 99
            "get_model should be implemented by child classes."
        )
T
typhoonzero 已提交
100

101
    @staticmethod
102 103 104 105 106 107 108 109 110 111 112
    def get_transpiler(
        trainer_id,
        main_program,
        pserver_endpoints,
        trainers,
        sync_mode,
        dc_asgd=False,
        current_endpoint=None,
        nccl_comm_num=1,
        hogwild_mode=False,
    ):
T
typhoonzero 已提交
113
        # NOTE: import fluid until runtime, or else forking processes will cause error.
114
        config = paddle.distributed.transpiler.DistributeTranspilerConfig()
W
Wu Yi 已提交
115
        config.enable_dc_asgd = dc_asgd
116
        config.sync_mode = sync_mode
T
tangwei12 已提交
117 118
        config.runtime_split_send_recv = hogwild_mode

119 120
        if nccl_comm_num > 1:
            config.nccl_comm_num = nccl_comm_num
121
        # config.runtime_split_send_recv = True
122
        t = paddle.distributed.transpiler.DistributeTranspiler(config=config)
123 124 125 126 127 128 129 130
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
            trainers=trainers,
            sync_mode=sync_mode,
            current_endpoint=current_endpoint,
        )
T
typhoonzero 已提交
131 132
        return t

133 134
    @staticmethod
    def get_lr_scheduler(program):
135 136
        lr_scheduler = None
        if hasattr(program, 'lr_scheduler'):
137
            from paddle.optimizer.lr import LRScheduler
138

139 140 141
            lr_scheduler = program.lr_scheduler
            assert isinstance(lr_scheduler, LRScheduler), "must be LRScheduler"
        return lr_scheduler
142

W
Wu Yi 已提交
143
    def run_pserver(self, args):
W
Wu Yi 已提交
144
        self.lr = args.lr
145
        self.get_model(batch_size=args.batch_size)
146
        # NOTE: pserver should not call memory optimize
T
tangwei12 已提交
147

148 149 150 151 152 153 154 155 156
        t = self.get_transpiler(
            trainer_id=args.trainer_id,
            main_program=fluid.default_main_program(),
            pserver_endpoints=args.endpoints,
            trainers=args.trainers,
            sync_mode=args.sync_mode,
            dc_asgd=args.dc_asgd,
            hogwild_mode=args.hogwild,
        )
W
Wu Yi 已提交
157
        pserver_prog = t.get_pserver_program(args.current_endpoint)
158 159 160
        startup_prog = t.get_startup_program(
            args.current_endpoint, pserver_prog
        )
Y
Yancey1989 已提交
161

T
typhoonzero 已提交
162 163 164
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
165
        print_to_err(type(self).__name__, "run pserver startup program done.")
T
typhoonzero 已提交
166
        exe.run(pserver_prog)
167
        print_to_err(type(self).__name__, "run pserver main program done.")
T
typhoonzero 已提交
168

169 170 171 172
    def run_pipeline_trainer(self, args):
        self.lr = args.lr

        dist_strategy = DistributedStrategy()
173 174 175 176 177 178 179 180 181 182 183
        (
            test_program,
            avg_cost,
            train_reader,
            test_reader,
            batch_acc,
            predict,
            data_loader,
        ) = self.get_model(
            batch_size=args.batch_size, dist_strategy=dist_strategy
        )
184 185 186 187 188 189 190 191 192 193 194 195 196

        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        eprint(type(self).__name__, "device_id: %d." % device_id)
        place = fluid.CUDAPlace(device_id)

        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
        eprint(type(self).__name__, "run worker startup program done.")

        data_loader.set_sample_list_generator(train_reader, place)
        data_loader.start()
        print_to_err(type(self).__name__, "begin to train on trainer")
        out_losses = []
197 198

        main_program = fluid.default_main_program()
199
        lr_scheduler = self.get_lr_scheduler(main_program)
200
        for i in range(RUN_STEP):
201
            loss = exe.run(main_program, fetch_list=[avg_cost])
202 203 204
            loss = loss[0] if loss else None
            out_losses.append(loss)
            print_to_err(type(self).__name__, "run step %d finished" % i)
205 206
            if lr_scheduler is not None:
                lr_scheduler.step()
207

208
        data_loader.reset()
209 210
        print_to_err(type(self).__name__, "trainer run finished")

T
tianshuo78520a 已提交
211
        sys.stdout.buffer.write(pickle.dumps(out_losses))
212

213 214 215 216 217 218 219 220 221 222 223
    def run_use_fleet_api_20_trainer(self, args):
        """
        1. remove codes for DistributedStrategy and leave the DistributedStrategy part to get_model()
        2. to run with fleet 2.0 api, set flags _use_fleet_api and _use_fleet_api_20 to True
        3. for now, not support test for model save
        """
        assert args.update_method == "nccl2" or "bkcl"

        self.lr = args.lr
        print_to_err("use_fleet 2.0", "fleet.node_num:")

224 225 226 227 228 229 230 231
        (
            test_program,
            avg_cost,
            train_reader,
            test_reader,
            batch_acc,
            predict,
        ) = self.get_model(batch_size=args.batch_size)
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248

        if fluid.core.is_compiled_with_cuda():
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
        elif fluid.core.is_compiled_with_xpu():
            device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
            place = fluid.XPUPlace(device_id)
        else:
            raise ValueError(
                "fleet dygraph api must in paddlepaddle-xpu or paddlepaddle-gpu."
            )

        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
        eprint(type(self).__name__, "run worker startup program done.")

        feed_var_list = [
249 250
            var
            for var in fluid.default_main_program().global_block().vars.values()
251 252 253 254 255 256 257 258 259 260 261 262 263
            if var.is_data
        ]

        eprint("feed_var_list:", feed_var_list)

        if feed_var_list[0].name == 'label':
            feed_var_list = feed_var_list[::-1]

        feeder = fluid.DataFeeder(feed_var_list, place)
        reader_generator = train_reader()

        def get_data():
            origin_batch = next(reader_generator)
264 265 266 267
            if (
                paddle.distributed.get_world_size() == 1
                and args.update_method == 'gloo'
            ):  # Gloo single mode
X
xiongkun 已提交
268 269 270
                return origin_batch

            elif args.update_method != "local" and args.use_reader_alloc:
271 272 273 274 275 276 277 278 279 280
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch

        print_to_err(type(self).__name__, "begin to train on trainer")
        out_losses = []
281
        for i in range(RUN_STEP):
282 283 284 285 286
            (loss,) = exe.run(
                fluid.default_main_program(),
                fetch_list=[avg_cost.name],
                feed=feeder.feed(get_data()),
            )
287 288 289
            out_losses.append(loss[0])
            print_to_err(type(self).__name__, "run step %d finished" % i)
        print_to_err(type(self).__name__, "trainer run finished")
290
        print_to_err(type(self).__name__, f"dist losses: {out_losses}")
291

T
tianshuo78520a 已提交
292
        sys.stdout.buffer.write(pickle.dumps(out_losses))
293

294 295
    def run_use_fleet_api_trainer(self, args):
        assert args.update_method == "nccl2" or "bkcl"
296 297 298 299 300 301 302 303

        self.lr = args.lr

        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1

        dist_strategy = DistributedStrategy()
        dist_strategy.exec_strategy = exec_strategy
T
tangwei12 已提交
304
        dist_strategy.fuse_memory_size = 1  # MB
305
        dist_strategy.fuse_laryer_size = 1
306 307 308 309
        if args.use_local_sgd:
            dist_strategy.use_local_sgd = True
        if args.ut4grad_allreduce:
            dist_strategy._ut4grad_allreduce = True
310 311
        if args.sync_batch_norm:
            dist_strategy.sync_batch_norm = True
312 313 314

        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
315
        print_to_err("use_fleet", "fleet.node_num:")
T
tangwei12 已提交
316 317
        # "fleet.node_id:", fleet.node_id(),
        # "fleet.trainer_num:", fleet.worker_num())
318

319 320 321 322 323 324 325 326 327 328
        (
            test_program,
            avg_cost,
            train_reader,
            test_reader,
            batch_acc,
            predict,
        ) = self.get_model(
            batch_size=args.batch_size, dist_strategy=dist_strategy
        )
329 330 331 332

        trainer_prog = fleet._origin_program
        dist_prog = fleet.main_program

333 334 335 336 337 338 339 340 341 342
        if fluid.core.is_compiled_with_cuda():
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
        elif fluid.core.is_compiled_with_xpu():
            device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
            place = fluid.XPUPlace(device_id)
        else:
            raise ValueError(
                "fleet dygraph api must in paddlepaddle-xpu or paddlepaddle-gpu."
            )
343 344 345 346 347 348

        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
        eprint(type(self).__name__, "run worker startup program done.")

        feed_var_list = [
349 350
            var
            for var in trainer_prog.global_block().vars.values()
351 352 353
            if var.is_data
        ]

354 355 356 357 358 359 360
        eprint("feed_var_list:", feed_var_list)

        # tmp add this code to pass python35 gcc8 CI
        # Fixme(gongweibao, wangxi), need fix fleet api program order
        if feed_var_list[0].name == 'label':
            feed_var_list = feed_var_list[::-1]

361 362 363 364 365 366 367 368 369 370 371 372 373 374
        feeder = fluid.DataFeeder(feed_var_list, place)
        reader_generator = train_reader()

        def get_data():
            origin_batch = next(reader_generator)
            if args.update_method != "local" and args.use_reader_alloc:
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch

375
        print_to_err(type(self).__name__, "begin to train on trainer")
376
        out_losses = []
377
        for i in range(RUN_STEP):
378 379 380 381 382
            (loss,) = exe.run(
                dist_prog,
                fetch_list=[avg_cost.name],
                feed=feeder.feed(get_data()),
            )
383
            out_losses.append(loss[0])
384 385
            print_to_err(type(self).__name__, "run step %d finished" % i)
        print_to_err(type(self).__name__, "trainer run finished")
386

T
tianshuo78520a 已提交
387
        sys.stdout.buffer.write(pickle.dumps(out_losses))
388

389 390 391
        if args.save_model:
            model_save_dir = "/tmp"
            if fleet.worker_index() == 0:
392 393 394 395 396 397 398 399 400 401 402 403
                model_save_dir_fluid = os.path.join(
                    model_save_dir, "fluid_persistables"
                )
                model_save_dir_fleet = os.path.join(
                    model_save_dir, "fleet_persistables"
                )
                infer_save_dir_fluid = os.path.join(
                    model_save_dir, "fluid_infer"
                )
                infer_save_dir_fleet = os.path.join(
                    model_save_dir, "fleet_infer"
                )
404
            else:
405 406 407 408 409 410 411 412 413 414 415 416
                model_save_dir_fluid = os.path.join(
                    model_save_dir, "fluid_persistables_2"
                )
                model_save_dir_fleet = os.path.join(
                    model_save_dir, "fleet_persistables_2"
                )
                infer_save_dir_fluid = os.path.join(
                    model_save_dir, "fluid_infer_2"
                )
                infer_save_dir_fleet = os.path.join(
                    model_save_dir, "fleet_infer_2"
                )
417
            paddle.distributed.io.save_persistables(
418 419
                exe, model_save_dir_fluid, fleet._origin_program
            )
420 421
            fleet.save_persistables(executor=exe, dirname=model_save_dir_fleet)
            feeded_var_names = [var.name for var in feed_var_list]
422 423 424 425 426 427 428 429 430 431
            fluid.io.save_inference_model(
                infer_save_dir_fluid,
                feeded_var_names,
                [avg_cost],
                exe,
                fleet._origin_program,
            )
            fleet.save_inference_model(
                exe, infer_save_dir_fleet, feeded_var_names, [avg_cost]
            )
432

433
    def run_trainer(self, args):
434 435 436 437 438 439 440 441 442 443 444
        from io import StringIO

        old_stdout = sys.stdout
        sys.stdout = StringIO()

        build_stra = fluid.BuildStrategy()
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False

        if args.fuse_all_reduce is not None:
445
            sys.stderr.write(f'fuse_all_reduce={args.fuse_all_reduce}')
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
            build_stra.fuse_all_reduce_ops = args.fuse_all_reduce

        if args.hogwild:
            build_stra.async_mode = True

        if args.enable_backward_deps:
            build_stra.enable_backward_optimizer_op_deps = True

        if args.use_reduce:
            build_stra.reduce_strategy = (
                fluid.BuildStrategy.ReduceStrategy.Reduce
            )
        else:
            build_stra.reduce_strategy = (
                fluid.BuildStrategy.ReduceStrategy.AllReduce
            )
        pass_builder = None
        if args.batch_merge_repeat > 1:
            pass_builder = build_stra._finalize_strategy_and_create_passes()
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
            mypass.set("num_repeats", args.batch_merge_repeat)

        if (
            args.update_method == "nccl2"
            or args.update_method == "nccl2_reduce_layer"
        ):
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
        else:
            # case args.update_method == "nccl2_reduce_layer":
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0

W
Wu Yi 已提交
479
        self.lr = args.lr
W
Wu Yi 已提交
480
        if args.nccl2_reduce_layer_local_run:
481 482 483 484 485 486 487 488
            (
                test_program,
                avg_cost,
                train_reader,
                test_reader,
                batch_acc,
                predict,
            ) = self.get_model(batch_size=args.batch_size, single_device=True)
489
        elif args.use_dgc:
490 491 492 493 494 495 496
            (
                test_program,
                avg_cost,
                train_reader,
                test_reader,
                batch_acc,
                predict,
497 498 499 500 501
            ) = self.get_model(
                batch_size=args.batch_size,
                use_dgc=args.use_dgc,
                build_strategy=build_stra,
            )
W
Wu Yi 已提交
502
        else:
503 504 505 506 507 508 509 510
            (
                test_program,
                avg_cost,
                train_reader,
                test_reader,
                batch_acc,
                predict,
            ) = self.get_model(batch_size=args.batch_size)
511

W
Wu Yi 已提交
512
        if args.update_method == "pserver":
513
            print_to_err(
514
                type(self).__name__,
515 516 517 518 519 520 521 522 523 524 525
                "begin to run transpile on trainer with pserver mode",
            )
            t = self.get_transpiler(
                trainer_id=args.trainer_id,
                main_program=fluid.default_main_program(),
                pserver_endpoints=args.endpoints,
                trainers=args.trainers,
                sync_mode=args.sync_mode,
                dc_asgd=args.dc_asgd,
                hogwild_mode=args.hogwild,
            )
T
tangwei12 已提交
526

T
typhoonzero 已提交
527
            trainer_prog = t.get_trainer_program()
528
            print_to_err(
529
                type(self).__name__,
530 531 532 533 534 535
                "get trainer program done with pserver mode.",
            )
        elif (
            args.update_method == "nccl2"
            or args.update_method == "nccl2_reduce_layer"
        ):
W
Wu Yi 已提交
536
            # transpile for nccl2
537
            config = paddle.distributed.transpiler.DistributeTranspilerConfig()
W
Wu Yi 已提交
538
            config.mode = "nccl2"
539
            config.nccl_comm_num = args.nccl_comm_num
540 541
            if args.use_hallreduce:
                config.use_hierarchical_allreduce = True
542 543 544
                config.hierarchical_allreduce_inter_nranks = (
                    args.hallreduce_inter_nranks
                )
545
            print_to_err(
546
                type(self).__name__,
547 548
                "begin to run transpile on trainer with nccl2 mode",
            )
549 550 551
            nccl2_t = paddle.distributed.transpiler.DistributeTranspiler(
                config=config
            )
552 553 554 555 556 557 558
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint,
            )
559
            print_to_err(
560 561
                type(self).__name__, "get trainer program done. with nccl2 mode"
            )
W
Wu Yi 已提交
562
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
563
        else:
564
            print_to_err(
565
                type(self).__name__,
566 567
                "do nothing about main program, just use it",
            )
T
typhoonzero 已提交
568
            trainer_prog = fluid.default_main_program()
569
            print_to_err(type(self).__name__, "use main program done.")
T
typhoonzero 已提交
570

571 572 573
        # FIXME(gongwb):wait pserver initialization.
        time.sleep(1)

574
        if args.use_cuda:
575 576
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
577 578 579
        else:
            place = fluid.CPUPlace()

580 581
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
582
        print_to_err(type(self).__name__, "run worker startup program done.")
T
typhoonzero 已提交
583

W
Wu Yi 已提交
584 585
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
586

587
        print_to_err(type(self).__name__, "begin to compile with data parallel")
588 589
        binary = compiler.CompiledProgram(
            trainer_prog, build_strategy=build_stra
590
        )
591
        print_to_err(type(self).__name__, "program compiled with data parallel")
T
typhoonzero 已提交
592 593

        feed_var_list = [
594 595
            var
            for var in trainer_prog.global_block().vars.values()
T
typhoonzero 已提交
596 597 598 599
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
600
        reader_generator = train_reader()
T
typhoonzero 已提交
601

602 603
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
604
            if args.update_method != "local" and args.use_reader_alloc:
605 606 607 608 609 610 611
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
612

613
        lr_scheduler = self.get_lr_scheduler(trainer_prog)
614
        print_to_err(type(self).__name__, "begin to train on trainer")
W
Wu Yi 已提交
615
        out_losses = []
616
        for i in range(RUN_STEP):
617 618 619
            (loss,) = exe.run(
                binary, fetch_list=[avg_cost.name], feed=feeder.feed(get_data())
            )
W
Wu Yi 已提交
620
            out_losses.append(loss[0])
621
            print_to_err(type(self).__name__, "run step %d finished" % i)
622 623 624
            if lr_scheduler is not None:
                lr_scheduler.step()

625 626
        print_to_err(type(self).__name__, "trainer run finished\n")
        # print_to_err(type(self).__name__, "out_losses")
627

628
        sys.stdout = old_stdout
629
        print_to_out(out_losses)
T
typhoonzero 已提交
630 631


632
class TestParallelDyGraphRunnerBase:
633 634
    def get_model(self):
        raise NotImplementedError(
635 636
            "get_model should be implemented by child classes."
        )
637 638 639

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
640 641
            "train_one_loop should be implemented by the child classes."
        )
642

643
    def _get_data(self, batch, args):
644 645 646 647
        if (
            paddle.distributed.get_world_size() == 1
            and args.update_method == 'gloo'
        ):  # Gloo single mode
X
xiongkun 已提交
648 649
            return batch
        elif args.update_method != "local":
650
            new_batch = []
651

652 653 654
            # NOTE(@xiongkun03) args.diff_batch means batch length is different:
            # such as : batch = [2,3,4,5], then the first rank will get [2]  and
            # the second rank will get [3,4,5].
655 656
            # this function is for test sparse_embedding_differ_length
            if hasattr(args, "diff_batch") and args.diff_batch:
657 658 659
                assert (
                    len(batch) > 2
                ), "in differ_batch mode, len(batch) must > 2."
660 661 662
                if paddle.distributed.get_rank() == 0:
                    new_batch.append(batch[0])
                elif paddle.distributed.get_rank() == 1:
663
                    new_batch.extend(list(batch[1:]))
664 665 666 667 668 669 670 671 672 673
                else:
                    raise NotImplementedError(
                        "Current TestParallelDyGraphRunnerBase don't support world_size > 2"
                    )
                return new_batch
            else:
                for offset, item in enumerate(batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
674 675 676
        else:
            return batch

677 678
    def run_trainer(self, args):
        seed = 90
X
xiongkun 已提交
679 680 681
        if args.update_method == 'gloo':
            place = fluid.CPUPlace()
        elif fluid.core.is_compiled_with_cuda():
682 683 684 685 686
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
        elif fluid.core.is_compiled_with_xpu():
            device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
            place = fluid.XPUPlace(device_id)
687 688 689
        elif fluid.core.is_compiled_with_npu():
            device_id = int(os.getenv("FLAGS_selected_npus", "0"))
            place = fluid.NPUPlace(device_id)
690
        else:
691
            assert "Only support CUDAPlace or XPUPlace or CPU(Gloo) for now."
692 693 694 695

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
Y
Yan Xu 已提交
696 697
            np.random.seed(seed)
            import random
698

699
            random.seed(seed)
700 701
            model, train_reader, opt = self.get_model()
            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
Y
Yan Xu 已提交
702

703 704 705 706 707 708 709
            # if args.update_method == "nccl2":
            if (
                args.update_method == "nccl2"
                or args.update_method == "bkcl"
                or args.update_method == "hccl"
                or args.update_method == "cncl"
            ):
Q
qizhaoaoe 已提交
710
                strategy = paddle.distributed.parallel.ParallelStrategy()
711 712 713 714
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
715
                paddle.distributed.init_parallel_env()
716
                print_to_err(
717
                    type(self).__name__,
718 719
                    "begin to prepare context in dygraph with nccl2",
                )
720
                if not args.find_unused_parameters:
Q
qizhaoaoe 已提交
721
                    model = paddle.DataParallel(
722 723
                        model, strategy, find_unused_parameters=False
                    )
724
                else:
Q
qizhaoaoe 已提交
725
                    model = paddle.DataParallel(
726 727
                        model, strategy, find_unused_parameters=True
                    )
728
                print_to_err(type(self).__name__, "model built in dygraph")
X
xiongkun 已提交
729 730 731 732

            elif args.update_method == "gloo":
                paddle.distributed.init_parallel_env()
                if not args.find_unused_parameters:
Q
qizhaoaoe 已提交
733
                    model = paddle.DataParallel(
734 735
                        model, find_unused_parameters=False
                    )
X
xiongkun 已提交
736
                else:
Q
qizhaoaoe 已提交
737
                    model = paddle.DataParallel(
738 739
                        model, find_unused_parameters=True
                    )
X
xiongkun 已提交
740

741
            out_losses = []
742
            print_to_err(type(self).__name__, "begin to run dygraph training")
743
            for step_id, data in enumerate(train_reader()):
744
                data = self._get_data(data, args)
745 746 747
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)
G
guru4elephant 已提交
748
                if step_id % 10 == 0:
749
                    print_to_err(
750
                        type(self).__name__,
751 752
                        "loss at step %d: %f" % (step_id, loss.numpy()),
                    )
Y
Yan Xu 已提交
753
                out_losses.append(loss.numpy())
754 755 756 757

                loss.backward()

                opt.minimize(loss)
758 759
                if not args.accumulate_gradient:
                    model.clear_gradients()
760
        print_to_out(out_losses)
761

762 763 764 765 766 767 768 769 770
    def run_trainer_with_spawn(self, args):
        # 1. enable dygraph
        paddle.disable_static()

        # 2. init seed
        seed = 90
        paddle.static.default_startup_program().random_seed = seed
        paddle.static.default_main_program().random_seed = seed
        np.random.seed(seed)
771
        random.seed(seed)
772
        # get trainer id
L
LiYuRio 已提交
773 774
        paddle.distributed.parallel._get_global_parallel_env()
        args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
775 776

        # 3. init parallel env
X
xiongkun 已提交
777
        if args.update_method in ["nccl2", "gloo"]:
778 779 780 781
            paddle.distributed.init_parallel_env()

        # 4. train model
        model, train_reader, opt = self.get_model()
X
xiongkun 已提交
782
        if args.update_method in ["nccl2", "gloo"]:
783
            model = paddle.DataParallel(
784 785
                model, find_unused_parameters=args.find_unused_parameters
            )
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800

        out_losses = []
        for step_id, data in enumerate(train_reader()):
            data = self._get_data(data, args)
            if step_id == RUN_STEP:
                break
            loss = self.run_one_loop(model, opt, data)
            out_losses.append(loss.numpy())

            loss.backward()

            opt.minimize(loss)
            model.clear_gradients()
        return out_losses

801
    def run_use_fleet_api_trainer(self, args):
802
        from paddle.distributed import fleet
803

804 805 806 807 808 809 810 811
        # 1. enable dygraph
        paddle.disable_static()

        # 2. init seed
        seed = 90
        paddle.static.default_startup_program().random_seed = seed
        paddle.static.default_main_program().random_seed = seed
        np.random.seed(seed)
812
        random.seed(seed)
813
        # get trainer id
L
LiYuRio 已提交
814 815
        paddle.distributed.parallel._get_global_parallel_env()
        args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
816

817 818
        # set strategy
        strategy = fleet.DistributedStrategy()
819 820
        if args.find_unused_parameters:
            strategy.find_unused_parameters = True
821

822
        # 3. init parallel env
823
        if args.update_method == "nccl2" or "bkcl" or "hccl":
824
            fleet.init(is_collective=True, strategy=strategy)
825 826 827

        # 4. train model
        model, train_reader, opt = self.get_model()
828
        if args.update_method == "nccl2" or "bkcl" or "hccl":
829 830 831 832 833 834 835 836 837 838 839 840 841 842
            opt = fleet.distributed_optimizer(opt)
            model = fleet.distributed_model(model)

        out_losses = []
        for step_id, data in enumerate(train_reader()):
            data = self._get_data(data, args)
            if step_id == RUN_STEP:
                break
            loss = self.run_one_loop(model, opt, data)
            out_losses.append(loss.numpy())

            loss.backward()

            opt.step()
843 844
            if not args.accumulate_gradient:
                opt.clear_grad()
845 846
        print_to_out(out_losses)

847

T
typhoonzero 已提交
848
def runtime_main(test_class):
W
Wu Yi 已提交
849
    parser = argparse.ArgumentParser(description='Run dist test.')
850 851 852
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer']
    )
W
Wu Yi 已提交
853
    parser.add_argument('--endpoints', type=str, required=False, default="")
854 855 856 857 858 859 860 861 862 863 864 865 866 867 868
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
        choices=[
            "pserver",
            "nccl2",
            "bkcl",
            "local",
            "nccl2_reduce_layer",
            "gloo",
            "hccl",
            "cncl",
        ],
    )
W
Wu Yi 已提交
869 870
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
871
    parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
872 873
    parser.add_argument('--enable_backward_deps', action='store_true')
    parser.add_argument('--use_hallreduce', action='store_true')
874
    parser.add_argument('--use_pipeline', action='store_true')
875
    parser.add_argument('--use_fleet_api', action='store_true')
876
    parser.add_argument('--use_fleet_api_20', action='store_true')
877
    parser.add_argument('--use_local_sgd', action='store_true')
878
    parser.add_argument('--diff_batch', action='store_true')
879
    parser.add_argument('--ut4grad_allreduce', action='store_true')
880 881 882 883 884 885
    parser.add_argument(
        '--hallreduce_inter_nranks', type=int, required=False, default=2
    )
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default=""
    )
W
Wu Yi 已提交
886
    parser.add_argument('--sync_mode', action='store_true')
887
    parser.add_argument('--use_cuda', action='store_true')
X
xiongkun 已提交
888
    parser.add_argument('--use_cpu', action='store_true')
889
    parser.add_argument('--use_xpu', action='store_true')
890
    parser.add_argument('--use_dgc', action='store_true')
891
    parser.add_argument('--use_npu', action='store_true')
892
    parser.add_argument('--accumulate_gradient', action='store_true')
893
    parser.add_argument('--find_unused_parameters', action='store_true')
W
Wu Yi 已提交
894
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
895
    parser.add_argument('--dc_asgd', action='store_true')
T
tangwei12 已提交
896
    parser.add_argument('--hogwild', action='store_true')
897
    parser.add_argument('--save_model', action='store_true')
898 899 900
    parser.add_argument(
        '--use_reader_alloc', action='store_true', required=False
    )
901
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
902
    parser.add_argument('--lr', required=False, type=float, default=0.001)
903 904 905 906 907 908 909 910 911
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1
    )
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False,
    )
912
    parser.add_argument('--sync_batch_norm', action='store_true')
913 914 915
    parser.add_argument(
        '--fuse_all_reduce', required=False, type=ast.literal_eval, default=None
    )
W
Wu Yi 已提交
916 917

    args = parser.parse_args()
T
typhoonzero 已提交
918

X
xiongkun 已提交
919 920 921
    if args.update_method == 'gloo':
        paddle.set_device("cpu")

T
typhoonzero 已提交
922
    model = test_class()
W
Wu Yi 已提交
923
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
924
        model.run_pserver(args)
925 926
    elif args.use_fleet_api:
        model.run_use_fleet_api_trainer(args)
927 928
    elif args.use_fleet_api_20:
        model.run_use_fleet_api_20_trainer(args)
929 930
    elif args.use_pipeline:
        model.run_pipeline_trainer(args)
T
typhoonzero 已提交
931
    else:
932
        model.run_trainer(args)
X
Xin Pan 已提交
933

M
minqiyang 已提交
934

Y
Yancey1989 已提交
935 936
import socket
from contextlib import closing
M
minqiyang 已提交
937

X
Xin Pan 已提交
938 939

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
940 941 942
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

943 944 945
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
946
            self.__use_xpu = False
947
            self._use_dgc = False
948
            self.__use_npu = False
949 950
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
951
            self.__use_xpu = False
952
            self.__use_npu = False
953 954 955 956
        elif self._enforce_place == "XPU":
            self.__use_cuda = False
            self.__use_xpu = True
            self._use_dgc = False
957 958 959 960 961 962
            self.__use_npu = False
        elif self._enforce_place == "NPU":
            self.__use_cuda = False
            self.__use_xpu = False
            self._use_dgc = False
            self.__use_npu = True
963 964 965 966 967
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
968 969 970 971
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
972

X
Xin Pan 已提交
973 974 975
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
976
        self._port_set = set()
M
minqiyang 已提交
977
        self._python_interp = sys.executable
W
Wu Yi 已提交
978
        self._sync_mode = True
T
tangwei12 已提交
979
        self._hogwild_mode = False
980
        self._enforce_place = None
W
Wu Yi 已提交
981
        self._use_reduce = False
W
Wu Yi 已提交
982
        self._dc_asgd = False  # must use with async mode
983
        self._use_reader_alloc = True
W
Wu Yi 已提交
984
        self._nccl2_mode = False
985
        self._bkcl_mode = False
X
xiongkun 已提交
986
        self._gloo_mode = False  # now, support gloo backend
987
        self._hccl_mode = False
988
        self._cncl_mode = False
989
        self._pipeline_mode = False
990
        self._mp_mode = False
991
        self._diff_batch = False
W
Wu Yi 已提交
992 993 994 995 996
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
997
        self._lr = 0.001
998
        self._use_dgc = False
999
        self._dygraph = False
1000
        self._nccl_comm_num = 1
1001
        self._enable_backward_deps = False
1002
        self._use_fleet_api = False
1003
        self._use_fleet_api_20 = False
1004 1005
        self._use_local_sgd = False
        self._ut4grad_allreduce = False
1006
        self._use_hallreduce = False
1007
        self._save_model = False
1008
        self._fuse_all_reduce = None
1009
        self._accumulate_gradient = False
1010
        self._find_unused_parameters = False
W
Wu Yi 已提交
1011
        self._setup_config()
1012 1013 1014 1015 1016 1017

        global DIST_UT_PORT
        if DIST_UT_PORT == 0 and os.getenv("PADDLE_DIST_UT_PORT"):
            DIST_UT_PORT = int(os.getenv("PADDLE_DIST_UT_PORT"))

        if DIST_UT_PORT == 0:
1018
            self._ps_endpoints = "127.0.0.1:{},127.0.0.1:{}".format(
1019 1020 1021
                self._find_free_port(),
                self._find_free_port(),
            )
1022
        else:
1023
            self._ps_endpoints = "127.0.0.1:{},127.0.0.1:{}".format(
1024 1025 1026
                DIST_UT_PORT,
                DIST_UT_PORT + 1,
            )
1027
            DIST_UT_PORT += 2
1028
            self._dist_port = DIST_UT_PORT
1029

1030
        self._after_setup_config()
X
Xin Pan 已提交
1031

1032 1033 1034 1035 1036
        self.temp_dir = tempfile.TemporaryDirectory()

    def tearDown(self):
        self.temp_dir.cleanup()

Y
Yancey1989 已提交
1037
    def _find_free_port(self):
Y
Yancey1989 已提交
1038
        def __free_port():
1039 1040 1041
            with closing(
                socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            ) as s:
Y
Yancey1989 已提交
1042
                s.bind(('', 0))
1043
                print_to_err(
1044 1045
                    type(self).__name__, "socket name: %s" % s.getsockname()[1]
                )
Y
Yancey1989 已提交
1046 1047 1048 1049 1050 1051 1052
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
1053

1054 1055 1056
    def start_pserver(
        self, model_file, check_error_log, required_envs, log_name=""
    ):
X
Xin Pan 已提交
1057
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
1058 1059 1060 1061 1062 1063 1064 1065
        ps_cmd = "%s"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            required_envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            ps_cmd += " -m coverage run --branch -p"

        ps_cmd += " %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"

1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
        ps0_cmd = ps_cmd % (
            self._python_interp,
            model_file,
            self._ps_endpoints,
            ps0_ep,
            self._trainers,
        )
        ps1_cmd = ps_cmd % (
            self._python_interp,
            model_file,
            self._ps_endpoints,
            ps1_ep,
            self._trainers,
        )
W
Wu Yi 已提交
1080 1081 1082 1083

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
X
Xin Pan 已提交
1084

1085 1086
        print(ps0_cmd)
        print(ps1_cmd)
1087 1088 1089 1090
        path0 = os.path.join(self.temp_dir.name, log_name + "_ps0_err.log")
        path1 = os.path.join(self.temp_dir.name, log_name + "_ps1_err.log")
        ps0_pipe = open(path0, "wb")
        ps1_pipe = open(path1, "wb")
G
gongweibao 已提交
1091

1092
        print_to_err(type(self).__name__, "going to start pserver process 0")
1093 1094 1095 1096 1097 1098
        ps0_proc = subprocess.Popen(
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs,
        )
1099
        print_to_err(type(self).__name__, "going to start pserver process 1")
1100 1101 1102 1103 1104 1105
        ps1_proc = subprocess.Popen(
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs,
        )
G
gongweibao 已提交
1106

1107
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
1108

1109 1110 1111 1112 1113 1114 1115 1116 1117 1118
    def _run_local(
        self,
        model,
        envs,
        check_error_log=False,
        batch_size=DEFAULT_BATCH_SIZE,
        batch_merge_repeat=1,
        log_name="",
        devices="1",
    ):
G
gongweibao 已提交
1119

1120 1121 1122 1123 1124 1125
        cmd = self._python_interp

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            cmd += " -m coverage run --branch -p"

1126
        cmd += " {} --role trainer --update_method local --lr {:f}".format(
1127 1128 1129
            model,
            self._lr,
        )
1130

1131 1132 1133 1134
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
1135 1136
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
1137

1138
        if self.__use_cuda:
1139
            cmd += " --use_cuda"
W
Wu Yi 已提交
1140
            env_local = {
1141 1142
                "CUDA_VISIBLE_DEVICES": devices,
                "PADDLE_TRAINERS_NUM": "1",
1143
                "PADDLE_TRAINER_ID": "0",
1144 1145 1146 1147 1148
            }
        elif self.__use_xpu:
            cmd += " --use_xpu"
            env_local = {
                "FLAGS_selected_xpus": devices,
W
Wu Yi 已提交
1149
                "PADDLE_TRAINERS_NUM": "1",
1150
                "PADDLE_TRAINER_ID": "0",
W
Wu Yi 已提交
1151
            }
1152 1153 1154 1155 1156
        elif self.__use_npu:
            cmd += " --use_npu"
            env_local = {
                "FLAGS_selected_npus": devices,
                "PADDLE_TRAINERS_NUM": "1",
1157
                "PADDLE_TRAINER_ID": "0",
1158
            }
1159 1160 1161
        else:
            env_local = {'CPU_NUM': '1'}

1162
        # not use dgc in single card
1163
        if len(devices) > 1 and self._use_dgc:
1164 1165
            cmd += " --use_dgc"

1166 1167 1168
        if self._accumulate_gradient:
            cmd += " --accumulate_gradient"

1169 1170 1171
        if self._find_unused_parameters:
            cmd += " --find_unused_parameters"

W
Wu Yi 已提交
1172
        env_local.update(envs)
1173
        print(f"local_cmd: {cmd}, env: {env_local}")
G
gongweibao 已提交
1174

1175
        if check_error_log:
1176 1177
            path = os.path.join(self.temp_dir.name, log_name + "_local.log")
            err_log = open(path, "wb")
1178 1179 1180 1181 1182 1183
            local_proc = subprocess.Popen(
                cmd.split(" "),
                stdout=subprocess.PIPE,
                stderr=err_log,
                env=env_local,
            )
G
gongweibao 已提交
1184
        else:
1185 1186 1187 1188 1189 1190
            local_proc = subprocess.Popen(
                cmd.split(" "),
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                env=env_local,
            )
G
gongweibao 已提交
1191

1192 1193 1194 1195 1196 1197
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
1198
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
1199

W
Wu Yi 已提交
1200
        return pickle.loads(local_out)
1201

1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
    def _run_local_gloo(
        self,
        model,
        envs,
        check_error_log=False,
        batch_size=DEFAULT_BATCH_SIZE,
        batch_merge_repeat=1,
        log_name="",
        devices="0",
    ):
X
xiongkun 已提交
1212 1213
        saved_endpoints = self._ps_endpoints
        self._ps_endpoints = self._ps_endpoints.split(',')[0]
1214 1215 1216
        result = self._run_cluster_gloo(
            model, envs, 'gloo', check_error_log, log_name
        )
X
xiongkun 已提交
1217 1218 1219
        self._ps_endpoints = saved_endpoints
        return result

1220
    def _run_cluster(self, model, envs, check_error_log, log_name):
X
Xin Pan 已提交
1221
        # Run dist train to compare with local results
1222 1223 1224
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(
            model, check_error_log, envs, log_name=log_name
        )
W
Wu Yi 已提交
1225

X
Xin Pan 已提交
1226
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
1227

1228 1229 1230 1231 1232 1233 1234 1235
        tr_cmd = "%s"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            tr_cmd += " -m coverage run --branch -p"

        tr_cmd += " %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"

1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253
        tr0_cmd = tr_cmd % (
            self._python_interp,
            model,
            self._ps_endpoints,
            0,
            ps0_ep,
            self._trainers,
            self._lr,
        )
        tr1_cmd = tr_cmd % (
            self._python_interp,
            model,
            self._ps_endpoints,
            1,
            ps1_ep,
            self._trainers,
            self._lr,
        )
W
Wu Yi 已提交
1254 1255 1256 1257

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
T
tangwei12 已提交
1258 1259 1260
        if self._hogwild_mode:
            tr0_cmd += " --hogwild"
            tr1_cmd += " --hogwild"
W
Wu Yi 已提交
1261 1262 1263
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
1264 1265 1266
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
1267
        if self.__use_cuda:
1268 1269 1270 1271 1272 1273 1274 1275 1276 1277
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
1278

1279 1280
        print(f"tr0_cmd: {tr0_cmd}, env: {env0}")
        print(f"tr1_cmd: {tr1_cmd}, env: {env1}")
1281 1282 1283 1284 1285

        path0 = os.path.join(self.temp_dir.name, log_name + "_tr0_err.log")
        path1 = os.path.join(self.temp_dir.name, log_name + "_tr1_err.log")
        tr0_pipe = open(path0, "wb")
        tr1_pipe = open(path1, "wb")
G
gongweibao 已提交
1286

1287
        print_to_err(type(self).__name__, "going to start trainer process 0")
1288 1289 1290 1291 1292 1293
        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0,
        )
1294
        print_to_err(type(self).__name__, "going to start trainer process 1")
1295 1296 1297 1298 1299 1300
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1,
        )
X
Xin Pan 已提交
1301

1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

1314 1315
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
1316

G
gongweibao 已提交
1317
        # close trainer file
1318 1319 1320 1321
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
1322

W
Wu Yi 已提交
1323 1324
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
1325

W
Wu Yi 已提交
1326 1327
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

1328 1329 1330
    def _get_gloo_trainer_cmd(
        self, model, ep, update_method, trainer_id, trainer_num
    ):
X
xiongkun 已提交
1331 1332 1333 1334 1335 1336 1337 1338
        env = {}
        tr_cmd = "%s -u"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            tr_cmd += " -m coverage run --branch -p"

        tr_cmd += " %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"

1339 1340 1341 1342 1343 1344 1345 1346 1347
        tr_cmd = tr_cmd % (
            self._python_interp,
            model,
            self._ps_endpoints,
            trainer_id,
            ep,
            update_method,
            self._lr,
        )
X
xiongkun 已提交
1348 1349 1350 1351 1352

        if self._use_reduce:
            tr_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr_cmd += " --use_reader_alloc"
1353 1354
        # assert self._use_reduce == False, "gloo not support _use_reduce"
        # assert self._use_reader_alloc == False, "gloo not support _use_reduce"
X
xiongkun 已提交
1355 1356
        if self._save_model:
            tr_cmd += " --save_model"
1357 1358
        if self._diff_batch:
            tr_cmd += " --diff_batch"
X
xiongkun 已提交
1359 1360
        self.__use_cuda = False
        self.__use_xpu = False
1361 1362
        assert not self.__use_cuda, "gloo not support use cuda"
        assert not self.__use_xpu, "gloo not support use xpu"
X
xiongkun 已提交
1363
        tr_cmd += " --use_cpu"
1364 1365
        env.update(
            {
1366 1367
                "PADDLE_TRAINERS_NUM": f"{trainer_num}",
                "PADDLE_TRAINER_ID": f"{trainer_id}",
1368 1369 1370 1371 1372 1373 1374
                "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                "PADDLE_CURRENT_ENDPOINT": ep,
                "PADDLE_CURRENT_ENDPOINT": ep,
                "PADDLE_DISTRI_BACKEND": "gloo",
                "GLOG_v": "2",
            }
        )
X
xiongkun 已提交
1375

1376
        assert not self._use_dgc, "gloo not support use dgc"
1377

X
xiongkun 已提交
1378 1379 1380 1381 1382 1383
        if self._accumulate_gradient:
            tr_cmd += " --accumulate_gradient"

        if self._find_unused_parameters:
            tr_cmd += " --find_unused_parameters"

1384
        assert not self._pipeline_mode, "gloo not support use pipeline"
X
xiongkun 已提交
1385 1386 1387 1388 1389

        if self._enable_backward_deps:  # build strategy, save it
            tr_cmd += " --enable_backward_deps"

        if self._fuse_all_reduce is not None:
1390
            tr_cmd += f" --fuse_all_reduce {self._fuse_all_reduce}"
X
xiongkun 已提交
1391

1392 1393
        assert not self._use_fleet_api, "gloo not support use fleet api"
        assert not self._use_fleet_api_20, "gloo not support use fleet api"
X
xiongkun 已提交
1394 1395
        return tr_cmd, env

1396 1397 1398
    def _get_nccl2_trainer_cmd(
        self, model, ep, update_method, trainer_id, trainer_num
    ):
1399
        env = {}
1400 1401 1402 1403 1404 1405 1406
        tr_cmd = "%s -u"

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            tr_cmd += " -m coverage run --branch -p"

        tr_cmd += " %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"

1407 1408 1409 1410 1411 1412 1413 1414 1415
        tr_cmd = tr_cmd % (
            self._python_interp,
            model,
            self._ps_endpoints,
            trainer_id,
            ep,
            update_method,
            self._lr,
        )
W
Wu Yi 已提交
1416 1417

        if self._use_reduce:
1418
            tr_cmd += " --use_reduce"
W
Wu Yi 已提交
1419
        if self._use_reader_alloc:
1420
            tr_cmd += " --use_reader_alloc"
1421 1422
        if self._save_model:
            tr_cmd += " --save_model"
W
Wu Yi 已提交
1423
        if self.__use_cuda:
1424
            tr_cmd += " --use_cuda"
1425 1426
            env.update(
                {
1427 1428 1429 1430
                    "FLAGS_selected_gpus": f"{0}",
                    "CUDA_VISIBLE_DEVICES": f"{trainer_id}",
                    "PADDLE_TRAINERS_NUM": f"{trainer_num}",
                    "PADDLE_TRAINER_ID": f"{trainer_id}",
1431 1432 1433 1434
                    "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                    "PADDLE_CURRENT_ENDPOINT": ep,
                }
            )
1435 1436 1437 1438
        # TODO(liuyuhui):XPU_VISIBLE_DEVICES is not working right now,
        # will update it after Badiu Kunlun partners' support.
        elif self.__use_xpu:
            tr_cmd += " --use_xpu"
1439 1440
            env.update(
                {
1441
                    "FLAGS_selected_xpus": f"{trainer_id}",
1442
                    # "XPU_VISIBLE_DEVICES": "{}".format(trainer_id + 1),
1443 1444
                    "PADDLE_TRAINERS_NUM": f"{trainer_num}",
                    "PADDLE_TRAINER_ID": f"{trainer_id}",
1445 1446 1447 1448 1449
                    "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                    "PADDLE_CURRENT_ENDPOINT": ep,
                    "GLOG_v": "2",
                }
            )
1450 1451
        elif self.__use_npu:
            tr_cmd += " --use_npu"
1452 1453
            env.update(
                {
1454 1455 1456
                    "FLAGS_selected_npus": f"{trainer_id}",
                    "PADDLE_TRAINERS_NUM": f"{trainer_num}",
                    "PADDLE_TRAINER_ID": f"{trainer_id}",
1457 1458 1459 1460 1461
                    "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
                    "PADDLE_CURRENT_ENDPOINT": ep,
                    "GLOG_v": "2",
                }
            )
W
Wu Yi 已提交
1462
        else:
1463
            env.update({'CPU_NUM': '1'})
W
Wu Yi 已提交
1464

1465
        if self._use_dgc:
1466 1467
            tr_cmd += " --use_dgc"

1468 1469 1470
        if self._accumulate_gradient:
            tr_cmd += " --accumulate_gradient"

1471 1472 1473
        if self._find_unused_parameters:
            tr_cmd += " --find_unused_parameters"

1474 1475
        if self._pipeline_mode:
            tr_cmd += " --use_pipeline"
1476
        if self._mp_mode:
1477
            env = {"FLAGS_selected_gpus": f"{trainer_id}"}
1478 1479

        if self._nccl_comm_num > 1:
1480
            tr_cmd += f" --nccl_comm_num {self._nccl_comm_num}"
1481

1482 1483
        if self._use_hallreduce:
            tr_cmd += " --use_hallreduce --hallreduce_inter_nranks 2"
1484

1485
        if self._enable_backward_deps:
1486
            tr_cmd += " --enable_backward_deps"
1487

1488
        if self._fuse_all_reduce is not None:
1489
            tr_cmd += f" --fuse_all_reduce {self._fuse_all_reduce}"
1490

1491
        if self._use_fleet_api:
1492 1493 1494 1495 1496
            tr_cmd += (
                " --use_fleet_api_20"
                if self._use_fleet_api_20
                else " --use_fleet_api"
            )
1497 1498 1499 1500
            if self._use_local_sgd:
                tr_cmd += " --use_local_sgd"
            if self._ut4grad_allreduce:
                tr_cmd += " --ut4grad_allreduce"
1501 1502
            if hasattr(self, '_sync_batch_norm') and self._sync_batch_norm:
                tr_cmd += " --sync_batch_norm"
1503

1504 1505 1506
        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            env['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')

1507
        return tr_cmd, env
W
Wu Yi 已提交
1508

1509 1510 1511 1512 1513 1514 1515 1516 1517 1518
    def _run_cluster_gloo(
        self, model, envs, update_method, check_error_log, log_name
    ):
        assert update_method == "gloo", (
            "_run_cluster_gloo must have update_method: gloo, but get %s"
            % update_method
        )
        assert (
            not self._use_hallreduce
        ), "_run_cluster_gloo must have _use_hallreduce = false"
X
xiongkun 已提交
1519 1520 1521 1522 1523 1524 1525 1526

        worker_endpoints = self._ps_endpoints.split(",")

        trainer_num = len(worker_endpoints)

        procs = []
        pipes = []
        for i in range(0, trainer_num):
1527 1528 1529
            tr_cmd, tr_env = self._get_gloo_trainer_cmd(
                model, worker_endpoints[i], update_method, i, trainer_num
            )
X
xiongkun 已提交
1530 1531 1532
            tr_env.update(envs)
            tr_env["GLOG_vmodule"] = 'gloo_context=4'
            tr_env["GLOG_v"] = '3'
1533 1534 1535 1536 1537
            print(
                "use_hallreduce:{} tr_cmd:{}, env: {}".format(
                    self._use_hallreduce, tr_cmd, tr_env
                )
            )
X
xiongkun 已提交
1538

1539
            path = os.path.join(
1540
                self.temp_dir.name, log_name + f"_tr{i}_err.log"
1541
            )
1542
            tr_pipe = open(path, "wb")
X
xiongkun 已提交
1543 1544 1545

            print_to_err(
                type(self).__name__,
1546
                f"going to start process {i} with nccl2",
1547 1548 1549 1550 1551 1552 1553
            )
            tr_proc = subprocess.Popen(
                tr_cmd.strip().split(" "),
                stdout=subprocess.PIPE,
                stderr=tr_pipe,
                env=tr_env,
            )
X
xiongkun 已提交
1554 1555 1556 1557 1558 1559 1560 1561 1562

            procs.append(tr_proc)
            pipes.append(tr_pipe)

        outs = []
        for i in range(0, trainer_num):
            tr_out, tr_err = procs[i].communicate()
            outs.append(tr_out)
            pipes[i].close()
1563
            sys.stderr.write(f'trainer {i} stderr: {tr_err}\n')
X
xiongkun 已提交
1564 1565

        if trainer_num == 1:
1566 1567
            if check_error_log:
                print("outs[0]:", outs[0])
X
xiongkun 已提交
1568 1569 1570 1571 1572 1573 1574 1575
            return pickle.loads(outs[0])

        else:
            if check_error_log:
                print("outs[0]:", outs[0])
                print("outs[1]:", outs[1])
            return pickle.loads(outs[0]), pickle.loads(outs[1])

1576 1577 1578
    def _run_cluster_nccl2(
        self, model, envs, update_method, check_error_log, log_name
    ):
1579 1580
        if self._use_hallreduce:
            self._ps_endpoints = ""
1581 1582 1583

            global DIST_UT_PORT
            if DIST_UT_PORT == 0:
W
WangXi 已提交
1584
                # NOTE(wangxi). hallreduce test must use 4cards after nccl>=2.7
1585 1586
                for i in range(0, 4):
                    self._ps_endpoints += "127.0.0.1:%s," % (
1587 1588
                        self._find_free_port()
                    )
1589 1590 1591 1592
            else:
                for i in range(0, 4):
                    self._ps_endpoints += "127.0.0.1:%s," % (DIST_UT_PORT + i)
                DIST_UT_PORT += 4
1593
            self._ps_endpoints = self._ps_endpoints[:-1]
W
Wu Yi 已提交
1594

1595 1596
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
W
Wu Yi 已提交
1597

1598
        trainer_num = len(worker_endpoints)
W
Wu Yi 已提交
1599

1600 1601 1602 1603
        procs = []
        pipes = []
        for i in range(0, trainer_num):
            tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
1604 1605
                model, worker_endpoints[i], update_method, i, trainer_num
            )
1606
            tr_env.update(envs)
1607 1608 1609 1610 1611
            print(
                "use_hallreduce:{} tr_cmd:{}, env: {}".format(
                    self._use_hallreduce, tr_cmd, tr_env
                )
            )
W
Wu Yi 已提交
1612

1613
            path = os.path.join(
1614
                self.temp_dir.name, log_name + f"_tr{i}_err.log"
1615
            )
1616
            tr_pipe = open(path, "wb")
W
Wu Yi 已提交
1617

1618
            print_to_err(
1619
                type(self).__name__,
1620
                f"going to start process {i} with nccl2",
1621 1622 1623 1624 1625 1626 1627
            )
            tr_proc = subprocess.Popen(
                tr_cmd.strip().split(" "),
                stdout=subprocess.PIPE,
                stderr=tr_pipe,
                env=tr_env,
            )
1628 1629 1630 1631 1632 1633 1634 1635 1636

            procs.append(tr_proc)
            pipes.append(tr_pipe)

        outs = []
        for i in range(0, trainer_num):
            tr_out, tr_err = procs[i].communicate()
            outs.append(tr_out)
            pipes[i].close()
1637
            sys.stderr.write(f'trainer {i} stderr: {tr_err}\n')
1638

1639 1640 1641
        if check_error_log:
            print("outs[0]:", outs[0])
            print("outs[1]:", outs[1])
1642

1643
        return pickle.loads(outs[0]), pickle.loads(outs[1])
1644

1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655
    def _run_pipeline(self, model, envs, check_error_log, log_name):
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        update_method = "nccl2"

        trainer_num = len(worker_endpoints)

        procs = []
        pipes = []
        for i in range(0, trainer_num):
            tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
1656 1657
                model, worker_endpoints[i], update_method, i, trainer_num
            )
1658 1659 1660 1661 1662
            tr_env.update(envs)
            tr_env['CUDA_VISIBLE_DEVICES'] = "0,1"
            tr_env['NCCL_SHM_DISABLE'] = '1'
            tr_env['FLAGS_selected_gpus'] = str(i)
            tr_env['FLAGS_cudnn_deterministic'] = '0'
1663
            print(f"tr_cmd:{tr_cmd}, env: {tr_env}")
1664

1665
            path = os.path.join(self.temp_dir.name + f"tr{i}_err.log")
1666
            tr_pipe = open(path, "wb")
1667 1668 1669

            print_to_err(
                type(self).__name__,
1670
                f"going to start process {i} with nccl2",
1671 1672 1673 1674 1675 1676 1677
            )
            tr_proc = subprocess.Popen(
                tr_cmd.strip().split(" "),
                stdout=subprocess.PIPE,
                stderr=tr_pipe,
                env=tr_env,
            )
1678 1679 1680 1681 1682 1683 1684 1685 1686

            procs.append(tr_proc)
            pipes.append(tr_pipe)

        outs = []
        for i in range(0, trainer_num):
            tr_out, tr_err = procs[i].communicate()
            outs.append(tr_out)
            pipes[i].close()
1687
            sys.stderr.write(f'trainer {i} stderr: {tr_err}\n')
1688 1689 1690 1691 1692 1693

        if check_error_log:
            print("outs[0]:", outs[0])
            print("outs[1]:", outs[1])
        return pickle.loads(outs[0]), pickle.loads(outs[1])

1694
    def _get_required_envs(self, check_error_log=False, need_envs={}):
1695 1696 1697 1698 1699 1700
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
G
guru4elephant 已提交
1701
            "FLAGS_rpc_deadline": "30000",  # 5sec to fail fast
1702
            "FLAGS_rpc_retry_bind_port": "50",
1703
            "FLAGS_cudnn_deterministic": "1",
1704
            "FLAGS_rpc_disable_reuse_port": "1",
W
Wu Yi 已提交
1705
            "http_proxy": "",
1706
            "NCCL_P2P_DISABLE": "1",
1707
            "NCCL_SHM_DISABLE": "1",
1708 1709 1710
        }

        if check_error_log:
1711 1712 1713 1714
            required_envs["GLOG_vmodule"] = (
                "fused_all_reduce_op_handle=10,all_reduce_op_handle=10,alloc_continuous_space_op=10,fuse_all_reduce_op_pass=10,"
                "alloc_continuous_space_for_grad_pass=10,fast_threaded_ssa_graph_executor=10,executor=10,operator=10,"
                "sparse_all_reduce_op_handle=10,gen_nccl_id_op=10,gen_nccl_id_op_help=10,nccl_helper=10,grpc_client=10,"
1715
                "grpc_server=10,request_handler_impl=10,section_worker=10"
1716
            )
1717 1718
            required_envs["GLOG_logtostderr"] = "1"

1719 1720
        if os.getenv('NVIDIA_TF32_OVERRIDE', '') is not None:
            required_envs['NVIDIA_TF32_OVERRIDE'] = os.getenv(
1721 1722
                'NVIDIA_TF32_OVERRIDE', ''
            )
1723

1724 1725 1726
        required_envs.update(need_envs)
        return required_envs

1727 1728 1729 1730 1731 1732 1733 1734
    def check_with_place(
        self,
        model_file,
        delta=1e-3,
        check_error_log=False,
        need_envs={},
        log_name="",
    ):
1735
        if self._dygraph and (self._gloo_mode or self._nccl2_mode):
1736 1737 1738 1739 1740 1741 1742
            self.check_with_place_func(
                model_file=model_file,
                delta=delta,
                check_error_log=check_error_log,
                need_envs=need_envs,
                log_name=log_name,
            )
1743
        else:
1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759
            self.check_with_place_func(
                model_file=model_file,
                delta=delta,
                check_error_log=check_error_log,
                need_envs=need_envs,
                log_name=log_name,
            )

    def check_with_place_func(
        self,
        model_file,
        delta=1e-3,
        check_error_log=False,
        need_envs={},
        log_name="",
    ):
1760 1761
        required_envs = self._get_required_envs(check_error_log, need_envs)

X
xiongkun 已提交
1762
        if self._gloo_mode:
1763 1764 1765
            local_losses = self._run_local_gloo(
                model_file, required_envs, check_error_log, log_name=log_name
            )
X
xiongkun 已提交
1766
        else:
1767 1768 1769
            local_losses = self._run_local(
                model_file, required_envs, check_error_log, log_name=log_name
            )
1770

W
Wu Yi 已提交
1771
        if self._nccl2_mode:
W
Wu Yi 已提交
1772 1773
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
1774 1775
                    model_file,
                    required_envs,
1776 1777
                    update_method="nccl2_reduce_layer",
                    check_error_log=check_error_log,
1778 1779
                    log_name=log_name,
                )
W
Wu Yi 已提交
1780 1781
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
1782 1783
                    model_file,
                    required_envs,
1784 1785
                    update_method='nccl2',
                    check_error_log=check_error_log,
1786 1787
                    log_name=log_name,
                )
1788 1789 1790 1791 1792 1793
        elif self._bkcl_mode:
            tr0_losses, tr1_losses = self._run_cluster_nccl2(
                model_file,
                required_envs,
                update_method='bkcl',
                check_error_log=check_error_log,
1794 1795
                log_name=log_name,
            )
X
xiongkun 已提交
1796 1797 1798 1799 1800 1801 1802
        elif self._gloo_mode:
            # gloo mode, cpu only parallel train @xiongkun03
            tr0_losses, tr1_losses = self._run_cluster_gloo(
                model_file,
                required_envs,
                update_method='gloo',
                check_error_log=check_error_log,
1803 1804
                log_name=log_name,
            )
1805 1806 1807 1808 1809 1810
        elif self._hccl_mode:
            tr0_losses, tr1_losses = self._run_cluster_nccl2(
                model_file,
                required_envs,
                update_method='hccl',
                check_error_log=check_error_log,
1811 1812
                log_name=log_name,
            )
1813 1814 1815 1816 1817 1818
        elif self._cncl_mode:
            tr0_losses, tr1_losses = self._run_cluster_nccl2(
                model_file,
                required_envs,
                update_method='cncl',
                check_error_log=check_error_log,
1819 1820
                log_name=log_name,
            )
1821
        elif self._pipeline_mode:
1822 1823 1824
            tr0_losses, tr1_losses = self._run_pipeline(
                model_file, required_envs, check_error_log, log_name=log_name
            )
W
Wu Yi 已提交
1825
        else:
1826 1827 1828
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log, log_name=log_name
            )
1829 1830

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
1831 1832 1833
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
1834 1835 1836 1837
            if self._pipeline_mode:
                dist_loss = np.array([tr1_loss])
            else:
                dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
W
Wu Yi 已提交
1838 1839
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)
1840

1841 1842 1843 1844 1845 1846 1847 1848
    def check_with_place_multi_cards(
        self,
        model_file,
        delta=1e-3,
        check_error_log=False,
        need_envs={},
        log_name="",
    ):
1849

1850 1851 1852 1853 1854 1855
        # need open p2p or shm otherwise multi cards mode will hang
        need_envs.update({"NCCL_P2P_DISABLE": "0", "NCCL_SHM_DISABLE": "0"})

        required_envs = self._get_required_envs(check_error_log, need_envs)

        if self._use_dgc:
1856 1857 1858 1859 1860 1861 1862
            multi_cards_losses = self._run_local(
                model_file,
                required_envs,
                check_error_log,
                log_name=log_name + "_dgc_2cards",
                devices="0,1",
            )
1863 1864

            self._use_dgc = False
1865 1866 1867 1868 1869 1870 1871
            base_losses = self._run_local(
                model_file,
                required_envs,
                check_error_log,
                log_name=log_name + "_base_2cards",
                devices="0,1",
            )
1872 1873 1874 1875 1876 1877 1878 1879

            self._use_dgc = True

            for step_id in range(RUN_STEP):
                base_loss = base_losses[step_id]
                multi_cards_loss = multi_cards_losses[step_id]
                print("=======", base_loss, ":", multi_cards_loss, "=======")
                self.assertAlmostEqual(base_loss, multi_cards_loss, delta=delta)