test_dist_base.py 24.3 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25 26
import pickle
import numpy as np
T
typhoonzero 已提交
27

28
import paddle.fluid as fluid
29
from paddle.fluid import compiler
30 31 32
import paddle.fluid.dygraph as dygraph
from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import DataParallel
33 34

RUN_STEP = 10
35
DEFAULT_BATCH_SIZE = 2
36

T
typhoonzero 已提交
37 38

class TestDistRunnerBase(object):
W
Wu Yi 已提交
39 40 41
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
42 43
                  single_device=False,
                  use_dgc=False):
T
typhoonzero 已提交
44 45 46
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

47
    @staticmethod
W
Wu Yi 已提交
48 49 50 51 52
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
53 54
                       dc_asgd=False,
                       current_endpoint=None):
T
typhoonzero 已提交
55
        # NOTE: import fluid until runtime, or else forking processes will cause error.
56
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
57
        config.enable_dc_asgd = dc_asgd
58
        config.sync_mode = sync_mode
59
        # config.runtime_split_send_recv = True
60
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
61 62 63 64
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
65
            trainers=trainers,
66
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
67 68
        return t

W
Wu Yi 已提交
69
    def run_pserver(self, args):
W
Wu Yi 已提交
70
        self.lr = args.lr
71
        self.get_model(batch_size=args.batch_size)
72
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
73 74
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
W
Wu Yi 已提交
75
                                args.trainers, args.sync_mode, args.dc_asgd)
W
Wu Yi 已提交
76 77 78
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
79

T
typhoonzero 已提交
80 81 82 83 84
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        exe.run(pserver_prog)

85
    def run_trainer(self, args):
W
Wu Yi 已提交
86
        self.lr = args.lr
W
Wu Yi 已提交
87 88 89
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
90 91 92
        elif args.use_dgc:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
93 94 95
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
96

W
Wu Yi 已提交
97
        if args.mem_opt:
98
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
W
Wu Yi 已提交
99
        if args.update_method == "pserver":
W
Wu Yi 已提交
100 101 102
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
W
Wu Yi 已提交
103
                                    args.sync_mode, args.dc_asgd)
T
typhoonzero 已提交
104
            trainer_prog = t.get_trainer_program()
W
Wu Yi 已提交
105
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
106 107 108 109 110 111 112 113 114 115 116
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
117 118 119
        else:
            trainer_prog = fluid.default_main_program()

120
        if args.use_cuda:
121 122
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
123 124 125
        else:
            place = fluid.CPUPlace()

126 127
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
T
typhoonzero 已提交
128

W
Wu Yi 已提交
129 130 131
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
        exec_strategy.allow_op_delay = False
132

W
Wu Yi 已提交
133
        build_stra = fluid.BuildStrategy()
134 135 136
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
137 138 139 140 141 142

        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
143
        pass_builder = None
X
Xin Pan 已提交
144
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
145
            pass_builder = build_stra._finalize_strategy_and_create_passes()
146
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
147
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
148

W
Wu Yi 已提交
149
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
150 151
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
152
        else:
W
Wu Yi 已提交
153
            # case args.update_method == "nccl2_reduce_layer":
154 155
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
156

X
Xin Pan 已提交
157
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
158
            loss_name=avg_cost.name,
W
Wu Yi 已提交
159
            build_strategy=build_stra,
W
Wu Yi 已提交
160
            exec_strategy=exec_strategy)
T
typhoonzero 已提交
161 162 163 164 165 166 167

        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
168
        reader_generator = train_reader()
T
typhoonzero 已提交
169

170 171
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
172
            if args.update_method != "local" and args.use_reader_alloc:
173 174 175 176 177 178 179
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
180

W
Wu Yi 已提交
181
        out_losses = []
182
        for _ in six.moves.xrange(RUN_STEP):
183 184
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
185
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
186 187 188 189 190
            out_losses.append(loss[0])
        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))
T
typhoonzero 已提交
191 192


193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
class TestParallelDyGraphRunnerBase(object):
    def get_model(self):
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
            "train_one_loop should be implemented by the child classes.")

    def run_trainer(self, args):
        seed = 90
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(device_id)

        def _get_data(batch):
            if args.update_method != "local":
                new_batch = []
                for offset, item in enumerate(batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return batch

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
            model, train_reader, opt = self.get_model()

            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
            if args.update_method == "nccl2":
                sys.stderr.write("")
                model = dygraph.parallel.DataParallel(model)
                strategy = dygraph.parallel.ParallelStrategy()
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
                dygraph.parallel.prepare_context(strategy)
            out_losses = []
            for step_id, data in enumerate(train_reader()):
                data = _get_data(data)
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)

                # FIXME(Yancey1989): scale the loss inplace 
                loss.stop_gradient = True
                loss_scale = to_variable(np.array([nranks]).astype("float32"))
                loss = loss / loss_scale

                out_losses.append(loss.numpy())
                loss.backward()

                opt.minimize(loss)
                model.clear_gradients()
            if six.PY2:
                print(pickle.dumps(out_losses))
            else:
                sys.stdout.buffer.write(pickle.dumps(out_losses))


