test_dist_base.py 27.4 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25 26
import pickle
import numpy as np
27
import time
28
import paddle.fluid as fluid
29
from paddle.fluid import compiler
30 31 32
import paddle.fluid.dygraph as dygraph
from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import DataParallel
33

Y
Yan Xu 已提交
34
RUN_STEP = 5
35
DEFAULT_BATCH_SIZE = 2
36

T
typhoonzero 已提交
37

38 39 40
def my_print(class_name, log_str):
    localtime = time.asctime(time.localtime(time.time()))
    print_str = localtime + "\t" + class_name + "\t" + log_str
G
guru4elephant 已提交
41
    if six.PY2:
42
        sys.stderr.write(pickle.dumps(print_str))
G
guru4elephant 已提交
43
    else:
44
        sys.stderr.buffer.write(pickle.dumps(print_str))
G
guru4elephant 已提交
45 46


T
typhoonzero 已提交
47
class TestDistRunnerBase(object):
W
Wu Yi 已提交
48 49 50
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
51 52
                  single_device=False,
                  use_dgc=False):
T
typhoonzero 已提交
53 54 55
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

56
    @staticmethod
W
Wu Yi 已提交
57 58 59 60 61
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
62
                       dc_asgd=False,
63 64
                       current_endpoint=None,
                       nccl_comm_num=1):
T
typhoonzero 已提交
65
        # NOTE: import fluid until runtime, or else forking processes will cause error.
66
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
67
        config.enable_dc_asgd = dc_asgd
68
        config.sync_mode = sync_mode
69 70
        if nccl_comm_num > 1:
            config.nccl_comm_num = nccl_comm_num
71
        # config.runtime_split_send_recv = True
72
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
73 74 75 76
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
77
            trainers=trainers,
78
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
79 80
        return t

W
Wu Yi 已提交
81
    def run_pserver(self, args):
W
Wu Yi 已提交
82
        self.lr = args.lr
83
        self.get_model(batch_size=args.batch_size)
84
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
85 86
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
W
Wu Yi 已提交
87
                                args.trainers, args.sync_mode, args.dc_asgd)
W
Wu Yi 已提交
88 89 90
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
91

T
typhoonzero 已提交
92 93 94
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
95
        my_print(type(self).__name__, "run pserver startup program done.")
T
typhoonzero 已提交
96
        exe.run(pserver_prog)
97
        my_print(type(self).__name__, "run pserver main program done.")
T
typhoonzero 已提交
98

99
    def run_trainer(self, args):
W
Wu Yi 已提交
100
        self.lr = args.lr
W
Wu Yi 已提交
101 102 103
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
104 105 106
        elif args.use_dgc:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
107 108 109
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
110

W
Wu Yi 已提交
111
        if args.mem_opt:
112
            my_print(type(self).__name__, "begin to run memory optimize")
113
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
114
            my_print(type(self).__name__, "trainer run memory optimize done.")
W
Wu Yi 已提交
115
        if args.update_method == "pserver":
116 117 118
            my_print(
                type(self).__name__,
                "begin to run transpile on trainer with pserver mode")
W
Wu Yi 已提交
119 120 121
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
W
Wu Yi 已提交
122
                                    args.sync_mode, args.dc_asgd)
T
typhoonzero 已提交
123
            trainer_prog = t.get_trainer_program()
124 125 126
            my_print(
                type(self).__name__,
                "get trainer program done with pserver mode.")
W
Wu Yi 已提交
127
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
128 129 130
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
131
            config.nccl_comm_num = args.nccl_comm_num
132 133 134
            if args.use_hallreduce:
                config.use_hierarchical_allreduce = True
                config.hierarchical_allreduce_inter_nranks = args.hallreduce_inter_nranks
135 136 137
            my_print(
                type(self).__name__,
                "begin to run transpile on trainer with nccl2 mode")
