test_dist_base.py 27.8 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
W
Wu Yi 已提交
25 26
import pickle
import numpy as np
27
import time
28
import paddle.fluid as fluid
29
from paddle.fluid import compiler
30 31 32
import paddle.fluid.dygraph as dygraph
from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import DataParallel
33

Y
Yan Xu 已提交
34
RUN_STEP = 5
35
DEFAULT_BATCH_SIZE = 2
36

T
typhoonzero 已提交
37

38 39 40
def my_print(class_name, log_str):
    localtime = time.asctime(time.localtime(time.time()))
    print_str = localtime + "\t" + class_name + "\t" + log_str
G
guru4elephant 已提交
41
    if six.PY2:
42
        sys.stderr.write(pickle.dumps(print_str))
G
guru4elephant 已提交
43
    else:
44
        sys.stderr.buffer.write(pickle.dumps(print_str))
G
guru4elephant 已提交
45 46


T
typhoonzero 已提交
47
class TestDistRunnerBase(object):
W
Wu Yi 已提交
48 49 50
    def get_model(self,
                  batch_size=DEFAULT_BATCH_SIZE,
                  lr=0.1,
51 52
                  single_device=False,
                  use_dgc=False):
T
typhoonzero 已提交
53 54 55
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

56
    @staticmethod
W
Wu Yi 已提交
57 58 59 60 61
    def get_transpiler(trainer_id,
                       main_program,
                       pserver_endpoints,
                       trainers,
                       sync_mode,
62
                       dc_asgd=False,
63 64
                       current_endpoint=None,
                       nccl_comm_num=1):
T
typhoonzero 已提交
65
        # NOTE: import fluid until runtime, or else forking processes will cause error.
66
        config = fluid.DistributeTranspilerConfig()
W
Wu Yi 已提交
67
        config.enable_dc_asgd = dc_asgd
68
        config.sync_mode = sync_mode
69 70
        if nccl_comm_num > 1:
            config.nccl_comm_num = nccl_comm_num
71
        # config.runtime_split_send_recv = True
72
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
73 74 75 76
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
77
            trainers=trainers,
78
            current_endpoint=current_endpoint)
T
typhoonzero 已提交
79 80
        return t

W
Wu Yi 已提交
81
    def run_pserver(self, args):
W
Wu Yi 已提交
82
        self.lr = args.lr
83
        self.get_model(batch_size=args.batch_size)
84
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
85 86
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
W
Wu Yi 已提交
87
                                args.trainers, args.sync_mode, args.dc_asgd)
W
Wu Yi 已提交
88 89 90
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
91

T
typhoonzero 已提交
92 93 94
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
95
        my_print(type(self).__name__, "run pserver startup program done.")
T
typhoonzero 已提交
96
        exe.run(pserver_prog)
97
        my_print(type(self).__name__, "run pserver main program done.")
T
typhoonzero 已提交
98

99
    def run_trainer(self, args):
W
Wu Yi 已提交
100
        self.lr = args.lr
W
Wu Yi 已提交
101 102 103
        if args.nccl2_reduce_layer_local_run:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, single_device=True)
104 105 106
        elif args.use_dgc:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size, use_dgc=args.use_dgc)
W
Wu Yi 已提交
107 108 109
        else:
            test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
                self.get_model(batch_size=args.batch_size)
110

W
Wu Yi 已提交
111
        if args.mem_opt:
112
            my_print(type(self).__name__, "begin to run memory optimize")
113
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
114
            my_print(type(self).__name__, "trainer run memory optimize done.")
W
Wu Yi 已提交
115
        if args.update_method == "pserver":
116 117 118
            my_print(
                type(self).__name__,
                "begin to run transpile on trainer with pserver mode")
W
Wu Yi 已提交
119 120 121
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
W
Wu Yi 已提交
122
                                    args.sync_mode, args.dc_asgd)
T
typhoonzero 已提交
123
            trainer_prog = t.get_trainer_program()
124 125 126
            my_print(
                type(self).__name__,
                "get trainer program done with pserver mode.")
W
Wu Yi 已提交
127
        elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
