test_dist_base.py 28.1 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25 26
import pickle
import numpy as np
27
import time
28
import paddle.fluid as fluid
29
from paddle.fluid import compiler
30 31 32
import paddle.fluid.dygraph as dygraph
from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import DataParallel
33

Y
Yan Xu 已提交
34
RUN_STEP = 5
35
DEFAULT_BATCH_SIZE = 2
36

T
typhoonzero 已提交
37

38 39 40
def my_print(class_name, log_str):
    localtime = time.asctime(time.localtime(time.time()))
    print_str = localtime + "\t" + class_name + "\t" + log_str
G
guru4elephant 已提交
41
    if six.PY2:
42
        sys.stderr.write(pickle.dumps(print_str))
G
guru4elephant 已提交
43
    else:
44
        sys.stderr.buffer.write(pickle.dumps(print_str))
G
guru4elephant 已提交
45 46


T
typhoonzero 已提交
47
class TestDistRunnerBase(object):
W
Wu Yi 已提交
48 49 50
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
51 52
                  single_device=False,
                  use_dgc=False):
T
typhoonzero 已提交
53 54 55
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

56
    @staticmethod
W
Wu Yi 已提交
57 58 59 60 61
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
62
                       dc_asgd=False,
63 64
                       current_endpoint=None,
                       nccl_comm_num=1):
T
typhoonzero 已提交
65
        # NOTE: import fluid until runtime, or else forking processes will cause error.
66
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
67
        config.enable_dc_asgd = dc_asgd
68
        config.sync_mode = sync_mode
69 70
        if nccl_comm_num > 1:
            config.nccl_comm_num = nccl_comm_num
71
        # config.runtime_split_send_recv = True
72
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
73 74 75 76
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
77
            trainers=trainers,
78
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
79 80
        return t

W
Wu Yi 已提交
81
    def run_pserver(self, args):
W
Wu Yi 已提交
82
        self.lr = args.lr
83
        self.get_model(batch_size=args.batch_size)
84
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
85 86
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
W
Wu Yi 已提交
87
                                args.trainers, args.sync_mode, args.dc_asgd)
W
Wu Yi 已提交
88 89 90
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
91

T
typhoonzero 已提交
92 93 94
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
95
        my_print(type(self).__name__, "run pserver startup program done.")
T
typhoonzero 已提交
96
        exe.run(pserver_prog)
97
        my_print(type(self).__name__, "run pserver main program done.")
T
typhoonzero 已提交
98

99
    def run_trainer(self, args):
W
Wu Yi 已提交
100
        self.lr = args.lr
W
Wu Yi 已提交
101 102 103
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
104 105 106
        elif args.use_dgc:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
107 108 109
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
110

W
Wu Yi 已提交
111
        if args.mem_opt:
112
            my_print(type(self).__name__, "begin to run memory optimize")
113
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
114
            my_print(type(self).__name__, "trainer run memory optimize done.")
W
Wu Yi 已提交
115
        if args.update_method == "pserver":
116 117 118
            my_print(
                type(self).__name__,
                "begin to run transpile on trainer with pserver mode")
W
Wu Yi 已提交
119 120 121
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
W
Wu Yi 已提交
122
                                    args.sync_mode, args.dc_asgd)
T
typhoonzero 已提交
123
            trainer_prog = t.get_trainer_program()
124 125 126
            my_print(
                type(self).__name__,
                "get trainer program done with pserver mode.")
W
Wu Yi 已提交
127
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
128 129 130
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
131
            config.nccl_comm_num = args.nccl_comm_num
132 133 134
            my_print(
                type(self).__name__,
                "begin to run transpile on trainer with nccl2 mode")
W
Wu Yi 已提交
135 136 137 138 139 140 141
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
142 143 144
            my_print(
                type(self).__name__,
                "get trainer program done. with nccl2 mode")
W
Wu Yi 已提交
145
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
146
        else:
147 148 149
            my_print(
                type(self).__name__,
                "do nothing about main program, just use it")
T
typhoonzero 已提交
150
            trainer_prog = fluid.default_main_program()
151
            my_print(type(self).__name__, "use main program done.")
T
typhoonzero 已提交
152

153
        if args.use_cuda:
154 155
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
156 157 158
        else:
            place = fluid.CPUPlace()

159 160
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
161
        my_print(type(self).__name__, "run worker startup program done.")
T
typhoonzero 已提交
162

W
Wu Yi 已提交
163 164 165
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
        exec_strategy.allow_op_delay = False
166

W
Wu Yi 已提交
167
        build_stra = fluid.BuildStrategy()
168 169 170
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
171

172 173 174
        if args.enable_backward_deps:
            build_stra.enable_backward_optimizer_op_deps = True

W
Wu Yi 已提交
175 176 177 178 179
        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
