test_dist_base.py 14.8 KB
Newer Older
X
Xin Pan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15

from __future__ import print_function
X
Xin Pan 已提交
16 17 18 19 20 21 22
import time

import unittest
import os
import sys
import signal
import subprocess
23
import six
W
Wu Yi 已提交
24
import argparse
T
typhoonzero 已提交
25

26 27 28
import paddle.fluid as fluid

RUN_STEP = 10
29
DEFAULT_BATCH_SIZE = 2
30

T
typhoonzero 已提交
31 32

class TestDistRunnerBase(object):
33
    def get_model(self, batch_size=DEFAULT_BATCH_SIZE):
T
typhoonzero 已提交
34 35 36
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

37 38 39
    @staticmethod
    def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers,
                       sync_mode):
T
typhoonzero 已提交
40
        # NOTE: import fluid until runtime, or else forking processes will cause error.
41 42
        config = fluid.DistributeTranspilerConfig()
        t = fluid.DistributeTranspiler(config=config)
T
typhoonzero 已提交
43 44 45 46
        t.transpile(
            trainer_id=trainer_id,
            program=main_program,
            pservers=pserver_endpoints,
W
Wu Yi 已提交
47 48
            trainers=trainers,
            sync_mode=sync_mode)
T
typhoonzero 已提交
49 50
        return t

W
Wu Yi 已提交
51
    def run_pserver(self, args):
52
        self.get_model(batch_size=args.batch_size)
53
        # NOTE: pserver should not call memory optimize
W
Wu Yi 已提交
54 55 56 57 58 59
        t = self.get_transpiler(args.trainer_id,
                                fluid.default_main_program(), args.endpoints,
                                args.trainers, args.sync_mode)
        pserver_prog = t.get_pserver_program(args.current_endpoint)
        startup_prog = t.get_startup_program(args.current_endpoint,
                                             pserver_prog)
Y
Yancey1989 已提交
60

T
typhoonzero 已提交
61 62 63 64 65
        place = fluid.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        exe.run(pserver_prog)

66
    def run_trainer(self, args):
T
typhoonzero 已提交
67
        test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
68
            self.get_model(batch_size=args.batch_size)
69

W
Wu Yi 已提交
70
        if args.mem_opt:
71
            fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
W
Wu Yi 已提交
72 73 74 75 76
        if args.is_dist:
            t = self.get_transpiler(args.trainer_id,
                                    fluid.default_main_program(),
                                    args.endpoints, args.trainers,
                                    args.sync_mode)
77

T
typhoonzero 已提交
78 79 80 81
            trainer_prog = t.get_trainer_program()
        else:
            trainer_prog = fluid.default_main_program()

82 83 84 85 86
        if args.use_cuda:
            place = fluid.CUDAPlace(0)
        else:
            place = fluid.CPUPlace()

T
typhoonzero 已提交
87 88 89 90 91 92
        startup_exe = fluid.Executor(place)
        startup_exe.run(fluid.default_startup_program())

        strategy = fluid.ExecutionStrategy()
        strategy.num_threads = 1
        strategy.allow_op_delay = False
93

W
Wu Yi 已提交
94
        build_stra = fluid.BuildStrategy()
95 96 97 98 99
        if args.batch_merge_repeat > 1:
            pass_builder = build_stra._create_passes_from_strategy()
            mypass = pass_builder.insert_pass(
                len(pass_builder.all_passes()) - 2, "multi_batch_merge_pass")
            mypass.set_int("num_repeats", args.batch_merge_repeat)
W
Wu Yi 已提交
100 101 102 103 104 105

        if args.use_reduce:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
        else:
            build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce

T
typhoonzero 已提交
106
        exe = fluid.ParallelExecutor(
107
            args.use_cuda,
W
Wu Yi 已提交
108 109 110
            loss_name=avg_cost.name,
            exec_strategy=strategy,
            build_strategy=build_stra)
T
typhoonzero 已提交
111 112 113 114 115 116 117

        feed_var_list = [
            var for var in trainer_prog.global_block().vars.values()
            if var.is_data
        ]

        feeder = fluid.DataFeeder(feed_var_list, place)
118
        reader_generator = train_reader()
T
typhoonzero 已提交
119