W
Wu Yi 已提交
128 129 130
            # transpile for nccl2
            config = fluid.DistributeTranspilerConfig()
            config.mode = "nccl2"
131
            config.nccl_comm_num = args.nccl_comm_num
132 133 134
            my_print(
                type(self).__name__,
                "begin to run transpile on trainer with nccl2 mode")
W
Wu Yi 已提交
135 136 137 138 139 140 141
            nccl2_t = fluid.DistributeTranspiler(config=config)
            nccl2_t.transpile(
                args.trainer_id,
                program=fluid.default_main_program(),
                startup_program=fluid.default_startup_program(),
                trainers=args.endpoints,
                current_endpoint=args.current_endpoint)
142 143 144
            my_print(
                type(self).__name__,
                "get trainer program done. with nccl2 mode")
W
Wu Yi 已提交
145
            trainer_prog = fluid.default_main_program()
T
typhoonzero 已提交
146 147 148
        else:
            trainer_prog = fluid.default_main_program()

149
        if args.use_cuda:
150 151
            device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
            place = fluid.CUDAPlace(device_id)
152 153 154
        else:
            place = fluid.CPUPlace()

155 156
        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())
157
        my_print(type(self).__name__, "run worker startup program done.")
T
typhoonzero 已提交
158

W
Wu Yi 已提交
159 160 161
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = 1
        exec_strategy.allow_op_delay = False
162

W
Wu Yi 已提交
163
        build_stra = fluid.BuildStrategy()
164 165 166
        # FIXME force disable enable_inplace and memory_optimize
        build_stra.enable_inplace = False
        build_stra.memory_optimize = False
W
Wu Yi 已提交
167

168 169 170
        if args.enable_backward_deps:
            build_stra.enable_backward_optimizer_op_deps = True

W
Wu Yi 已提交
171 172 173 174 175
        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

W
Wu Yi 已提交
176
        pass_builder = None
X
Xin Pan 已提交
177
        if args.batch_merge_repeat > 1:
X
fix  
Xin Pan 已提交
178
            pass_builder = build_stra._finalize_strategy_and_create_passes()
179
            mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
180
            mypass.set("num_repeats", args.batch_merge_repeat)
X
Xin Pan 已提交
181

W
Wu Yi 已提交
182
        if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
183 184
            build_stra.num_trainers = len(args.endpoints.split(","))
            build_stra.trainer_id = args.trainer_id
W
Wu Yi 已提交
185
        else:
W
Wu Yi 已提交
186
            # case args.update_method == "nccl2_reduce_layer":
187 188
            build_stra.num_trainers = 1
            build_stra.trainer_id = 0
W
Wu Yi 已提交
189

190
        my_print(type(self).__name__, "begin to compile with data parallel")
X
Xin Pan 已提交
191
        binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
W
Wu Yi 已提交
192
            loss_name=avg_cost.name,
W
Wu Yi 已提交
193
            build_strategy=build_stra,
W
Wu Yi 已提交
194
            exec_strategy=exec_strategy)
195
        my_print(type(self).__name__, "program compiled with data parallel")
T
typhoonzero 已提交
196

197 198 199 200 201 202 203 204 205
        if args.use_cuda and args.update_method == "nccl2":
            # it just for test share_vars_from feature.
            test_exe = fluid.ParallelExecutor(
                use_cuda=True,
                loss_name=avg_cost.name,
                build_strategy=build_stra,
                main_program=test_program,
                share_vars_from=binary._executor)

T
typhoonzero 已提交
206 207 208 209 210 211
        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
212
        reader_generator = train_reader()
T
typhoonzero 已提交
213

214 215
        def get_data():
            origin_batch = next(reader_generator)
W
Wu Yi 已提交
216
            if args.update_method != "local" and args.use_reader_alloc:
217 218 219 220 221 222 223
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
224

225
        my_print(type(self).__name__, "begin to train on trainer")
W
Wu Yi 已提交
226
        out_losses = []
227
        for _ in six.moves.xrange(RUN_STEP):
228 229
            loss, = exe.run(binary,
                            fetch_list=[avg_cost.name],
230
                            feed=feeder.feed(get_data()))
