test_dist_fleet_base.py 15.1 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
16 17 18
"""
    high level unit test for distribute fleet.
"""
19

T
tangwei12 已提交
20 21
import os
import sys
22
import subprocess
T
tangwei12 已提交
23

24 25 26
import six
import shutil
import numpy as np
27 28 29 30
import argparse
from contextlib import closing
import socket
import time
31
import tempfile
32
import unittest
T
tangwei12 已提交
33

34
import paddle
T
tangwei12 已提交
35 36
paddle.enable_static()

T
tangwei12 已提交
37
import paddle.fluid as fluid
38
import paddle.distributed.fleet.base.role_maker as role_maker
39
import paddle.distributed.fleet as fleet
40
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
T
tangwei12 已提交
41
from paddle.distributed.fleet.utils.ps_util import Distributed
T
tangwei12 已提交
42

C
Chengmo 已提交
43 44
__all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main']

T
tangwei12 已提交
45 46
RUN_STEP = 5
LEARNING_RATE = 0.01
47
DIST_UT_PORT = 0
T
tangwei12 已提交
48 49 50


class FleetDistRunnerBase(object):
51 52 53 54 55 56
    """
        run_pserver,run_trainer : after init role, using transpiler split program
        net : implment by child class, the network of model
        do training : exe run program
    """

57
    def build_role(self, args):
58

59 60
        if args.role.upper() == "PSERVER":
            role = role_maker.UserDefinedRoleMaker(
61
                is_collective=False,
62
                init_gloo=False,
63
                path=args.gloo_path,
64 65
                current_id=args.current_id,
                role=role_maker.Role.SERVER,
66
                worker_endpoints=args.trainer_endpoints.split(","),
67 68 69
                server_endpoints=args.endpoints.split(","))
        else:
            role = role_maker.UserDefinedRoleMaker(
70
                is_collective=False,
71
                init_gloo=False,
72
                path=args.gloo_path,
73 74
                current_id=args.current_id,
                role=role_maker.Role.WORKER,
75
                worker_endpoints=args.trainer_endpoints.split(","),
76
                server_endpoints=args.endpoints.split(","))
77
        self.role = role
78 79 80
        return role

    def build_strategy(self, args):
81 82 83 84
        if args.mode == "sync":
            self.strategy = paddle.distributed.fleet.DistributedStrategy()
            self.strategy.a_sync = False
        elif args.mode == "async":
85 86
            self.strategy = paddle.distributed.fleet.DistributedStrategy()
            self.strategy.a_sync = True
1
123malin 已提交
87
        elif args.mode == "geo":
88 89 90 91 92
            self.strategy = paddle.distributed.fleet.DistributedStrategy()
            self.strategy.a_sync = True
            self.strategy.a_sync_configs = {
                "k_steps": args.geo_sgd_need_push_nums
            }
93 94 95 96
        elif args.mode == "auto":
            self.strategy = paddle.distributed.fleet.DistributedStrategy()
            self.strategy.auto = True

97 98 99 100
        self.dump_param = os.getenv("dump_param", "").split(",")
        self.dump_fields = os.getenv("dump_fields", "").split(",")
        self.dump_fields_path = os.getenv("dump_fields_path", "")
        debug = int(os.getenv("Debug", "0"))
101
        # TODO(update strategy to support dump params)
102
        if False:  # debug:
103 104 105 106 107 108
            self.strategy.set_debug_opt({
                "dump_param": self.dump_param,
                "dump_fields": self.dump_fields,
                "dump_fields_path": self.dump_fields_path
            })

1
123malin 已提交
109 110
        return self.strategy

111
    def build_optimizer(self, avg_cost, strategy):
C
Chengmo 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124
        use_grad_clip = int(os.getenv('GRAD_CLIP', 0))
        if use_grad_clip:
            # 1: clip_by_value; 2: clip_by_norm; 3:clip_by_global_norm
            if use_grad_clip == 1:
                fluid.clip.set_gradient_clip(
                    clip=fluid.clip.GradientClipByValue(2.0))
            elif use_grad_clip == 2:
                fluid.clip.set_gradient_clip(
                    clip=fluid.clip.GradientClipByNorm(2.0))
            elif use_grad_clip == 3:
                fluid.clip.set_gradient_clip(
                    clip=fluid.clip.GradientClipByGlobalNorm(2.0))

125 126 127 128 129 130 131 132 133 134
        use_decay = int(os.getenv("DECAY", "0"))
        if use_decay:
            optimizer = fluid.optimizer.SGD(
                learning_rate=fluid.layers.exponential_decay(
                    learning_rate=LEARNING_RATE,
                    decay_steps=500,
                    decay_rate=0.969,
                    staircase=True))
        else:
            optimizer = fluid.optimizer.SGD(LEARNING_RATE)
135
        optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
T
tangwei12 已提交
136 137
        optimizer.minimize(avg_cost)

138
    def run_pserver(self, args):
T
tangwei12 已提交
139 140 141
        fleet.init_server()
        fleet.run_server()

