test_dist_fleet_base.py 15.4 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
16 17 18 19 20
from paddle.distributed.fleet.utils.ps_util import Distributed
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
21 22 23
"""
    high level unit test for distribute fleet.
"""
24

T
tangwei12 已提交
25 26
import os
import sys
27
import subprocess
T
tangwei12 已提交
28

29 30 31
import six
import shutil
import numpy as np
32 33 34 35
import argparse
from contextlib import closing
import socket
import time
36
import tempfile
37
import unittest
T
tangwei12 已提交
38

39
import paddle
T
tangwei12 已提交
40 41
paddle.enable_static()

C
Chengmo 已提交
42 43
__all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main']

T
tangwei12 已提交
44 45
RUN_STEP = 5
LEARNING_RATE = 0.01
46
DIST_UT_PORT = 0
T
tangwei12 已提交
47 48 49


class FleetDistRunnerBase(object):
50 51 52 53 54 55
    """
        run_pserver,run_trainer : after init role, using transpiler split program
        net : implment by child class, the network of model
        do training : exe run program
    """

56
    def build_role(self, args):
57

58 59
        if args.role.upper() == "PSERVER":
            role = role_maker.UserDefinedRoleMaker(
60
                is_collective=False,
61
                init_gloo=False,
62
                path=args.gloo_path,
63 64
                current_id=args.current_id,
                role=role_maker.Role.SERVER,
65
                worker_endpoints=args.trainer_endpoints.split(","),
66 67 68
                server_endpoints=args.endpoints.split(","))
        else:
            role = role_maker.UserDefinedRoleMaker(
69
                is_collective=False,
70
                init_gloo=False,
71
                path=args.gloo_path,
72 73
                current_id=args.current_id,
                role=role_maker.Role.WORKER,
74
                worker_endpoints=args.trainer_endpoints.split(","),
75
                server_endpoints=args.endpoints.split(","))
76
        self.role = role
77 78 79
        return role

    def build_strategy(self, args):
80 81 82 83
        if args.mode == "sync":
            self.strategy = paddle.distributed.fleet.DistributedStrategy()
            self.strategy.a_sync = False
        elif args.mode == "async":
84 85
            self.strategy = paddle.distributed.fleet.DistributedStrategy()
            self.strategy.a_sync = True
1
123malin 已提交
86
        elif args.mode == "geo":
87 88 89 90 91
            self.strategy = paddle.distributed.fleet.DistributedStrategy()
            self.strategy.a_sync = True
            self.strategy.a_sync_configs = {
                "k_steps": args.geo_sgd_need_push_nums
            }
92 93 94 95
        elif args.mode == "auto":
            self.strategy = paddle.distributed.fleet.DistributedStrategy()
            self.strategy.auto = True

96 97 98 99
        self.dump_param = os.getenv("dump_param", "").split(",")
        self.dump_fields = os.getenv("dump_fields", "").split(",")
        self.dump_fields_path = os.getenv("dump_fields_path", "")
        debug = int(os.getenv("Debug", "0"))
100
        # TODO(update strategy to support dump params)
101
        if False:  # debug:
102 103 104 105 106 107
            self.strategy.set_debug_opt({
                "dump_param": self.dump_param,
                "dump_fields": self.dump_fields,
                "dump_fields_path": self.dump_fields_path
            })

1
123malin 已提交
108 109
        return self.strategy

110
    def build_optimizer(self, avg_cost, strategy):
C
Chengmo 已提交
111 112 113 114 115 116 117 118 119 120 121 122 123
        use_grad_clip = int(os.getenv('GRAD_CLIP', 0))
        if use_grad_clip:
            # 1: clip_by_value; 2: clip_by_norm; 3:clip_by_global_norm
            if use_grad_clip == 1:
                fluid.clip.set_gradient_clip(
                    clip=fluid.clip.GradientClipByValue(2.0))
            elif use_grad_clip == 2:
                fluid.clip.set_gradient_clip(
                    clip=fluid.clip.GradientClipByNorm(2.0))
            elif use_grad_clip == 3:
                fluid.clip.set_gradient_clip(
                    clip=fluid.clip.GradientClipByGlobalNorm(2.0))