T
typhoonzero 已提交
255
def runtime_main(test_class):
W
Wu Yi 已提交
256 257 258 259
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
260 261 262 263
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
264
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
265 266 267 268 269 270
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
271
    parser.add_argument('--use_cuda', action='store_true')
272
    parser.add_argument('--use_dgc', action='store_true')
W
Wu Yi 已提交
273
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
274
    parser.add_argument('--dc_asgd', action='store_true')
275
    parser.add_argument(
W
Wu Yi 已提交
276
        '--use_reader_alloc', action='store_true', required=False)
277
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
278
    parser.add_argument('--lr', required=False, type=float, default=0.001)
279 280
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
281 282 283 284 285
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
W
Wu Yi 已提交
286 287

    args = parser.parse_args()
T
typhoonzero 已提交
288 289

    model = test_class()
W
Wu Yi 已提交
290
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
291
        model.run_pserver(args)
T
typhoonzero 已提交
292
    else:
293
        model.run_trainer(args)
X
Xin Pan 已提交
294

M
minqiyang 已提交
295

M
minqiyang 已提交
296
import paddle.compat as cpt
Y
Yancey1989 已提交
297 298
import socket
from contextlib import closing
M
minqiyang 已提交
299

X
Xin Pan 已提交
300 301

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
302 303 304
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

305 306 307
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
308
            self._use_dgc = False
309 310 311 312 313 314 315
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
316 317 318 319
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
320

X
Xin Pan 已提交
321 322 323
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
324
        self._port_set = set()
Y
Yancey1989 已提交
325 326
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
327
        self._python_interp = sys.executable
W
Wu Yi 已提交
328
        self._sync_mode = True
329
        self._enforce_place = None
W
Wu Yi 已提交
330
        self._mem_opt = False
W
Wu Yi 已提交
331
        self._use_reduce = False
W
Wu Yi 已提交
332
        self._dc_asgd = False  # must use with async mode
333
        self._use_reader_alloc = True
W
Wu Yi 已提交
334
        self._nccl2_mode = False
335
        self._mp_mode = False
W
Wu Yi 已提交
336 337 338 339 340
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
341
        self._lr = 0.001
342
        self._use_dgc = False
343
        self._dygraph = False
W
Wu Yi 已提交
344
        self._setup_config()
345
        self._after_setup_config()
X
Xin Pan 已提交
346

Y
Yancey1989 已提交
347
    def _find_free_port(self):
Y
Yancey1989 已提交
348 349 350 351 352 353 354 355 356 357 358
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
359

360
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
361
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
362
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
W
Wu Yi 已提交
363
        ps0_cmd = ps_cmd % \
364 365
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
366
        ps1_cmd = ps_cmd % \
367 368
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
369 370 371 372 373 374 375

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
376

377 378
        print(ps0_cmd)
        print(ps1_cmd)
M
minqiyang 已提交
379 380
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
381

