model.py 20.2 KB
Newer Older
Y
Yang Zhang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Y
Yang Zhang 已提交
15 16 17
from __future__ import absolute_import

import inspect
Y
Yang Zhang 已提交
18 19
import os
import pickle
Y
Yang Zhang 已提交
20 21
import numpy as np

L
LielinJiang 已提交
22
from collections import OrderedDict
Y
Yang Zhang 已提交
23
from paddle import fluid
Y
Yang Zhang 已提交
24 25 26
from paddle.fluid.framework import in_dygraph_mode, Variable
from paddle.fluid.executor import global_scope
from paddle.fluid.io import is_belong_to_optimizer
Y
Yang Zhang 已提交
27
from paddle.fluid.dygraph.base import to_variable
L
LielinJiang 已提交
28 29 30 31
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
import paddle.fluid.incubate.fleet.base.role_maker as role_maker

import distributed
Y
Yang Zhang 已提交
32

Y
Yang Zhang 已提交
33
__all__ = ['shape_hints', 'Model', 'Loss', 'CrossEntropy']
Y
Yang Zhang 已提交
34 35 36 37 38 39 40 41


def to_list(value):
    if isinstance(value, (list, tuple)):
        return value
    return [value]


42 43 44 45 46 47 48 49
def to_numpy(var):
    assert isinstance(var, (Variable, fluid.core.VarBase)), "not a variable"
    if isinstance(var, fluid.core.VarBase):
        return var.numpy()
    t = global_scope().find_var(var.name).get_tensor()
    return np.array(t)


Y
Yang Zhang 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
def extract_args(func):
    if hasattr(inspect, 'getfullargspec'):
        return inspect.getfullargspec(func)[0]
    else:
        return inspect.getargspec(func)[0]


def shape_hints(**hints):
    assert hints, "hints can not be empty"
    assert all(isinstance(h, (list, tuple)) for h in hints.values()), \
        "shape hint must be a list or tuple"

    def wrapper(func):
        args = extract_args(func)
        invalid = set(hints.keys()) - set(args)
        assert not invalid, \
            "shape hint for arguments that are not present in forward method" \
            + ": ({})".format(", ".join(invalid))
        func.shape_hints = hints
        return func
    return wrapper


Y
Yang Zhang 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
class Loss(object):
    def __init__(self, average=True):
        super(Loss, self).__init__()
        self.average = average

    def infer_shape(self, outputs):
        return [o.shape for o in outputs]

    def infer_dtype(self, outputs):
        return [o.dtype for o in outputs]

    def forward(self, outputs, labels):
        raise NotImplementedError()

    def __call__(self, outputs, labels):
        labels = to_list(labels)
        if in_dygraph_mode():
            labels = [to_variable(l) for l in labels]
        losses = to_list(self.forward(to_list(outputs), labels))
        if not self.average:
            return losses
        return [fluid.layers.reduce_mean(l) for l in losses]


class CrossEntropy(Loss):
    def __init__(self):
        super(CrossEntropy, self).__init__()

    def infer_shape(self, outputs):
        return [o.shape[:-1] + (1, ) for o in outputs]

    def infer_dtype(self, outputs):
        return ['int64' for _ in outputs]

    def forward(self, outputs, labels):
        return [fluid.layers.cross_entropy(o, l) for o, l in zip(
            outputs, labels)]


Y
Yang Zhang 已提交
112 113 114 115 116 117 118
class StaticGraphAdapter(object):
    def __init__(self, model):
        super(StaticGraphAdapter, self).__init__()
        self.model = model
        # with `_build_once` gone, parameters are now created in `__init__`
        # so we need to keep track of the parameters already created
        self._startup_prog = fluid.default_startup_program()
119
        self._orig_prog = fluid.default_main_program()
Y
Yang Zhang 已提交
120

121
        self._label_vars = {}  # label variables
Y
Yang Zhang 已提交
122 123 124 125 126 127
        self._endpoints = {}
        self._loss_endpoint = None
        self._executor = None
        self._progs = {}
        self._compiled_progs = {}