W
Wu Yi 已提交
138 139 140 141 142 143 144
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
145 146 147
            my_print(
                type(self).__name__,
                "get trainer program done. with nccl2 mode")
W
Wu Yi 已提交
148
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
149
        else:
150 151 152
            my_print(
                type(self).__name__,
                "do nothing about main program, just use it")
T
typhoonzero 已提交
153
            trainer_prog = fluid.default_main_program()
154
            my_print(type(self).__name__, "use main program done.")
T
typhoonzero 已提交
155

156
        if args.use_cuda:
157 158
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
159 160 161
        else:
            place = fluid.CPUPlace()

162 163
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
164
        my_print(type(self).__name__, "run worker startup program done.")
T
typhoonzero 已提交
165

W
Wu Yi 已提交
166 167 168
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
        exec_strategy.allow_op_delay = False
169

W
Wu Yi 已提交
170
        build_stra = fluid.BuildStrategy()
171 172 173
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
174

175 176 177
        if args.enable_backward_deps:
            build_stra.enable_backward_optimizer_op_deps = True

W
Wu Yi 已提交
178 179 180 181 182
        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
183
        pass_builder = None
X
Xin Pan 已提交
184
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
185
            pass_builder = build_stra._finalize_strategy_and_create_passes()
186
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
187
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
188

W
Wu Yi 已提交
189
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
190 191
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
192
        else:
W
Wu Yi 已提交
193
            # case args.update_method == "nccl2_reduce_layer":
194 195
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
196

197
        my_print(type(self).__name__, "begin to compile with data parallel")
X
Xin Pan 已提交
198
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
199
            loss_name=avg_cost.name,
W
Wu Yi 已提交
200
            build_strategy=build_stra,
W
Wu Yi 已提交
201
            exec_strategy=exec_strategy)
202
        my_print(type(self).__name__, "program compiled with data parallel")
T
typhoonzero 已提交
203 204 205 206 207 208 209

        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
210
        reader_generator = train_reader()
T
typhoonzero 已提交
211

212 213
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
214
            if args.update_method != "local" and args.use_reader_alloc:
215 216 217 218 219 220 221
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
222

223
        my_print(type(self).__name__, "begin to train on trainer")
W
Wu Yi 已提交
224
        out_losses = []
225
        for i in six.moves.xrange(RUN_STEP):
226 227
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
228
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
229
            out_losses.append(loss[0])
230 231 232
            my_print(type(self).__name__, "run step %d finished" % i)
        my_print(type(self).__name__, "trainer run finished")

W
Wu Yi 已提交
233 234 235 236
        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))
T
typhoonzero 已提交
237 238


239 240 241 242 243 244 245 246 247 248
class TestParallelDyGraphRunnerBase(object):
    def get_model(self):
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
            "train_one_loop should be implemented by the child classes.")

    def run_trainer(self, args):
Y
Yan Xu 已提交
249

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
        seed = 90
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(device_id)

        def _get_data(batch):
            if args.update_method != "local":
                new_batch = []
                for offset, item in enumerate(batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return batch

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
Y
Yan Xu 已提交
267 268 269
            np.random.seed(seed)
            import random
            random.seed = seed
270 271
            model, train_reader, opt = self.get_model()
            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
Y
Yan Xu 已提交
272

273 274 275 276 277 278
            if args.update_method == "nccl2":
                strategy = dygraph.parallel.ParallelStrategy()
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
279 280 281
                my_print(
                    type(self).__name__,
                    "begin to prepare context in dygraph with nccl2")
282
                dygraph.parallel.prepare_context(strategy)
Y
Yan Xu 已提交
283
                model = dygraph.parallel.DataParallel(model, strategy)
284
                my_print(type(self).__name__, "model built in dygraph")
285
            out_losses = []
286
            my_print(type(self).__name__, "begin to run dygraph training")
287 288 289 290 291
            for step_id, data in enumerate(train_reader()):
                data = _get_data(data)
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)
G
guru4elephant 已提交
292
                if step_id % 10 == 0:
293 294 295
                    my_print(
                        type(self).__name__,
                        "loss at step %d: %f" % (step_id, loss))
Y
Yan Xu 已提交
296
                out_losses.append(loss.numpy())
297

Y
Yan Xu 已提交
298 299 300
                # FIXME(Yancey1989): scale the loss inplace
                if args.update_method == "nccl2":
                    loss = model.scale_loss(loss)
301 302

                loss.backward()
Y
Yan Xu 已提交
303 304
                if args.update_method == "nccl2":
                    model.apply_collective_grads()
305 306 307

                opt.minimize(loss)
                model.clear_gradients()
308
            my_print(type(self).__name__, pickle.dumps(out_losses))
309 310


T
typhoonzero 已提交
311
def runtime_main(test_class):
W
Wu Yi 已提交
312 313 314 315
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
316 317 318 319
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
320
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
321 322
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
323
    parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
324 325
    parser.add_argument('--enable_backward_deps', action='store_true')
    parser.add_argument('--use_hallreduce', action='store_true')
326
    parser.add_argument(
327
        '--hallreduce_inter_nranks', type=int, required=False, default=2)
W
Wu Yi 已提交
328 329 330 331
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
332
    parser.add_argument('--use_cuda', action='store_true')
333
    parser.add_argument('--use_dgc', action='store_true')
W
Wu Yi 已提交
334
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
335
    parser.add_argument('--dc_asgd', action='store_true')
336
    parser.add_argument(
W
Wu Yi 已提交
337
        '--use_reader_alloc', action='store_true', required=False)
338
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
339
    parser.add_argument('--lr', required=False, type=float, default=0.001)
340 341
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
342 343 344 345 346
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
W
Wu Yi 已提交
347 348

    args = parser.parse_args()
T
typhoonzero 已提交
349 350

    model = test_class()
W
Wu Yi 已提交
351
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
352
        model.run_pserver(args)
T
typhoonzero 已提交
353
    else:
354
        model.run_trainer(args)
X
Xin Pan 已提交
355

M
minqiyang 已提交
356

M
minqiyang 已提交
357
import paddle.compat as cpt
Y
Yancey1989 已提交
358 359
import socket
from contextlib import closing
M
minqiyang 已提交
360

X
Xin Pan 已提交
361 362

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
363 364 365
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

366 367 368
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
369
            self._use_dgc = False
370 371 372 373 374 375 376
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
377 378 379 380
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
381

X
Xin Pan 已提交
382 383 384
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
385
        self._port_set = set()
Y
Yancey1989 已提交
386 387
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
388
        self._python_interp = sys.executable
W
Wu Yi 已提交
389
        self._sync_mode = True
390
        self._enforce_place = None
W
Wu Yi 已提交
391
        self._mem_opt = False
W
Wu Yi 已提交
392
        self._use_reduce = False
W
Wu Yi 已提交
393
        self._dc_asgd = False  # must use with async mode
394
        self._use_reader_alloc = True
W
Wu Yi 已提交
395
        self._nccl2_mode = False
396
        self._mp_mode = False
W
Wu Yi 已提交
397 398 399 400 401
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
402
        self._lr = 0.001
403
        self._use_dgc = False
404
        self._dygraph = False
405
        self._nccl_comm_num = 1
406 407
        self._enable_backward_deps = False
        self._use_hallreduce = False
W
Wu Yi 已提交
408
        self._setup_config()
409
        self._after_setup_config()
X
Xin Pan 已提交
410

Y
Yancey1989 已提交
411
    def _find_free_port(self):
Y
Yancey1989 已提交
412 413 414 415
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
416 417
                my_print(
                    type(self).__name__, "socket name: %s" % s.getsockname()[1])
Y
Yancey1989 已提交
418 419 420 421 422 423 424
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
425

426
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
427
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
428
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
W
Wu Yi 已提交
429
        ps0_cmd = ps_cmd % \
