test_dist_base.py 21.6 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25 26
import pickle
import numpy as np
T
typhoonzero 已提交
27

28
import paddle.fluid as fluid
29
from paddle.fluid import compiler
30 31

RUN_STEP = 10
32
DEFAULT_BATCH_SIZE = 2
33

T
typhoonzero 已提交
34 35

class TestDistRunnerBase(object):
W
Wu Yi 已提交
36 37 38
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
39 40
                  single_device=False,
                  use_dgc=False):
T
typhoonzero 已提交
41 42 43
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

44
    @staticmethod
W
Wu Yi 已提交
45 46 47 48 49
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
50 51
                       dc_asgd=False,
                       current_endpoint=None):
T
typhoonzero 已提交
52
        # NOTE: import fluid until runtime, or else forking processes will cause error.
53
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
54
        config.enable_dc_asgd = dc_asgd
55
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
56 57 58 59
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
60
            trainers=trainers,
61 62
            sync_mode=sync_mode,
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
63 64
        return t

W
Wu Yi 已提交
65
    def run_pserver(self, args):
W
Wu Yi 已提交
66
        self.lr = args.lr
67
        self.get_model(batch_size=args.batch_size)
68
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
69 70
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
W
Wu Yi 已提交
71
                                args.trainers, args.sync_mode, args.dc_asgd)
W
Wu Yi 已提交
72 73 74
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
75

T
typhoonzero 已提交
76 77 78 79 80
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        exe.run(pserver_prog)

81
    def run_trainer(self, args):
W
Wu Yi 已提交
82
        self.lr = args.lr
W
Wu Yi 已提交
83 84 85
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
86 87 88
        elif args.use_dgc:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
89 90 91
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
92

W
Wu Yi 已提交
93
        if args.mem_opt:
94
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
W
Wu Yi 已提交
95
        if args.update_method == "pserver":
W
Wu Yi 已提交
96 97 98
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
W
Wu Yi 已提交
99
                                    args.sync_mode, args.dc_asgd)
T
typhoonzero 已提交
100
            trainer_prog = t.get_trainer_program()
W
Wu Yi 已提交
101
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
102 103 104 105 106 107 108 109 110 111 112
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
113 114 115
        else:
            trainer_prog = fluid.default_main_program()

116
        if args.use_cuda:
117 118
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
119 120 121
        else:
            place = fluid.CPUPlace()

122 123
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
T
typhoonzero 已提交
124

W
Wu Yi 已提交
125 126 127
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
        exec_strategy.allow_op_delay = False
128

W
Wu Yi 已提交
129
        build_stra = fluid.BuildStrategy()
130 131 132
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
133 134 135 136 137 138

        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
139
        pass_builder = None
X
Xin Pan 已提交
140
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
141
            pass_builder = build_stra._finalize_strategy_and_create_passes()
142
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
143
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
144

W
Wu Yi 已提交
145
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
146 147
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
148
        else:
W
Wu Yi 已提交
149
            # case args.update_method == "nccl2_reduce_layer":
150 151
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
152

X
Xin Pan 已提交
153
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
154
            loss_name=avg_cost.name,
W
Wu Yi 已提交
155
            build_strategy=build_stra,
W
Wu Yi 已提交
156
            exec_strategy=exec_strategy)
T
typhoonzero 已提交
157 158 159 160 161 162 163

        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
164
        reader_generator = train_reader()
T
typhoonzero 已提交
165

166 167
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
168
            if args.update_method != "local" and args.use_reader_alloc:
169 170 171 172 173 174 175
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
176

W
Wu Yi 已提交
177
        out_losses = []
178
        for _ in six.moves.xrange(RUN_STEP):
179 180
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
181
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
182 183 184 185 186
            out_losses.append(loss[0])
        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))
T
typhoonzero 已提交
187 188 189


def runtime_main(test_class):
W
Wu Yi 已提交
190 191 192 193
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
194 195 196 197
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
198
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
199 200 201 202 203 204
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
205
    parser.add_argument('--use_cuda', action='store_true')
206
    parser.add_argument('--use_dgc', action='store_true')
W
Wu Yi 已提交
207
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
208
    parser.add_argument('--dc_asgd', action='store_true')