128 129
        self._lazy_load_optimizer = None

L
LielinJiang 已提交
130 131 132
        self._nranks = distributed.Env().nranks
        self._local_rank = distributed.Env().local_rank

Y
Yang Zhang 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
        # parse shape hints
        self._input_desc = OrderedDict([
            (n, None) for n in extract_args(self.model.forward) if n != 'self'
        ])
        if hasattr(self.model.forward, 'shape_hints'):
            self._input_desc.update(self.model.forward.shape_hints)

    @property
    def mode(self):
        return self.model.mode

    @mode.setter
    def mode(self, value):
        self.model.mode = value

    def train(self, inputs, labels, device='CPU', device_ids=None):
Y
Yang Zhang 已提交
149
        assert self.model._optimizer and self.model._loss_function, \
Y
Yang Zhang 已提交
150 151 152 153 154
            "model not ready, please call `model.prepare()` first"
        self.mode = 'train'
        return self._run(inputs, labels, device, device_ids)

    def eval(self, inputs, labels, device='CPU', device_ids=None):
Y
Yang Zhang 已提交
155
        assert self.model._loss_function, \
Y
Yang Zhang 已提交
156 157 158 159 160 161 162 163
            "model not ready, please call `model.prepare()` first"
        self.mode = 'eval'
        return self._run(inputs, labels, device, device_ids)

    def test(self, inputs, device='CPU', device_ids=None):
        self.mode = 'test'
        return self._run(inputs, None, device, device_ids)

164 165 166
    def parameters(self, *args, **kwargs):
        return None

Y
Yang Zhang 已提交
167
    def save(self, path):
Y
Yang Zhang 已提交
168 169 170
        def _save(state, path):
            if not state:
                return
171 172
            state = {k: to_numpy(v) if isinstance(v, Variable) else v
                     for k, v in state.items()}
Y
Yang Zhang 已提交
173 174 175 176 177 178 179
            with open(path, 'wb') as f:
                pickle.dump(state, f)

        base = os.path.basename(path)
        assert base != "", "path should be of 'dirname/filename' format"
        param_path = path + ".pdparams"
        _save(self.model.state_dict(), param_path)
Y
Yang Zhang 已提交
180 181
        prog = self._progs.get('train', None)
        if prog is None or self.model._optimizer is None:
Y
Yang Zhang 已提交
182 183 184 185 186
            return
        # XXX `optimizer.state_dict()` only work in dygraph mode
        optim_path = path + ".pdopt"
        optim = {p.name: p for p in filter(
            is_belong_to_optimizer, prog.list_vars())}
187 188
        if not optim:
            return
Y
Yang Zhang 已提交
189 190 191 192
        # HACK this is contrived, optimizer state is not the same for
        # static/dynamic graph mode
        optim['__static_graph_only__'] = True
        _save(optim, optim_path)
Y
Yang Zhang 已提交
193 194

    def load(self, path):
Y
Yang Zhang 已提交
195 196 197 198 199 200 201
        def _load(path):
            if not os.path.exists(path):
                return
            with open(path, 'rb') as f:
                return pickle.load(f)

        param_path = path + ".pdparams"
Y
Yang Zhang 已提交
202 203 204 205 206 207 208 209 210 211 212
        param_state = _load(param_path)
        assert param_state, "failed to load parameters, please check path"

        if self._executor is None:
            executor = fluid.Executor(fluid.CPUPlace())._default_executor
        else:
            executor = self._executor._default_executor

        fluid.core._create_loaded_parameter(
            list(self.model.state_dict().values()), global_scope(), executor)

Y
Yang Zhang 已提交
213
        for key, var in self.model.state_dict().items():
Y
Yang Zhang 已提交
214
            assert key in param_state, \
Y
Yang Zhang 已提交
215 216
                "parameter [{}] is not found in model file [{}]".format(
                    key, param_path)
