test_dist_base.py 25.5 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25 26
import pickle
import numpy as np
T
typhoonzero 已提交
27

28
import paddle.fluid as fluid
29
from paddle.fluid import compiler
30 31 32
import paddle.fluid.dygraph as dygraph
from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import DataParallel
33

Y
Yan Xu 已提交
34
RUN_STEP = 5
35
DEFAULT_BATCH_SIZE = 2
36

T
typhoonzero 已提交
37 38

class TestDistRunnerBase(object):
W
Wu Yi 已提交
39 40 41
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
42 43
                  single_device=False,
                  use_dgc=False):
T
typhoonzero 已提交
44 45 46
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

47
    @staticmethod
W
Wu Yi 已提交
48 49 50 51 52
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
53
                       dc_asgd=False,
54 55
                       current_endpoint=None,
                       nccl_comm_num=1):
T
typhoonzero 已提交
56
        # NOTE: import fluid until runtime, or else forking processes will cause error.
57
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
58
        config.enable_dc_asgd = dc_asgd
59
        config.sync_mode = sync_mode
60 61
        if nccl_comm_num > 1:
            config.nccl_comm_num = nccl_comm_num
62
        # config.runtime_split_send_recv = True
63
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
64 65 66 67
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
68
            trainers=trainers,
69
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
70 71
        return t

W
Wu Yi 已提交
72
    def run_pserver(self, args):
W
Wu Yi 已提交
73
        self.lr = args.lr
74
        self.get_model(batch_size=args.batch_size)
75
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
76 77
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
W
Wu Yi 已提交
78
                                args.trainers, args.sync_mode, args.dc_asgd)
W
Wu Yi 已提交
79 80 81
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
82

T
typhoonzero 已提交
83 84 85 86 87
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        exe.run(pserver_prog)

88
    def run_trainer(self, args):
W
Wu Yi 已提交
89
        self.lr = args.lr
W
Wu Yi 已提交
90 91 92
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
93 94 95
        elif args.use_dgc:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
96 97 98
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
99

W
Wu Yi 已提交
100
        if args.mem_opt:
101
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
W
Wu Yi 已提交
102
        if args.update_method == "pserver":
W
Wu Yi 已提交
103 104 105
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
W
Wu Yi 已提交
106
                                    args.sync_mode, args.dc_asgd)
T
typhoonzero 已提交
107
            trainer_prog = t.get_trainer_program()
W
Wu Yi 已提交
108
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
109 110 111
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
112
            config.nccl_comm_num = args.nccl_comm_num
W
Wu Yi 已提交
113 114 115 116 117 118 119
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
120

W
Wu Yi 已提交
121
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
122 123 124
        else:
            trainer_prog = fluid.default_main_program()

125
        if args.use_cuda:
126 127
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
128 129 130
        else:
            place = fluid.CPUPlace()

131 132
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
T
typhoonzero 已提交
133

W
Wu Yi 已提交
134 135 136
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
        exec_strategy.allow_op_delay = False
137

W
Wu Yi 已提交
138
        build_stra = fluid.BuildStrategy()
139 140 141
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
142

143 144 145
        if args.enable_backward_deps:
            build_stra.enable_backward_optimizer_op_deps = True

W
Wu Yi 已提交
146 147 148 149 150
        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
151
        pass_builder = None
X
Xin Pan 已提交
152
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
153
            pass_builder = build_stra._finalize_strategy_and_create_passes()
154
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
155
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
156

W
Wu Yi 已提交
157
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
158 159
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
160
        else:
W
Wu Yi 已提交
161
            # case args.update_method == "nccl2_reduce_layer":
162 163
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
164

X
Xin Pan 已提交
165
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
166
            loss_name=avg_cost.name,
W
Wu Yi 已提交
167
            build_strategy=build_stra,
W
Wu Yi 已提交
168
            exec_strategy=exec_strategy)
T
typhoonzero 已提交
169

