test_dist_base.py 26.9 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25 26
import pickle
import numpy as np
T
typhoonzero 已提交
27

28
import paddle.fluid as fluid
29
from paddle.fluid import compiler
30 31 32
import paddle.fluid.dygraph as dygraph
from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import DataParallel
33

Y
Yan Xu 已提交
34
RUN_STEP = 5
35
DEFAULT_BATCH_SIZE = 2
36

T
typhoonzero 已提交
37

G
guru4elephant 已提交
38 39 40 41 42 43 44
def my_print(log_str):
    if six.PY2:
        sys.stderr.write(pickle.dumps(log_str))
    else:
        sys.stderr.buffer.write(pickle.dumps(log_str))


T
typhoonzero 已提交
45
class TestDistRunnerBase(object):
W
Wu Yi 已提交
46 47 48
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
49 50
                  single_device=False,
                  use_dgc=False):
T
typhoonzero 已提交
51 52 53
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

54
    @staticmethod
W
Wu Yi 已提交
55 56 57 58 59
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
60
                       dc_asgd=False,
61 62
                       current_endpoint=None,
                       nccl_comm_num=1):
T
typhoonzero 已提交
63
        # NOTE: import fluid until runtime, or else forking processes will cause error.
64
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
65
        config.enable_dc_asgd = dc_asgd
66
        config.sync_mode = sync_mode
67 68
        if nccl_comm_num > 1:
            config.nccl_comm_num = nccl_comm_num
69
        # config.runtime_split_send_recv = True
70
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
71 72 73 74
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
75
            trainers=trainers,
76
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
77 78
        return t

W
Wu Yi 已提交
79
    def run_pserver(self, args):
W
Wu Yi 已提交
80
        self.lr = args.lr
81
        self.get_model(batch_size=args.batch_size)
82
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
83 84
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
W
Wu Yi 已提交
85
                                args.trainers, args.sync_mode, args.dc_asgd)
W
Wu Yi 已提交
86 87 88
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
89

T
typhoonzero 已提交
90 91 92
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
G
guru4elephant 已提交
93
        my_print("run pserver startup program done.")
T
typhoonzero 已提交
94
        exe.run(pserver_prog)
G
guru4elephant 已提交
95
        my_print("run pserver main program done.")
T
typhoonzero 已提交
96

97
    def run_trainer(self, args):
W
Wu Yi 已提交
98
        self.lr = args.lr
W
Wu Yi 已提交
99 100 101
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
102 103 104
        elif args.use_dgc:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
105 106 107
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
108

W
Wu Yi 已提交
109
        if args.mem_opt:
G
guru4elephant 已提交
110
            my_print("begin to run memory optimize")
111
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
G
guru4elephant 已提交
112
            my_print("trainer run memory optimize done.")
W
Wu Yi 已提交
113
        if args.update_method == "pserver":
G
guru4elephant 已提交
114
            my_print("begin to run transpile on trainer with pserver mode")
W
Wu Yi 已提交
115 116 117
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
W
Wu Yi 已提交
118
                                    args.sync_mode, args.dc_asgd)
T
typhoonzero 已提交
119
            trainer_prog = t.get_trainer_program()
G
guru4elephant 已提交
120
            my_print("get trainer program done with pserver mode.")
W
Wu Yi 已提交
121
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
122 123 124
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
125
            config.nccl_comm_num = args.nccl_comm_num
G
guru4elephant 已提交
126
            my_print("begin to run transpile on trainer with nccl2 mode")
W
Wu Yi 已提交
127 128 129 130 131 132 133
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
G
guru4elephant 已提交
134
            my_print("get trainer program done. with nccl2 mode")
W
Wu Yi 已提交
135
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
136 137 138
        else:
            trainer_prog = fluid.default_main_program()

139
        if args.use_cuda:
140 141
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
142 143 144
        else:
            place = fluid.CPUPlace()

145 146
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
G
guru4elephant 已提交
147
        my_print("run worker startup program done.")
T
typhoonzero 已提交
148

W
Wu Yi 已提交
149 150 151
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
        exec_strategy.allow_op_delay = False
152

W
Wu Yi 已提交
153
        build_stra = fluid.BuildStrategy()
154 155 156
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
157

158 159 160
        if args.enable_backward_deps:
            build_stra.enable_backward_optimizer_op_deps = True

W
Wu Yi 已提交
161 162 163 164 165
        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
166
        pass_builder = None
X
Xin Pan 已提交
167
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
168
            pass_builder = build_stra._finalize_strategy_and_create_passes()
169
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
170
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
171

W
Wu Yi 已提交
172
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
173 174
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
175
        else:
W
Wu Yi 已提交
176
            # case args.update_method == "nccl2_reduce_layer":
177 178
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
179

G
guru4elephant 已提交
180
        my_print("begin to compile with data parallel")
X
Xin Pan 已提交
181
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
182
            loss_name=avg_cost.name,
W
Wu Yi 已提交
183
            build_strategy=build_stra,
W
Wu Yi 已提交
184
            exec_strategy=exec_strategy)
G
guru4elephant 已提交
185
        my_print("program compiled with data parallel")
T
typhoonzero 已提交
186

187 188 189 190 191 192 193 194 195
        if args.use_cuda and args.update_method == "nccl2":
            # it just for test share_vars_from feature.
            test_exe = fluid.ParallelExecutor(
                use_cuda=True,
                loss_name=avg_cost.name,
                build_strategy=build_stra,
                main_program=test_program,
                share_vars_from=binary._executor)

T
typhoonzero 已提交
196 197 198 199 200 201
        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
202
        reader_generator = train_reader()
T
typhoonzero 已提交
203

204 205
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
206
            if args.update_method != "local" and args.use_reader_alloc:
207 208 209 210 211 212 213
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
214

G
guru4elephant 已提交
215
        my_print("begin to train on trainer")
W
Wu Yi 已提交
216
        out_losses = []
217
        for _ in six.moves.xrange(RUN_STEP):
218 219
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
220
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
221 222 223 224 225
            out_losses.append(loss[0])
        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))
T
typhoonzero 已提交
226 227


228 229 230 231 232 233 234 235 236 237
class TestParallelDyGraphRunnerBase(object):
    def get_model(self):
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
            "train_one_loop should be implemented by the child classes.")

    def run_trainer(self, args):
Y
Yan Xu 已提交
238

239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
        seed = 90
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(device_id)

        def _get_data(batch):
            if args.update_method != "local":
                new_batch = []
                for offset, item in enumerate(batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return batch

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
Y
Yan Xu 已提交
256 257 258
            np.random.seed(seed)
            import random
            random.seed = seed
259 260
            model, train_reader, opt = self.get_model()
            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
Y
Yan Xu 已提交
261

262 263 264 265 266 267
            if args.update_method == "nccl2":
                strategy = dygraph.parallel.ParallelStrategy()
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
G
guru4elephant 已提交
268
                my_print("begin to prepare context in dygraph with nccl2")
269
                dygraph.parallel.prepare_context(strategy)
Y
Yan Xu 已提交
270
                model = dygraph.parallel.DataParallel(model, strategy)
G
guru4elephant 已提交
271
                my_print("model built in dygraph")
272
            out_losses = []
G
guru4elephant 已提交
273
            my_print("begin to run dygraph training")
274 275 276 277 278
            for step_id, data in enumerate(train_reader()):
                data = _get_data(data)
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)
G
guru4elephant 已提交
279 280
                if step_id % 10 == 0:
                    my_print("loss at step %d: %f" % (step_id, loss))
Y
Yan Xu 已提交
281
                out_losses.append(loss.numpy())
282

Y
Yan Xu 已提交
283 284 285
                # FIXME(Yancey1989): scale the loss inplace
                if args.update_method == "nccl2":
                    loss = model.scale_loss(loss)
286 287

                loss.backward()
Y
Yan Xu 已提交
288 289
                if args.update_method == "nccl2":
                    model.apply_collective_grads()
290 291 292

                opt.minimize(loss)
                model.clear_gradients()
G
guru4elephant 已提交
293
            my_print(pickle.dumps(out_losses))
294 295


T
typhoonzero 已提交
296
def runtime_main(test_class):
W
Wu Yi 已提交
297 298 299 300
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
301 302 303 304
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
305
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
306 307
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
308
    parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
309 310
    parser.add_argument(
        '--enable_backward_deps', type=bool, required=False, default=1)
W
Wu Yi 已提交
311 312 313 314
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
315
    parser.add_argument('--use_cuda', action='store_true')
316
    parser.add_argument('--use_dgc', action='store_true')
W
Wu Yi 已提交
317
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
318
    parser.add_argument('--dc_asgd', action='store_true')
319
    parser.add_argument(
W
Wu Yi 已提交
320
        '--use_reader_alloc', action='store_true', required=False)
321
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
322
    parser.add_argument('--lr', required=False, type=float, default=0.001)
323 324
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
325 326 327 328 329
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
W
Wu Yi 已提交
330 331

    args = parser.parse_args()
T
typhoonzero 已提交
332 333

    model = test_class()
W
Wu Yi 已提交
334
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
335
        model.run_pserver(args)
T
typhoonzero 已提交
336
    else:
337
        model.run_trainer(args)
