__init__.py 13.5 KB
Newer Older
D
dongdaxiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

import sys
import os
from ..base.role_maker import MPISymetricRoleMaker
D
dongdaxiang 已提交
17 18 19 20
from .optimizer_factory import *
from google.protobuf import text_format
import paddle.fluid.optimizer as local_optimizer
import paddle.fluid as fluid
D
dongdaxiang 已提交
21 22 23 24


class Fleet(object):
    """
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
    Fleet in Python. Fleet is used in distributed training. It is designed as a singlton instance
    in c++. A Fleet() object will be initialized automatically when a user import this package as
    fleet. The General interface Fleet supports are:
    init(): which should be called only once in user's python scripts. init() will initialize
            FleetWrapper in CPP, it will also initialize a RoleMaker which is used for identifying
            current node's role, e.g. worker, server, etc.
    stop(): will be called after a user finishes his/her training task. Fleet instance will be
            destroyed when stop() is called.
    init_pserver(): will be called by user. When a user knows current process is_worker(), he/she
                    should call init_pserver() to initialize global information about parameter server
    init_worker(): will be called by user. When a user knows current process is_server(), he/she
                    should call init_worker() to initialize global information about worker and connect
                    worker with pserver.
    get_worker_num(): return the number of current task's worker node
    get_server_num(): return the number of current task's pserver node
    is_worker(): return whether current process is a worker
    is_server(): return thether current process is a server
    init_pserver_model(): initialize model parameters in pserver, called from a worker node
    save_pserver_model(): save model parameters in pserver, called from a server node

    Example:
46

47 48 49 50 51 52 53 54 55 56 57 58 59 60
        .. code-block:: python
           import paddle.fluid.incubate.fleet.parameter_server as fleet
           from my_model import bow_net
           model = bow_net()
           fleet.init()
           sgd_optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.0001)
           sgd_optimizer = fleet.DistributedOptimizer(sgd_optimizer)
           sgd_optimizer.minimize(model.loss)
           exe = paddle.fluid.Executor(paddle.fluid.CPUPlace())
           if fleet.is_worker():
              exe.run(paddle.fluid.default_startup_program())
              fleet.init_worker() # init worker should be called before training
              # do other things like training
           elif fleet.is_server():
61
              fleet.init_pserver()
62
           fleet.stop()
D
dongdaxiang 已提交
63 64 65
    """

    def __init__(self):
D
dongdaxiang 已提交
66
        self._opt_info = None  # for fleet only
D
dongdaxiang 已提交
67
        self.role_maker_ = None
68
        self.local_ip_ = 0
69
        self.is_initialized_ = False
D
dongdaxiang 已提交
70 71 72 73 74

    def init(self):
        # TODO(guru4elephant)
        # this is a temporary solution
        # we will support more configurable RoleMaker for users in the future
75 76 77
        """
        init(): which should be called only once in user's python scripts. init() will initialize
            FleetWrapper in CPP, it will also initialize a RoleMaker which is used for identifying
78
            current node's role, e.g. worker, server, etc.
79
        """
80 81
        if not self.is_initialized_:
            self.role_maker_ = MPISymetricRoleMaker()
82
            self.role_maker_._generate_role()
83 84
            self._fleet_ptr = fluid.core.Fleet()
            self.is_initialized_ = True
D
dongdaxiang 已提交
85 86

    def stop(self):
87 88 89 90
        """
        stop(): will be called after a user finishes his/her training task. Fleet instance will be
            destroyed when stop() is called.
        """
X
xjqbest 已提交
91
        self.role_maker_._barrier_worker()
92
        if self.role_maker_._is_first_worker():
D
dongdaxiang 已提交
93
            self._fleet_ptr.stop_server()
94 95 96
        self.role_maker_._barrier_worker()
        self.role_maker_._barrier_all()
        self.role_maker_._finalize()
D
dongdaxiang 已提交
97 98

    def init_pserver(self):
99 100 101 102
        """
        init_pserver(): will be called by user. When a user knows current process is_worker(), he/she
            should call init_pserver() to initialize global information about parameter server
        """
D
dongdaxiang 已提交
103 104 105 106 107 108 109 110
        if self._opt_info:
            if "fleet_desc" in self._opt_info:
                self._dist_desc_str = text_format.MessageToString(
                    self._opt_info["fleet_desc"])
                self._dist_desc = self._opt_info["fleet_desc"]
            else:
                print("You should run DistributedOptimizer.minimize() first")
                sys.exit(-1)