217
            self._set_var(var, param_state[key])
Y
Yang Zhang 已提交
218 219 220 221 222 223 224 225 226 227 228

        # FIXME what if a different optimizer is used?
        if not self.model._optimizer:
            return
        optim_path = path + ".pdopt"
        optim_state = _load(optim_path)
        if optim_state is None:
            return
        assert '__static_graph_only__' in optim_state, \
            "optimizer saved in dygraph mode is not usable in static graph"

229 230 231 232 233 234 235 236 237 238 239
        if self._executor is not None:
           self._load_optimizer(optim_state)
        else:
           self._lazy_load_optimizer = optim_state

    def _load_optimizer(self, state):
        prog = self._progs.get('train', None)
        optim = list(filter(is_belong_to_optimizer, prog.list_vars()))
        if not optim:
            return

Y
Yang Zhang 已提交
240
        fluid.core._create_loaded_parameter(
241
            optim, global_scope(), self._executor._default_executor)
Y
Yang Zhang 已提交
242 243

        for var in optim:
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
            assert var.name in state, \
                "variable [{}] is not in optimizer state file".format(var.name)
            self._set_var(var, state[var.name])

    def _set_var(self, var, ndarray):
        t = global_scope().find_var(var.name).get_tensor()
        p = t._place()
        if p.is_cpu_place():
            place = fluid.CPUPlace()
        elif p.is_cuda_pinned_place():
            place = fluid.CUDAPinnedPlace()
        else:
            p = fluid.core.Place()
            p.set_place(t._place())
            place = fluid.CUDAPlace(p.gpu_device_id())

        t.set(ndarray, place)
Y
Yang Zhang 已提交
261 262 263 264 265 266 267 268

    def _run(self, inputs, labels=None, device='CPU', device_ids=None):
        inputs = to_list(inputs)
        if labels is not None:
            labels = to_list(labels)
        assert len(inputs) == len(self._input_desc), "number of inputs" \
            + " does not match number of arguments of `forward` method"

Y
Yang Zhang 已提交
269 270
        if self._progs.get(self.mode, None) is None:
            self._make_program(self._infer_input_vars(inputs))
Y
Yang Zhang 已提交
271

272 273
        compiled_prog = self._compile_and_initialize(
            self._progs[self.mode], device, device_ids)
Y
Yang Zhang 已提交
274 275 276 277 278 279 280 281

        feed = {}
        input_names = [name for name in self._input_desc.keys()]
        for idx, n in enumerate(input_names):
            # train and test may take different arguments
            if inputs[idx] is not None:
                feed[n] = inputs[idx]
        if labels is not None:
282
            for idx, v in enumerate(self._label_vars[self.mode]):
Y
Yang Zhang 已提交
283 284
                feed[v.name] = labels[idx]

285 286 287
        endpoints = self._endpoints[self.mode]
        fetch_list = endpoints['output'] + endpoints['loss']
        num_output = len(endpoints['output'])
L
LielinJiang 已提交
288 289
        if self.mode != 'test':
            fetch_list += endpoints['label']
290
        out = self._executor.run(
Y
Yang Zhang 已提交
291
            compiled_prog, feed=feed,
292 293 294 295
            fetch_list=fetch_list)
        if self.mode == 'test':
            return out[:num_output]
        else:
L
LielinJiang 已提交
296
            return out[:num_output], out[num_output:-1], out[-1:]
Y
Yang Zhang 已提交
297 298

    def _make_program(self, inputs):
299
        prog = self._orig_prog.clone()
300
        if self.mode == 'train' and self.model._optimizer._learning_rate_map:
301 302 303
            # HACK workaround learning rate map issue
            lr_var = self.model._optimizer._learning_rate_map[self._orig_prog]
            self.model._optimizer._learning_rate_map[prog] = lr_var
L
LielinJiang 已提交
304
                
305
        losses = []
Y
Yang Zhang 已提交
306 307 308
        with fluid.program_guard(prog, self._startup_prog):
            outputs = to_list(self.model.forward(*inputs))
            if self.mode != 'test':
