model_base.py 6.8 KB
Newer Older
B
Bo Zhou 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base class to define an Algorithm.
"""

18 19
import hashlib
import paddle.fluid as fluid
B
Bo Zhou 已提交
20 21 22 23 24 25 26 27 28 29
from abc import ABCMeta, abstractmethod

__all__ = ['Network', 'Model']


class Network(object):
    """
    A Network is an unordered set of LayerFuncs or Networks.
    """

30 31 32 33 34
    def sync_params_to(self,
                       target_net,
                       gpu_id=0,
                       decay=0.0,
                       share_vars_parallel_executor=None):
35 36 37 38 39 40
        """
        Args:
            target_net: Network object deepcopy from source network
            gpu_id: gpu id of target_net 
            decay: Float. The decay to use. 
                   target_net_weights = decay * target_net_weights + (1 - decay) * source_net_weights
41 42
            share_vars_parallel_executor: if not None, will use fluid.ParallelExecutor 
                                          to run program instead of fluid.Executor
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
        """
        args_hash_id = hashlib.md5('{}_{}_{}'.format(
            id(target_net), gpu_id, decay).encode('utf-8')).hexdigest()
        has_cached = False
        try:
            if self._cached_id == args_hash_id:
                has_cached = True
        except AttributeError:
            has_cached = False

        if not has_cached:
            # Can not run _cached program, need create a new program
            self._cached_id = args_hash_id

            assert not target_net is self, "cannot copy between identical networks"
            assert isinstance(target_net, Network)
            assert self.__class__.__name__ == target_net.__class__.__name__, \
                "must be the same class for para syncing!"
            assert (decay >= 0 and decay <= 1)

            # Resolve Circular Imports
            from parl.plutils import get_parameter_pairs, fetch_framework_var

            param_pairs = get_parameter_pairs(self, target_net)

            self._cached_sync_params_program = fluid.Program()

            with fluid.program_guard(self._cached_sync_params_program):
71 72 73
                for (src_var_name, target_var_name) in param_pairs:
                    src_var = fetch_framework_var(src_var_name)
                    target_var = fetch_framework_var(target_var_name)
74 75 76
                    fluid.layers.assign(
                        decay * target_var + (1 - decay) * src_var, target_var)

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
            if share_vars_parallel_executor is None:
                # use fluid.Executor
                place = fluid.CPUPlace() if gpu_id < 0 \
                        else fluid.CUDAPlace(gpu_id)
                self._cached_fluid_executor = fluid.Executor(place)
            else:
                # use fluid.ParallelExecutor
                use_cuda = True if gpu_id >= 0 else False

                # specify strategy to make ParallelExecutor run faster
                exec_strategy = fluid.ExecutionStrategy()
                exec_strategy.use_experimental_executor = True
                exec_strategy.num_threads = 4
                build_strategy = fluid.BuildStrategy()
                build_strategy.remove_unnecessary_lock = True

                with fluid.scope_guard(fluid.global_scope().new_scope()):
                    self._cached_fluid_executor = fluid.ParallelExecutor(
                        use_cuda=use_cuda,
                        main_program=self._cached_sync_params_program,
                        share_vars_from=share_vars_parallel_executor,
                        exec_strategy=exec_strategy,
                        build_strategy=build_strategy,
                    )
        if share_vars_parallel_executor is None:
            self._cached_fluid_executor.run(self._cached_sync_params_program)
        else:
            self._cached_fluid_executor.run(fetch_list=[])
105 106 107 108 109 110 111 112 113 114 115 116 117

    @property
    def parameter_names(self):
        """ param_attr names of all parameters in Network,
            only parameter created by parl.layers included

        Returns:
            list of string, param_attr names of all parameters
        """

        # Resolve Circular Imports
        from parl.plutils import get_parameter_names
        return get_parameter_names(self)
B
Bo Zhou 已提交
118 119 120 121 122 123


class Model(Network):
    """
    A Model is owned by an Algorithm. 
    It implements the entire network model(forward part) to solve a specific problem.
B
Bo Zhou 已提交
124
    In general, Model is responsible for forward and 
B
Bo Zhou 已提交
125 126
    Algorithm is responsible for backward.

H
Hongsheng Zeng 已提交
127 128 129 130
    Model can also use deepcopy way to construct target model, which has the same structure as initial model. 
    Note that only the model definition is copied here. To copy the parameters from the current model 
    to the target model, you must explicitly use sync_params_to function after the program is initialized.

B
Bo Zhou 已提交
131 132
    Here is an example:
        ```python
H
Hongsheng Zeng 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
        import parl.layers as layers
        import parl.Model as Model

        class MLPModel(Model):
            def __init__(self):
                self.fc = layers.fc(size=64)

            def policy(self, obs):
                out = self.fc(obs)
                return out
                
        model = MLPModel() 
        target_model = deepcopy(model) # automatically create new unique parameters names for target_model.fc

        # build program
        x = layers.data(name='x', shape=[100], dtype="float32")
        y1 = model.policy(x) 
        y2 = target_model.policy(x)  

        ...
        # Need initialize program before calling sync_params_to
        fluid_executor.run(fluid.default_startup_program()) 
        ...

        # synchronize parameters
        model.sync_params_to(target_model, gpu_id=gpu_id)
B
Bo Zhou 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
        ```
    """
    __metaclass__ = ABCMeta

    def __init__(self):
        super(Model, self).__init__()

    def policy(self, *args):
        """
        Implement your policy here. 
        The function was later used by algorithm 
        Return: action_dists: a dict of action distribution objects
                states
        Optional: a model might not always have to implement policy()
        """
        raise NotImplementedError()

    def value(self, *args):
        """
        Return: values: a dict of estimated values for the current observations and states
                        For example, "q_value" and "v_value"
        Optional: a model might not always have to implement value()
        """
        raise NotImplementedError()