170 171 172 173 174 175 176 177 178
        if args.use_cuda and args.update_method == "nccl2":
            # it just for test share_vars_from feature.
            test_exe = fluid.ParallelExecutor(
                use_cuda=True,
                loss_name=avg_cost.name,
                build_strategy=build_stra,
                main_program=test_program,
                share_vars_from=binary._executor)

T
typhoonzero 已提交
179 180 181 182 183 184
        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
185
        reader_generator = train_reader()
T
typhoonzero 已提交
186

187 188
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
189
            if args.update_method != "local" and args.use_reader_alloc:
190 191 192 193 194 195 196
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
197

W
Wu Yi 已提交
198
        out_losses = []
199
        for _ in six.moves.xrange(RUN_STEP):
200 201
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
202
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
203 204 205 206 207
            out_losses.append(loss[0])
        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))
T
typhoonzero 已提交
208 209


210 211 212 213 214 215 216 217 218 219
class TestParallelDyGraphRunnerBase(object):
    def get_model(self):
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
            "train_one_loop should be implemented by the child classes.")

    def run_trainer(self, args):
Y
Yan Xu 已提交
220

221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
        seed = 90
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(device_id)

        def _get_data(batch):
            if args.update_method != "local":
                new_batch = []
                for offset, item in enumerate(batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return batch

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
Y
Yan Xu 已提交
238 239 240
            np.random.seed(seed)
            import random
            random.seed = seed
241 242
            model, train_reader, opt = self.get_model()
            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
Y
Yan Xu 已提交
243

244 245 246 247 248 249 250
            if args.update_method == "nccl2":
                strategy = dygraph.parallel.ParallelStrategy()
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
                dygraph.parallel.prepare_context(strategy)
Y
Yan Xu 已提交
251
                model = dygraph.parallel.DataParallel(model, strategy)
252 253 254 255 256 257
            out_losses = []
            for step_id, data in enumerate(train_reader()):
                data = _get_data(data)
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)
Y
Yan Xu 已提交
258
                out_losses.append(loss.numpy())
259

Y
Yan Xu 已提交
260 261 262
                # FIXME(Yancey1989): scale the loss inplace
                if args.update_method == "nccl2":
                    loss = model.scale_loss(loss)
263 264

                loss.backward()
Y
Yan Xu 已提交
265 266
                if args.update_method == "nccl2":
                    model.apply_collective_grads()
267 268 269 270 271 272 273 274 275

                opt.minimize(loss)
                model.clear_gradients()
            if six.PY2:
                print(pickle.dumps(out_losses))
            else:
                sys.stdout.buffer.write(pickle.dumps(out_losses))


T
typhoonzero 已提交
276
def runtime_main(test_class):
W
Wu Yi 已提交
277 278 279 280
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
281 282 283 284
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
285
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
286 287
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
288
    parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
289 290
    parser.add_argument(
        '--enable_backward_deps', type=bool, required=False, default=1)
W
Wu Yi 已提交
291 292 293 294
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
295
    parser.add_argument('--use_cuda', action='store_true')
296
    parser.add_argument('--use_dgc', action='store_true')
W
Wu Yi 已提交
297
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
298
    parser.add_argument('--dc_asgd', action='store_true')
299
    parser.add_argument(
W
Wu Yi 已提交
300
        '--use_reader_alloc', action='store_true', required=False)
301
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
302
    parser.add_argument('--lr', required=False, type=float, default=0.001)
303 304
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
305 306 307 308 309
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
W
Wu Yi 已提交
310 311

    args = parser.parse_args()
T
typhoonzero 已提交
312 313

    model = test_class()
W
Wu Yi 已提交
314
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
315
        model.run_pserver(args)
T
typhoonzero 已提交
316
    else:
317
        model.run_trainer(args)
X
Xin Pan 已提交
318

M
minqiyang 已提交
319

M
minqiyang 已提交
320
import paddle.compat as cpt
Y
Yancey1989 已提交
321 322
import socket
from contextlib import closing
M
minqiyang 已提交
323

X
Xin Pan 已提交
324 325

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
326 327 328
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