W
Wu Yi 已提交
231 232 233 234 235
            out_losses.append(loss[0])
        if six.PY2:
            print(pickle.dumps(out_losses))
        else:
            sys.stdout.buffer.write(pickle.dumps(out_losses))
T
typhoonzero 已提交
236 237


238 239 240 241 242 243 244 245 246 247
class TestParallelDyGraphRunnerBase(object):
    def get_model(self):
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

    def run_one_loop(self, model, opt, data):
        raise NotImplementedError(
            "train_one_loop should be implemented by the child classes.")

    def run_trainer(self, args):
Y
Yan Xu 已提交
248

249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
        seed = 90
        device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
        place = fluid.CUDAPlace(device_id)

        def _get_data(batch):
            if args.update_method != "local":
                new_batch = []
                for offset, item in enumerate(batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return batch

        with fluid.dygraph.guard(place):
            fluid.default_startup_program().random_seed = seed
            fluid.default_main_program().random_seed = seed
Y
Yan Xu 已提交
266 267 268
            np.random.seed(seed)
            import random
            random.seed = seed
269 270
            model, train_reader, opt = self.get_model()
            nranks = len(args.endpoints.split(",")) if args.endpoints else 1
Y
Yan Xu 已提交
271

272 273 274 275 276 277
            if args.update_method == "nccl2":
                strategy = dygraph.parallel.ParallelStrategy()
                strategy.nranks = nranks
                strategy.local_rank = args.trainer_id
                strategy.trainer_endpoints = args.endpoints.split(",")
                strategy.current_endpoint = args.current_endpoint
278 279 280
                my_print(
                    type(self).__name__,
                    "begin to prepare context in dygraph with nccl2")
281
                dygraph.parallel.prepare_context(strategy)
Y
Yan Xu 已提交
282
                model = dygraph.parallel.DataParallel(model, strategy)
283
                my_print(type(self).__name__, "model built in dygraph")
284
            out_losses = []
285
            my_print(type(self).__name__, "begin to run dygraph training")
286 287 288 289 290
            for step_id, data in enumerate(train_reader()):
                data = _get_data(data)
                if step_id == RUN_STEP:
                    break
                loss = self.run_one_loop(model, opt, data)
G
guru4elephant 已提交
291
                if step_id % 10 == 0:
292 293 294
                    my_print(
                        type(self).__name__,
                        "loss at step %d: %f" % (step_id, loss))
Y
Yan Xu 已提交
295
                out_losses.append(loss.numpy())
296

Y
Yan Xu 已提交
297 298 299
                # FIXME(Yancey1989): scale the loss inplace
                if args.update_method == "nccl2":
                    loss = model.scale_loss(loss)
300 301

                loss.backward()
Y
Yan Xu 已提交
302 303
                if args.update_method == "nccl2":
                    model.apply_collective_grads()
304 305 306

                opt.minimize(loss)
                model.clear_gradients()
307
            my_print(type(self).__name__, pickle.dumps(out_losses))
308 309


T
typhoonzero 已提交
310
def runtime_main(test_class):
W
Wu Yi 已提交
311 312 313 314
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
W
Wu Yi 已提交
315 316 317 318
    parser.add_argument(
        '--update_method',
        type=str,
        default="local",
W
Wu Yi 已提交
319
        choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"])
W
Wu Yi 已提交
320 321
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
322
    parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
323 324
    parser.add_argument(
        '--enable_backward_deps', type=bool, required=False, default=1)
W
Wu Yi 已提交
325 326 327 328
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
329
    parser.add_argument('--use_cuda', action='store_true')
330
    parser.add_argument('--use_dgc', action='store_true')
W
Wu Yi 已提交
331
    parser.add_argument('--use_reduce', action='store_true')
W
Wu Yi 已提交
332
    parser.add_argument('--dc_asgd', action='store_true')
333
    parser.add_argument(
W
Wu Yi 已提交
334
        '--use_reader_alloc', action='store_true', required=False)
335
    parser.add_argument('--batch_size', required=False, type=int, default=2)
W
Wu Yi 已提交
336
    parser.add_argument('--lr', required=False, type=float, default=0.001)
337 338
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
339 340 341 342 343
    parser.add_argument(
        '--nccl2_reduce_layer_local_run',
        required=False,
        type=bool,
        default=False)
W
Wu Yi 已提交
344 345

    args = parser.parse_args()
T
typhoonzero 已提交
346 347

    model = test_class()
W
Wu Yi 已提交
348
    if args.role == "pserver" and args.update_method == "pserver":
W
Wu Yi 已提交
349
        model.run_pserver(args)
T
typhoonzero 已提交
350
    else:
351
        model.run_trainer(args)
X
Xin Pan 已提交
352

M
minqiyang 已提交
353

M
minqiyang 已提交
354
import paddle.compat as cpt
Y
Yancey1989 已提交
355 356
import socket
from contextlib import closing
M
minqiyang 已提交
357

X
Xin Pan 已提交
358 359

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
360 361 362
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

363 364 365
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
366
            self._use_dgc = False
367 368 369 370 371 372 373
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False
374 375 376 377
                self._use_dgc = False

        if self._use_reduce:
            assert not self._use_dgc
378

X
Xin Pan 已提交
379 380 381
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
382
        self._port_set = set()
Y
Yancey1989 已提交
383 384
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
385
        self._python_interp = sys.executable
W
Wu Yi 已提交
386
        self._sync_mode = True
387
        self._enforce_place = None
W
Wu Yi 已提交
388
        self._mem_opt = False
W
Wu Yi 已提交
389
        self._use_reduce = False
W
Wu Yi 已提交
390
        self._dc_asgd = False  # must use with async mode
391
        self._use_reader_alloc = True
W
Wu Yi 已提交
392
        self._nccl2_mode = False
393
        self._mp_mode = False
W
Wu Yi 已提交
394 395 396 397 398
        # FIXME(typhoonzero): I added this stupid argument to enable
        # testing allreduce layers, which users can call layers.allreduce
        # to accumulate tensors at anywhere. Find a better way to do this
        # test, reduce check this argument everywhere.
        self._nccl2_reduce_layer = False
W
Wu Yi 已提交
399
        self._lr = 0.001
400
        self._use_dgc = False
401
        self._dygraph = False
402
        self._nccl_comm_num = 1
W
Wu Yi 已提交
403
        self._setup_config()
404
        self._after_setup_config()
405
        self._enable_backward_deps = False
X
Xin Pan 已提交
406

Y
Yancey1989 已提交
407
    def _find_free_port(self):
Y
Yancey1989 已提交
408 409 410 411
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
412 413
                my_print(
                    type(self).__name__, "socket name: %s" % s.getsockname()[1])
Y
Yancey1989 已提交
414 415 416 417 418 419 420
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port
Y
Yancey1989 已提交
421

422
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
423
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
424
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
W
Wu Yi 已提交
425
        ps0_cmd = ps_cmd % \
426 427
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
428
        ps1_cmd = ps_cmd % \
429 430
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
431 432 433 434 435 436 437

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
438

439 440
        print(ps0_cmd)
        print(ps1_cmd)
M
minqiyang 已提交
441 442
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
443

444
        my_print(type(self).__name__, "going to start pserver process 0")
X
Xin Pan 已提交
445
        ps0_proc = subprocess.Popen(
446 447 448 449
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
450
        my_print(type(self).__name__, "going to start pserver process 1")
X
Xin Pan 已提交
451
        ps1_proc = subprocess.Popen(
452 453 454 455
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
456

457
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
458

459 460 461 462 463 464
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
465

W
Wu Yi 已提交
466 467
        cmd = "%s %s --role trainer --lr %f" % (self._python_interp, model,
                                                self._lr)
468 469 470 471
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
W
Wu Yi 已提交
472 473
        if self._nccl2_reduce_layer:
            cmd += " --nccl2_reduce_layer_local_run 1"
474

475
        if self.__use_cuda:
476
            cmd += " --use_cuda"
W
Wu Yi 已提交
477 478 479 480 481
            env_local = {
                "CUDA_VISIBLE_DEVICES": "0",
                "PADDLE_TRAINERS_NUM": "1",
                "PADDLE_TRAINER_ID": "0"
            }
482 483 484
        else:
            env_local = {'CPU_NUM': '1'}

W
Wu Yi 已提交
485 486
        env_local.update(envs)
        print("local_cmd: {}, env: {}".format(cmd, env_local))
G
gongweibao 已提交
487

488
        if check_error_log:
489
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
490
            local_proc = subprocess.Popen(
491
                cmd.split(" "),
G
gongweibao 已提交
492
                stdout=subprocess.PIPE,
493
                stderr=err_log,
W
Wu Yi 已提交
494
                env=env_local)
G
gongweibao 已提交
495 496
        else:
            local_proc = subprocess.Popen(
497
                cmd.split(" "),
G
gongweibao 已提交
498
                stdout=subprocess.PIPE,
499
                stderr=subprocess.PIPE,
W
Wu Yi 已提交
500
                env=env_local)
G
gongweibao 已提交
501

502 503 504 505 506 507
        local_out, local_err = local_proc.communicate()

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stderr: %s\n' % local_err)
W
Wu Yi 已提交
508
        sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
X
Xin Pan 已提交
509

W
Wu Yi 已提交
510
        return pickle.loads(local_out)
511 512

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
513
        # Run dist train to compare with local results
514 515
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
W
Wu Yi 已提交
516

X
Xin Pan 已提交
517
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
518

W
Wu Yi 已提交
519
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver --lr %f"
W
Wu Yi 已提交
520
        tr0_cmd = tr_cmd % \
521
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
522
                   0, ps0_ep, self._trainers, self._lr)
W
Wu Yi 已提交
523
        tr1_cmd = tr_cmd % \
524
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
525
                   1, ps1_ep, self._trainers, self._lr)
W
Wu Yi 已提交
526 527 528 529 530 531 532 533 534 535

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
536 537 538
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
539
        if self.__use_cuda:
540 541 542 543 544 545 546 547 548 549
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
550

W
Wu Yi 已提交
551 552
        print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
553 554
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
555

556
        my_print(type(self).__name__, "going to start trainer process 0")
X
Xin Pan 已提交
557
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
558
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
559
            stdout=subprocess.PIPE,
G
gongweibao 已提交
560
            stderr=tr0_pipe,
X
Xin Pan 已提交
561
            env=env0)
562
        my_print(type(self).__name__, "going to start trainer process 1")
X
Xin Pan 已提交
563
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
564
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
565
            stdout=subprocess.PIPE,
G
gongweibao 已提交
566
            stderr=tr1_pipe,
X
Xin Pan 已提交
567 568
            env=env1)