124
        use_decay = int(os.getenv("USE_DECAY", "0"))
125
        if use_decay:
126 127 128 129 130
            scheduler = paddle.optimizer.lr.ExponentialDecay(
                learning_rate=LEARNING_RATE, gamma=0.999, verbose=True)
            optimizer = fluid.optimizer.SGD(scheduler)
            """
            # learning rate decay method before 2.0
131 132 133 134 135
            optimizer = fluid.optimizer.SGD(
                learning_rate=fluid.layers.exponential_decay(
                    learning_rate=LEARNING_RATE,
                    decay_steps=500,
                    decay_rate=0.969,
136 137
                    staircase=True)) 
            """
138 139
        else:
            optimizer = fluid.optimizer.SGD(LEARNING_RATE)
140
        optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
T
tangwei12 已提交
141 142
        optimizer.minimize(avg_cost)

143
    def run_pserver(self, args):
T
tangwei12 已提交
144 145 146
        fleet.init_server()
        fleet.run_server()

1
123malin 已提交
147 148 149 150 151
    def run_dataset_trainer(self, args):
        out = self.do_dataset_training(fleet)

    def run_pyreader_trainer(self, args):
        out = self.do_pyreader_training(fleet)
T
tangwei12 已提交
152

153
    def net(self, args, batch_size=4, lr=0.01):
T
tangwei12 已提交
154 155 156
        raise NotImplementedError(
            "get_model should be implemented by child classes.")

1
123malin 已提交
157
    def do_dataset_training(self, fleet):
T
tangwei12 已提交
158
        raise NotImplementedError(
1
123malin 已提交
159 160 161 162 163
            "do_dataset_training should be implemented by child classes.")

    def do_pyreader_training(self, fleet):
        raise NotImplementedError(
            "do_pyreader_training should be implemented by child classes.")
T
tangwei12 已提交
164

T
tangwei12 已提交
165 166 167 168
    def do_distributed_testing(self, fleet):
        raise NotImplementedError(
            "do_distributed_testing should be implemented by child classes.")

T
tangwei12 已提交
169 170

class TestFleetBase(unittest.TestCase):
171 172 173 174 175
    """
        start_pserver,start_trainer : add start cmd to test
        run_cluster : using multi process to test distribute program
    """

T
tangwei12 已提交
176 177 178
    def _setup_config(self):
        raise NotImplementedError("tests should have _setup_config implemented")

179 180 181 182
    def tearDown(self):
        t = time.time() - self.startTime
        print('%s: %.3f' % (self.__class__.__name__, t))

T
tangwei12 已提交
183
    def setUp(self):
184 185
        self.startTime = time.time()

1
123malin 已提交
186 187
        self._mode = "sync"
        self._reader = "pyreader"
T
tangwei12 已提交
188 189
        self._trainers = 2
        self._pservers = 2
T
tangwei12 已提交
190
        self._need_test = 0
T
tangwei12 已提交
191
        self._port_set = set()
192 193 194 195 196 197 198 199 200

        global DIST_UT_PORT
        if DIST_UT_PORT == 0 and os.getenv("PADDLE_DIST_UT_PORT"):
            DIST_UT_PORT = int(os.getenv("PADDLE_DIST_UT_PORT"))

        if DIST_UT_PORT:
            print("set begin_port:", DIST_UT_PORT)
            self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                DIST_UT_PORT, DIST_UT_PORT + 1)
201 202 203
            self._tr_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                DIST_UT_PORT + 2, DIST_UT_PORT + 3)
            DIST_UT_PORT += 4
204 205 206
        else:
            self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                self._find_free_port(), self._find_free_port())
207 208
            self._tr_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
                self._find_free_port(), self._find_free_port())
209

T
tangwei12 已提交
210
        self._python_interp = sys.executable
211
        self._geo_sgd_need_push_nums = 5
C
Chengmo 已提交
212
        self._grad_clip_mode = 0
T
tangwei12 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
        self._setup_config()

    def _find_free_port(self):
        def __free_port():
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as s:
                s.bind(('', 0))
                return s.getsockname()[1]

        while True:
            port = __free_port()
            if port not in self._port_set:
                self._port_set.add(port)
                return port

    def _start_pserver(self, cmd, required_envs):
        ps0_cmd, ps1_cmd = cmd.format(0), cmd.format(1)

