提交 bd37f473 编写于 作者: H Hongsheng Zeng 提交者: Bo Zhou

Provide synchronizable create_parameter in PARL (#38)

* Provide synchronizable create_parameter in PARL

* use AttrHold to make LayerFunc support more than two parameters

* refine code

* refine code

* fix #25 
上级 53c94787
......@@ -65,9 +65,9 @@ class Network(object):
self._cached_sync_params_program = fluid.Program()
with fluid.program_guard(self._cached_sync_params_program):
for (src_var_name, target_var_name, is_bias) in param_pairs:
src_var = fetch_framework_var(src_var_name, is_bias)
target_var = fetch_framework_var(target_var_name, is_bias)
for (src_var_name, target_var_name) in param_pairs:
src_var = fetch_framework_var(src_var_name)
target_var = fetch_framework_var(target_var_name)
fluid.layers.assign(
decay * target_var + (1 - decay) * src_var, target_var)
......
......@@ -48,6 +48,29 @@ class TestModel(Model):
return out
class TestModel2(Model):
def __init__(self):
self.created_param = layers.create_parameter(
shape=[100],
dtype='float32',
default_initializer=fluid.initializer.Uniform(low=-1.0, high=1.0))
def predict(self, obs):
out = obs + self.created_param()
return out
class TestModel3(Model):
def __init__(self):
self.fc1 = layers.fc(64, bias_attr=False)
self.batch_norm = layers.batch_norm()
def predict(self, obs):
hid1 = self.fc1(obs)
out = self.batch_norm(hid1)
return out
class ModelBaseTest(unittest.TestCase):
def setUp(self):
self.model = TestModel()
......@@ -412,6 +435,91 @@ class ModelBaseTest(unittest.TestCase):
self.assertLess(float(np.abs(real_target_outputs - out_np)), 1e-5)
def test_sync_params_with_create_parameter(self):
model = TestModel2()
target_model = deepcopy(model)
pred_program = fluid.Program()
with fluid.program_guard(pred_program):
obs = layers.data(name='obs', shape=[100], dtype='float32')
model_output = model.predict(obs)
target_model_output = target_model.predict(obs)
self.executor.run(fluid.default_startup_program())
N = 10
random_obs = np.random.random(size=(N, 100)).astype('float32')
for i in range(N):
x = np.expand_dims(random_obs[i], axis=0)
outputs = self.executor.run(
pred_program,
feed={'obs': x},
fetch_list=[model_output, target_model_output])
self.assertNotEqual(
np.sum(outputs[0].flatten()), np.sum(outputs[1].flatten()))
model.sync_params_to(target_model)
random_obs = np.random.random(size=(N, 100)).astype('float32')
for i in range(N):
x = np.expand_dims(random_obs[i], axis=0)
outputs = self.executor.run(
pred_program,
feed={'obs': x},
fetch_list=[model_output, target_model_output])
self.assertEqual(
np.sum(outputs[0].flatten()), np.sum(outputs[1].flatten()))
def test_sync_params_with_batch_norm(self):
model = TestModel3()
target_model = deepcopy(model)
program1 = fluid.Program()
program2 = fluid.Program()
with fluid.program_guard(program1):
obs = layers.data(
name='obs', shape=[32, 128, 128], dtype="float32")
model_output = model.predict(obs)
loss = layers.reduce_mean(model_output)
optimizer = fluid.optimizer.AdamOptimizer(1e-3)
optimizer.minimize(loss)
with fluid.program_guard(program2):
obs = layers.data(
name='obs', shape=[32, 128, 128], dtype="float32")
model_output = model.predict(obs)
target_model_output = target_model.predict(obs)
self.executor.run(fluid.default_startup_program())
N = 10
random_obs = np.random.random(size=(N, 32, 128, 128)).astype('float32')
for i in range(N):
x = np.expand_dims(random_obs[i], axis=0)
outputs = self.executor.run(
program2,
feed={'obs': x},
fetch_list=[model_output, target_model_output])
self.assertNotEqual(
np.sum(outputs[0].flatten()), np.sum(outputs[1].flatten()))
# run optimizing to make parameters of batch_norm between model and target_model are different
N = 100
random_obs = np.random.random(size=(N, 32, 128, 128)).astype('float32')
for i in range(N):
x = np.expand_dims(random_obs[i], axis=0)
self.executor.run(program1, feed={'obs': x})
model.sync_params_to(target_model)
random_obs = np.random.random(size=(N, 32, 128, 128)).astype('float32')
for i in range(N):
x = np.expand_dims(random_obs[i], axis=0)
outputs = self.executor.run(
program2,
feed={'obs': x},
fetch_list=[model_output, target_model_output])
self.assertEqual(
np.sum(outputs[0].flatten()), np.sum(outputs[1].flatten()))
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from copy import deepcopy
__all__ = ['AttrHolder']
class AttrHolder(object):
""" Mainly used for maintaining all ParamAttr in a parl.layers.LayerFunc
"""
def __init__(self, **kwargs):
"""
Args:
kwargs: {name:attr}
"""
self._attrs_dict = {}
for k, v in six.iteritems(kwargs):
self._add_attr(k, v)
def _add_attr(self, name, attr):
assert name not in self._attrs_dict
self._attrs_dict[name] = attr
def __setattr__(self, name, attr):
if not name.startswith('_'):
self._add_attr(name, attr)
else:
# self._attrs_dict
super(AttrHolder, self).__setattr__(name, attr)
def __getattr__(self, name):
if name in self._attrs_dict.keys():
return self._attrs_dict[name]
else:
return None
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in six.iteritems(self.__dict__):
setattr(result, k, deepcopy(v, memo))
return result
def sorted(self):
"""
Returns:
list of all attrs, which is sorted by key
"""
return [self._attrs_dict[k] for k in sorted(self._attrs_dict.keys())]
def tolist(self):
"""
Returns:
list of all attrs
"""
return list(six.itervalues(self._attrs_dict))
......@@ -26,6 +26,7 @@ from paddle.fluid.framework import Variable
from paddle.fluid.layers import *
from paddle.fluid.param_attr import ParamAttr
from parl.framework.model_base import Network
from parl.layers.attr_holder import AttrHolder
def update_attr_name(name, default_name, attr, is_bias):
......@@ -59,9 +60,8 @@ def update_attr_name(name, default_name, attr, is_bias):
class LayerFunc(object):
def __init__(self, param_attr=False, bias_attr=False):
self.param_attr = param_attr
self.bias_attr = bias_attr
def __init__(self, attr_holder):
self.attr_holder = attr_holder
def __deepcopy__(self, memo):
cls = self.__class__
......@@ -74,15 +74,16 @@ class LayerFunc(object):
for k, v in six.iteritems(self.__dict__):
setattr(copied, k, deepcopy(v, memo))
## then we need to create new para names for self.param_attr and self.bias_attr
## then we need to create new para names for param_attr in self.attr_holder
def create_new_para_name(attr):
if attr:
assert attr.name, "attr should have a name already!"
name_key = 'PARL_target_' + attr.name
attr.name = unique_name.generate(name_key)
create_new_para_name(copied.param_attr)
create_new_para_name(copied.bias_attr)
for attr in copied.attr_holder.tolist():
create_new_para_name(attr)
## We require the user to sync the parameter values later, because
## this deepcopy is supposed to be called only before the startup
## program. This function will cause the computation graph change, so
......@@ -91,18 +92,26 @@ class LayerFunc(object):
@property
def param_name(self):
if self.param_attr:
return self.param_attr.name
if self.attr_holder.param_attr:
return self.attr_holder.param_attr.name
else:
return None
@property
def bias_name(self):
if self.bias_attr:
return self.bias_attr.name
if self.attr_holder.bias_attr:
return self.attr_holder.bias_attr.name
else:
return None
@property
def all_params_names(self):
params_names = []
for attr in self.attr_holder.tolist():
if attr:
params_names.append(attr.name)
return params_names
def check_caller_name():
stack = inspect.stack()
......@@ -136,15 +145,16 @@ def fc(size,
class FC_(LayerFunc):
def __init__(self):
super(FC_, self).__init__(param_attr, bias_attr)
super(FC_, self).__init__(
AttrHolder(param_attr=param_attr, bias_attr=bias_attr))
def __call__(self, input, is_test=False):
return layers.fc(
input=input,
size=size,
num_flatten_dims=num_flatten_dims,
param_attr=self.param_attr,
bias_attr=self.bias_attr,
param_attr=self.attr_holder.param_attr,
bias_attr=self.attr_holder.bias_attr,
act=act,
is_test=is_test)
......@@ -166,7 +176,7 @@ def embedding(size,
class Embedding_(LayerFunc):
def __init__(self):
super(Embedding_, self).__init__(param_attr)
super(Embedding_, self).__init__(AttrHolder(param_attr=param_attr))
def __call__(self, input):
return layers.embedding(
......@@ -175,7 +185,7 @@ def embedding(size,
is_sparse=is_sparse,
is_distributed=is_distributed,
padding_idx=padding_idx,
param_attr=self.param_attr,
param_attr=self.attr_holder.param_attr,
dtype=dtype)
return Embedding_()
......@@ -201,7 +211,8 @@ def dynamic_lstm(size,
class DynamicLstm_(LayerFunc):
def __init__(self):
super(DynamicLstm_, self).__init__(param_attr, bias_attr)
super(DynamicLstm_, self).__init__(
AttrHolder(param_attr=param_attr, bias_attr=bias_attr))
def __call__(self, input, h_0=None, c_0=None):
return layers.dynamic_lstm(
......@@ -209,8 +220,8 @@ def dynamic_lstm(size,
h_0=h_0,
c_0=c_0,
size=size,
param_attr=self.param_attr,
bias_attr=self.bias_attr,
param_attr=self.attr_holder.param_attr,
bias_attr=self.attr_holder.bias_attr,
use_peepholes=use_peepholes,
is_reverse=is_reverse,
gate_activation=gate_activation,
......@@ -243,15 +254,16 @@ def dynamic_lstmp(size,
class DynamicLstmp_(LayerFunc):
def __init__(self):
super(DynamicLstmp_, self).__init__(param_attr, bias_attr)
super(DynamicLstmp_, self).__init__(
AttrHolder(param_attr=param_attr, bias_attr=bias_attr))
def __call__(self, input):
return layers.dynamic_lstmp(
input=input,
size=size,
proj_size=proj_size,
param_attr=self.param_attr,
bias_attr=self.bias_attr,
param_attr=self.attr_holder.param_attr,
bias_attr=self.attr_holder.bias_attr,
use_peepholes=use_peepholes,
is_reverse=is_reverse,
gate_activation=gate_activation,
......@@ -280,14 +292,15 @@ def dynamic_gru(size,
class DynamicGru_(LayerFunc):
def __init__(self):
super(DynamicGru_, self).__init__(param_attr, bias_attr)
super(DynamicGru_, self).__init__(
AttrHolder(param_attr=param_attr, bias_attr=bias_attr))
def __call__(self, input, h_0=None):
return layers.dynamic_gru(
input=input,
size=size,
param_attr=self.param_attr,
bias_attr=self.bias_attr,
param_attr=self.attr_holder.param_attr,
bias_attr=self.attr_holder.bias_attr,
is_reverse=is_reverse,
gate_activation=gate_activation,
candidate_activation=candidate_activation,
......@@ -329,7 +342,8 @@ def sequence_conv(num_filters,
class SequenceConv_(LayerFunc):
def __init__(self):
super(SequenceConv_, self).__init__(param_attr, bias_attr)
super(SequenceConv_, self).__init__(
AttrHolder(param_attr=param_attr, bias_attr=bias_attr))
def __call__(self, input):
return layers.sequence_conv(
......@@ -338,8 +352,8 @@ def sequence_conv(num_filters,
filter_size=filter_size,
filter_stride=filter_stride,
padding=padding,
bias_attr=self.bias_attr,
param_attr=self.param_attr,
bias_attr=self.attr_holder.bias_attr,
param_attr=self.attr_holder.param_attr,
act=act)
return SequenceConv_()
......@@ -366,7 +380,8 @@ def conv2d(num_filters,
class Conv2D_(LayerFunc):
def __init__(self):
super(Conv2D_, self).__init__(param_attr, bias_attr)
super(Conv2D_, self).__init__(
AttrHolder(param_attr=param_attr, bias_attr=bias_attr))
def __call__(self, input):
return layers.conv2d(
......@@ -377,8 +392,8 @@ def conv2d(num_filters,
padding=padding,
dilation=dilation,
groups=groups,
param_attr=self.param_attr,
bias_attr=self.bias_attr,
param_attr=self.attr_holder.param_attr,
bias_attr=self.attr_holder.bias_attr,
use_cudnn=use_cudnn,
act=act)
......@@ -406,7 +421,8 @@ def conv2d_transpose(num_filters,
class Conv2DTranspose_(LayerFunc):
def __init__(self):
super(Conv2DTranspose_, self).__init__(param_attr, bias_attr)
super(Conv2DTranspose_, self).__init__(
AttrHolder(param_attr=param_attr, bias_attr=bias_attr))
def __call__(self, input):
return layers.conv2d_transpose(
......@@ -417,8 +433,8 @@ def conv2d_transpose(num_filters,
padding=padding,
stride=stride,
dilation=dilation,
param_attr=self.param_attr,
bias_attr=self.bias_attr,
param_attr=self.attr_holder.param_attr,
bias_attr=self.attr_holder.bias_attr,
use_cudnn=use_cudnn,
act=act)
......@@ -436,7 +452,8 @@ def lstm_unit(forget_bias=0.0, param_attr=None, bias_attr=None, name=None):
class LstmUnit_(LayerFunc):
def __init__(self):
super(LstmUnit_, self).__init__(param_attr, bias_attr)
super(LstmUnit_, self).__init__(
AttrHolder(param_attr=param_attr, bias_attr=bias_attr))
def __call__(self, x_t, hidden_t_prev, cell_t_prev):
return layers.lstm_unit(
......@@ -444,7 +461,7 @@ def lstm_unit(forget_bias=0.0, param_attr=None, bias_attr=None, name=None):
hidden_t_prev=hidden_t_prev,
cell_t_prev=cell_t_prev,
forget_bias=forget_bias,
param_attr=self.param_attr,
param_attr=self.attr_holder.param_attr,
bias_attr=self.bias_attr)
return LstmUnit_()
......@@ -463,13 +480,13 @@ def row_conv(future_context_size, param_attr=None, act=None, name=None):
class RowConv_(LayerFunc):
def __init__(self):
super(RowConv_, self).__init__(param_attr)
super(RowConv_, self).__init__(AttrHolder(param_attr=param_attr))
def __call__(self, input):
return layers.row_conv(
input=input,
future_context_size=future_context_size,
param_attr=self.param_attr,
param_attr=self.attr_holder.param_attr,
act=act)
return RowConv_()
......@@ -479,27 +496,85 @@ def layer_norm(**kwargs):
raise NotImplementedError()
def create_persistable_variable(shape,
def batch_norm(act=None,
momentum=0.9,
epsilon=1e-05,
param_attr=None,
bias_attr=None,
data_layout='NCHW',
in_place=False,
name=None,
moving_mean_name=None,
moving_variance_name=None,
do_model_average_for_mean_and_var=False,
fuse_with_relu=False):
"""
Return a function that creates a paddle.fluid.layers.batch_norm.
"""
default_name = "batch_norm"
param_attr = update_attr_name(name, default_name, param_attr, False)
bias_attr = update_attr_name(name, default_name, bias_attr, True)
moving_mean_attr = update_attr_name(name, default_name + "_moving_mean",
None, False)
moving_variance_attr = update_attr_name(
name, default_name + "_moving_variance", None, False)
check_caller_name()
class BatchNorm_(LayerFunc):
def __init__(self):
super(BatchNorm_, self).__init__(
AttrHolder(
param_attr=param_attr,
bias_attr=bias_attr,
moving_mean_attr=moving_mean_attr,
moving_variance_attr=moving_variance_attr))
def __call__(self, input, is_test=False):
return layers.batch_norm(
input=input,
act=act,
is_test=is_test,
momentum=momentum,
epsilon=epsilon,
param_attr=self.attr_holder.param_attr,
bias_attr=self.attr_holder.bias_attr,
data_layout=data_layout,
in_place=in_place,
name=name,
moving_mean_name=self.attr_holder.moving_mean_attr.name,
moving_variance_name=self.attr_holder.moving_variance_attr.
name,
do_model_average_for_mean_and_var=
do_model_average_for_mean_and_var,
fuse_with_relu=fuse_with_relu)
return BatchNorm_()
def create_parameter(shape,
dtype,
name=None,
attr=None,
is_bias=False,
default_initializer=None):
"""
Return a function that creates a parameter which cannot be synchronized like those of layers
Return a function that creates a paddle.fluid.layers.create_parameter.
This function can be called in Algorithm, so we don't check the caller nor require that
the variable can be copied.
"""
default_name = "per_var"
attr = update_attr_name(name, default_name, attr, is_bias)
param_attr = update_attr_name(name, "create_parameter", attr, False)
check_caller_name()
class CreateParameter_(LayerFunc):
def __init__(self):
super(CreateParameter_, self).__init__(
AttrHolder(param_attr=param_attr))
class CreateParameter_(object):
def __call__(self):
return layers.create_parameter(
shape=shape,
dtype=dtype,
attr=attr,
attr=self.attr_holder.param_attr,
is_bias=is_bias,
default_initializer=default_initializer)
......
......@@ -32,9 +32,11 @@ class MyNetWork(Network):
self.conv2d = layers.conv2d(
num_filters=64,
filter_size=3,
param_attr=self.embedding.param_attr,
param_attr=self.embedding.attr_holder.param_attr,
name="my_conv2d")
self.batch_norm = layers.batch_norm()
class TestParamName(unittest.TestCase):
def test_name_number(self):
......@@ -65,6 +67,13 @@ class TestParamName(unittest.TestCase):
self.assertEqual(net.conv2d.param_name, "embedding.w_0")
self.assertEqual(net.conv2d.bias_name, "my_conv2d.b_0")
self.assertSetEqual(
set(net.batch_norm.all_params_names),
set([
'batch_norm.w_0', 'batch_norm.b_0',
'batch_norm_moving_mean.w_0', 'batch_norm_moving_variance.w_0'
]))
if __name__ == '__main__':
unittest.main()
......@@ -25,8 +25,13 @@ class MyNetWork(Network):
self.fc2 = layers.fc(64, bias_attr=False)
self.fc3 = layers.fc(64, name="fc")
self.fc4 = layers.fc(64, name="fc")
self.embedding = layers.embedding((100, 64),
param_attr=self.fc1.param_attr)
self.embedding = layers.embedding(
(100, 64), param_attr=self.fc1.attr_holder.param_attr)
self.created_param = layers.create_parameter(
shape=[100],
dtype='float32',
default_initializer=fluid.initializer.Uniform(low=-1.0, high=1.0))
self.batch_norm = layers.batch_norm()
class TestParamSharing(unittest.TestCase):
......@@ -97,6 +102,62 @@ class TestParamSharing(unittest.TestCase):
self.assertEqual(
np.sum(outputs[2].flatten()), np.sum(old_y1.flatten()))
def test_param_sharing_with_create_parameter(self):
"""
Test case for parameter sharing of create_parameter op
"""
net = MyNetWork()
main_program1 = fluid.Program()
with fluid.program_guard(main_program1):
x = layers.data(name='x', shape=[100], dtype="float32")
out1 = x + net.created_param()
main_program2 = fluid.Program()
with fluid.program_guard(main_program2):
x = layers.data(name='x', shape=[100], dtype="float32")
out2 = x + net.created_param()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
input_np = np.random.uniform(0, 1, [1, 100]).astype("float32")
out1_np = exe.run(
main_program1, feed={"x": input_np}, fetch_list=[out1])[0]
out2_np = exe.run(
main_program2, feed={"x": input_np}, fetch_list=[out2])[0]
self.assertEqual(np.sum(out1_np.flatten()), np.sum(out2_np.flatten()))
def test_param_sharing_with_batch_norm(self):
"""
Test case for batch_norm layer
"""
net = MyNetWork()
main_program1 = fluid.Program()
with fluid.program_guard(main_program1):
x = layers.data(name='x', shape=[32, 128, 128], dtype="float32")
hid1 = net.fc1(x)
out1 = net.batch_norm(hid1)
main_program2 = fluid.Program()
with fluid.program_guard(main_program2):
x = layers.data(name='x', shape=[32, 128, 128], dtype="float32")
hid1 = net.fc1(x)
out2 = net.batch_norm(hid1)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
input_np = np.random.uniform(0, 1, [1, 32, 128, 128]).astype("float32")
out1_np = exe.run(
main_program1, feed={"x": input_np}, fetch_list=[out1])[0]
out2_np = exe.run(
main_program2, feed={"x": input_np}, fetch_list=[out2])[0]
self.assertEqual(np.sum(out1_np.flatten()), np.sum(out2_np.flatten()))
if __name__ == "__main__":
unittest.main()
......@@ -26,13 +26,12 @@ __all__ = [
]
def fetch_framework_var(attr_name, is_bias):
def fetch_framework_var(attr_name):
""" Fetch framework variable according given attr_name.
Return a new reusing variable through create_parameter way
Args:
attr_name: string, attr name of parameter
is_bias: bool, decide whether the parameter is bias
Returns:
framework_var: framework.Varialbe
......@@ -42,10 +41,7 @@ def fetch_framework_var(attr_name, is_bias):
core_var = scope.find_var(attr_name)
shape = core_var.get_tensor().shape()
framework_var = fluid.layers.create_parameter(
shape=shape,
dtype='float32',
attr=fluid.ParamAttr(name=attr_name),
is_bias=is_bias)
shape=shape, dtype='float32', attr=fluid.ParamAttr(name=attr_name))
return framework_var
......@@ -82,11 +78,14 @@ def get_parameter_pairs(src, target):
target_var = getattr(target, attr)
param_pairs.extend(get_parameter_pairs(src_var, target_var))
elif isinstance(src, LayerFunc):
param_pairs.append((src.param_attr.name, target.param_attr.name,
False))
if src.bias_attr:
param_pairs.append((src.bias_attr.name, target.bias_attr.name,
True))
src_attrs = src.attr_holder.sorted()
target_attrs = target.attr_holder.sorted()
assert len(src_attrs) == len(target_attrs), \
"number of ParamAttr between source layer and target layer should be same."
for (src_attr, target_attr) in zip(src_attrs, target_attrs):
if src_attr:
assert target_attr, "ParamAttr between source layer and target layer is inconsistent."
param_pairs.append((src_attr.name, target_attr.name))
elif isinstance(src, tuple) or isinstance(src, list) or isinstance(
src, set):
for src_var, target_var in zip(src, target):
......@@ -118,9 +117,9 @@ def get_parameter_names(obj):
if isinstance(val, Network):
parameter_names.extend(get_parameter_names(val))
elif isinstance(val, LayerFunc):
parameter_names.append(val.param_name)
if val.bias_name is not None:
parameter_names.append(val.bias_name)
for attr in val.attr_holder.tolist():
if attr:
parameter_names.append(attr.name)
elif isinstance(val, tuple) or isinstance(val, list) or isinstance(
val, set):
for x in val:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册