569 570 571 572 573 574 575 576 577 578 579 580
        # Wait until trainer process terminate
        while True:
            stat0 = tr0_proc.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
        while True:
            stat1 = tr1_proc.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

581 582
        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()
X
Xin Pan 已提交
583

G
gongweibao 已提交
584
        # close trainer file
585 586 587 588
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()
W
Wu Yi 已提交
589

W
Wu Yi 已提交
590 591
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
592

593
        # print server log
G
guru4elephant 已提交
594 595
        '''
        with open("/tmp/ps0_err.log", "rb") as fn:
596
            sys.stderr.write("ps0 stderr: %s\n" % fn.read())
G
guru4elephant 已提交
597
        with open("/tmp/ps1_err.log", "rb") as fn:
598
            sys.stderr.write("ps1 stderr: %s\n" % fn.read())
G
guru4elephant 已提交
599
        '''
600

601
        # print log
G
guru4elephant 已提交
602 603
        '''
        with open("/tmp/tr0_err.log", "rb") as fn:
604
            sys.stderr.write('trainer 0 stderr: %s\n' % fn.read())
G
guru4elephant 已提交
605
        with open("/tmp/tr1_err.log", "rb") as fn:
606
            sys.stderr.write('trainer 1 stderr: %s\n' % fn.read())
G
guru4elephant 已提交
607
        '''