329 330 331
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
332
            self._use_dgc = False
333 334 335 336 337 338 339
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
340 341 342 343
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
344

X
Xin Pan 已提交
345 346 347
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
348
        self._port_set = set()
Y
Yancey1989 已提交
349 350
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
351
        self._python_interp = sys.executable
W
Wu Yi 已提交
352
        self._sync_mode = True
353
        self._enforce_place = None
W
Wu Yi 已提交
354
        self._mem_opt = False
W
Wu Yi 已提交
355
        self._use_reduce = False
W
Wu Yi 已提交
356
        self._dc_asgd = False  # must use with async mode
357
        self._use_reader_alloc = True
W
Wu Yi 已提交
358
        self._nccl2_mode = False
359
        self._mp_mode = False
W
Wu Yi 已提交
360 361 362 363 364
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
365
        self._lr = 0.001
366
        self._use_dgc = False
367
        self._dygraph = False
368
        self._nccl_comm_num = 1
W
Wu Yi 已提交
369
        self._setup_config()
370
        self._after_setup_config()
371
        self._enable_backward_deps = False
X
Xin Pan 已提交
372

Y
Yancey1989 已提交
373
    def _find_free_port(self):
Y
Yancey1989 已提交
374 375 376 377 378 379 380 381 382 383 384
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
385

386
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
387
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
388
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
W
Wu Yi 已提交
389
        ps0_cmd = ps_cmd % \
390 391
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
392
        ps1_cmd = ps_cmd % \
393 394
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
395 396 397 398 399 400 401

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
402

403 404
        print(ps0_cmd)
        print(ps1_cmd)
M
minqiyang 已提交
405 406
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
407

X
Xin Pan 已提交
408
        ps0_proc = subprocess.Popen(
409 410 411 412
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
X
Xin Pan 已提交
413
        ps1_proc = subprocess.Popen(
414 415 416 417
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
418

419
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
420

421 422 423 424 425 426
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
427

W
Wu Yi 已提交
428 429
        cmd = "%s %s --role trainer --lr %f" % (self._python_interp, model,
                                                self._lr)
430 431 432 433
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
434 435
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
436

437
        if self.__use_cuda:
438
            cmd += " --use_cuda"
W
Wu Yi 已提交
439 440 441 442 443
            env_local = {
                "CUDA_VISIBLE_DEVICES": "0",
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
444 445 446
        else:
            env_local = {'CPU_NUM': '1'}

W
Wu Yi 已提交
447 448
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
449

450
        if check_error_log:
451
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
452
            local_proc = subprocess.Popen(
453
                cmd.split(" "),
G
gongweibao 已提交
454
                stdout=subprocess.PIPE,
455
                stderr=err_log,
W
Wu Yi 已提交
456
                env=env_local)
G
gongweibao 已提交
457 458
        else:
            local_proc = subprocess.Popen(
459
                cmd.split(" "),
G
gongweibao 已提交
460
                stdout=subprocess.PIPE,
461
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
462
                env=env_local)
G
gongweibao 已提交
463

464 465 466 467 468 469
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
470
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
471

W
Wu Yi 已提交
472
        return pickle.loads(local_out)
473 474

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
475
        # Run dist train to compare with local results
476 477
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
W
Wu Yi 已提交
478

X
Xin Pan 已提交
479
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
480

W
Wu Yi 已提交
481
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"
W
Wu Yi 已提交
482
        tr0_cmd = tr_cmd % \
483
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
484
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
485
        tr1_cmd = tr_cmd % \
486
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
487
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
488 489 490 491 492 493 494 495 496 497

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
498 499 500
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
501
        if self.__use_cuda:
502 503 504 505 506 507 508 509 510 511
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
512

W
Wu Yi 已提交
513 514
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
515 516
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
517

X
Xin Pan 已提交
518
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
519
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
520
            stdout=subprocess.PIPE,
G
gongweibao 已提交
521
            stderr=tr0_pipe,
X
Xin Pan 已提交
522 523
            env=env0)
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
524
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
525
            stdout=subprocess.PIPE,
G
gongweibao 已提交
526
            stderr=tr1_pipe,
X
Xin Pan 已提交
527 528
            env=env1)

529 530 531 532 533 534 535 536 537 538 539 540
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

541 542
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
543

G
gongweibao 已提交
544
        # close trainer file
545 546 547 548
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
549

W
Wu Yi 已提交
550 551
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
552

553 554 555 556 557 558
        # print server log
        with open("/tmp/ps0_err.log", "r") as fn:
            sys.stderr.write("ps0 stderr: %s\n" % fn.read())
        with open("/tmp/ps1_err.log", "r") as fn:
            sys.stderr.write("ps1 stderr: %s\n" % fn.read())

559
        # print log
560 561 562 563
        with open("/tmp/tr0_err.log", "r") as fn:
            sys.stderr.write('trainer 0 stderr: %s\n' % fn.read())
        with open("/tmp/tr1_err.log", "r") as fn:
            sys.stderr.write('trainer 1 stderr: %s\n' % fn.read())
564

W
Wu Yi 已提交
565 566
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

W
Wu Yi 已提交
567 568
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
                           check_error_log):