209
    parser.add_argument(
W
Wu Yi 已提交
210
        '--use_reader_alloc', action='store_true', required=False)
211
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
212
    parser.add_argument('--lr', required=False, type=float, default=0.001)
213 214
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
215 216 217 218 219
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
W
Wu Yi 已提交
220 221

    args = parser.parse_args()
T
typhoonzero 已提交
222 223

    model = test_class()
W
Wu Yi 已提交
224
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
225
        model.run_pserver(args)
T
typhoonzero 已提交
226
    else:
227
        model.run_trainer(args)
X
Xin Pan 已提交
228

M
minqiyang 已提交
229

M
minqiyang 已提交
230
import paddle.compat as cpt
Y
Yancey1989 已提交
231 232
import socket
from contextlib import closing
M
minqiyang 已提交
233

X
Xin Pan 已提交
234 235

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
236 237 238
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

239 240 241
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
242
            self._use_dgc = False
243 244 245 246 247 248 249
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
250 251 252 253
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
254

X
Xin Pan 已提交
255 256 257
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
258
        self._port_set = set()
Y
Yancey1989 已提交
259 260
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
261
        self._python_interp = sys.executable
W
Wu Yi 已提交
262
        self._sync_mode = True
263
        self._enforce_place = None
W
Wu Yi 已提交
264
        self._mem_opt = False
W
Wu Yi 已提交
265
        self._use_reduce = False
W
Wu Yi 已提交
266
        self._dc_asgd = False  # must use with async mode
267
        self._use_reader_alloc = True
W
Wu Yi 已提交
268
        self._nccl2_mode = False
269
        self._mp_mode = False
W
Wu Yi 已提交
270 271 272 273 274
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
275
        self._lr = 0.001
276
        self._use_dgc = False
W
Wu Yi 已提交
277
        self._setup_config()
278
        self._after_setup_config()
X
Xin Pan 已提交
279

Y
Yancey1989 已提交
280
    def _find_free_port(self):
Y
Yancey1989 已提交
281 282 283 284 285 286 287 288 289 290 291
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
292

293
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
294
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
295
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
W
Wu Yi 已提交
296
        ps0_cmd = ps_cmd % \
297 298
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
299
        ps1_cmd = ps_cmd % \
300 301
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
302 303 304 305 306 307 308

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
309

310 311
        print(ps0_cmd)
        print(ps1_cmd)
M
minqiyang 已提交
312 313
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
314

