test_dist_base.py 20.9 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25 26
import pickle
import numpy as np
T
typhoonzero 已提交
27

28
import paddle.fluid as fluid
29
from paddle.fluid import compiler
30 31

RUN_STEP = 10
32
DEFAULT_BATCH_SIZE = 2
33

T
typhoonzero 已提交
34 35

class TestDistRunnerBase(object):
W
Wu Yi 已提交
36 37 38 39
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
                  single_device=False):
T
typhoonzero 已提交
40 41 42
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

43
    @staticmethod
W
Wu Yi 已提交
44 45 46 47 48
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
49 50
                       dc_asgd=False,
                       current_endpoint=None):
T
typhoonzero 已提交
51
        # NOTE: import fluid until runtime, or else forking processes will cause error.
52
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
53
        config.enable_dc_asgd = dc_asgd
54
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
55 56 57 58
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
59
            trainers=trainers,
60 61
            sync_mode=sync_mode,
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
62 63
        return t

W
Wu Yi 已提交
64
    def run_pserver(self, args):
W
Wu Yi 已提交
65
        self.lr = args.lr
66
        self.get_model(batch_size=args.batch_size)
67
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
68 69
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
W
Wu Yi 已提交
70
                                args.trainers, args.sync_mode, args.dc_asgd)
W
Wu Yi 已提交
71 72 73
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
74

T
typhoonzero 已提交
75 76 77 78 79
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        exe.run(pserver_prog)

80
    def run_trainer(self, args):
W
Wu Yi 已提交
81
        self.lr = args.lr
W
Wu Yi 已提交
82 83 84 85 86 87
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
88

W
Wu Yi 已提交
89
        if args.mem_opt:
90
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
W
Wu Yi 已提交
91
        if args.update_method == "pserver":
W
Wu Yi 已提交
92 93 94
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
W
Wu Yi 已提交
95
                                    args.sync_mode, args.dc_asgd)
T
typhoonzero 已提交
96
            trainer_prog = t.get_trainer_program()
W
Wu Yi 已提交
97
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
98 99 100 101 102 103 104 105 106 107 108
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
109 110 111
        else:
            trainer_prog = fluid.default_main_program()

112 113 114 115 116
        if args.use_cuda:
            place = fluid.CUDAPlace(0)
        else:
            place = fluid.CPUPlace()

117 118
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
T
typhoonzero 已提交
119

W
Wu Yi 已提交
120 121 122
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
        exec_strategy.allow_op_delay = False
123

W
Wu Yi 已提交
124
        build_stra = fluid.BuildStrategy()
125 126 127
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
128 129 130 131 132 133

        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
134
        pass_builder = None
X
Xin Pan 已提交
135
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
136
            pass_builder = build_stra._finalize_strategy_and_create_passes()
X
Xin Pan 已提交
137
            mypass = pass_builder.insert_pass(
S
sneaxiy 已提交
138
                len(pass_builder.all_passes()) - 3, "multi_batch_merge_pass")
139
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
140

W
Wu Yi 已提交
141
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
142 143
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
144
        else:
W
Wu Yi 已提交
145
            # case args.update_method == "nccl2_reduce_layer":
146 147
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
148

X
Xin Pan 已提交
149
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
150
            loss_name=avg_cost.name,
W
Wu Yi 已提交
151
            build_strategy=build_stra,
W
Wu Yi 已提交
152
            exec_strategy=exec_strategy)
T
typhoonzero 已提交
153 154 155 156 157 158 159

        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
160
        reader_generator = train_reader()
T
typhoonzero 已提交
161

162 163
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
164
            if args.update_method != "local" and args.use_reader_alloc:
165 166 167 168 169 170 171
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
172

W
Wu Yi 已提交
173
        out_losses = []
174
        for _ in six.moves.xrange(RUN_STEP):
175 176
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
177
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
178 179 180 181 182
            out_losses.append(loss[0])
        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))
T
typhoonzero 已提交
183 184 185


