kagle_model.py 8.9 KB
Newer Older
X
xiexionghang 已提交
1 2 3 4
"""
Model Net: analyse layer config, and parse to Paddle Pragram
"""
import abc
X
xiexionghang 已提交
5 6
import copy
import yaml
X
xiexionghang 已提交
7 8
import kagle.kagle_layer
import kagle.kagle_table
X
xiexionghang 已提交
9 10 11 12
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet

def create(config):
X
xiexionghang 已提交
13 14 15 16 17 18 19
    """
    Create a model instance by config
    Args:
        config(dict) : desc model type and net 
    Return:
        Model Instance
    """
X
xiexionghang 已提交
20 21 22 23 24 25
    model = None
    if config['mode'] == 'fluid':
        model = FluidModel(config)
        model.build_model()
    return model
   
X
xiexionghang 已提交
26

X
xiexionghang 已提交
27
class Model(object):
X
xiexionghang 已提交
28 29 30
    """R
    """
    __metaclass__ = abc.ABCMeta
X
xiexionghang 已提交
31 32

    def __init__(self, config):
X
xiexionghang 已提交
33 34
        """R
        """
X
xiexionghang 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47
        self._config = config
        self._name = config['name']
        f = open(config['layer_file'], 'r')
        self._build_nodes = yaml.safe_load(f.read())
        self._build_phase = ['input', 'param', 'summary', 'layer']
        self._build_param = {'layer': {}, 'inner_layer':{}, 'layer_extend': {}, 'model': {}}
        self._inference_meta = {'dependency':{}, 'params': {}}
        self._cost = None
        self._metrics = {}
        self._data_var = []
        pass
    
    def get_cost_op(self):
X
xiexionghang 已提交
48 49
        """R
        """
X
xiexionghang 已提交
50 51 52
        return self._cost

    def get_metrics(self):
X
xiexionghang 已提交
53 54
        """R
        """
X
xiexionghang 已提交
55 56
        return self._metrics

X
xiexionghang 已提交
57
    @abc.abstractmethod
X
xiexionghang 已提交
58
    def shrink(self, params):
X
xiexionghang 已提交
59 60
        """R
        """
X
xiexionghang 已提交
61 62
        pass    

X
xiexionghang 已提交
63
    @abc.abstractmethod
X
xiexionghang 已提交
64
    def build_model(self): 
X
xiexionghang 已提交
65 66
        """R
        """
X
xiexionghang 已提交
67 68
        pass

X
xiexionghang 已提交
69
    @abc.abstractmethod
X
xiexionghang 已提交
70
    def dump_model_program(self, path):
X
xiexionghang 已提交
71 72
        """R
        """
X
xiexionghang 已提交
73 74
        pass

X
xiexionghang 已提交
75
    @abc.abstractmethod
X
xiexionghang 已提交
76
    def dump_inference_param(self, params):
X
xiexionghang 已提交
77 78
        """R
        """
X
xiexionghang 已提交
79
        pass
X
xiexionghang 已提交
80
    @abc.abstractmethod
X
xiexionghang 已提交
81
    def dump_inference_program(self, inference_layer, path):
X
xiexionghang 已提交
82 83
        """R
        """
X
xiexionghang 已提交
84 85 86
        pass
    
    def inference_params(self, inference_layer):
X
xiexionghang 已提交
87 88 89 90 91 92 93
        """
        get params name for inference_layer 
        Args:
            inference_layer(str): layer for inference
        Return:
            params(list): params name list that for inference layer
        """
X
xiexionghang 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107
        layer = inference_layer
        if layer in self._inference_meta['params']:
            return self._inference_meta['params'][layer]
            
        self._inference_meta['params'][layer] = []
        self._inference_meta['dependency'][layer] = self.get_dependency(self._build_param['inner_layer'], layer)
        for node in self._build_nodes['layer']:
            if node['name'] not in self._inference_meta['dependency'][layer]:
                continue
            if 'inference_param' in self._build_param['layer_extend'][node['name']]:
                self._inference_meta['params'][layer] += self._build_param['layer_extend'][node['name']]['inference_param']['params'] 
        return self._inference_meta['params'][layer]

    def get_dependency(self, layer_graph, dest_layer):
X
xiexionghang 已提交
108 109 110 111 112 113 114
        """
        get layers of dest_layer depends on
        Args:
            layer_graph(dict) : all layers in graph
        Return:
            depend_layers(list) : sub-graph layers for calculate dest_layer
        """
X
xiexionghang 已提交
115 116 117 118 119 120 121 122 123 124
        dependency_list = []
        if dest_layer in layer_graph:
            dependencys = copy.deepcopy(layer_graph[dest_layer]['input'])
            dependency_list = copy.deepcopy(dependencys)
            for dependency in dependencys:
                dependency_list = dependency_list + self.get_dependency(layer_graph, dependency)
        return list(set(dependency_list))

    
class FluidModel(Model):
X
xiexionghang 已提交
125 126
    """R
    """
X
xiexionghang 已提交
127
    def __init__(self, config):
X
xiexionghang 已提交
128 129
        """R
        """