Y
Yang Zhang 已提交
309
                label_vars = self._infer_label_vars(outputs)
310
                self._label_vars[self.mode] = label_vars
Y
Yang Zhang 已提交
311
                losses = self.model._loss_function(outputs, label_vars)
Y
Yang Zhang 已提交
312 313
                if self.mode == 'train':
                    self._loss_endpoint = fluid.layers.sum(losses)
L
LielinJiang 已提交
314 315 316 317 318 319 320 321
                    if self._nranks > 1:
                        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
                        fleet.init(role)
                        dist_strategy = DistributedStrategy()
                        dist_strategy.mode = "collective"
                        dist_strategy.collective_mode = "grad_allreduce"
                        self.model._optimizer = fleet.distributed_optimizer(self.model._optimizer, strategy=dist_strategy)
                        
Y
Yang Zhang 已提交
322
                    self.model._optimizer.minimize(self._loss_endpoint)
L
LielinJiang 已提交
323 324 325 326 327
            if self.mode != 'train':
                outputs = [distributed._all_gather(o, self._nranks) for o in outputs]
                if self.mode != 'test':
                    label_vars = [distributed._all_gather(l, self._nranks) for l in label_vars]   
                     
328 329
        if self.mode != 'train':  # clone again to put it in test mode
            prog = prog.clone(for_test=True)
Y
Yang Zhang 已提交
330
        self._progs[self.mode] = prog
331 332
        self._endpoints[self.mode] = {
            "output": outputs,
L
LielinJiang 已提交
333 334
            "loss": losses,
            "label": label_vars
335
        }
Y
Yang Zhang 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350

    def _infer_input_vars(self, inputs):
        input_vars = []
        for idx, i in enumerate(inputs):
            if i is None:  # train and test may take different arguments
                input_vars.append(None)
                continue
            ndarray = np.array(i)
            name = list(self._input_desc.keys())[idx]
            shape = list(self._input_desc.values())[idx]
            if shape is None:
                shape = (None, ) + ndarray.shape[1:]
            input_vars.append(fluid.data(name, shape, ndarray.dtype))
        return input_vars

Y
Yang Zhang 已提交
351 352 353 354 355 356 357 358
    def _infer_label_vars(self, outputs):
        shapes = self.model._loss_function.infer_shape(outputs)
        dtypes = self.model._loss_function.infer_dtype(outputs)
        label_vars = []
        for idx, (shape, dtype) in enumerate(zip(shapes, dtypes)):
            name = '__label{}'.format(idx)
            label_vars.append(fluid.data(name, shape, dtype))
        return label_vars
Y
Yang Zhang 已提交
359 360

    def _compile_and_initialize(self, prog, device='CPU', device_ids=None):
361 362 363
        compiled_prog = self._compiled_progs.get(self.mode, None)
        if compiled_prog is not None:
            return compiled_prog
Y
Yang Zhang 已提交
364

365 366
        places = [device.lower() == 'gpu' and fluid.CUDAPlace(i)
                  or fluid.CPUPlace() for i in device_ids]
Y
Yang Zhang 已提交
367

Y
Yang Zhang 已提交
368 369 370
        # XXX *ALL WEIGHTS* should be initialized upon model construction
        # even if `forward()` may run different code path for different mode
        # therefore startup program only needs to run once
Y
Yang Zhang 已提交
371
        if self._executor is None:
L
LielinJiang 已提交
372 373 374 375 376 377
            if self._nranks > 1 and device.lower() == 'gpu':
                gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
                place = fluid.CUDAPlace(gpu_id) if device.lower() == 'gpu' else fluid.CPUPlace()
            else:
                place = places[0]
            self._executor = fluid.Executor(place)