def runtime_main(test_class):
W
Wu Yi 已提交
186 187 188 189
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
190 191 192 193
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
194
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
195 196 197 198 199 200
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
201
    parser.add_argument('--use_cuda', action='store_true')
W
Wu Yi 已提交
202
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
203
    parser.add_argument('--dc_asgd', action='store_true')
204
    parser.add_argument(
W
Wu Yi 已提交
205
        '--use_reader_alloc', action='store_true', required=False)
206
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
207
    parser.add_argument('--lr', required=False, type=float, default=0.001)
208 209
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
210 211 212 213 214
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
W
Wu Yi 已提交
215 216

    args = parser.parse_args()
T
typhoonzero 已提交
217 218

    model = test_class()
W
Wu Yi 已提交
219
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
220
        model.run_pserver(args)
T
typhoonzero 已提交
221
    else:
222
        model.run_trainer(args)
X
Xin Pan 已提交
223

M
minqiyang 已提交
224

M
minqiyang 已提交
225
import paddle.compat as cpt
Y
Yancey1989 已提交
226 227
import socket
from contextlib import closing
M
minqiyang 已提交
228

X
Xin Pan 已提交
229 230

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
231 232 233
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

234 235 236 237 238 239 240 241 242 243 244
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False

X
Xin Pan 已提交
245 246 247
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
248
        self._port_set = set()
Y
Yancey1989 已提交
249 250
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
251
        self._python_interp = sys.executable
W
Wu Yi 已提交
252
        self._sync_mode = True
253
        self._enforce_place = None
W
Wu Yi 已提交
254
        self._mem_opt = False
W
Wu Yi 已提交
255
        self._use_reduce = False
W
Wu Yi 已提交
256
        self._dc_asgd = False  # must use with async mode
257
        self._use_reader_alloc = True
W
Wu Yi 已提交
258
        self._nccl2_mode = False
W
Wu Yi 已提交
259 260 261 262 263
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
264
        self._lr = 0.001
W
Wu Yi 已提交
265
        self._setup_config()
266
        self._after_setup_config()
X
Xin Pan 已提交
267

Y
Yancey1989 已提交
268
    def _find_free_port(self):
Y
Yancey1989 已提交
269 270 271 272 273 274 275 276 277 278 279
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
280

281
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
282
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
283
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
W
Wu Yi 已提交
284
        ps0_cmd = ps_cmd % \
285 286
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
287
        ps1_cmd = ps_cmd % \
288 289
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
290 291 292 293 294 295 296

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
297

298 299
        print(ps0_cmd)
        print(ps1_cmd)
M
minqiyang 已提交
300 301
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
302