180
        pass_builder = None
X
Xin Pan 已提交
181
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
182
            pass_builder = build_stra._finalize_strategy_and_create_passes()
183
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
184
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
185

W
Wu Yi 已提交
186
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
187 188
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
189
        else:
W
Wu Yi 已提交
190
            # case args.update_method == "nccl2_reduce_layer":
191 192
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
193

194
        my_print(type(self).__name__, "begin to compile with data parallel")
X
Xin Pan 已提交
195
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
196
            loss_name=avg_cost.name,
W
Wu Yi 已提交
197
            build_strategy=build_stra,
W
Wu Yi 已提交
198
            exec_strategy=exec_strategy)
199
        my_print(type(self).__name__, "program compiled with data parallel")
T
typhoonzero 已提交
200

201 202 203 204 205 206 207 208 209
        if args.use_cuda and args.update_method == "nccl2":
            # it just for test share_vars_from feature.
            test_exe = fluid.ParallelExecutor(
                use_cuda=True,
                loss_name=avg_cost.name,
                build_strategy=build_stra,
                main_program=test_program,
                share_vars_from=binary._executor)

T
typhoonzero 已提交
210 211 212 213 214 215
        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
216
        reader_generator = train_reader()
T
typhoonzero 已提交
217

218 219
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
220
            if args.update_method != "local" and args.use_reader_alloc:
221 222 223 224 225 226 227
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
228

229
        my_print(type(self).__name__, "begin to train on trainer")
W
Wu Yi 已提交
230
        out_losses = []
231
        for i in six.moves.xrange(RUN_STEP):
232 233
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
234
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
235
            out_losses.append(loss[0])
236 237 238
            my_print(type(self).__name__, "run step %d finished" % i)
        my_print(type(self).__name__, "trainer run finished")

W
Wu Yi 已提交
239 240 241 242
        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))
T
typhoonzero 已提交
243 244


245 246 247 248 249 250 251 252 253 254
class TestParallelDyGraphRunnerBase(object):
    def get_model(self):
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
            "train_one_loop should be implemented by the child classes.")

    def run_trainer(self, args):
Y
Yan Xu 已提交
255

256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
        seed = 90
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(device_id)

        def _get_data(batch):
            if args.update_method != "local":
                new_batch = []
                for offset, item in enumerate(batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return batch

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
Y
Yan Xu 已提交
273 274 275
            np.random.seed(seed)
            import random
            random.seed = seed
276 277
            model, train_reader, opt = self.get_model()
            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
Y
Yan Xu 已提交
278

279 280 281 282 283 284
            if args.update_method == "nccl2":
                strategy = dygraph.parallel.ParallelStrategy()
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
285 286 287
                my_print(
                    type(self).__name__,
                    "begin to prepare context in dygraph with nccl2")
288
                dygraph.parallel.prepare_context(strategy)
Y
Yan Xu 已提交
289
                model = dygraph.parallel.DataParallel(model, strategy)
290
                my_print(type(self).__name__, "model built in dygraph")
291
            out_losses = []
292
            my_print(type(self).__name__, "begin to run dygraph training")
293 294 295 296 297
            for step_id, data in enumerate(train_reader()):
                data = _get_data(data)
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)
G
guru4elephant 已提交
298
                if step_id % 10 == 0:
299 300 301
                    my_print(
                        type(self).__name__,
                        "loss at step %d: %f" % (step_id, loss))
Y
Yan Xu 已提交
302
                out_losses.append(loss.numpy())
303

Y
Yan Xu 已提交
304 305 306
                # FIXME(Yancey1989): scale the loss inplace
                if args.update_method == "nccl2":
                    loss = model.scale_loss(loss)
307 308

                loss.backward()
Y
Yan Xu 已提交
309 310
                if args.update_method == "nccl2":
                    model.apply_collective_grads()
311 312 313

                opt.minimize(loss)
                model.clear_gradients()
314
            my_print(type(self).__name__, pickle.dumps(out_losses))
315 316


T
typhoonzero 已提交
317
def runtime_main(test_class):
W
Wu Yi 已提交
318 319 320 321
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
322 323 324 325
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
326
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
327 328
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
329
    parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
330 331
    parser.add_argument(
        '--enable_backward_deps', type=bool, required=False, default=1)
W
Wu Yi 已提交
332 333 334 335
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
336
    parser.add_argument('--use_cuda', action='store_true')
337
    parser.add_argument('--use_dgc', action='store_true')
W
Wu Yi 已提交
338
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
339
    parser.add_argument('--dc_asgd', action='store_true')
340
    parser.add_argument(
W
Wu Yi 已提交
341
        '--use_reader_alloc', action='store_true', required=False)
342
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
343
    parser.add_argument('--lr', required=False, type=float, default=0.001)