Y
Yang Zhang 已提交
378 379 380 381 382 383 384 385 386 387
            # XXX incremental initialization
            uninitialized = []
            for var_py in self._startup_prog.list_vars():
                var = fluid.global_scope().find_var(var_py.name)
                if var and var.get_tensor()._is_initialized():
                    continue
                uninitialized.append(var_py)
            if uninitialized:
                startup_prog = self._startup_prog._prune(uninitialized)
                self._executor.run(startup_prog)
Y
Yang Zhang 已提交
388

389 390 391 392
            if self.mode == 'train' and self._lazy_load_optimizer:
                self._load_optimizer(self._lazy_load_optimizer)
                self._lazy_load_optimizer = None

L
LielinJiang 已提交
393 394 395 396
        if self._nranks < 2:
            compiled_prog = fluid.CompiledProgram(prog)
        else:
            compiled_prog = prog#fleet.main_program
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
        if len(device_ids) > 1:
            loss_name = None
            if self.mode == 'train' and self._loss_endpoint is not None:
                loss_name = self._loss_endpoint.name

            share_vars_from = None
            if self.mode == 'eval' and 'train' in self._compiled_progs:
                share_vars_from = self._compiled_progs['train']
            # HACK invalidate eval program if is compiled before train program
            # quite hackish, OTOH, it is generally uncommon that the eval
            # program will be run before the train program
            if self.mode == 'train' and 'eval' in self._compiled_progs:
                del self._compiled_progs['eval']

            compiled_prog = compiled_prog.with_data_parallel(
                loss_name=loss_name, places=places,
                share_vars_from=share_vars_from)

415
        self._compiled_progs[self.mode] = compiled_prog
Y
Yang Zhang 已提交
416 417 418 419 420 421 422
        return compiled_prog


class DynamicGraphAdapter(object):
    def __init__(self, model):
        super(DynamicGraphAdapter, self).__init__()
        self.model = model
L
LielinJiang 已提交
423 424 425 426 427
        self._nranks = distributed.Env().nranks
        self._local_rank = distributed.Env().local_rank

        if self._nranks > 1:
            self.ddp_model = distributed.DistributedDataParallel(self.model)
Y
Yang Zhang 已提交
428 429 430 431 432 433 434 435 436

    @property
    def mode(self):
        return self.model.mode

    @mode.setter
    def mode(self, value):
        self.model.mode = value

437
    # TODO multi device in dygraph mode not implemented at present time
Y
Yang Zhang 已提交
438
    def train(self, inputs, labels, device='CPU', device_ids=None):
Y
Yang Zhang 已提交
439
        assert self.model._optimizer and self.model._loss_function, \
Y
Yang Zhang 已提交
440 441 442 443 444
            "model not ready, please call `model.prepare()` first"
        super(Model, self.model).train()
        self.mode = 'train'
        inputs = to_list(inputs)
        labels = to_list(labels)
L
LielinJiang 已提交
445 446 447 448 449 450 451 452 453 454 455 456
        if self._nranks > 1:
            outputs = self.ddp_model.forward(*[to_variable(x) for x in inputs])
            losses = self.model._loss_function(outputs, labels)
            final_loss = fluid.layers.sum(losses)
            final_loss = self.ddp_model.scale_loss(final_loss)
            final_loss.backward()
            self.ddp_model.apply_collective_grads()
        else:
            outputs = self.model.forward(*[to_variable(x) for x in inputs])
            losses = self.model._loss_function(outputs, labels)
            final_loss = fluid.layers.sum(losses)
            final_loss.backward()
Y
Yang Zhang 已提交
457 458
        self.model._optimizer.minimize(final_loss)
        self.model.clear_gradients()
459
        return [to_numpy(o) for o in to_list(outputs)], \
L
LielinJiang 已提交
460
            [to_numpy(l) for l in losses], [l for l in labels]
Y
Yang Zhang 已提交
461 462

    def eval(self, inputs, labels, device='CPU', device_ids=None):
Y
Yang Zhang 已提交
463
        assert self.model._loss_function, \
Y
Yang Zhang 已提交
464
            "model not ready, please call `model.prepare()` first"