D
dongdaxiang 已提交
111
            self._fleet_ptr.init_server(self._dist_desc_str,
112
                                        self.role_maker_._get_rank())
D
dongdaxiang 已提交
113
            self.local_ip_ = self._fleet_ptr.run_server()
X
xjqbest 已提交
114
            # barrier_all for init_server
115 116
            self.role_maker_._barrier_all()
            self.all_ips_ = self.role_maker_._all_gather(self.local_ip_)
117

D
dongdaxiang 已提交
118
            self._fleet_ptr.gather_servers(self.all_ips_,
119
                                           self.role_maker_._get_size())
X
xjqbest 已提交
120
            # barrier_all for init_worker, wait all workers start
121
            self.role_maker_._barrier_all()
D
dongdaxiang 已提交
122 123 124 125
        else:
            print("You should run DistributedOptimizer.minimize() first")
            sys.exit(-1)

X
xujiaqi01 已提交
126
    def init_worker(self, programs):
127 128 129 130
        """
        init_worker(): will be called by user. When a user knows current process is_server(), he/she
                    should call init_worker() to initialize global information about worker and connect
                    worker with pserver.
X
xjqbest 已提交
131 132 133 134

        Args:
            programs(Program|list): a Program or a list of Programs

135
        """
X
xujiaqi01 已提交
136 137
        if not isinstance(programs, list):
            programs = [programs]
D
dongdaxiang 已提交
138 139 140 141 142 143 144 145
        if self._opt_info:
            if "fleet_desc" in self._opt_info:
                self._dist_desc_str = text_format.MessageToString(
                    self._opt_info["fleet_desc"])
                self._dist_desc = self._opt_info["fleet_desc"]
            else:
                print("You should run DistributedOptimizer.minimize() first")
                sys.exit(-1)
X
xjqbest 已提交
146 147
            # barrier_all for init_server, wait for server starts
            self.role_maker_._barrier_all()
148
            self.all_ips_ = self.role_maker_._all_gather(self.local_ip_)
149
            self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_,
150 151
                                        self.role_maker_._get_size(),
                                        self.role_maker_._get_rank())
X
xjqbest 已提交
152
            # barrier_all for init_worker
153
            self.role_maker_._barrier_all()
X
xjqbest 已提交
154 155 156 157 158 159
            # prepare for client to client communication
            info = self._fleet_ptr.get_clients_info()
            all_info = self.role_maker_._worker_gather(info[0])
            self._fleet_ptr.gather_clients(all_info)
            self._fleet_ptr.create_client2client_connection()
            # barrier for init model
160 161
            self.role_maker_._barrier_worker()
            if self.role_maker_._is_first_worker():
162
                tables = self._dist_desc.trainer_param.dense_table
X
xujiaqi01 已提交
163 164 165 166 167 168 169 170 171
                for prog in programs:
                    prog_id = str(id(prog))
                    prog_conf = self._opt_info['program_configs'][prog_id]
                    prog_tables = {}
                    for key in prog_conf:
                        if "dense" not in key:
                            continue
                        for table_id in prog_conf[key]:
                            prog_tables[int(table_id)] = 0
172
                    for table in tables:
X
xujiaqi01 已提交
173 174 175 176 177 178
                        if int(table.table_id) not in prog_tables:
                            continue
                        var_name_list = []
                        for i in range(0, len(table.dense_variable_name)):
                            var_name_list.append(table.dense_variable_name[i])
                    self._fleet_ptr.init_model(prog.desc,
179 180
                                               int(table.table_id),
                                               var_name_list)
X
xjqbest 已提交
181
            # barrier for init model done
182
            self.role_maker_._barrier_worker()
D
dongdaxiang 已提交
183 184 185 186
        else:
            print("You should run DistributedOptimizer.minimize() first")
            sys.exit(-1)

187
    def get_worker_num(self):
188 189 190
        """
        return the number of current job's worker num
        """
191
        return self.role_maker_._worker_num()
192 193

    def get_server_num(self):
194 195 196
        """
        return the number of current job's server num
        """
197
        return self.role_maker_._server_num()
198

199 200 201 202
    def get_worker_index(self):
        """
        return the mpi rank of current worker
        """
203
        return self.role_maker_._worker_index()
204

D
dongdaxiang 已提交
205
    def is_worker(self):
206 207 208
        """
        return whether current node is a worker
        """
209
        return self.role_maker_._is_worker()
D
dongdaxiang 已提交
210 211

    def is_server(self):
212 213 214
        """
        return whether current node is pserver
        """