231 232
        ps0_pipe = open(tempfile.gettempdir() + "/ps0_err.log", "wb+")
        ps1_pipe = open(tempfile.gettempdir() + "/ps1_err.log", "wb+")
T
tangwei12 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248

        ps0_proc = subprocess.Popen(
            ps0_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps0_pipe,
            env=required_envs)
        ps1_proc = subprocess.Popen(
            ps1_cmd.strip().split(" "),
            stdout=subprocess.PIPE,
            stderr=ps1_pipe,
            env=required_envs)
        return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe

    def _start_trainer(self, cmd, required_envs):
        tr0_cmd, tr1_cmd = cmd.format(0), cmd.format(1)

249 250
        tr0_pipe = open(tempfile.gettempdir() + "/tr0_err.log", "wb+")
        tr1_pipe = open(tempfile.gettempdir() + "/tr1_err.log", "wb+")
T
tangwei12 已提交
251

252 253 254
        tr0_out = open(tempfile.gettempdir() + "/tr0_stdout.log", "wb+")
        tr1_out = open(tempfile.gettempdir() + "/tr1_stdout.log", "wb+")

T
tangwei12 已提交
255 256
        tr0_proc = subprocess.Popen(
            tr0_cmd.strip().split(" "),
257
            stdout=tr0_out,
T
tangwei12 已提交
258 259 260 261
            stderr=tr0_pipe,
            env=required_envs)
        tr1_proc = subprocess.Popen(
            tr1_cmd.strip().split(" "),
262
            stdout=tr1_out,
T
tangwei12 已提交
263 264 265 266 267 268
            stderr=tr1_pipe,
            env=required_envs)

        return tr0_proc, tr1_proc, tr0_pipe, tr1_pipe

    def _run_cluster(self, model, envs):
269
        env = {'GRAD_CLIP': str(self._grad_clip_mode)}
270
        python_path = self._python_interp
271 272
        gloo_path = tempfile.mkdtemp()

273 274 275
        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
            python_path += " -m coverage run --branch -p"
276
        env.update(envs)
T
tangwei12 已提交
277

T
tangwei12 已提交
278
        tr_cmd = "{0} {1} --role trainer --endpoints {2} --trainer_endpoints {3} --current_id {{}} --trainers {4} --mode {5} --geo_sgd_need_push_nums {6} --reader {7} --gloo_path {8} --test {9}".format(
279 280
            python_path, model, self._ps_endpoints, self._tr_endpoints,
            self._trainers, self._mode, self._geo_sgd_need_push_nums,
T
tangwei12 已提交
281
            self._reader, gloo_path, self._need_test)
T
tangwei12 已提交
282

T
tangwei12 已提交
283
        ps_cmd = "{0} {1} --role pserver --endpoints {2} --trainer_endpoints {3} --current_id {{}} --trainers {4} --mode {5} --geo_sgd_need_push_nums {6} --reader {7} --gloo_path {8} --test {9}".format(
284 285
            python_path, model, self._ps_endpoints, self._tr_endpoints,
            self._trainers, self._mode, self._geo_sgd_need_push_nums,
T
tangwei12 已提交
286
            self._reader, gloo_path, self._need_test)
287

T
tangwei12 已提交
288 289 290 291 292 293 294 295 296 297
        # Run dist train to compare with local results
        ps0, ps1, ps0_pipe, ps1_pipe = self._start_pserver(ps_cmd, env)
        tr0, tr1, tr0_pipe, tr1_pipe = self._start_trainer(tr_cmd, env)

        # Wait until trainer process terminate
        while True:
            stat0 = tr0.poll()
            time.sleep(0.1)
            if stat0 is not None:
                break
298

T
tangwei12 已提交
299 300 301 302 303 304 305 306 307
        while True:
            stat1 = tr1.poll()
            time.sleep(0.1)
            if stat1 is not None:
                break

        tr0_out, tr0_err = tr0.communicate()
        tr1_out, tr1_err = tr1.communicate()