344 345
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
346 347 348 349 350
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
W
Wu Yi 已提交
351 352

    args = parser.parse_args()
T
typhoonzero 已提交
353 354

    model = test_class()
W
Wu Yi 已提交
355
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
356
        model.run_pserver(args)
T
typhoonzero 已提交
357
    else:
358
        model.run_trainer(args)
X
Xin Pan 已提交
359

M
minqiyang 已提交
360

M
minqiyang 已提交
361
import paddle.compat as cpt
Y
Yancey1989 已提交
362 363
import socket
from contextlib import closing
M
minqiyang 已提交
364

X
Xin Pan 已提交
365 366

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
367 368 369
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

370 371 372
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
373
            self._use_dgc = False
374 375 376 377 378 379 380
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
381 382 383 384
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
385

X
Xin Pan 已提交
386 387 388
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
389
        self._port_set = set()
Y
Yancey1989 已提交
390 391
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
392
        self._python_interp = sys.executable
W
Wu Yi 已提交
393
        self._sync_mode = True
394
        self._enforce_place = None
W
Wu Yi 已提交
395
        self._mem_opt = False
W
Wu Yi 已提交
396
        self._use_reduce = False
W
Wu Yi 已提交
397
        self._dc_asgd = False  # must use with async mode
398
        self._use_reader_alloc = True
W
Wu Yi 已提交
399
        self._nccl2_mode = False
400
        self._mp_mode = False
W
Wu Yi 已提交
401 402 403 404 405
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
406
        self._lr = 0.001
407
        self._use_dgc = False
408
        self._dygraph = False
409
        self._nccl_comm_num = 1
W
Wu Yi 已提交
410
        self._setup_config()
411
        self._after_setup_config()
412
        self._enable_backward_deps = False
X
Xin Pan 已提交
413

Y
Yancey1989 已提交
414
    def _find_free_port(self):
Y
Yancey1989 已提交
415 416 417 418
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
419 420
                my_print(
                    type(self).__name__, "socket name: %s" % s.getsockname()[1])
Y
Yancey1989 已提交
421 422 423 424 425 426 427
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
428

429
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
430
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
431
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
W
Wu Yi 已提交
432
        ps0_cmd = ps_cmd % \
433 434
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
435
        ps1_cmd = ps_cmd % \
436 437
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
438 439 440 441 442 443 444

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
445

446 447
        print(ps0_cmd)
        print(ps1_cmd)
M
minqiyang 已提交
448 449
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
450

451
        my_print(type(self).__name__, "going to start pserver process 0")
X
Xin Pan 已提交
452
        ps0_proc = subprocess.Popen(
453 454 455 456
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
457
        my_print(type(self).__name__, "going to start pserver process 1")
X
Xin Pan 已提交
458
        ps1_proc = subprocess.Popen(
459 460 461 462
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
463

464
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
465

466 467 468 469 470 471
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
472

W
Wu Yi 已提交
473 474
        cmd = "%s %s --role trainer --lr %f" % (self._python_interp, model,
                                                self._lr)
475 476 477 478
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
479 480
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
481

482
        if self.__use_cuda:
483
            cmd += " --use_cuda"
W
Wu Yi 已提交
484 485 486 487 488
            env_local = {
                "CUDA_VISIBLE_DEVICES": "0",
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
489 490 491
        else:
            env_local = {'CPU_NUM': '1'}

W
Wu Yi 已提交
492 493
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
494

495
        if check_error_log:
496
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
497
            local_proc = subprocess.Popen(
498
                cmd.split(" "),
G
gongweibao 已提交
499
                stdout=subprocess.PIPE,
500
                stderr=err_log,
W
Wu Yi 已提交
501
                env=env_local)
G
gongweibao 已提交
502 503
        else:
            local_proc = subprocess.Popen(
504
                cmd.split(" "),
G
gongweibao 已提交
505
                stdout=subprocess.PIPE,
506
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
507
                env=env_local)
G
gongweibao 已提交
508

509 510 511 512 513 514
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
515
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
516

W
Wu Yi 已提交
517
        return pickle.loads(local_out)
518 519

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
520
        # Run dist train to compare with local results
521 522
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
W
Wu Yi 已提交
523

X
Xin Pan 已提交
524
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
525

W
Wu Yi 已提交
526
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"
W
Wu Yi 已提交
527
        tr0_cmd = tr_cmd % \
528
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
529
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
530
        tr1_cmd = tr_cmd % \
531
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
532
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
533 534 535 536 537 538 539 540 541 542

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
543 544 545
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
546
        if self.__use_cuda:
547 548 549 550 551 552 553 554 555 556
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
557

W
Wu Yi 已提交
558 559
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
560 561
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
562

563
        my_print(type(self).__name__, "going to start trainer process 0")
X
Xin Pan 已提交
564
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
565
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
566
            stdout=subprocess.PIPE,
G
gongweibao 已提交
567
            stderr=tr0_pipe,
X
Xin Pan 已提交
568
            env=env0)
569
        my_print(type(self).__name__, "going to start trainer process 1")
X
Xin Pan 已提交
570
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
571
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
572
            stdout=subprocess.PIPE,
G
gongweibao 已提交
573
            stderr=tr1_pipe,
X
Xin Pan 已提交
574 575
            env=env1)

576 577 578 579 580 581 582 583 584 585 586 587
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

588 589
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
590

G
gongweibao 已提交
591
        # close trainer file
592 593 594 595
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
596

W
Wu Yi 已提交
597 598
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
599

600
        # print server log
G
guru4elephant 已提交
601 602
        '''
        with open("/tmp/ps0_err.log", "rb") as fn:
603
            sys.stderr.write("ps0 stderr: %s\n" % fn.read())
G
guru4elephant 已提交
604
        with open("/tmp/ps1_err.log", "rb") as fn:
605
            sys.stderr.write("ps1 stderr: %s\n" % fn.read())
G
guru4elephant 已提交
606
        '''
607

608
        # print log
G
guru4elephant 已提交
609 610
        '''
        with open("/tmp/tr0_err.log", "rb") as fn:
611
            sys.stderr.write('trainer 0 stderr: %s\n' % fn.read())
G
guru4elephant 已提交
612
        with open("/tmp/tr1_err.log", "rb") as fn:
613
            sys.stderr.write('trainer 1 stderr: %s\n' % fn.read())
G
guru4elephant 已提交
614
        '''
615

W
Wu Yi 已提交
616 617
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

W
Wu Yi 已提交
618 619
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
                           check_error_log):