215
        return self.role_maker_._is_server()
D
dongdaxiang 已提交
216

D
dongdaxiang 已提交
217
    def init_pserver_model(self):
218 219 220
        """
        init pserver model called from pserver
        """
221
        if self.role_maker_._is_first_worker():
D
dongdaxiang 已提交
222
            self._fleet_ptr.init_model()
223
        self.role_maker_._barrier_worker()
D
dongdaxiang 已提交
224 225

    def save_pserver_model(self, save_path):
226 227 228
        """
        save pserver model called from a worker
        """
D
dongdaxiang 已提交
229 230 231
        self._fleet_ptr.save_model(save_path)

    def _set_opt_info(self, opt_info):
232 233 234
        """
        this function saves the result from DistributedOptimizer.minimize()
        """
D
dongdaxiang 已提交
235 236 237
        self._opt_info = opt_info


D
dongdaxiang 已提交
238
class DistributedOptimizer(object):
239 240 241 242 243 244 245 246 247 248
    """
    DistributedOptimizer is a wrapper for paddle.fluid.optimizer
    A user should pass a paddle.fluid.optimizer to DistributedOptimizer
    minimize() function is implemented.
    DistributedOptimizer is the starting point for a user who wants to
    run distributed training. The optimized information will be stored in
    Fleet() instance who holds the global information about current distributed
    training.
    """

D
dongdaxiang 已提交
249 250 251 252 253 254 255 256 257 258 259
    def __init__(self, optimizer, dist_config={}):
        super(DistributedOptimizer, self).__init__()
        self._optimizer = optimizer
        self._optimizer_name = "Distributed%s" % optimizer.type.capitalize()
        if optimizer.type != "adam":
            print("Currently, distributed optimizer only supports Adam"
                  "Will config built-in adam for you."
                  "We will support more functions in DistributedOptimizer",
                  sys.stderr)
            self._optimizer_name = "DistributedAdam"

D
dongdaxiang 已提交
260
        self._distributed_optimizer = globals()[self._optimizer_name](optimizer)
D
dongdaxiang 已提交
261 262 263 264 265 266 267

    def backward(self,
                 loss,
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None,
                 callbacks=None):
268 269 270 271
        """
        Currently, backward function can not be called through DistributedOptimizer
        """
        raise NotImplementedError()
D
dongdaxiang 已提交
272 273

    def apply_gradients(self, params_grads):
274 275 276 277
        """
        Currently, apply_gradients function can not be called through DistributedOptimizer
        """
        raise NotImplementedError()
D
dongdaxiang 已提交
278 279 280 281 282 283

    def minimize(self,
                 loss,
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None):
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
        """
        minimize a program through loss, loss can be a list in DistributedOptimizer
        Args:
            loss (Variable|Variable List): loss variable or loss variable list to run optimization.
            startup_program (Program): startup_program for initializing parameters
                in `parameter_list`.
            parameter_list (list): list of Variables to update.
            no_grad_set (set|None): set of Variables should be ignored.
        Returns:
            tuple: (optimize_ops, params_grads) which are, list of operators appended;
            and list of (param, grad) Variables pair for optimization.
        Note that in parameter server mode, a worker will not get anything about optimize_os
        Because optmizer algorithms run on pserver side. We will make this usable in pserver
        process, but currently the optimization part is written into Fleet(). A user does not
        need to care about how to startup a pserver node.
        """
D
dongdaxiang 已提交
300
        optimize_ops, param_grads, opt_info = \
301
                      self._distributed_optimizer._minimize(
D
dongdaxiang 已提交
302 303 304 305 306 307
                          loss,
                          startup_program,
                          parameter_list,
                          no_grad_set)

        fleet_instance._set_opt_info(opt_info)
D
dongdaxiang 已提交
308
        return [optimize_ops, param_grads]
D
dongdaxiang 已提交
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323


# this is a temporary solution
# TODO(guru4elephant)
# will make this more flexible for more Parameter Server Archs
fleet_instance = Fleet()

init = fleet_instance.init
stop = fleet_instance.stop
init_pserver = fleet_instance.init_pserver
init_worker = fleet_instance.init_worker
is_worker = fleet_instance.is_worker
is_server = fleet_instance.is_server
init_pserver_model = fleet_instance.init_pserver_model
save_pserver_model = fleet_instance.save_pserver_model
324 325
worker_num = fleet_instance.get_worker_num
server_num = fleet_instance.get_server_num
326
worker_index = fleet_instance.get_worker_index