W
Wu Yi 已提交
569 570 571
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
W
Wu Yi 已提交
572 573 574 575
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
576

W
Wu Yi 已提交
577
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
W
Wu Yi 已提交
578 579
        tr0_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
580
                   0, w0_ep, update_method, self._lr)
W
Wu Yi 已提交
581 582
        tr1_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
583
                   1, w1_ep, update_method, self._lr)
W
Wu Yi 已提交
584 585 586 587 588 589 590 591 592 593 594 595 596

        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
        if self.__use_cuda:
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
W
Wu Yi 已提交
597 598 599 600 601 602 603 604 605 606 607
            env0 = {
                "CUDA_VISIBLE_DEVICES": "0",
                # for test nccl2 layer
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "0"
            }
            env1 = {
                "CUDA_VISIBLE_DEVICES": "1",
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "1"
            }
W
Wu Yi 已提交
608 609 610 611
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

612 613 614
        if self._use_dgc:
            tr0_cmd += " --use_dgc"
            tr1_cmd += " --use_dgc"
615 616 617 618 619

        if self._nccl_comm_num > 1:
            tr0_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
            tr1_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)

620 621 622 623
        if self._mp_mode:
            env0 = {"FLAGS_selected_gpus": "0"}
            env1 = {"FLAGS_selected_gpus": "1"}

624 625 626 627
        if self._enable_backward_deps:
            tr0_cmd += " --enable_backward_deps 1"
            tr1_cmd += " --enable_backward_deps 1"

W
Wu Yi 已提交
628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
        env0.update(envs)
        env1.update(envs)

        print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")

        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0)
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1)

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()

        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()

        # print log
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)

W
Wu Yi 已提交
658
        return pickle.loads(tr0_out), pickle.loads(tr1_out)
659 660 661 662 663 664 665 666 667 668 669 670

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
671
            "FLAGS_rpc_deadline": "5000",  # 5sec to fail fast
672
            "FLAGS_cudnn_deterministic": "1",
W
Wu Yi 已提交
673 674
            "http_proxy": "",
            "NCCL_P2P_DISABLE": "1"
675 676 677 678 679
        }

        required_envs.update(need_envs)

        if check_error_log:
W
Wu Yi 已提交
680
            required_envs["GLOG_v"] = "3"
681 682 683 684
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
W
Wu Yi 已提交
685
                                check_error_log)
W
Wu Yi 已提交
686
        if self._nccl2_mode:
W
Wu Yi 已提交
687 688 689 690 691 692
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, True, check_error_log)
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, False, check_error_log)
W
Wu Yi 已提交
693 694 695
        else:
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log)
696 697

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
698 699 700
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
Y
Yan Xu 已提交
701
            dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
W
Wu Yi 已提交
702 703
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)