1
123malin 已提交
142 143 144 145 146
    def run_dataset_trainer(self, args):
        out = self.do_dataset_training(fleet)

    def run_pyreader_trainer(self, args):
        out = self.do_pyreader_training(fleet)
T
tangwei12 已提交
147

148
    def net(self, args, batch_size=4, lr=0.01):
T
tangwei12 已提交
149 150 151
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

1
123malin 已提交
152
    def do_dataset_training(self, fleet):
T
tangwei12 已提交
153
        raise NotImplementedError(
1
123malin 已提交
154 155 156 157 158
            "do_dataset_training should be implemented by child classes.")

    def do_pyreader_training(self, fleet):
        raise NotImplementedError(
            "do_pyreader_training should be implemented by child classes.")
T
tangwei12 已提交
159

T
tangwei12 已提交
160 161 162 163
    def do_distributed_testing(self, fleet):
        raise NotImplementedError(
            "do_distributed_testing should be implemented by child classes.")

T
tangwei12 已提交
164 165

class TestFleetBase(unittest.TestCase):
166 167 168 169 170
    """
        start_pserver,start_trainer : add start cmd to test
        run_cluster : using multi process to test distribute program
    """

T
tangwei12 已提交
171 172 173
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

174 175 176 177
    def tearDown(self):
        t = time.time() - self.startTime
        print('%s: %.3f' % (self.__class__.__name__, t))

T
tangwei12 已提交
178
    def setUp(self):
179 180
        self.startTime = time.time()

1
123malin 已提交
181 182
        self._mode = "sync"
        self._reader = "pyreader"
T
tangwei12 已提交
183 184
        self._trainers = 2
        self._pservers = 2
T
tangwei12 已提交
185
        self._need_test = 0
T
tangwei12 已提交
186
        self._port_set = set()
187 188 189 190 191 192 193 194 195

        global DIST_UT_PORT
        if DIST_UT_PORT == 0 and os.getenv("PADDLE_DIST_UT_PORT"):
            DIST_UT_PORT = int(os.getenv("PADDLE_DIST_UT_PORT"))

        if DIST_UT_PORT:
            print("set begin_port:", DIST_UT_PORT)
            self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                DIST_UT_PORT, DIST_UT_PORT + 1)
196 197 198
            self._tr_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                DIST_UT_PORT + 2, DIST_UT_PORT + 3)
            DIST_UT_PORT += 4
199 200 201
        else:
            self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                self._find_free_port(), self._find_free_port())
202 203
            self._tr_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                self._find_free_port(), self._find_free_port())
204

T
tangwei12 已提交
205
        self._python_interp = sys.executable
206
        self._geo_sgd_need_push_nums = 5
C
Chengmo 已提交
207
        self._grad_clip_mode = 0
T
tangwei12 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
        self._setup_config()

    def _find_free_port(self):
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port

    def _start_pserver(self, cmd, required_envs):
        ps0_cmd, ps1_cmd = cmd.format(0), cmd.format(1)

226 227
        ps0_pipe = open(tempfile.gettempdir() + "/ps0_err.log", "wb+")
        ps1_pipe = open(tempfile.gettempdir() + "/ps1_err.log", "wb+")
T
tangwei12 已提交
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243

        ps0_proc = subprocess.Popen(
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
        ps1_proc = subprocess.Popen(
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe

    def _start_trainer(self, cmd, required_envs):
        tr0_cmd, tr1_cmd = cmd.format(0), cmd.format(1)

244 245
        tr0_pipe = open(tempfile.gettempdir() + "/tr0_err.log", "wb+")
        tr1_pipe = open(tempfile.gettempdir() + "/tr1_err.log", "wb+")
T
tangwei12 已提交
246

247 248 249
        tr0_out = open(tempfile.gettempdir() + "/tr0_stdout.log", "wb+")
        tr1_out = open(tempfile.gettempdir() + "/tr1_stdout.log", "wb+")

T
tangwei12 已提交
250 251
        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
252
            stdout=tr0_out,
T
tangwei12 已提交
253 254 255 256
            stderr=tr0_pipe,
            env=required_envs)
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
257
            stdout=tr1_out,
T
tangwei12 已提交
258 259 260 261 262 263
            stderr=tr1_pipe,
            env=required_envs)

        return tr0_proc, tr1_proc, tr0_pipe, tr1_pipe

    def _run_cluster(self, model, envs):
264
        env = {'GRAD_CLIP': str(self._grad_clip_mode)}
265
        python_path = self._python_interp
266 267
        gloo_path = tempfile.mkdtemp()

268 269 270
        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            python_path += " -m coverage run --branch -p"
271
        env.update(envs)
T
tangwei12 已提交
272