430 431
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
432
        ps1_cmd = ps_cmd % \
433 434
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
435 436 437 438 439 440 441

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
442

443 444
        print(ps0_cmd)
        print(ps1_cmd)
M
minqiyang 已提交
445 446
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
447

448
        my_print(type(self).__name__, "going to start pserver process 0")
X
Xin Pan 已提交
449
        ps0_proc = subprocess.Popen(
450 451 452 453
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
454
        my_print(type(self).__name__, "going to start pserver process 1")
X
Xin Pan 已提交
455
        ps1_proc = subprocess.Popen(
456 457 458 459
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
460

461
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
462

463 464 465 466 467 468
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
469

W
Wu Yi 已提交
470 471
        cmd = "%s %s --role trainer --lr %f" % (self._python_interp, model,
                                                self._lr)
472 473 474 475
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
476 477
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
478

479
        if self.__use_cuda:
480
            cmd += " --use_cuda"
W
Wu Yi 已提交
481 482 483 484 485
            env_local = {
                "CUDA_VISIBLE_DEVICES": "0",
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
486 487 488
        else:
            env_local = {'CPU_NUM': '1'}

W
Wu Yi 已提交
489 490
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
491

492
        if check_error_log:
493
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
494
            local_proc = subprocess.Popen(
495
                cmd.split(" "),
G
gongweibao 已提交
496
                stdout=subprocess.PIPE,
497
                stderr=err_log,
W
Wu Yi 已提交
498
                env=env_local)
G
gongweibao 已提交
499 500
        else:
            local_proc = subprocess.Popen(
501
                cmd.split(" "),
G
gongweibao 已提交
502
                stdout=subprocess.PIPE,
503
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
504
                env=env_local)
G
gongweibao 已提交
505

506 507 508 509 510 511
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
512
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
513

W
Wu Yi 已提交
514
        return pickle.loads(local_out)
515 516

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
517
        # Run dist train to compare with local results
518 519
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
W
Wu Yi 已提交
520

X
Xin Pan 已提交
521
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
522

W
Wu Yi 已提交
523
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"
W
Wu Yi 已提交
524
        tr0_cmd = tr_cmd % \
525
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
526
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
527
        tr1_cmd = tr_cmd % \
528
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
529
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
530 531 532 533 534 535 536 537 538 539

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
540 541 542
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
543
        if self.__use_cuda:
544 545 546 547 548 549 550 551 552 553
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
554

W
Wu Yi 已提交
555 556
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
557 558
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
559

560
        my_print(type(self).__name__, "going to start trainer process 0")
X
Xin Pan 已提交
561
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
562
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
563
            stdout=subprocess.PIPE,
G
gongweibao 已提交
564
            stderr=tr0_pipe,
X
Xin Pan 已提交
565
            env=env0)
566
        my_print(type(self).__name__, "going to start trainer process 1")
X
Xin Pan 已提交
567
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
568
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
569
            stdout=subprocess.PIPE,
G
gongweibao 已提交
570
            stderr=tr1_pipe,
X
Xin Pan 已提交
571 572
            env=env1)

573 574 575 576 577 578 579 580 581 582 583 584
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

585 586
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
587

G
gongweibao 已提交
588
        # close trainer file
589 590 591 592
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
593

W
Wu Yi 已提交
594 595
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
596

W
Wu Yi 已提交
597 598
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

599 600 601 602 603
    def _get_nccl2_trainer_cmd(self, model, ep, update_method, trainer_id,
                               trainer_num):
        env = {}
        tr_cmd = "%s -u %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
        tr_cmd = tr_cmd % \
W
Wu Yi 已提交
604
                  (self._python_interp, model, self._ps_endpoints,
605
                   trainer_id, ep, update_method, self._lr)
W
Wu Yi 已提交
606 607

        if self._mem_opt:
608
            tr_cmd += " --mem_opt"
W
Wu Yi 已提交
609
        if self._use_reduce:
610
            tr_cmd += " --use_reduce"
W
Wu Yi 已提交
611
        if self._use_reader_alloc:
612
            tr_cmd += " --use_reader_alloc"
W
Wu Yi 已提交
613
        if self.__use_cuda:
614 615 616 617 618 619
            tr_cmd += " --use_cuda"
            env.update({
                "CUDA_VISIBLE_DEVICES": "{}".format(trainer_id),
                "PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
                "PADDLE_TRAINER_ID": "{}".format(trainer_id)
            })
W
Wu Yi 已提交
620
        else:
621
            env.update({'CPU_NUM': '1'})
W
Wu Yi 已提交
622

623
        if self._use_dgc:
624 625 626 627
            tr_cmd += " --use_dgc"

        if self._mp_mode:
            env = {"FLAGS_selected_gpus": "{}".format(trainer_id)}
628 629

        if self._nccl_comm_num > 1:
630
            tr_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
631

632 633
        if self._use_hallreduce:
            tr_cmd += " --use_hallreduce --hallreduce_inter_nranks 2"
634

635
        if self._enable_backward_deps:
636
            tr_cmd += " --enable_backward_deps"
637

638
        return tr_cmd, env
W
Wu Yi 已提交
639

640 641 642 643 644 645 646
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
                           check_error_log):
        if self._use_hallreduce:
            self._ps_endpoints = ""
            for i in range(0, 4):
                self._ps_endpoints += "127.0.0.1:%s," % (self._find_free_port())
            self._ps_endpoints = self._ps_endpoints[:-1]