308 309
        tr0_ret = tr0.returncode
        tr1_ret = tr0.returncode
C
Chengmo 已提交
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
        if tr0_ret != 0:
            print(
                "========================Error tr0_err begin==========================="
            )
            os.system("cat {}".format(tempfile.gettempdir() + "/tr0_err.log"))
            print(
                "========================Error tr0_err end==========================="
            )

        if tr1_ret != 0:
            print(
                "========================Error tr1_err begin==========================="
            )
            os.system("cat {}".format(tempfile.gettempdir() + "/tr1_err.log"))
            print(
                "========================Error tr1_err end==========================="
            )
327

T
tangwei12 已提交
328 329 330 331 332 333 334 335 336
        # close trainer file
        tr0_pipe.close()
        tr1_pipe.close()
        ps0_pipe.close()
        ps1_pipe.close()

        ps0.terminate()
        ps1.terminate()

337
        shutil.rmtree(gloo_path)
C
Chengmo 已提交
338 339
        self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check")
        self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check")
T
tangwei12 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
        return 0, 0

    def check_with_place(self,
                         model_file,
                         delta=1e-3,
                         check_error_log=False,
                         need_envs={}):
        required_envs = {
            "PATH": os.getenv("PATH", ""),
            "PYTHONPATH": os.getenv("PYTHONPATH", ""),
            "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
            "FLAGS_rpc_deadline": "5000",  # 5sec to fail fast
            "http_proxy": ""
        }

        required_envs.update(need_envs)

        if check_error_log:
            required_envs["GLOG_v"] = "3"
            required_envs["GLOG_logtostderr"] = "1"

        tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)


def runtime_main(test_class):
    parser = argparse.ArgumentParser(description='Run Fleet test.')
    parser.add_argument(
        '--role', type=str, required=True, choices=['pserver', 'trainer'])
    parser.add_argument('--endpoints', type=str, required=False, default="")
369 370 371
    parser.add_argument(
        '--trainer_endpoints', type=str, required=False, default="")
    parser.add_argument('--gloo_path', type=str, required=False, default="")
T
tangwei12 已提交
372 373
    parser.add_argument('--current_id', type=int, required=False, default=0)
    parser.add_argument('--trainers', type=int, required=False, default=1)
1
123malin 已提交
374
    parser.add_argument('--mode', type=str, required=False, default='geo')
375 376
    parser.add_argument(
        '--geo_sgd_need_push_nums', type=int, required=False, default=2)
1
123malin 已提交
377
    parser.add_argument('--reader', type=str, required=False, default='dataset')
T
tangwei12 已提交
378
    parser.add_argument('--test', type=int, required=False, default=0)
T
tangwei12 已提交
379 380 381
    args = parser.parse_args()

    model = test_class()
382 383 384 385 386
    role = model.build_role(args)
    fleet.init(role)
    strategy = model.build_strategy(args)
    avg_cost = model.net(args)
    model.build_optimizer(avg_cost, strategy)
T
tangwei12 已提交
387 388 389
    if args.role == "pserver":
        model.run_pserver(args)
    else:
1
123malin 已提交
390 391 392 393
        if args.reader == "dataset":
            model.run_dataset_trainer(args)
        else:
            model.run_pyreader_trainer(args)
T
tangwei12 已提交
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418

        if args.test:
            test_origin_program = fluid.Program()
            test_startup_program = fluid.Program()
            with fluid.program_guard(
                    main_program=test_origin_program,
                    startup_program=test_startup_program):
                with fluid.unique_name.guard():
                    avg_cost = model.net(args, is_train=False)
            send_ctx = fleet.fleet._runtime_handle._communicator.send_ctx_
            varname2tables = {}
            for gradname, ctx in send_ctx.items():
                if ctx.is_sparse:
                    param = gradname.strip("@GRAD")
                    varname2tables[param] = ctx.table_id()
                else:
                    continue
            ps_util = Distributed()
            test_main_program = ps_util.estimate(test_origin_program,
                                                 varname2tables)
            print(str(test_main_program))
            print(str(test_startup_program))
            model.do_distributed_testing(args, test_main_program,
                                         test_startup_program)
        fleet.stop_worker()