X
Xin Pan 已提交
382
        ps0_proc = subprocess.Popen(
383 384 385 386
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
X
Xin Pan 已提交
387
        ps1_proc = subprocess.Popen(
388 389 390 391
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
392

393
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
394

395 396 397 398 399 400
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
401

W
Wu Yi 已提交
402 403
        cmd = "%s %s --role trainer --lr %f" % (self._python_interp, model,
                                                self._lr)
404 405 406 407
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
408 409
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
410

411
        if self.__use_cuda:
412
            cmd += " --use_cuda"
W
Wu Yi 已提交
413 414 415 416 417
            env_local = {
                "CUDA_VISIBLE_DEVICES": "0",
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
418 419 420
        else:
            env_local = {'CPU_NUM': '1'}

W
Wu Yi 已提交
421 422
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
423

424
        if check_error_log:
425
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
426
            local_proc = subprocess.Popen(
427
                cmd.split(" "),
G
gongweibao 已提交
428
                stdout=subprocess.PIPE,
429
                stderr=err_log,
W
Wu Yi 已提交
430
                env=env_local)
G
gongweibao 已提交
431 432
        else:
            local_proc = subprocess.Popen(
433
                cmd.split(" "),
G
gongweibao 已提交
434
                stdout=subprocess.PIPE,
435
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
436
                env=env_local)
G
gongweibao 已提交
437

438 439 440 441 442 443
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
444
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
445

W
Wu Yi 已提交
446
        return pickle.loads(local_out)
447 448

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
449
        # Run dist train to compare with local results
450 451
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
W
Wu Yi 已提交
452

X
Xin Pan 已提交
453
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
454

W
Wu Yi 已提交
455
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"
W
Wu Yi 已提交
456
        tr0_cmd = tr_cmd % \
457
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
458
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
459
        tr1_cmd = tr_cmd % \
460
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
461
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
462 463 464 465 466 467 468 469 470 471

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
472 473 474
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
475
        if self.__use_cuda:
476 477 478 479 480 481 482 483 484 485
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
486

W
Wu Yi 已提交
487 488
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
489 490
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
491

X
Xin Pan 已提交
492
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
493
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
494
            stdout=subprocess.PIPE,
G
gongweibao 已提交
495
            stderr=tr0_pipe,
X
Xin Pan 已提交
496 497
            env=env0)
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
498
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
499
            stdout=subprocess.PIPE,
G
gongweibao 已提交
500
            stderr=tr1_pipe,
X
Xin Pan 已提交
501 502
            env=env1)

503 504 505 506 507 508 509 510 511 512 513 514
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

515 516
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
517

G
gongweibao 已提交
518
        # close trainer file
519 520 521 522
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
523

W
Wu Yi 已提交
524 525
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
526

527 528 529 530 531 532
        # print server log
        with open("/tmp/ps0_err.log", "r") as fn:
            sys.stderr.write("ps0 stderr: %s\n" % fn.read())
        with open("/tmp/ps1_err.log", "r") as fn:
            sys.stderr.write("ps1 stderr: %s\n" % fn.read())

533
        # print log
534 535 536 537
        with open("/tmp/tr0_err.log", "r") as fn:
            sys.stderr.write('trainer 0 stderr: %s\n' % fn.read())
        with open("/tmp/tr1_err.log", "r") as fn:
            sys.stderr.write('trainer 1 stderr: %s\n' % fn.read())
538

W
Wu Yi 已提交
539 540
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

W
Wu Yi 已提交
541 542
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
                           check_error_log):
W
Wu Yi 已提交
543 544 545
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
W
Wu Yi 已提交
546 547 548 549
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
550

W
Wu Yi 已提交
551
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
W
Wu Yi 已提交
552 553
        tr0_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
554
                   0, w0_ep, update_method, self._lr)
W
Wu Yi 已提交
555 556
        tr1_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
557
                   1, w1_ep, update_method, self._lr)
W
Wu Yi 已提交
558 559 560 561 562 563 564 565 566 567 568 569 570

        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
        if self.__use_cuda:
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
W
Wu Yi 已提交
571 572 573 574 575 576 577 578 579 580 581
            env0 = {
                "CUDA_VISIBLE_DEVICES": "0",
                # for test nccl2 layer
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "0"
            }
            env1 = {
                "CUDA_VISIBLE_DEVICES": "1",
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "1"
            }
W
Wu Yi 已提交
582 583 584 585
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

586 587 588
        if self._use_dgc:
            tr0_cmd += " --use_dgc"
            tr1_cmd += " --use_dgc"
589 590 591 592
        if self._mp_mode:
            env0 = {"FLAGS_selected_gpus": "0"}
            env1 = {"FLAGS_selected_gpus": "1"}

W
Wu Yi 已提交
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
        env0.update(envs)
        env1.update(envs)

        print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")

        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0)
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1)

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()

        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()

        # print log
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)

W
Wu Yi 已提交
623
        return pickle.loads(tr0_out), pickle.loads(tr1_out)
624 625 626 627 628 629 630 631 632 633 634 635

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
636
            "FLAGS_rpc_deadline": "5000",  # 5sec to fail fast
637
            "FLAGS_cudnn_deterministic": "1",
W
Wu Yi 已提交
638 639
            "http_proxy": "",
            "NCCL_P2P_DISABLE": "1"
640 641 642 643 644
        }

        required_envs.update(need_envs)

        if check_error_log:
W
Wu Yi 已提交
645
            required_envs["GLOG_v"] = "3"
646 647 648 649
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
W
Wu Yi 已提交
650
                                check_error_log)
W
Wu Yi 已提交
651
        if self._nccl2_mode:
W
Wu Yi 已提交
652 653 654 655 656 657
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, True, check_error_log)
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, False, check_error_log)
W
Wu Yi 已提交
658 659 660
        else:
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log)
661 662

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
663 664 665
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
666 667 668 669
            dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss]))
            if not self._dygraph:
                # Parallel DyGraph already scaled the loss in training
                dist_loss = dist_loss / 2
W
Wu Yi 已提交
670 671
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)