W
Wu Yi 已提交
620 621 622
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
W
Wu Yi 已提交
623 624 625 626
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
627

W
Wu Yi 已提交
628
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
W
Wu Yi 已提交
629 630
        tr0_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
631
                   0, w0_ep, update_method, self._lr)
W
Wu Yi 已提交
632 633
        tr1_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
634
                   1, w1_ep, update_method, self._lr)
W
Wu Yi 已提交
635 636 637 638 639 640 641 642 643 644 645 646 647

        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
        if self.__use_cuda:
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
W
Wu Yi 已提交
648 649 650 651 652 653 654 655 656 657 658
            env0 = {
                "CUDA_VISIBLE_DEVICES": "0",
                # for test nccl2 layer
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "0"
            }
            env1 = {
                "CUDA_VISIBLE_DEVICES": "1",
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "1"
            }
W
Wu Yi 已提交
659 660 661 662
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

663 664 665
        if self._use_dgc:
            tr0_cmd += " --use_dgc"
            tr1_cmd += " --use_dgc"
666 667 668 669 670

        if self._nccl_comm_num > 1:
            tr0_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
            tr1_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)

671 672 673 674
        if self._mp_mode:
            env0 = {"FLAGS_selected_gpus": "0"}
            env1 = {"FLAGS_selected_gpus": "1"}

675 676 677 678
        if self._enable_backward_deps:
            tr0_cmd += " --enable_backward_deps 1"
            tr1_cmd += " --enable_backward_deps 1"

W
Wu Yi 已提交
679 680 681 682 683 684 685 686
        env0.update(envs)
        env1.update(envs)

        print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")

687
        my_print(type(self).__name__, "going to start process 0 with nccl2")
W
Wu Yi 已提交
688 689 690 691 692
        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0)
693
        my_print(type(self).__name__, "going to start process 1 with nccl2")
W
Wu Yi 已提交
694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1)

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()

        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()

        # print log
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)

W
Wu Yi 已提交
711
        return pickle.loads(tr0_out), pickle.loads(tr1_out)
712 713 714 715 716 717 718 719 720 721 722 723

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
G
guru4elephant 已提交
724
            "FLAGS_rpc_deadline": "30000",  # 5sec to fail fast
725
            "FLAGS_cudnn_deterministic": "1",
W
Wu Yi 已提交
726 727
            "http_proxy": "",
            "NCCL_P2P_DISABLE": "1"
728 729 730 731 732
        }

        required_envs.update(need_envs)

        if check_error_log:
W
Wu Yi 已提交
733
            required_envs["GLOG_v"] = "3"
734 735 736 737
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
W
Wu Yi 已提交
738
                                check_error_log)
W
Wu Yi 已提交
739
        if self._nccl2_mode:
W
Wu Yi 已提交
740 741 742 743 744 745
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, True, check_error_log)
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, False, check_error_log)
W
Wu Yi 已提交
746 747 748
        else:
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log)
749 750

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
751 752 753
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
Y
Yan Xu 已提交
754
            dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
W
Wu Yi 已提交
755 756
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)