608

W
Wu Yi 已提交
609 610
        return pickle.loads(tr0_out), pickle.loads(tr1_out)

W
Wu Yi 已提交
611 612
    def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
                           check_error_log):
W
Wu Yi 已提交
613 614 615
        # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
        worker_endpoints = self._ps_endpoints.split(",")
        w0_ep, w1_ep = worker_endpoints
W
Wu Yi 已提交
616 617 618 619
        if nccl2_reduce_layer:
            update_method = "nccl2_reduce_layer"
        else:
            update_method = "nccl2"
W
Wu Yi 已提交
620

W
Wu Yi 已提交
621
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
W
Wu Yi 已提交
622 623
        tr0_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
624
                   0, w0_ep, update_method, self._lr)
W
Wu Yi 已提交
625 626
        tr1_cmd = tr_cmd % \
                  (self._python_interp, model, self._ps_endpoints,
W
Wu Yi 已提交
627
                   1, w1_ep, update_method, self._lr)
W
Wu Yi 已提交
628 629 630 631 632 633 634 635 636 637 638 639 640

        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
        if self.__use_cuda:
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
W
Wu Yi 已提交
641 642 643 644 645 646 647 648 649 650 651
            env0 = {
                "CUDA_VISIBLE_DEVICES": "0",
                # for test nccl2 layer
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "0"
            }
            env1 = {
                "CUDA_VISIBLE_DEVICES": "1",
                "PADDLE_TRAINERS_NUM": "2",
                "PADDLE_TRAINER_ID": "1"
            }
