From 72c56faf1997657c23d7c6b6b6ec20e86d3a839b Mon Sep 17 00:00:00 2001 From: xiexionghang Date: Thu, 5 Mar 2020 23:06:02 +0800 Subject: [PATCH] depend on paddle with bcloud --- kagle/kagle_layer.py | 144 ++++++++++++++++++++++++++++++++----------- kagle/kagle_model.py | 97 ++++++++++++++++++++++++----- 2 files changed, 190 insertions(+), 51 deletions(-) diff --git a/kagle/kagle_layer.py b/kagle/kagle_layer.py index c78adb83..b9180bbc 100755 --- a/kagle/kagle_layer.py +++ b/kagle/kagle_layer.py @@ -1,40 +1,58 @@ +""" +DnnLayer: analyse layer config, and parse to Paddle Operator, build net +""" +import abc import paddle.fluid as fluid -from abc import ABCMeta, abstractmethod class Layer(object): - __metaclass__=ABCMeta + """R + """ + __metaclass__ = abc.ABCMeta def __init__(self, config): + """R + """ pass def generate(self, mode, param): + """R + """ if mode == 'fluid': return self.generate_fluid(param) elif mode == 'tensorflow': return self.generate_tensorflow(param) print ('unsupport this mode: ' + mode) - return None,None + return None, None - @abstractmethod + @abc.abstractmethod def generate_fluid(self, param): + """R + """ pass - # maybe - #@abstractmethod def generate_tensorflow(self, param): + """ Not implement currently + """ pass + class EmbeddingInputLayer(Layer): + """R + """ def __init__(self, config): + """R + """ self._cvm = config['cvm'] self._name = config['name'] - self._slots = [ str(slot) for slot in config['slots'] ] + self._slots = [str(slot) for slot in config['slots']] self._mf_dim = config['mf_dim'] self._backward = config['backward'] self._emb_dim = self._mf_dim + 3 #append show ctr lr self._emb_layers = [] def generate_fluid(self, param): + """R + """ show_clk = fluid.layers.concat( [param['layer']['show'], param['layer']['click']], axis=1) show_clk.stop_gradient = True @@ -42,39 +60,61 @@ class EmbeddingInputLayer(Layer): for slot in self._slots: l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1) data_var.append(l) - emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], is_sparse = True, is_distributed=True, param_attr=fluid.ParamAttr(name="embedding")) + emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], \ + is_sparse=True, is_distributed=True, param_attr=fluid.ParamAttr(name="embedding")) emb = fluid.layers.sequence_pool(input=emb, pool_type='sum') emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm) self._emb_layers.append(emb) output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name) return output, {'data_var' : data_var} + class LabelInputLayer(Layer): + """R + """ def __init__(self, config): + """R + """ self._name = config['name'] self._dim = config.get('dim', 1) self._data_type = config.get('data_type', "int64") self._label_idx = config['label_idx'] def generate_fluid(self, param): - label = fluid.layers.data(name=self._name, shape=[-1, self._dim], dtype=self._data_type, lod_level=0, append_batch_size=False) + """R + """ + label = fluid.layers.data(name=self._name, shape=[-1, self._dim], \ + dtype=self._data_type, lod_level=0, append_batch_size=False) cast_label = fluid.layers.cast(label, dtype='float32') cast_label.stop_gradient = True - return cast_label, {'data_var' : [label]} + return cast_label, {'data_var': [label]} + class TagInputLayer(Layer): + """R + """ def __init__(self, config): + """R + """ self._name = config['name'] self._tag = config['tag'] self._dim = config.get('dim', 1) self._data_type = config['data_type'] def generate_fluid(self, param): - output = fluid.layers.data(name=self._name, shape=[-1, self._dim], dtype=self._data_type, lod_level=0, append_batch_size=False, stop_gradient=True) - return output, {'data_var' : [output]} + """R + """ + output = fluid.layers.data(name=self._name, shape=[-1, self._dim], \ + dtype=self._data_type, lod_level=0, append_batch_size=False, stop_gradient=True) + return output, {'data_var': [output]} + class ParamLayer(Layer): + """R + """ def __init__(self, config): + """R + """ self._name = config['name'] self._coln = config['coln'] self._table_id = config.get('table_id', -1) @@ -83,40 +123,59 @@ class ParamLayer(Layer): self._config = config def generate_fluid(self, param): + """R + """ return self._config, {'inference_param': {'name':'param', 'params': [], 'table_id': self._table_id}} + class SummaryLayer(Layer): + """R + """ def __init__(self, config): + """R + """ self._name = config['name'] self._table_id = config.get('table_id', -1) self._data_type = config.get('data_type', 'float32') self._config = config def generate_fluid(self, param): - return self._config, {'inference_param': {'name':'summary', 'params': [], 'table_id': self._table_id}} + """R + """ + return self._config, {'inference_param': {'name': 'summary', 'params': [], 'table_id': self._table_id}} + class NormalizetionLayer(Layer): + """R + """ def __init__(self, config): + """R + """ self._name = config['name'] self._input = config['input'] self._summary = config['summary'] self._table_id = config.get('table_id', -1) def generate_fluid(self, param): + """R + """ input_layer = param['layer'][self._input[0]] summary_layer = param['layer'][self._summary] if len(self._input) > 0: - input_list=[ param['layer'][i] for i in self._input ] + input_list=[param['layer'][i] for i in self._input] input_layer = fluid.layers.concat(input=input_list, axis=1) bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={ - "batch_size":1e4, - "batch_sum_default":0.0, - "batch_square":1e4}) - inference_param = [ self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum' ] - return bn, {'inference_param' : { 'name':'summary', 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}} + "batch_size": 1e4, "batch_sum_default": 0.0, "batch_square": 1e4}) + inference_param = [self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum'] + return bn, {'inference_param' : {'name':'summary', 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}} + class NeuralLayer(Layer): + """R + """ def __init__(self, config): + """R + """ self._name = config['name'] self._param = config['param'] self._input = config['input'] @@ -124,16 +183,19 @@ class NeuralLayer(Layer): self._act_func = config.get('act_func', None) def generate_fluid(self, param): + """R + """ param_layer = param['layer'][self._param] input_layer = param['layer'][self._input[0]] if len(self._input) > 0: - input_list=[ param['layer'][i] for i in self._input ] + input_list=[param['layer'][i] for i in self._input] input_layer = fluid.layers.concat(input=input_list, axis=1) input_coln = input_layer.shape[1] scale = param_layer['init_range'] / (input_coln ** 0.5) bias = None if self._bias: - bias = fluid.ParamAttr(learning_rate=1.0, initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)) + bias = fluid.ParamAttr(learning_rate=1.0, + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)) fc = fluid.layers.fc( name = self._name, input = input_layer, @@ -146,8 +208,13 @@ class NeuralLayer(Layer): inference_param = [self._name + '.w_0', self._name + '.b_0'] return fc, {'inference_param' : {'name':'param', 'params': inference_param, 'table_id': param_layer.get('table_id', -1)}} + class SigmoidLossLayer(Layer): + """R + """ def __init__(self, config): + """R + """ self._name = config['name'] self._label = config['label'] self._input = config['input'] @@ -155,28 +222,30 @@ class SigmoidLossLayer(Layer): self._metric_label = config.get('metric_label', None) self._bound = config.get('bound', [-15.0, 15.0]) self._extend_output = { - 'metric_label' : self._metric_label, - 'metric_dict' : { - 'auc' : { 'var' : None}, - 'batch_auc' : {'var' : None}, - 'stat_pos' : {'var' : None, 'data_type' : 'int64'}, - 'stat_neg' : {'var' : None, 'data_type' : 'int64'}, - 'batch_stat_pos' : {'var' : None, 'data_type' : 'int64'}, - 'batch_stat_neg' : {'var' : None, 'data_type' : 'int64'}, - 'pos_ins_num' : {'var' : None}, - 'abserr': {'var' : None}, - 'sqrerr': {'var' : None}, - 'prob': {'var' : None}, - 'total_ins_num': {'var' : None}, - 'q': {'var' : None} + 'metric_label': self._metric_label, + 'metric_dict': { + 'auc': {'var': None}, + 'batch_auc': {'var': None}, + 'stat_pos': {'var': None, 'data_type': 'int64'}, + 'stat_neg': {'var': None, 'data_type': 'int64'}, + 'batch_stat_pos': {'var': None, 'data_type': 'int64'}, + 'batch_stat_neg': {'var': None, 'data_type': 'int64'}, + 'pos_ins_num': {'var': None}, + 'abserr': {'var': None}, + 'sqrerr': {'var': None}, + 'prob': {'var': None}, + 'total_ins_num': {'var': None}, + 'q': {'var': None} } } def generate_fluid(self, param): + """R + """ input_layer = param['layer'][self._input[0]] label_layer = param['layer'][self._label] - output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name = self._name) + output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name=self._name) norm = fluid.layers.sigmoid(output, name=self._name) output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32')) if self._weight: @@ -191,7 +260,8 @@ class SigmoidLossLayer(Layer): input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1) metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \ metric['batch_stat_neg']['var'], metric['stat_pos']['var'], metric['stat_neg']['var']] = \ - fluid.layers.auc(input=binary_predict, label=fluid.layers.cast(x=label_layer, dtype='int64'), curve='ROC', num_thresholds=32) + fluid.layers.auc(input=binary_predict, label=fluid.layers.cast(x=label_layer, dtype='int64'), \ + curve='ROC', num_thresholds=32) metric['sqrerr']['var'], metric['abserr']['var'], metric['prob']['var'], metric['q']['var'], \ metric['pos_ins_num']['var'], metric['total_ins_num']['var'] = \ diff --git a/kagle/kagle_model.py b/kagle/kagle_model.py index ad6b234e..233317c0 100755 --- a/kagle/kagle_model.py +++ b/kagle/kagle_model.py @@ -1,22 +1,37 @@ +""" +Model Net: analyse layer config, and parse to Paddle Pragram +""" +import abc import copy import yaml -import kagle_layer -import kagle_table +import kagle.kagle_layer +import kagle.kagle_table import paddle.fluid as fluid -from abc import ABCMeta, abstractmethod from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet def create(config): + """ + Create a model instance by config + Args: + config(dict) : desc model type and net + Return: + Model Instance + """ model = None if config['mode'] == 'fluid': model = FluidModel(config) model.build_model() return model + class Model(object): - __metaclass__=ABCMeta + """R + """ + __metaclass__ = abc.ABCMeta def __init__(self, config): + """R + """ self._config = config self._name = config['name'] f = open(config['layer_file'], 'r') @@ -30,31 +45,52 @@ class Model(object): pass def get_cost_op(self): + """R + """ return self._cost def get_metrics(self): + """R + """ return self._metrics - @abstractmethod + @abc.abstractmethod def shrink(self, params): + """R + """ pass - @abstractmethod + @abc.abstractmethod def build_model(self): + """R + """ pass - @abstractmethod + @abc.abstractmethod def dump_model_program(self, path): + """R + """ pass - @abstractmethod + @abc.abstractmethod def dump_inference_param(self, params): + """R + """ pass - @abstractmethod + @abc.abstractmethod def dump_inference_program(self, inference_layer, path): + """R + """ pass def inference_params(self, inference_layer): + """ + get params name for inference_layer + Args: + inference_layer(str): layer for inference + Return: + params(list): params name list that for inference layer + """ layer = inference_layer if layer in self._inference_meta['params']: return self._inference_meta['params'][layer] @@ -69,6 +105,13 @@ class Model(object): return self._inference_meta['params'][layer] def get_dependency(self, layer_graph, dest_layer): + """ + get layers of dest_layer depends on + Args: + layer_graph(dict) : all layers in graph + Return: + depend_layers(list) : sub-graph layers for calculate dest_layer + """ dependency_list = [] if dest_layer in layer_graph: dependencys = copy.deepcopy(layer_graph[dest_layer]['input']) @@ -79,11 +122,24 @@ class Model(object): class FluidModel(Model): + """R + """ def __init__(self, config): + """R + """ Model.__init__(self, config) pass def build_model(self): + """R + build a fluid model with config + Return: + modle_instance(dict) + train_program + startup_program + inference_param : all params name list + table: table-meta to ps-server + """ for layer in self._build_nodes['layer']: self._build_param['inner_layer'][layer['name']] = layer @@ -91,7 +147,8 @@ class FluidModel(Model): self._build_param['table'] = {} self._build_param['model']['train_program'] = fluid.Program() self._build_param['model']['startup_program'] = fluid.Program() - with fluid.program_guard(self._build_param['model']['train_program'], self._build_param['model']['startup_program']): + with fluid.program_guard(self._build_param['model']['train_program'], \ + self._build_param['model']['startup_program']): with fluid.unique_name.guard(): for phase in self._build_phase: if self._build_nodes[phase] is None: @@ -114,16 +171,19 @@ class FluidModel(Model): self._metrics[extend_output['metric_label']] = extend_output['metric_dict'] if 'inference_param' in extend_output: - param_name = extend_output['inference_param']['name'] + inference_param = extend_output['inference_param'] + param_name = inference_param['name'] if param_name not in self._build_param['table']: self._build_param['table'][param_name] = {'params':[]} - table_meta = kagle_table.TableMeta.alloc_new_table(extend_output['inference_param']['table_id']) + table_meta = kagle_table.TableMeta.alloc_new_table(inference_param['table_id']) self._build_param['table'][param_name]['_meta'] = table_meta - self._build_param['table'][param_name]['params'] += extend_output['inference_param']['params'] + self._build_param['table'][param_name]['params'] += inference_param['params'] pass @classmethod def build_optimizer(self, params): + """R + """ optimizer_conf = params['optimizer_conf'] strategy = None if 'strategy' in optimizer_conf: @@ -134,12 +194,15 @@ class FluidModel(Model): model_metrics = metrics[name] stat_var_names += [ model_metrics[metric]['var'].name for metric in model_metrics] strategy['stat_var_names'] = list(set(stat_var_names)) - optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')' + optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \ + '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')' exec(optimizer_generator) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) return optimizer def dump_model_program(self, path): + """R + """ with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout: print >> fout, self._build_param['model']['train_program'] with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout: @@ -147,6 +210,8 @@ class FluidModel(Model): pass def shrink(self, params): + """R + """ scope = params['scope'] decay = params['decay'] for param_table in self._build_param['table']: @@ -154,9 +219,13 @@ class FluidModel(Model): fleet.shrink_dense_table(decay, scope=scope, table_id=table_id) def dump_inference_program(self, inference_layer, path): + """R + """ pass def dump_inference_param(self, params): + """R + """ scope = params['scope'] executor = params['executor'] program = self._build_param['model']['train_program'] -- GitLab