提交 72c56faf 编写于 作者: X xiexionghang

depend on paddle with bcloud

上级 3e834fec
"""
DnnLayer: analyse layer config, and parse to Paddle Operator, build net
"""
import abc
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod
class Layer(object):
__metaclass__=ABCMeta
"""R
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config):
"""R
"""
pass
def generate(self, mode, param):
"""R
"""
if mode == 'fluid':
return self.generate_fluid(param)
elif mode == 'tensorflow':
return self.generate_tensorflow(param)
print ('unsupport this mode: ' + mode)
return None,None
return None, None
@abstractmethod
@abc.abstractmethod
def generate_fluid(self, param):
"""R
"""
pass
# maybe
#@abstractmethod
def generate_tensorflow(self, param):
""" Not implement currently
"""
pass
class EmbeddingInputLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._cvm = config['cvm']
self._name = config['name']
self._slots = [ str(slot) for slot in config['slots'] ]
self._slots = [str(slot) for slot in config['slots']]
self._mf_dim = config['mf_dim']
self._backward = config['backward']
self._emb_dim = self._mf_dim + 3 #append show ctr lr
self._emb_layers = []
def generate_fluid(self, param):
"""R
"""
show_clk = fluid.layers.concat(
[param['layer']['show'], param['layer']['click']], axis=1)
show_clk.stop_gradient = True
......@@ -42,39 +60,61 @@ class EmbeddingInputLayer(Layer):
for slot in self._slots:
l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1)
data_var.append(l)
emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], is_sparse = True, is_distributed=True, param_attr=fluid.ParamAttr(name="embedding"))
emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], \
is_sparse=True, is_distributed=True, param_attr=fluid.ParamAttr(name="embedding"))
emb = fluid.layers.sequence_pool(input=emb, pool_type='sum')
emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm)
self._emb_layers.append(emb)
output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name)
return output, {'data_var' : data_var}
class LabelInputLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._dim = config.get('dim', 1)
self._data_type = config.get('data_type', "int64")
self._label_idx = config['label_idx']
def generate_fluid(self, param):
label = fluid.layers.data(name=self._name, shape=[-1, self._dim], dtype=self._data_type, lod_level=0, append_batch_size=False)
"""R
"""
label = fluid.layers.data(name=self._name, shape=[-1, self._dim], \
dtype=self._data_type, lod_level=0, append_batch_size=False)
cast_label = fluid.layers.cast(label, dtype='float32')
cast_label.stop_gradient = True
return cast_label, {'data_var' : [label]}
return cast_label, {'data_var': [label]}
class TagInputLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._tag = config['tag']
self._dim = config.get('dim', 1)
self._data_type = config['data_type']
def generate_fluid(self, param):
output = fluid.layers.data(name=self._name, shape=[-1, self._dim], dtype=self._data_type, lod_level=0, append_batch_size=False, stop_gradient=True)
return output, {'data_var' : [output]}
"""R
"""
output = fluid.layers.data(name=self._name, shape=[-1, self._dim], \
dtype=self._data_type, lod_level=0, append_batch_size=False, stop_gradient=True)
return output, {'data_var': [output]}
class ParamLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._coln = config['coln']
self._table_id = config.get('table_id', -1)
......@@ -83,40 +123,59 @@ class ParamLayer(Layer):
self._config = config
def generate_fluid(self, param):
"""R
"""
return self._config, {'inference_param': {'name':'param', 'params': [], 'table_id': self._table_id}}
class SummaryLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._table_id = config.get('table_id', -1)
self._data_type = config.get('data_type', 'float32')
self._config = config
def generate_fluid(self, param):
return self._config, {'inference_param': {'name':'summary', 'params': [], 'table_id': self._table_id}}
"""R
"""
return self._config, {'inference_param': {'name': 'summary', 'params': [], 'table_id': self._table_id}}
class NormalizetionLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._input = config['input']
self._summary = config['summary']
self._table_id = config.get('table_id', -1)
def generate_fluid(self, param):
"""R
"""
input_layer = param['layer'][self._input[0]]
summary_layer = param['layer'][self._summary]
if len(self._input) > 0:
input_list=[ param['layer'][i] for i in self._input ]
input_list=[param['layer'][i] for i in self._input]
input_layer = fluid.layers.concat(input=input_list, axis=1)
bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={
"batch_size":1e4,
"batch_sum_default":0.0,
"batch_square":1e4})
inference_param = [ self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum' ]
return bn, {'inference_param' : { 'name':'summary', 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}}
"batch_size": 1e4, "batch_sum_default": 0.0, "batch_square": 1e4})
inference_param = [self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum']
return bn, {'inference_param' : {'name':'summary', 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}}
class NeuralLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._param = config['param']
self._input = config['input']
......@@ -124,16 +183,19 @@ class NeuralLayer(Layer):
self._act_func = config.get('act_func', None)
def generate_fluid(self, param):
"""R
"""
param_layer = param['layer'][self._param]
input_layer = param['layer'][self._input[0]]
if len(self._input) > 0:
input_list=[ param['layer'][i] for i in self._input ]
input_list=[param['layer'][i] for i in self._input]
input_layer = fluid.layers.concat(input=input_list, axis=1)
input_coln = input_layer.shape[1]
scale = param_layer['init_range'] / (input_coln ** 0.5)
bias = None
if self._bias:
bias = fluid.ParamAttr(learning_rate=1.0, initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale))
bias = fluid.ParamAttr(learning_rate=1.0,
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale))
fc = fluid.layers.fc(
name = self._name,
input = input_layer,
......@@ -146,8 +208,13 @@ class NeuralLayer(Layer):
inference_param = [self._name + '.w_0', self._name + '.b_0']
return fc, {'inference_param' : {'name':'param', 'params': inference_param, 'table_id': param_layer.get('table_id', -1)}}
class SigmoidLossLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._label = config['label']
self._input = config['input']
......@@ -155,28 +222,30 @@ class SigmoidLossLayer(Layer):
self._metric_label = config.get('metric_label', None)
self._bound = config.get('bound', [-15.0, 15.0])
self._extend_output = {
'metric_label' : self._metric_label,
'metric_dict' : {
'auc' : { 'var' : None},
'batch_auc' : {'var' : None},
'stat_pos' : {'var' : None, 'data_type' : 'int64'},
'stat_neg' : {'var' : None, 'data_type' : 'int64'},
'batch_stat_pos' : {'var' : None, 'data_type' : 'int64'},
'batch_stat_neg' : {'var' : None, 'data_type' : 'int64'},
'pos_ins_num' : {'var' : None},
'abserr': {'var' : None},
'sqrerr': {'var' : None},
'prob': {'var' : None},
'total_ins_num': {'var' : None},
'q': {'var' : None}
'metric_label': self._metric_label,
'metric_dict': {
'auc': {'var': None},
'batch_auc': {'var': None},
'stat_pos': {'var': None, 'data_type': 'int64'},
'stat_neg': {'var': None, 'data_type': 'int64'},
'batch_stat_pos': {'var': None, 'data_type': 'int64'},
'batch_stat_neg': {'var': None, 'data_type': 'int64'},
'pos_ins_num': {'var': None},
'abserr': {'var': None},
'sqrerr': {'var': None},
'prob': {'var': None},
'total_ins_num': {'var': None},
'q': {'var': None}
}
}
def generate_fluid(self, param):
"""R
"""
input_layer = param['layer'][self._input[0]]
label_layer = param['layer'][self._label]
output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name = self._name)
output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name=self._name)
norm = fluid.layers.sigmoid(output, name=self._name)
output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32'))
if self._weight:
......@@ -191,7 +260,8 @@ class SigmoidLossLayer(Layer):
input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1)
metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \
metric['batch_stat_neg']['var'], metric['stat_pos']['var'], metric['stat_neg']['var']] = \
fluid.layers.auc(input=binary_predict, label=fluid.layers.cast(x=label_layer, dtype='int64'), curve='ROC', num_thresholds=32)
fluid.layers.auc(input=binary_predict, label=fluid.layers.cast(x=label_layer, dtype='int64'), \
curve='ROC', num_thresholds=32)
metric['sqrerr']['var'], metric['abserr']['var'], metric['prob']['var'], metric['q']['var'], \
metric['pos_ins_num']['var'], metric['total_ins_num']['var'] = \
......
"""
Model Net: analyse layer config, and parse to Paddle Pragram
"""
import abc
import copy
import yaml
import kagle_layer
import kagle_table
import kagle.kagle_layer
import kagle.kagle_table
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
def create(config):
"""
Create a model instance by config
Args:
config(dict) : desc model type and net
Return:
Model Instance
"""
model = None
if config['mode'] == 'fluid':
model = FluidModel(config)
model.build_model()
return model
class Model(object):
__metaclass__=ABCMeta
"""R
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config):
"""R
"""
self._config = config
self._name = config['name']
f = open(config['layer_file'], 'r')
......@@ -30,31 +45,52 @@ class Model(object):
pass
def get_cost_op(self):
"""R
"""
return self._cost
def get_metrics(self):
"""R
"""
return self._metrics
@abstractmethod
@abc.abstractmethod
def shrink(self, params):
"""R
"""
pass
@abstractmethod
@abc.abstractmethod
def build_model(self):
"""R
"""
pass
@abstractmethod
@abc.abstractmethod
def dump_model_program(self, path):
"""R
"""
pass
@abstractmethod
@abc.abstractmethod
def dump_inference_param(self, params):
"""R
"""
pass
@abstractmethod
@abc.abstractmethod
def dump_inference_program(self, inference_layer, path):
"""R
"""
pass
def inference_params(self, inference_layer):
"""
get params name for inference_layer
Args:
inference_layer(str): layer for inference
Return:
params(list): params name list that for inference layer
"""
layer = inference_layer
if layer in self._inference_meta['params']:
return self._inference_meta['params'][layer]
......@@ -69,6 +105,13 @@ class Model(object):
return self._inference_meta['params'][layer]
def get_dependency(self, layer_graph, dest_layer):
"""
get layers of dest_layer depends on
Args:
layer_graph(dict) : all layers in graph
Return:
depend_layers(list) : sub-graph layers for calculate dest_layer
"""
dependency_list = []
if dest_layer in layer_graph:
dependencys = copy.deepcopy(layer_graph[dest_layer]['input'])
......@@ -79,11 +122,24 @@ class Model(object):
class FluidModel(Model):
"""R
"""
def __init__(self, config):
"""R
"""
Model.__init__(self, config)
pass
def build_model(self):
"""R
build a fluid model with config
Return:
modle_instance(dict)
train_program
startup_program
inference_param : all params name list
table: table-meta to ps-server
"""
for layer in self._build_nodes['layer']:
self._build_param['inner_layer'][layer['name']] = layer
......@@ -91,7 +147,8 @@ class FluidModel(Model):
self._build_param['table'] = {}
self._build_param['model']['train_program'] = fluid.Program()
self._build_param['model']['startup_program'] = fluid.Program()
with fluid.program_guard(self._build_param['model']['train_program'], self._build_param['model']['startup_program']):
with fluid.program_guard(self._build_param['model']['train_program'], \
self._build_param['model']['startup_program']):
with fluid.unique_name.guard():
for phase in self._build_phase:
if self._build_nodes[phase] is None:
......@@ -114,16 +171,19 @@ class FluidModel(Model):
self._metrics[extend_output['metric_label']] = extend_output['metric_dict']
if 'inference_param' in extend_output:
param_name = extend_output['inference_param']['name']
inference_param = extend_output['inference_param']
param_name = inference_param['name']
if param_name not in self._build_param['table']:
self._build_param['table'][param_name] = {'params':[]}
table_meta = kagle_table.TableMeta.alloc_new_table(extend_output['inference_param']['table_id'])
table_meta = kagle_table.TableMeta.alloc_new_table(inference_param['table_id'])
self._build_param['table'][param_name]['_meta'] = table_meta
self._build_param['table'][param_name]['params'] += extend_output['inference_param']['params']
self._build_param['table'][param_name]['params'] += inference_param['params']
pass
@classmethod
def build_optimizer(self, params):
"""R
"""
optimizer_conf = params['optimizer_conf']
strategy = None
if 'strategy' in optimizer_conf:
......@@ -134,12 +194,15 @@ class FluidModel(Model):
model_metrics = metrics[name]
stat_var_names += [ model_metrics[metric]['var'].name for metric in model_metrics]
strategy['stat_var_names'] = list(set(stat_var_names))
optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')'
optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \
'(learning_rate=' + str(optimizer_conf['learning_rate']) + ')'
exec(optimizer_generator)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
return optimizer
def dump_model_program(self, path):
"""R
"""
with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout:
print >> fout, self._build_param['model']['train_program']
with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout:
......@@ -147,6 +210,8 @@ class FluidModel(Model):
pass
def shrink(self, params):
"""R
"""
scope = params['scope']
decay = params['decay']
for param_table in self._build_param['table']:
......@@ -154,9 +219,13 @@ class FluidModel(Model):
fleet.shrink_dense_table(decay, scope=scope, table_id=table_id)
def dump_inference_program(self, inference_layer, path):
"""R
"""
pass
def dump_inference_param(self, params):
"""R
"""
scope = params['scope']
executor = params['executor']
program = self._build_param['model']['train_program']
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册