X
Xin Pan 已提交
315
        ps0_proc = subprocess.Popen(
316 317 318 319
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
X
Xin Pan 已提交
320
        ps1_proc = subprocess.Popen(
321 322 323 324
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
325

326
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
327

328 329 330 331 332 333
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
334

W
Wu Yi 已提交
335 336
        cmd = "%s %s --role trainer --lr %f" % (self._python_interp, model,
                                                self._lr)
337 338 339 340
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
341 342
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
343

344
        if self.__use_cuda:
345
            cmd += " --use_cuda"
W
Wu Yi 已提交
346 347 348 349 350
            env_local = {
                "CUDA_VISIBLE_DEVICES": "0",
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
351 352 353
        else:
            env_local = {'CPU_NUM': '1'}

W
Wu Yi 已提交
354 355
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
356

357
        if check_error_log:
358
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
359
            local_proc = subprocess.Popen(
360
                cmd.split(" "),
G
gongweibao 已提交
361
                stdout=subprocess.PIPE,
362
                stderr=err_log,
W
Wu Yi 已提交
363
                env=env_local)
G
gongweibao 已提交
364 365
        else:
            local_proc = subprocess.Popen(
366
                cmd.split(" "),
G
gongweibao 已提交
367
                stdout=subprocess.PIPE,
368
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
369
                env=env_local)
G
gongweibao 已提交
370

371 372 373 374 375 376
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
377
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
378

W
Wu Yi 已提交
379
        return pickle.loads(local_out)
380 381

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
382
        # Run dist train to compare with local results
383 384
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
W
Wu Yi 已提交
385

X
Xin Pan 已提交
386
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
387

W
Wu Yi 已提交
388
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"
W
Wu Yi 已提交
389
        tr0_cmd = tr_cmd % \
390
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
391
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
392
        tr1_cmd = tr_cmd % \
393
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
394
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
395 396 397 398 399 400 401 402 403 404

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
405 406 407
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
408
        if self.__use_cuda:
409 410 411 412 413 414 415 416 417 418
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
419

W
Wu Yi 已提交
420 421
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
422 423
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
424

X
Xin Pan 已提交
425
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
426
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
427
            stdout=subprocess.PIPE,
G
gongweibao 已提交
428
            stderr=tr0_pipe,
X
Xin Pan 已提交
429 430
            env=env0)
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
431
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
432
            stdout=subprocess.PIPE,
G
gongweibao 已提交
433
            stderr=tr1_pipe,
X
Xin Pan 已提交
434 435
            env=env1)

436 437 438 439 440 441 442 443 444 445 446 447
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

448 449
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
450

G
gongweibao 已提交
451
        # close trainer file
452 453 454 455
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
456

W
Wu Yi 已提交
457 458
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
459

460 461 462 463 464 465
        # print server log
        with open("/tmp/ps0_err.log", "r") as fn:
            sys.stderr.write("ps0 stderr: %s\n" % fn.read())
        with open("/tmp/ps1_err.log", "r") as fn:
            sys.stderr.write("ps1 stderr: %s\n" % fn.read())

466
        # print log
467 468 469 470
        with open("/tmp/tr0_err.log", "r") as fn:
            sys.stderr.write('trainer 0 stderr: %s\n' % fn.read())
        with open("/tmp/tr1_err.log", "r") as fn:
            sys.stderr.write('trainer 1 stderr: %s\n' % fn.read())
471

W
Wu Yi 已提交
472 473
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

W
Wu Yi 已提交
474 475
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
                           check_error_log):
W
Wu Yi 已提交
476 477 478
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
W
Wu Yi 已提交
479 480 481 482
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
483

W
Wu Yi 已提交
484
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
W
Wu Yi 已提交
485 486
        tr0_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
487
                   0, w0_ep, update_method, self._lr)
W
Wu Yi 已提交
488 489
        tr1_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
490
                   1, w1_ep, update_method, self._lr)
W
Wu Yi 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503

        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
        if self.__use_cuda:
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
W
Wu Yi 已提交
504 505 506 507 508 509 510 511 512 513 514
            env0 = {
                "CUDA_VISIBLE_DEVICES": "0",
                # for test nccl2 layer
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "0"
            }
            env1 = {
                "CUDA_VISIBLE_DEVICES": "1",
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "1"
            }
W
Wu Yi 已提交
515 516 517 518
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

519 520 521
        if self._use_dgc:
            tr0_cmd += " --use_dgc"
            tr1_cmd += " --use_dgc"
522 523 524 525
        if self._mp_mode:
            env0 = {"FLAGS_selected_gpus": "0"}
            env1 = {"FLAGS_selected_gpus": "1"}

W
Wu Yi 已提交
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
        env0.update(envs)
        env1.update(envs)

        print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")

        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0)
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1)

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()

        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()

        # print log
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)

W
Wu Yi 已提交
556
        return pickle.loads(tr0_out), pickle.loads(tr1_out)
557 558 559 560 561 562 563 564 565 566 567 568

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
569
            "FLAGS_rpc_deadline": "5000",  # 5sec to fail fast
570
            "FLAGS_cudnn_deterministic": "1",
W
Wu Yi 已提交
571 572
            "http_proxy": "",
            "NCCL_P2P_DISABLE": "1"
573 574 575 576 577
        }

        required_envs.update(need_envs)

        if check_error_log:
W
Wu Yi 已提交
578
            required_envs["GLOG_v"] = "3"
579 580 581 582
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
W
Wu Yi 已提交
583
                                check_error_log)
W
Wu Yi 已提交
584
        if self._nccl2_mode:
W
Wu Yi 已提交
585 586 587 588 589 590
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, True, check_error_log)
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, False, check_error_log)
W
Wu Yi 已提交
591 592 593
        else:
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log)
594 595

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
596 597 598 599 600 601
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
            dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)