465
        super(Model, self.model).eval()
Y
Yang Zhang 已提交
466 467 468
        self.mode = 'eval'
        inputs = to_list(inputs)
        labels = to_list(labels)
L
LielinJiang 已提交
469
        labels = [to_variable(l) for l in labels]
Y
Yang Zhang 已提交
470
        outputs = self.model.forward(*[to_variable(x) for x in inputs])
Y
Yang Zhang 已提交
471
        losses = self.model._loss_function(outputs, labels)
L
LielinJiang 已提交
472 473 474
        if self._nranks > 1:
            outputs = [distributed._all_gather(o, self._nranks) for o in to_list(outputs)]
            labels = [distributed._all_gather(l, self._nranks) for l in labels]
475
        return [to_numpy(o) for o in to_list(outputs)], \
L
LielinJiang 已提交
476
            [to_numpy(l) for l in losses], [to_numpy(l) for l in labels]
Y
Yang Zhang 已提交
477 478

    def test(self, inputs, device='CPU', device_ids=None):
479
        super(Model, self.model).eval()
Y
Yang Zhang 已提交
480
        self.mode = 'test'
481 482
        inputs = [to_variable(x) for x in to_list(inputs)]
        outputs = self.model.forward(*inputs)
L
LielinJiang 已提交
483 484
        if self._nranks > 2:
            outputs = [distributed._all_gather(o, self._nranks) for o in to_list(outputs)]
485
        return [to_numpy(o) for o in to_list(outputs)]
Y
Yang Zhang 已提交
486

487 488 489
    def parameters(self, *args, **kwargs):
        return super(Model, self.model).parameters(*args, **kwargs)

Y
Yang Zhang 已提交
490 491 492 493 494 495 496 497 498 499 500 501
    def save(self, path):
        params = self.model.state_dict()
        fluid.save_dygraph(params, path)
        if self.model._optimizer is None:
            return
        if self.model._optimizer.state_dict():
            optim = self.model._optimizer.state_dict()
            fluid.save_dygraph(optim, path)

    def load(self, path):
        params, optim = fluid.load_dygraph(path)
        self.model.set_dict(params)
Y
Yang Zhang 已提交
502
        if self.model._optimizer is None or optim is None:
Y
Yang Zhang 已提交
503 504 505 506 507 508 509 510
            return
        self.model._optimizer.set_dict(optim)


class Model(fluid.dygraph.Layer):
    def __init__(self):
        super(Model, self).__init__(self.__class__.__name__)
        self.mode = 'train'
Y
Yang Zhang 已提交
511
        self._loss_function = None
Y
Yang Zhang 已提交
512
        self._loss_weights = None
Y
Yang Zhang 已提交
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528
        self._optimizer = None
        if in_dygraph_mode():
            self._adapter = DynamicGraphAdapter(self)
        else:
            self._adapter = StaticGraphAdapter(self)

    def train(self, *args, **kwargs):
        return self._adapter.train(*args, **kwargs)

    def eval(self, *args, **kwargs):
        return self._adapter.eval(*args, **kwargs)

    def test(self, *args, **kwargs):
        return self._adapter.test(*args, **kwargs)

    def save(self, *args, **kwargs):
L
LielinJiang 已提交
529 530
        if distributed.get_local_rank() == 0:
            return self._adapter.save(*args, **kwargs)
Y
Yang Zhang 已提交
531 532 533 534

    def load(self, *args, **kwargs):
        return self._adapter.load(*args, **kwargs)

Y
Yang Zhang 已提交
535
    def prepare(self, optimizer, loss_function):
Y
Yang Zhang 已提交
536
        self._optimizer = optimizer
Y
Yang Zhang 已提交
537 538 539
        assert isinstance(loss_function, Loss), \
            "'loss_function' must be sub classes of 'Loss'"
        self._loss_function = loss_function
540 541 542

    def parameters(self, *args, **kwargs):
        return self._adapter.parameters(*args, **kwargs)