X
Xin Pan 已提交
338

M
minqiyang 已提交
339

M
minqiyang 已提交
340
import paddle.compat as cpt
Y
Yancey1989 已提交
341 342
import socket
from contextlib import closing
M
minqiyang 已提交
343

X
Xin Pan 已提交
344 345

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
346 347 348
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

349 350 351
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
352
            self._use_dgc = False
353 354 355 356 357 358 359
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
360 361 362 363
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
364

X
Xin Pan 已提交
365 366 367
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
368
        self._port_set = set()
Y
Yancey1989 已提交
369 370
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
371
        self._python_interp = sys.executable
W
Wu Yi 已提交
372
        self._sync_mode = True
373
        self._enforce_place = None
W
Wu Yi 已提交
374
        self._mem_opt = False
W
Wu Yi 已提交
375
        self._use_reduce = False
W
Wu Yi 已提交
376
        self._dc_asgd = False  # must use with async mode
377
        self._use_reader_alloc = True
W
Wu Yi 已提交
378
        self._nccl2_mode = False
379
        self._mp_mode = False
W
Wu Yi 已提交
380 381 382 383 384
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
385
        self._lr = 0.001
386
        self._use_dgc = False
387
        self._dygraph = False
388
        self._nccl_comm_num = 1
W
Wu Yi 已提交
389
        self._setup_config()
390
        self._after_setup_config()
391
        self._enable_backward_deps = False
X
Xin Pan 已提交
392

Y
Yancey1989 已提交
393
    def _find_free_port(self):
Y
Yancey1989 已提交
394 395 396 397
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
G
guru4elephant 已提交
398
                my_print("socket name: %s" % s.getsockname()[1])
Y
Yancey1989 已提交
399 400 401 402 403 404 405
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
406

407
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
408
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
409
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
W
Wu Yi 已提交
410
        ps0_cmd = ps_cmd % \
411 412
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
413
        ps1_cmd = ps_cmd % \
414 415
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
416 417 418 419 420 421 422

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
423

424 425
        print(ps0_cmd)
        print(ps1_cmd)
M
minqiyang 已提交
426 427
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
428

G
guru4elephant 已提交
429
        my_print("going to start pserver process 0")
X
Xin Pan 已提交
430
        ps0_proc = subprocess.Popen(
431 432 433 434
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
G
guru4elephant 已提交
435
        my_print("going to start pserver process 1")
X
Xin Pan 已提交
436
        ps1_proc = subprocess.Popen(
437 438 439 440
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
441

442
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
443

444 445 446 447 448 449
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
450

W
Wu Yi 已提交
451 452
        cmd = "%s %s --role trainer --lr %f" % (self._python_interp, model,
                                                self._lr)
453 454 455 456
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
457 458
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
459

460
        if self.__use_cuda:
461
            cmd += " --use_cuda"
W
Wu Yi 已提交
462 463 464 465 466
            env_local = {
                "CUDA_VISIBLE_DEVICES": "0",
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
467 468 469
        else:
            env_local = {'CPU_NUM': '1'}

W
Wu Yi 已提交
470 471
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
472

473
        if check_error_log:
474
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
475
            local_proc = subprocess.Popen(
476
                cmd.split(" "),
G
gongweibao 已提交
477
                stdout=subprocess.PIPE,
478
                stderr=err_log,
W
Wu Yi 已提交
479
                env=env_local)
G
gongweibao 已提交
480 481
        else:
            local_proc = subprocess.Popen(
482
                cmd.split(" "),
G
gongweibao 已提交
483
                stdout=subprocess.PIPE,
484
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
485
                env=env_local)
G
gongweibao 已提交
486

487 488 489 490 491 492
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
493
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
494

W
Wu Yi 已提交
495
        return pickle.loads(local_out)
496 497

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
498
        # Run dist train to compare with local results
499 500
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
W
Wu Yi 已提交
501

X
Xin Pan 已提交
502
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
503

W
Wu Yi 已提交
504
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"
W
Wu Yi 已提交
505
        tr0_cmd = tr_cmd % \
506
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
507
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
508
        tr1_cmd = tr_cmd % \
509
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
510
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
511 512 513 514 515 516 517 518 519 520

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
521 522 523
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
524
        if self.__use_cuda:
525 526 527 528 529 530 531 532 533 534
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
535

W
Wu Yi 已提交
536 537
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
538 539
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
540

G
guru4elephant 已提交
541
        my_print("going to start trainer process 0")
X
Xin Pan 已提交
542
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
543
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
544
            stdout=subprocess.PIPE,
G
gongweibao 已提交
545
            stderr=tr0_pipe,
X
Xin Pan 已提交
546
            env=env0)
G
guru4elephant 已提交
547
        my_print("going to start trainer process 1")
X
Xin Pan 已提交
548
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
549
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
550
            stdout=subprocess.PIPE,
G
gongweibao 已提交
551
            stderr=tr1_pipe,
X
Xin Pan 已提交
552 553
            env=env1)

554 555 556 557 558 559 560 561 562 563 564 565
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

566 567
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
568

G
gongweibao 已提交
569
        # close trainer file
570 571 572 573
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
574

W
Wu Yi 已提交
575 576
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
577

578
        # print server log
G
guru4elephant 已提交
579 580
        '''
        with open("/tmp/ps0_err.log", "rb") as fn:
581
            sys.stderr.write("ps0 stderr: %s\n" % fn.read())
G
guru4elephant 已提交
582
        with open("/tmp/ps1_err.log", "rb") as fn:
583
            sys.stderr.write("ps1 stderr: %s\n" % fn.read())
G
guru4elephant 已提交
584
        '''
585

586
        # print log
G
guru4elephant 已提交
587 588
        '''
        with open("/tmp/tr0_err.log", "rb") as fn:
589
            sys.stderr.write('trainer 0 stderr: %s\n' % fn.read())
G
guru4elephant 已提交
590
        with open("/tmp/tr1_err.log", "rb") as fn:
591
            sys.stderr.write('trainer 1 stderr: %s\n' % fn.read())
G
guru4elephant 已提交
592
        '''
593

W
Wu Yi 已提交
594 595
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

W
Wu Yi 已提交
596 597
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
                           check_error_log):