T
tangwei12 已提交
273
        tr_cmd = "{0} {1} --role trainer --endpoints {2} --trainer_endpoints {3} --current_id {{}} --trainers {4} --mode {5} --geo_sgd_need_push_nums {6} --reader {7} --gloo_path {8} --test {9}".format(
274 275
            python_path, model, self._ps_endpoints, self._tr_endpoints,
            self._trainers, self._mode, self._geo_sgd_need_push_nums,
T
tangwei12 已提交
276
            self._reader, gloo_path, self._need_test)
T
tangwei12 已提交
277

T
tangwei12 已提交
278
        ps_cmd = "{0} {1} --role pserver --endpoints {2} --trainer_endpoints {3} --current_id {{}} --trainers {4} --mode {5} --geo_sgd_need_push_nums {6} --reader {7} --gloo_path {8} --test {9}".format(
279 280
            python_path, model, self._ps_endpoints, self._tr_endpoints,
            self._trainers, self._mode, self._geo_sgd_need_push_nums,
T
tangwei12 已提交
281
            self._reader, gloo_path, self._need_test)
282

T
tangwei12 已提交
283 284 285 286 287 288 289 290 291 292
        # Run dist train to compare with local results
        ps0, ps1, ps0_pipe, ps1_pipe = self._start_pserver(ps_cmd, env)
        tr0, tr1, tr0_pipe, tr1_pipe = self._start_trainer(tr_cmd, env)

        # Wait until trainer process terminate
        while True:
            stat0 = tr0.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
293

T
tangwei12 已提交
294 295 296 297 298 299 300 301 302
        while True:
            stat1 = tr1.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

        tr0_out, tr0_err = tr0.communicate()
        tr1_out, tr1_err = tr1.communicate()

303 304
        tr0_ret = tr0.returncode
        tr1_ret = tr0.returncode
C
Chengmo 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
        if tr0_ret != 0:
            print(
                "========================Error tr0_err begin==========================="
            )
            os.system("cat {}".format(tempfile.gettempdir() + "/tr0_err.log"))
            print(
                "========================Error tr0_err end==========================="
            )

        if tr1_ret != 0:
            print(
                "========================Error tr1_err begin==========================="
            )
            os.system("cat {}".format(tempfile.gettempdir() + "/tr1_err.log"))
            print(
                "========================Error tr1_err end==========================="
            )
322

T
tangwei12 已提交
323 324 325 326 327 328 329 330 331
        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()

        ps0.terminate()
        ps1.terminate()

332
        shutil.rmtree(gloo_path)
C
Chengmo 已提交
333 334
        self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check")
        self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check")
T
tangwei12 已提交
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
        return 0, 0

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_rpc_deadline": "5000",  # 5sec to fail fast
            "http_proxy": ""
        }

        required_envs.update(need_envs)

        if check_error_log:
            required_envs["GLOG_v"] = "3"
            required_envs["GLOG_logtostderr"] = "1"

        tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)


def runtime_main(test_class):
    parser = argparse.ArgumentParser(description='Run Fleet test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
364 365 366
    parser.add_argument(
        '--trainer_endpoints', type=str, required=False, default="")
    parser.add_argument('--gloo_path', type=str, required=False, default="")
T
tangwei12 已提交
367 368
    parser.add_argument('--current_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
1
123malin 已提交
369
    parser.add_argument('--mode', type=str, required=False, default='geo')
370 371
    parser.add_argument(
        '--geo_sgd_need_push_nums', type=int, required=False, default=2)
1
123malin 已提交
372
    parser.add_argument('--reader', type=str, required=False, default='dataset')
T
tangwei12 已提交
373
    parser.add_argument('--test', type=int, required=False, default=0)
T
tangwei12 已提交
374 375 376
    args = parser.parse_args()

    model = test_class()
377 378 379 380 381
    role = model.build_role(args)
    fleet.init(role)
    strategy = model.build_strategy(args)
    avg_cost = model.net(args)
    model.build_optimizer(avg_cost, strategy)
T
tangwei12 已提交
382 383 384
    if args.role == "pserver":
        model.run_pserver(args)
    else:
1
123malin 已提交
385 386 387 388
        if args.reader == "dataset":
            model.run_dataset_trainer(args)
        else:
            model.run_pyreader_trainer(args)
T
tangwei12 已提交
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413

        if args.test:
            test_origin_program = fluid.Program()
            test_startup_program = fluid.Program()
            with fluid.program_guard(
                    main_program=test_origin_program,
                    startup_program=test_startup_program):
                with fluid.unique_name.guard():
                    avg_cost = model.net(args, is_train=False)
            send_ctx = fleet.fleet._runtime_handle._communicator.send_ctx_
            varname2tables = {}
            for gradname, ctx in send_ctx.items():
                if ctx.is_sparse:
                    param = gradname.strip("@GRAD")
                    varname2tables[param] = ctx.table_id()
                else:
                    continue
            ps_util = Distributed()
            test_main_program = ps_util.estimate(test_origin_program,
                                                 varname2tables)
            print(str(test_main_program))
            print(str(test_startup_program))
            model.do_distributed_testing(args, test_main_program,
                                         test_startup_program)
        fleet.stop_worker()