X
Xin Pan 已提交
303
        ps0_proc = subprocess.Popen(
304 305 306 307
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
X
Xin Pan 已提交
308
        ps1_proc = subprocess.Popen(
309 310 311 312
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
313

314
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
315

316 317 318 319 320 321
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
322

W
Wu Yi 已提交
323 324
        cmd = "%s %s --role trainer --lr %f" % (self._python_interp, model,
                                                self._lr)
325 326 327 328
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
329 330
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
331

332
        if self.__use_cuda:
333
            cmd += " --use_cuda"
W
Wu Yi 已提交
334 335 336 337 338
            env_local = {
                "CUDA_VISIBLE_DEVICES": "0",
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
339 340 341
        else:
            env_local = {'CPU_NUM': '1'}

W
Wu Yi 已提交
342 343
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
344

345
        if check_error_log:
346
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
347
            local_proc = subprocess.Popen(
348
                cmd.split(" "),
G
gongweibao 已提交
349
                stdout=subprocess.PIPE,
350
                stderr=err_log,
W
Wu Yi 已提交
351
                env=env_local)
G
gongweibao 已提交
352 353
        else:
            local_proc = subprocess.Popen(
354
                cmd.split(" "),
G
gongweibao 已提交
355
                stdout=subprocess.PIPE,
356
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
357
                env=env_local)
G
gongweibao 已提交
358

359 360 361 362 363 364
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
365
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
366

W
Wu Yi 已提交
367
        return pickle.loads(local_out)
368 369

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
370
        # Run dist train to compare with local results
371 372
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
W
Wu Yi 已提交
373

X
Xin Pan 已提交
374
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
375

W
Wu Yi 已提交
376
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"
W
Wu Yi 已提交
377
        tr0_cmd = tr_cmd % \
378
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
379
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
380
        tr1_cmd = tr_cmd % \
381
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
382
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
383 384 385 386 387 388 389 390 391 392

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
393 394 395
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
396
        if self.__use_cuda:
397 398 399 400 401 402 403 404 405 406
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
407

W
Wu Yi 已提交
408 409
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
410 411
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
412

X
Xin Pan 已提交
413
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
414
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
415
            stdout=subprocess.PIPE,
G
gongweibao 已提交
416
            stderr=tr0_pipe,
X
Xin Pan 已提交
417 418
            env=env0)
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
419
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
420
            stdout=subprocess.PIPE,
G
gongweibao 已提交
421
            stderr=tr1_pipe,
X
Xin Pan 已提交
422 423
            env=env1)

424 425 426 427 428 429 430 431 432 433 434 435
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

436 437
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
438

G
gongweibao 已提交
439
        # close trainer file
440 441 442 443
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
444

W
Wu Yi 已提交
445 446
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
447

448 449 450 451 452 453
        # print server log
        with open("/tmp/ps0_err.log", "r") as fn:
            sys.stderr.write("ps0 stderr: %s\n" % fn.read())
        with open("/tmp/ps1_err.log", "r") as fn:
            sys.stderr.write("ps1 stderr: %s\n" % fn.read())

454
        # print log
455 456 457 458
        with open("/tmp/tr0_err.log", "r") as fn:
            sys.stderr.write('trainer 0 stderr: %s\n' % fn.read())
        with open("/tmp/tr1_err.log", "r") as fn:
            sys.stderr.write('trainer 1 stderr: %s\n' % fn.read())
459

W
Wu Yi 已提交
460 461
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

W
Wu Yi 已提交
462 463
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
                           check_error_log):
W
Wu Yi 已提交
464 465 466
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
W
Wu Yi 已提交
467 468 469 470
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
471

W
Wu Yi 已提交
472
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
W
Wu Yi 已提交
473 474
        tr0_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
475
                   0, w0_ep, update_method, self._lr)
W
Wu Yi 已提交
476 477
        tr1_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
478
                   1, w1_ep, update_method, self._lr)
W
Wu Yi 已提交
479 480 481 482 483 484 485 486 487 488 489 490 491

        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
        if self.__use_cuda:
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
W
Wu Yi 已提交
492 493 494 495 496 497 498 499 500 501 502
            env0 = {
                "CUDA_VISIBLE_DEVICES": "0",
                # for test nccl2 layer
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "0"
            }
            env1 = {
                "CUDA_VISIBLE_DEVICES": "1",
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "1"
            }
W
Wu Yi 已提交
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)

        print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")

        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0)
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1)

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()

        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()

        # print log
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)

W
Wu Yi 已提交
537
        return pickle.loads(tr0_out), pickle.loads(tr1_out)
538 539 540 541 542 543 544 545 546 547 548 549

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
550
            "FLAGS_rpc_deadline": "5000",  # 5sec to fail fast
551
            "FLAGS_cudnn_deterministic": "1",
W
Wu Yi 已提交
552 553
            "http_proxy": "",
            "NCCL_P2P_DISABLE": "1"
554 555 556 557 558
        }

        required_envs.update(need_envs)

        if check_error_log:
W
Wu Yi 已提交
559
            required_envs["GLOG_v"] = "3"
560 561 562 563
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
W
Wu Yi 已提交
564
                                check_error_log)
W
Wu Yi 已提交
565
        if self._nccl2_mode:
W
Wu Yi 已提交
566 567 568 569 570 571
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, True, check_error_log)
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, False, check_error_log)
W
Wu Yi 已提交
572 573 574
        else:
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log)
575 576

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
577 578 579 580 581 582
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
            dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)