W
Wu Yi 已提交
652 653 654 655
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

656 657 658
        if self._use_dgc:
            tr0_cmd += " --use_dgc"
            tr1_cmd += " --use_dgc"
659 660 661 662 663

        if self._nccl_comm_num > 1:
            tr0_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
            tr1_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)

664 665 666 667
        if self._mp_mode:
            env0 = {"FLAGS_selected_gpus": "0"}
            env1 = {"FLAGS_selected_gpus": "1"}

668 669 670 671
        if self._enable_backward_deps:
            tr0_cmd += " --enable_backward_deps 1"
            tr1_cmd += " --enable_backward_deps 1"

W
Wu Yi 已提交
672 673 674 675 676 677 678 679
        env0.update(envs)
        env1.update(envs)

        print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
        print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")

680
        my_print(type(self).__name__, "going to start process 0 with nccl2")
W
Wu Yi 已提交
681 682 683 684 685
        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr0_pipe,
            env=env0)
686
        my_print(type(self).__name__, "going to start process 1 with nccl2")
W
Wu Yi 已提交
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=tr1_pipe,
            env=env1)

        tr0_out, tr0_err = tr0_proc.communicate()
        tr1_out, tr1_err = tr1_proc.communicate()

        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()

        # print log
        sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)

W
Wu Yi 已提交
704
        return pickle.loads(tr0_out), pickle.loads(tr1_out)
705 706 707 708 709 710 711 712 713 714 715 716

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
G
guru4elephant 已提交
717
            "FLAGS_rpc_deadline": "30000",  # 5sec to fail fast
718
            "FLAGS_cudnn_deterministic": "1",
W
Wu Yi 已提交
719 720
            "http_proxy": "",
            "NCCL_P2P_DISABLE": "1"
721 722 723 724 725
        }

        required_envs.update(need_envs)

        if check_error_log:
W
Wu Yi 已提交
726
            required_envs["GLOG_v"] = "3"
727 728 729 730
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
W
Wu Yi 已提交
731
                                check_error_log)
W
Wu Yi 已提交
732
        if self._nccl2_mode:
W
Wu Yi 已提交
733 734 735 736 737 738
            if self._nccl2_reduce_layer:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, True, check_error_log)
            else:
                tr0_losses, tr1_losses = self._run_cluster_nccl2(
                    model_file, required_envs, False, check_error_log)
W
Wu Yi 已提交
739 740 741
        else:
            tr0_losses, tr1_losses = self._run_cluster(
                model_file, required_envs, check_error_log)
742 743

        for step_id in range(RUN_STEP):
W
Wu Yi 已提交
744 745 746
            local_loss = local_losses[step_id]
            tr0_loss = tr0_losses[step_id]
            tr1_loss = tr1_losses[step_id]
Y
Yan Xu 已提交
747
            dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
W
Wu Yi 已提交
748 749
            print("=======", local_loss, ":", dist_loss[0], "=======")
            self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)