提交 094d29aa 编写于 作者: Y Yu Yang

Draft for parameters

上级 844d0620
......@@ -23,29 +23,32 @@ def network_config():
outputs(cost)
def event_handler(event):
if isinstance(event, paddle.trainer.CompleteTrainOneBatch):
print "Pass %d, Batch %d, Cost %f" % (event.pass_id, event.batch_id,
event.cost)
else:
pass
def main():
paddle.init(use_gpu=False, trainer_count=1)
model_config = parse_network_config(network_config)
pool = paddle.parameters.create(model_config)
for param_name in pool.get_names():
array = pool.get_parameter(param_name)
parameters = paddle.parameters.create(model_config)
for param_name in parameters.keys():
array = parameters[param_name]
array[:] = numpy.random.uniform(low=-1.0, high=1.0, size=array.shape)
parameters[param_name] = array
adam_optimizer = paddle.optimizer.Adam(learning_rate=1e-3)
adam_optimizer = paddle.optimizer.Optimizer(
learning_rate=0.01, learning_method=AdamOptimizer())
def event_handler(event):
if isinstance(event, paddle.trainer.CompleteTrainOneBatch):
para = parameters['___fc_layer_2__.w0']
print "Pass %d, Batch %d, Cost %f, Weight Mean Of Fc 2 is %f" % (
event.pass_id, event.batch_id, event.cost, para.mean())
else:
pass
trainer = paddle.trainer.SGDTrainer(update_equation=adam_optimizer)
trainer.train(train_data_reader=train_reader,
topology=model_config,
parameters=pool,
parameters=parameters,
event_handler=event_handler,
batch_size=32, # batch size should be refactor in Data reader
data_types={ # data_types will be removed, It should be in
......
......@@ -2,110 +2,144 @@ import numpy as np
from paddle.proto.ModelConfig_pb2 import ModelConfig
from paddle.proto.ParameterConfig_pb2 import ParameterConfig
import py_paddle.swig_paddle as api
__all__ = ['IParameterPool', 'create', 'ParameterFlag']
__all__ = ['Parameters', 'create']
class ParameterFlag(object):
"""
The flag for IParameterPool.get_parameter. If writeable, operation on return
numpy array will also apply to Paddle parameter. But it will be slower in
GPU mode.
def create(*topologies):
"""
READ_ONLY = 0x01
WRITE_ONLY = 0x02
READ_WRITE = READ_ONLY | WRITE_ONLY
Create parameter pool by topologies.
class IParameterPool(object):
:param topologies:
:return:
"""
Interface of Parameter Pool. The parameter pool is a dictionary of
parameters. User can modify parameter or customize parameter value
by `get_parameter`.
.. code-block:: python
pool = Parameters()
for topo in topologies:
if not isinstance(topo, ModelConfig):
raise ValueError(
'create must pass a topologies which type is ModelConfig')
pool = paddle.parameters.create(topo1, topo2)
for param in topo.parameters:
pool.append_config(param)
return pool
embedding = pool.get_parameter("embedding")
assert isinstance(embedding, numpy.ndarray)
print embedding[1:]
"""
def get_parameter(self, name, flag=ParameterFlag.READ_WRITE):
"""
Get a parameter by name.
:param name: parameter name.
:type name: basestring
:param flag: the flag for return value. readable or writable.
:type flag: int
:return: The parameter value
:rtype: np.ndarray
class Parameters(object):
def __init__(self):
self.__param_conf__ = dict()
self.__gradient_machines__ = []
self.__tmp_params__ = []
def append_config(self, param_conf):
if not isinstance(param_conf, ParameterConfig):
raise ValueError("param_conf must be paddle.proto.ParameterConfig")
if param_conf.name in self.__param_conf__:
raise ValueError("duplicated parameter %s" % param_conf.name)
self.__param_conf__[param_conf.name] = param_conf
def keys(self):
return self.__param_conf__.keys()
def names(self):
return self.keys()
def has_key(self, key):
return key in self.__param_conf__.keys()
def __getitem__(self, key):
shape = self.get_shape(key)
if len(self.__gradient_machines__) == 0:
# create new parameter in python numpy.
return np.ndarray(shape=shape, dtype=np.float32)
else:
for each_gradient_machine in self.__gradient_machines__:
param = __get_parameter_in_gradient_machine__(
each_gradient_machine, key)
# for simplify implementation now, we always copy from C++
assert isinstance(param, api.Parameter)
val = param.getBuf(api.PARAMETER_VALUE)
assert isinstance(val, api.Vector)
return val.copyToNumpyArray().reshape(shape=shape)
# else continue
raise RuntimeError("Unexpected branch")
def get_shape(self, key):
if not isinstance(key, basestring):
raise ValueError("parameter name should be string")
if not self.has_key(key):
raise ValueError("No such parameter %s" % key)
conf = self.__param_conf__[key]
return map(int, conf.dims)
def __setitem__(self, key, value):
if not isinstance(value, np.ndarray):
raise ValueError("Must return ndarray")
value = value.astype(dtype=np.float32)
shape = self.get_shape(key)
if not reduce(lambda a, b: a and b,
map(lambda x: x[0] == x[1], zip(value.shape, shape))):
raise ValueError("Value shape mismatch, expect %s, should %s" %
(shape, value.shape))
if len(self.__gradient_machines__) == 0:
self.__tmp_params__.append((key, value))
else:
for each_gradient_machine in self.__gradient_machines__:
__copy_parameter_to_gradient_machine__(each_gradient_machine,
key, value)
def append_gradient_machine(self, gradient_machine):
if not isinstance(gradient_machine, api.GradientMachine):
raise ValueError("gradient_machine should be api.GradientMachine")
if len(self.__tmp_params__) != 0:
for name, val in self.__tmp_params__:
try:
__copy_parameter_to_gradient_machine__(gradient_machine,
name, val)
except ValueError:
# If no such parameter in gradient machine, then don't copy
pass
def __get_parameter_in_gradient_machine__(gradient_machine, name):
"""
raise NotImplementedError()
def get_names(self):
"""
Get all parameter names
:return: all parameter names
:rtype: list
:param gradient_machine:
:type gradient_machine: api.GradientMachine
:param name:
:return:
:rtype: api.Parameter
"""
raise NotImplementedError()
params = filter(lambda p: p.getName() == name,
gradient_machine.getParameters())
class NumpyParameterPool(IParameterPool):
def __init__(self):
self.__param_configs__ = dict()
self.__params__ = dict()
def append(self, conf):
if not isinstance(conf, ParameterConfig):
raise ValueError("conf must be ParameterConfig")
if not conf.IsInitialized():
raise ValueError("conf is not initialized")
self.__param_configs__[conf.name] = conf
self.__params__[conf.name] = None
def get_config(self, name):
if name not in self.__param_configs__:
raise ValueError("parameter %s is not appended" % name)
if len(params) == 0:
raise ValueError("No such parameter")
elif len(params) > 1:
raise ValueError("Unexpected branch")
else:
return params[0]
return self.__param_configs__[name]
def get_parameter(self, name, *args, **kwargs):
if name not in self.__params__:
raise ValueError("parameter %s is not appended" % name)
param = self.__params__[name]
if param is None:
shape = self.__param_configs__[name].dims
if len(shape) == 0:
raise ValueError("parameter %s is no shape" % name)
param = np.ndarray(
shape=[int(item) for item in shape], dtype='float32')
self.__params__[name] = param
return param
def get_names(self):
return self.__param_configs__.keys()
def create(*topologies):
def __copy_parameter_to_gradient_machine__(gradient_machine, name, arr):
"""
Create parameter pool by topologies.
Copy a python ndarray into the gradient machine.
:param topologies:
:param gradient_machine:
:type gradient_machine: api.GradientMachine
:param name:
:param arr:
:type arr: np.ndarray
:return:
:rtype: api.Parameter
"""
pool = NumpyParameterPool()
for topo in topologies:
if not isinstance(topo, ModelConfig):
raise ValueError(
'create must pass a topologies which type is ModelConfig')
for param in topo.parameters:
pool.append(param)
return pool
param = __get_parameter_in_gradient_machine__(gradient_machine, name)
vec = param.getBuf(api.PARAMETER_VALUE)
assert isinstance(vec, api.Vector)
vec.copyFromNumpyArray(arr.flatten())
import collections
from paddle.proto.ModelConfig_pb2 import ModelConfig
from paddle.proto.ParameterConfig_pb2 import ParameterConfig
from . import parameters as v2_parameters
from . import optimizer as v2_optimizer
import py_paddle.swig_paddle as api
from py_paddle import DataProviderConverter
from paddle.proto.ModelConfig_pb2 import ModelConfig
from . import optimizer as v2_optimizer
from . import parameters as v2_parameters
__all__ = ['ITrainer', 'SGDTrainer', 'CompleteTrainOneBatch', 'BaseEvent']
......@@ -21,11 +22,10 @@ class CompleteTrainOneBatch(BaseEvent):
Event On One Batch Training Complete.
"""
def __init__(self, pass_id, batch_id, cost, parameters):
def __init__(self, pass_id, batch_id, cost):
self.pass_id = pass_id
self.batch_id = batch_id
self.cost = cost
self.parameters = parameters
def default_event_handler(event):
......@@ -42,57 +42,6 @@ class ITrainer(object):
raise NotImplementedError()
class LazyParameterPool(v2_parameters.IParameterPool):
"""
Lazy Parameter Pool stores a reference to GradientMachine. User could invoke
`get_parameter` if needed, but the operation is lazy. It means the parameter
will only fetched from GPU or Parameter Server if `get_parameter` is
invoked. Also, set flag = writable will make a extra host2device copy after
reading/modifying parameter.
This class is not exposed to User. User should treat this class as a normal
IParameterPool.
See IParameterPool for usage documentation.
:type __gradient_machine__: api.GradientMachine
"""
def get_parameter(self, name, flag=v2_parameters.ParameterFlag.READ_WRITE):
param = filter(lambda x: x.getName() == name,
self.__gradient_machine__.getParameters())
if len(param) == 0:
raise ValueError("Cannot found parameter with name %s" % name)
elif len(param) > 1:
raise RuntimeError("Unexpected branch")
else:
conf = param[0].getConfig().toProto()
param = param[0].getBuf(api.PARAMETER_VALUE)
assert isinstance(param, api.Vector)
assert isinstance(conf, ParameterConfig)
shape = map(int, conf.dims)
if api.isUsingGpu():
arr = param.copyToNumpyArray().reshape(shape)
if flag & v2_parameters.ParameterFlag.WRITE_ONLY:
self.need_copy = True
self.arrays[name] = arr
else:
arr = param.toNumpyArrayInplace().reshape(shape)
return arr
def get_names(self):
return [
param.getName()
for param in self.__gradient_machine__.getParameters()
]
def __init__(self, gradient_machine):
self.__gradient_machine__ = gradient_machine
self.need_copy = False
self.arrays = dict()
class SGDTrainer(ITrainer):
def __init__(self, update_equation):
"""
......@@ -137,7 +86,7 @@ class SGDTrainer(ITrainer):
gm = api.GradientMachine.createFromConfigProto(
topology, api.CREATE_MODE_NORMAL, self.__optimizer__.enable_types())
assert isinstance(gm, api.GradientMachine)
__copy_parameter_from_pool__(gm, parameters)
parameters.append_gradient_machine(gm)
updater = self.__optimizer__.create_local_updater()
updater.init(gm)
......@@ -167,16 +116,9 @@ class SGDTrainer(ITrainer):
cost_vec = cost_vec.copyToNumpyMat()
cost = cost_vec.sum() / len(data_batch)
updater.finishBatch(cost)
pool = LazyParameterPool(gradient_machine=gm)
event_handler(
CompleteTrainOneBatch(
pass_id=pass_id,
batch_id=batch_id,
cost=cost,
parameters=pool))
if pool.need_copy:
__copy_parameter_from_lazy_pool__(gm, pool)
pass_id=pass_id, batch_id=batch_id, cost=cost))
updater.finishPass()
gm.finish()
......@@ -211,34 +153,6 @@ def __generator_to_batch__(generator, batch_size):
yield ret_val
def __copy_parameter_from_lazy_pool__(gm, pool):
assert isinstance(pool, LazyParameterPool)
for each_param_name in pool.arrays.keys():
param = filter(lambda x: x.getName() == each_param_name,
gm.getParameters())
assert len(param) == 1
param = param[0]
param.getBuf(api.PARAMETER_VALUE).copyFromNumpyArray(pool.arrays[
each_param_name].flatten().astype('float32'))
def __copy_parameter_from_pool__(gm, pool):
"""
:param gm:
:type gm: api.GradientMachine
:param pool:
:type pool: v2_parameters.IParameterPool
:return:
"""
assert isinstance(pool, v2_parameters.IParameterPool)
for each_param in gm.getParameters():
name = each_param.getName()
param = pool.get_parameter(name, v2_parameters.ParameterFlag.READ_ONLY)
each_param.getBuf(api.PARAMETER_VALUE).copyFromNumpyArray(param.flatten(
).astype('float32'))
def __check_train_args__(train_data_reader, topology, parameters,
test_data_reader, event_handler, **kwargs):
"""
......@@ -258,7 +172,7 @@ def __check_train_args__(train_data_reader, topology, parameters,
if not isinstance(topology, ModelConfig):
raise ValueError('topology should be a model config')
if not isinstance(parameters, v2_parameters.IParameterPool):
if not isinstance(parameters, v2_parameters.Parameters):
raise ValueError('parameters should be a parameter pool')
if not callable(event_handler):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册