X
xiexionghang 已提交
130 131 132 133
        Model.__init__(self, config)
        pass
    
    def build_model(self): 
X
xiexionghang 已提交
134 135 136 137 138 139 140 141 142
        """R
        build a fluid model with config
        Return:
            modle_instance(dict)
                train_program
                startup_program
                inference_param : all params name list
                table: table-meta to ps-server
        """
X
xiexionghang 已提交
143 144 145 146 147 148 149
        for layer in self._build_nodes['layer']:
            self._build_param['inner_layer'][layer['name']] = layer
        
    
        self._build_param['table'] = {}
        self._build_param['model']['train_program'] = fluid.Program()
        self._build_param['model']['startup_program'] = fluid.Program()
X
xiexionghang 已提交
150 151
        with fluid.program_guard(self._build_param['model']['train_program'], \
            self._build_param['model']['startup_program']):
X
xiexionghang 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
            with fluid.unique_name.guard():
                for phase in self._build_phase:
                    if self._build_nodes[phase] is None:
                        continue
                    for node in self._build_nodes[phase]:
                        exec("""layer=kagle_layer.{}(node)""".format(node['class']))
                        layer_output, extend_output = layer.generate(self._config['mode'], self._build_param)
                        self._build_param['layer'][node['name']] = layer_output
                        self._build_param['layer_extend'][node['name']] = extend_output
                        if extend_output is None:
                            continue
                        if 'loss' in extend_output:
                            if self._cost is None:
                                self._cost = extend_output['loss']
                            else:
                                self._cost += extend_output['loss']
                        if 'data_var' in extend_output:
                            self._data_var += extend_output['data_var']
                        if 'metric_label' in extend_output and extend_output['metric_label'] is not None:
                            self._metrics[extend_output['metric_label']] = extend_output['metric_dict']

                        if 'inference_param' in extend_output:
X
xiexionghang 已提交
174 175
                            inference_param = extend_output['inference_param']
                            param_name = inference_param['name']
X
xiexionghang 已提交
176 177
                            if param_name not in self._build_param['table']:
                                self._build_param['table'][param_name] = {'params':[]}
X
xiexionghang 已提交
178
                                table_meta = kagle_table.TableMeta.alloc_new_table(inference_param['table_id'])
X
xiexionghang 已提交
179
                                self._build_param['table'][param_name]['_meta'] = table_meta
X
xiexionghang 已提交
180
                            self._build_param['table'][param_name]['params'] += inference_param['params']
X
xiexionghang 已提交
181 182 183 184
        pass
    
    @classmethod
    def build_optimizer(self, params):
X
xiexionghang 已提交
185 186
        """R
        """
X
xiexionghang 已提交
187 188 189 190 191 192 193 194 195 196
        optimizer_conf = params['optimizer_conf']
        strategy = None
        if 'strategy' in optimizer_conf:
            strategy = optimizer_conf['strategy']
            stat_var_names = []
            metrics = params['metrics']
            for name in metrics:
                model_metrics = metrics[name]
                stat_var_names += [ model_metrics[metric]['var'].name for metric in model_metrics]
            strategy['stat_var_names'] = list(set(stat_var_names))
X
xiexionghang 已提交
197 198
        optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \
            '(learning_rate=' +  str(optimizer_conf['learning_rate']) + ')'
X
xiexionghang 已提交
199 200 201 202 203
        exec(optimizer_generator)            
        optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
        return optimizer

    def dump_model_program(self, path):
X
xiexionghang 已提交
204 205
        """R
        """
X
xiexionghang 已提交
206 207 208 209 210 211 212
        with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout:
            print >> fout, self._build_param['model']['train_program']
        with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout:
            print >> fout, self._build_param['model']['startup_program']
        pass

    def shrink(self, params):
X
xiexionghang 已提交
213 214
        """R
        """
X
xiexionghang 已提交
215 216 217 218 219 220 221
        scope = params['scope']
        decay = params['decay']
        for param_table in self._build_param['table']:
            table_id = self._build_param['table'][param_table]['_meta']._table_id
            fleet.shrink_dense_table(decay, scope=scope, table_id=table_id)

    def dump_inference_program(self, inference_layer, path):
X
xiexionghang 已提交
222 223
        """R
        """
X
xiexionghang 已提交
224 225 226
        pass

    def dump_inference_param(self, params):
X
xiexionghang 已提交
227 228
        """R
        """
X
xiexionghang 已提交
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
        scope = params['scope']
        executor = params['executor']
        program = self._build_param['model']['train_program']
        for table_name,table in self._build_param['table'].items():
            fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params'])
        for infernce_item in params['inference_list']:
            params_name_list = self.inference_params(infernce_item['layer_name'])
            params_var_list = [ program.global_block().var(i) for i in params_name_list ]
            params_file_name = infernce_item['save_file_name']
            with fluid.scope_guard(scope):
                if params['save_combine']:
                    fluid.io.save_vars(
                    executor, "./", program, vars=params_var_list, filename=params_file_name)
                else:
                    fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list)
        pass