120 121 122 123 124 125 126 127 128 129
        def get_data():
            origin_batch = next(reader_generator)
            if args.is_dist and args.use_reader_alloc:
                new_batch = []
                for offset, item in enumerate(origin_batch):
                    if offset % 2 == args.trainer_id:
                        new_batch.append(item)
                return new_batch
            else:
                return origin_batch
T
typhoonzero 已提交
130

131 132 133 134
        for _ in six.moves.xrange(RUN_STEP):
            loss, = exe.run(fetch_list=[avg_cost.name],
                            feed=feeder.feed(get_data()))
            print(loss)
T
typhoonzero 已提交
135 136 137


def runtime_main(test_class):
W
Wu Yi 已提交
138 139 140 141 142 143 144 145 146 147 148
    parser = argparse.ArgumentParser(description='Run dist test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
    parser.add_argument('--is_dist', action='store_true')
    parser.add_argument('--trainer_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
    parser.add_argument(
        '--current_endpoint', type=str, required=False, default="")
    parser.add_argument('--sync_mode', action='store_true')
    parser.add_argument('--mem_opt', action='store_true')
149
    parser.add_argument('--use_cuda', action='store_true')
W
Wu Yi 已提交
150
    parser.add_argument('--use_reduce', action='store_true')
151 152
    parser.add_argument(
        '--use_reader_alloc', action='store_true', required=False, default=True)
153 154 155
    parser.add_argument('--batch_size', required=False, type=int, default=2)
    parser.add_argument(
        '--batch_merge_repeat', required=False, type=int, default=1)
W
Wu Yi 已提交
156 157

    args = parser.parse_args()
T
typhoonzero 已提交
158 159

    model = test_class()
W
Wu Yi 已提交
160 161
    if args.role == "pserver" and args.is_dist:
        model.run_pserver(args)
T
typhoonzero 已提交
162
    else:
163
        model.run_trainer(args)
X
Xin Pan 已提交
164

M
minqiyang 已提交
165

M
minqiyang 已提交
166
import paddle.compat as cpt
Y
Yancey1989 已提交
167 168
import socket
from contextlib import closing
M
minqiyang 已提交
169

X
Xin Pan 已提交
170 171

class TestDistBase(unittest.TestCase):
W
Wu Yi 已提交
172 173 174
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

175 176 177 178 179 180 181 182 183 184 185
    def _after_setup_config(self):
        if self._enforce_place == "CPU":
            self.__use_cuda = False
        elif self._enforce_place == "GPU":
            self.__use_cuda = True
        else:
            if fluid.core.is_compiled_with_cuda():
                self.__use_cuda = True
            else:
                self.__use_cuda = False

X
Xin Pan 已提交
186 187 188
    def setUp(self):
        self._trainers = 2
        self._pservers = 2
Y
Yancey1989 已提交
189 190
        self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
            self._find_free_port(), self._find_free_port())
M
minqiyang 已提交
191
        self._python_interp = sys.executable
W
Wu Yi 已提交
192
        self._sync_mode = True
193
        self._enforce_place = None
W
Wu Yi 已提交
194
        self._mem_opt = False
W
Wu Yi 已提交
195
        self._use_reduce = False
196
        self._use_reader_alloc = True
W
Wu Yi 已提交
197
        self._setup_config()
198
        self._after_setup_config()
X
Xin Pan 已提交
199

Y
Yancey1989 已提交
200 201 202 203 204
    def _find_free_port(self):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
            s.bind(('', 0))
            return s.getsockname()[1]

205
    def start_pserver(self, model_file, check_error_log, required_envs):
X
Xin Pan 已提交
206
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
W
Wu Yi 已提交
207
        ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --is_dist"
W
Wu Yi 已提交
208
        ps0_cmd = ps_cmd % \
209 210
                  (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
                   self._trainers)
W
Wu Yi 已提交
211
        ps1_cmd = ps_cmd % \
212 213
                  (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
                   self._trainers)
W
Wu Yi 已提交
214 215 216 217 218 219 220

        if self._sync_mode:
            ps0_cmd += " --sync_mode"
            ps1_cmd += " --sync_mode"
        if self._mem_opt:
            ps0_cmd += " --mem_opt"
            ps1_cmd += " --mem_opt"
X
Xin Pan 已提交
221

222 223 224 225
        print(ps0_cmd)
        print(ps1_cmd)
        ps0_pipe = open("/tmp/ps0_err.log", "wb")
        ps1_pipe = open("/tmp/ps1_err.log", "wb")
G
gongweibao 已提交
226

X
Xin Pan 已提交
227
        ps0_proc = subprocess.Popen(
228 229 230 231
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
X
Xin Pan 已提交
232
        ps1_proc = subprocess.Popen(
233 234 235 236
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
G
gongweibao 已提交
237

238
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
X
Xin Pan 已提交
239 240

    def _wait_ps_ready(self, pid):
X
polish  
Xin Pan 已提交
241
        retry_times = 50
X
Xin Pan 已提交
242 243 244 245 246 247 248 249
        while True:
            assert retry_times >= 0, "wait ps ready failed"
            time.sleep(3)
            try:
                # the listen_and_serv_op would touch a file which contains the listen port
                # on the /tmp directory until it was ready to process all the RPC call.
                os.stat("/tmp/paddle.%d.port" % pid)
                return
X
polish  
Xin Pan 已提交
250 251 252
            except os.error as e:
                sys.stderr.write('waiting for pserver: %s, left retry %d\n' %
                                 (e, retry_times))
X
Xin Pan 已提交
253 254
                retry_times -= 1

255 256 257 258 259 260
    def _run_local(self,
                   model,
                   envs,
                   check_error_log=False,
                   batch_size=DEFAULT_BATCH_SIZE,
                   batch_merge_repeat=1):
G
gongweibao 已提交
261

262
        cmd = "%s %s --role trainer" % (self._python_interp, model)
263 264 265 266
        if batch_size != DEFAULT_BATCH_SIZE:
            cmd += " --batch_size %d" % batch_size
        if batch_merge_repeat > 1:
            cmd += " --batch_merge_repeat %d" % batch_merge_repeat
267

268
        if self.__use_cuda:
269 270 271 272 273 274
            cmd += " --use_cuda"
            env_local = {"CUDA_VISIBLE_DEVICES": "0"}
        else:
            env_local = {'CPU_NUM': '1'}

        envs.update(env_local)
G
gongweibao 已提交
275

276
        if check_error_log:
277
            err_log = open("/tmp/trainer.err.log", "wb")
G
gongweibao 已提交
278
            local_proc = subprocess.Popen(
279
                cmd.split(" "),
G
gongweibao 已提交
280
                stdout=subprocess.PIPE,
281 282
                stderr=err_log,
                env=envs)
G
gongweibao 已提交
283 284
        else:
            local_proc = subprocess.Popen(
285
                cmd.split(" "),
G
gongweibao 已提交
286
                stdout=subprocess.PIPE,
287 288
                stderr=subprocess.PIPE,
                env=envs)
G
gongweibao 已提交
289

290 291 292 293 294 295 296 297
        local_out, local_err = local_proc.communicate()
        local_ret = cpt.to_text(local_out)

        if check_error_log:
            err_log.close()

        sys.stderr.write('local_stdout: %s\n' % local_ret)
        sys.stderr.write('local_stderr: %s\n' % local_err)
X
Xin Pan 已提交
298

299 300 301 302
        local_losses = local_ret.split("\n")
        return local_losses

    def _run_cluster(self, model, envs, check_error_log):
X
Xin Pan 已提交
303
        # Run dist train to compare with local results
304 305
        ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
                                                          check_error_log, envs)
X
Xin Pan 已提交
306 307 308
        self._wait_ps_ready(ps0.pid)
        self._wait_ps_ready(ps1.pid)
        ps0_ep, ps1_ep = self._ps_endpoints.split(",")
309

W
Wu Yi 已提交
310
        tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist"
W
Wu Yi 已提交
311
        tr0_cmd = tr_cmd % \
312 313
                  (self._python_interp, model, self._ps_endpoints,
                   0, ps0_ep, self._trainers)
W
Wu Yi 已提交
314
        tr1_cmd = tr_cmd % \
315 316
                  (self._python_interp, model, self._ps_endpoints,
                   1, ps1_ep, self._trainers)
W
Wu Yi 已提交
317 318 319 320 321 322 323 324 325 326

        if self._sync_mode:
            tr0_cmd += " --sync_mode"
            tr1_cmd += " --sync_mode"
        if self._mem_opt:
            tr0_cmd += " --mem_opt"
            tr1_cmd += " --mem_opt"
        if self._use_reduce:
            tr0_cmd += " --use_reduce"
            tr1_cmd += " --use_reduce"
327 328 329
        if self._use_reader_alloc:
            tr0_cmd += " --use_reader_alloc"
            tr1_cmd += " --use_reader_alloc"
330
        if self.__use_cuda:
331 332 333 334 335 336 337 338 339 340
            tr0_cmd += " --use_cuda"
            tr1_cmd += " --use_cuda"
            env0 = {"CUDA_VISIBLE_DEVICES": "0"}
            env1 = {"CUDA_VISIBLE_DEVICES": "1"}
        else:
            env0 = {'CPU_NUM': '1'}
            env1 = {'CPU_NUM': '1'}

        env0.update(envs)
        env1.update(envs)
X
Xin Pan 已提交
341

342 343 344 345
        print("tr0_cmd:{}, env0: {}".format(tr0_cmd, env0))
        print("tr1_cmd:{}, env1: {}".format(tr1_cmd, env1))
        tr0_pipe = open("/tmp/tr0_err.log", "wb")
        tr1_pipe = open("/tmp/tr1_err.log", "wb")
G
gongweibao 已提交
346

X
Xin Pan 已提交
347
        tr0_proc = subprocess.Popen(
W
Wu Yi 已提交
348
            tr0_cmd.strip().split(" "),
X
Xin Pan 已提交
349
            stdout=subprocess.PIPE,
G
gongweibao 已提交
350
            stderr=tr0_pipe,
X
Xin Pan 已提交
351 352
            env=env0)
        tr1_proc = subprocess.Popen(
W
Wu Yi 已提交
353
            tr1_cmd.strip().split(" "),
X
Xin Pan 已提交
354
            stdout=subprocess.PIPE,
G
gongweibao 已提交
355
            stderr=tr1_pipe,
X
Xin Pan 已提交
356 357
            env=env1)

358 359 360 361
        tr0_out, tr0_err = tr0_proc.communicate()
        tr0_loss_text = cpt.to_text(tr0_out)
        tr1_out, tr1_err = tr1_proc.communicate()
        tr1_loss_text = cpt.to_text(tr1_out)
X
Xin Pan 已提交
362

G
gongweibao 已提交
363
        # close trainer file
364 365
        tr0_pipe.close()
        tr1_pipe.close()
G
gongweibao 已提交
366

367 368
        ps0_pipe.close()
        ps1_pipe.close()
T
typhoonzero 已提交
369
        # FIXME: use terminate() instead of sigkill.
X
Xin Pan 已提交
370 371
        os.kill(ps0.pid, signal.SIGKILL)
        os.kill(ps1.pid, signal.SIGKILL)
W
Wu Yi 已提交
372 373
        ps0.terminate()
        ps1.terminate()
T
typhoonzero 已提交
374

375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
        # print log
        sys.stderr.write('trainer 0 stdout:\n %s\n' % tr0_loss_text)
        sys.stderr.write('trainer 0 stderr:\n %s\n' % tr0_err)
        sys.stderr.write('trainer 1 stdout: %s\n' % tr1_loss_text)
        sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)

        tr0_losses = tr0_loss_text.split("\n")
        tr1_losses = tr1_loss_text.split("\n")

        return tr0_losses, tr1_losses

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        # TODO(typhoonzero): should auto adapt GPU count on the machine.
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
            "FLAGS_cudnn_deterministic": "1",
398
            "http_proxy": ""
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
        }

        required_envs.update(need_envs)

        if check_error_log:
            required_envs["GLOG_v"] = "7"
            required_envs["GLOG_logtostderr"] = "1"

        local_losses\
            = self._run_local(model_file, required_envs,
                                       check_error_log)
        tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs,
                                                   check_error_log)

        for step_id in range(RUN_STEP):
            local_loss = eval(local_losses[step_id])[0]
            tr0_loss = eval(tr0_losses[step_id])[0]
            tr1_loss = eval(tr1_losses[step_id])[0]
            dist_loss = (tr0_loss + tr1_loss) / 2
            print(str(local_loss) + ":" + str(dist_loss))
            self.assertAlmostEqual(local_loss, dist_loss, delta=delta)