W
Wu Yi 已提交
598 599 600
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
W
Wu Yi 已提交
601 602 603 604
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
605

W
Wu Yi 已提交
606
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
W
Wu Yi 已提交
607 608
        tr0_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
609
                   0, w0_ep, update_method, self._lr)
W
Wu Yi 已提交
610 611
        tr1_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
612
                   1, w1_ep, update_method, self._lr)
W
Wu Yi 已提交
613 614 615 616 617 618 619 620 621 622 623 624 625

        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
        if self.__use_cuda:
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
W
Wu Yi 已提交
626 627 628 629 630 631 632 633 634 635 636
            env0 = {
                "CUDA_VISIBLE_DEVICES": "0",
                # for test nccl2 layer
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "0"
            }
            env1 = {
                "CUDA_VISIBLE_DEVICES": "1",
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "1"
            }
W
Wu Yi 已提交
637 638 639 640
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

641 642 643
        if self._use_dgc:
            tr0_cmd += " --use_dgc"
            tr1_cmd += " --use_dgc"
644 645 646 647 648

        if self._nccl_comm_num > 1:
            tr0_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
            tr1_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)

649 650 651 652
        if self._mp_mode:
            env0 = {"FLAGS_selected_gpus": "0"}
            env1 = {"FLAGS_selected_gpus": "1"}

653 654 655 656
        if self._enable_backward_deps:
            tr0_cmd += " --enable_backward_deps 1"
            tr1_cmd += " --enable_backward_deps 1"

W
Wu Yi 已提交
657 658 659 660 661 662 663 664
        env0.update(envs)
        env1.update(envs)

        print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")

G
guru4elephant 已提交
665
        my_print("going to start process 0 with nccl2")
W
Wu Yi 已提交
666 667 668 669 670
        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0)
G
guru4elephant 已提交
671
        my_print("going to start process 1 with nccl2")
W
Wu Yi 已提交
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1)

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()

        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()

        # print log
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)

W
Wu Yi 已提交
689
        return pickle.loads(tr0_out), pickle.loads(tr1_out)
690 691 692 693 694 695 696 697 698 699 700 701

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
G
guru4elephant 已提交
702
            "FLAGS_rpc_deadline": "30000",  # 5sec to fail fast
703
            "FLAGS_cudnn_deterministic": "1",
W
Wu Yi 已提交
704 705
            "http_proxy": "",
            "NCCL_P2P_DISABLE": "1"
706 707 708 709 710
        }

        required_envs.update(need_envs)

        if check_error_log:
W
Wu Yi 已提交
711
            required_envs["GLOG_v"] = "3"
712 713 714 715
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
W
Wu Yi 已提交
716
                                check_error_log)
W
Wu Yi 已提交
717
        if self._nccl2_mode:
W
Wu Yi 已提交
718 719 720 721 722 723
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, True, check_error_log)
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, False, check_error_log)
W
Wu Yi 已提交
724 725 726
        else:
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log)
727 728

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
729 730 731
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
Y
Yan Xu 已提交
732
            dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
W
Wu Yi 已提交
733 734
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)