W
Wu Yi 已提交
647

648 649 650 651 652 653
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
654

655
        trainer_num = len(worker_endpoints)
W
Wu Yi 已提交
656

657 658 659 660 661 662 663 664
        procs = []
        pipes = []
        for i in range(0, trainer_num):
            tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
                model, worker_endpoints[i], update_method, i, trainer_num)
            tr_env.update(envs)
            print("use_hallreduce:{} tr_cmd:{}, env: {}".format(
                self._use_hallreduce, tr_cmd, tr_env))
W
Wu Yi 已提交
665

666
            tr_pipe = open("/tmp/tr{}_err.log".format(i), "wb")
W
Wu Yi 已提交
667

668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
            my_print(
                type(self).__name__,
                "going to start process {} with nccl2".format(i))
            tr_proc = subprocess.Popen(
                tr_cmd.strip().split(" "),
                stdout=subprocess.PIPE,
                stderr=tr_pipe,
                env=tr_env)

            procs.append(tr_proc)
            pipes.append(tr_pipe)

        outs = []
        for i in range(0, trainer_num):
            tr_out, tr_err = procs[i].communicate()
            outs.append(tr_out)
            pipes[i].close()
            sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err))

        return pickle.loads(outs[0]), pickle.loads(outs[1])
688 689 690 691 692 693 694 695 696 697 698 699

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
G
guru4elephant 已提交
700
            "FLAGS_rpc_deadline": "30000",  # 5sec to fail fast
701
            "FLAGS_cudnn_deterministic": "1",
W
Wu Yi 已提交
702
            "http_proxy": "",
703 704
            "NCCL_P2P_DISABLE": "1",
            "NCCL_SHM_DISABLE": "1"
705 706 707 708 709
        }

        required_envs.update(need_envs)

        if check_error_log:
710
            required_envs["GLOG_v"] = "10"
711 712 713 714
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
W
Wu Yi 已提交
715
                                check_error_log)
W
Wu Yi 已提交
716
        if self._nccl2_mode:
W
Wu Yi 已提交
717 718 719 720 721 722
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, True, check_error_log)
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, False, check_error_log)
W
Wu Yi 已提交
723 724 725
        else:
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log)
726 727

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
728 729 730
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
Y
Yan Xu 已提交
731
            dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
W
Wu Yi 已提交
732 733
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)