提交 21b4d90a 编写于 作者: L Luo Tao

Merge branch 'develop' into anakin_test

......@@ -65,6 +65,7 @@ option(REPLACE_ENFORCE_GLOG "Replace PADDLE_ENFORCE with glog/CHECK for better d
option(WITH_ANAKIN "Compile with Anakin library" OFF)
option(WITH_GRPC "Use grpc as the default rpc framework" ${WITH_DISTRIBUTE})
option(WITH_BRPC_RDMA "Use brpc rdma as the rpc protocal" OFF)
option(WITH_INFERENCE "Compile fluid inference library" ON)
option(WITH_SYSTEM_BLAS "Use system blas library" OFF)
option(PY_VERSION "Compile PaddlePaddle with python3 support" ${PY_VERSION})
......
......@@ -264,6 +264,8 @@ function(cc_test TARGET_NAME)
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if (${cc_test_SERIAL})
set_property(TEST ${TARGET_NAME} PROPERTY RUN_SERIAL 1)
set_property(TEST ${TARGET_NAME} PROPERTY ENVIRONMENT FLAGS_cpu_deterministic=true)
set_property(TEST ${TARGET_NAME} PROPERTY ENVIRONMENT FLAGS_init_allocated_mem=true)
set_property(TEST ${TARGET_NAME} PROPERTY ENVIRONMENT FLAGS_cudnn_deterministic=true)
endif()
......@@ -330,6 +332,8 @@ function(nv_test TARGET_NAME)
add_test(${TARGET_NAME} ${TARGET_NAME})
if (nv_test_SERIAL)
set_property(TEST ${TARGET_NAME} PROPERTY RUN_SERIAL 1)
set_property(TEST ${TARGET_NAME} PROPERTY ENVIRONMENT FLAGS_cpu_deterministic=true)
set_property(TEST ${TARGET_NAME} PROPERTY ENVIRONMENT FLAGS_init_allocated_mem=true)
set_property(TEST ${TARGET_NAME} PROPERTY ENVIRONMENT FLAGS_cudnn_deterministic=true)
endif()
......@@ -580,6 +584,7 @@ function(py_test TARGET_NAME)
cmake_parse_arguments(py_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
add_test(NAME ${TARGET_NAME}
COMMAND env FLAGS_init_allocated_mem=true FLAGS_cudnn_deterministic=true
FLAGS_cpu_deterministic=true
PYTHONPATH=${PADDLE_BINARY_DIR}/python ${py_test_ENVS}
${PYTHON_EXECUTABLE} -u ${py_test_SRCS} ${py_test_ARGS}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
......
......@@ -5,5 +5,7 @@ add_subdirectory(operators)
add_subdirectory(pybind)
add_subdirectory(string)
add_subdirectory(recordio)
# NOTE: please add subdirectory inference at last.
add_subdirectory(inference)
if(WITH_INFERENCE)
# NOTE: please add subdirectory inference at last.
add_subdirectory(inference)
endif()
......@@ -21,6 +21,26 @@ namespace framework {
namespace details {
struct BuildStrategy {
// ParallelExecutor supports two modes of ReduceStrategy, kAllReduce and
// kReduce, for CPU and GPU. If you use kAllReduce, different threads
// optimize their parameters separately. If you use kReduce, the optimizations
// of parameters are distributed to different threads.
// For example, a model has 100 parameters and is running with four threads,
// if you choose kAllReduce, every thread is to optimize 100 parameters
// separately, if you choose kReduce, every thread is to optimize 25
// parameters.
// Of particular note is, if you use kReduce when using CPU training,
// all the parameters are shared between different threads. This feature will
// save memory.
// FIXME(zcd): The result of the two modes(kAllReduce and kReduce) maybe not
// equal for GPU. Because, the result of the different order of summing maybe
// different, for example, the result of `a+b+c+d` may be different with the
// result of `c+a+b+d`.
// For GPU, the implementation of kAllReduce and kReduce is adopted NCCL,
// so the result of kAllReduce and kReduce maybe not equal.
// For CPU, if you want to fix the order of summing to make the result
// of kAllReduce and kReduce no diff, you can add
// `FLAGS_cpu_deterministic=true` to env.
enum class ReduceStrategy { kAllReduce = 0, kReduce = 1 };
enum class GradientScaleStrategy {
......
......@@ -18,6 +18,10 @@
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/platform/profiler.h"
DEFINE_bool(
cpu_deterministic, false,
"Whether to make the result of computation deterministic in CPU side.");
namespace paddle {
namespace framework {
namespace details {
......@@ -91,11 +95,33 @@ void ReduceOpHandle::RunImpl() {
} else {
std::vector<const LoDTensor *> lod_tensors =
GetInputValues<LoDTensor>(in_var_handles, var_scopes);
if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) {
this->RunAndRecordEvent([&] {
// FIXME(zcd): The order of summing is important,
// especially when the type of data is float or double.
// For example, the result of `a+b+c+d` may be different
// with the result of `c+a+b+d`, so the summing order should be fixed.
if (!FLAGS_cpu_deterministic) {
ReduceLoDTensor func(lod_tensors,
out_var->GetMutable<framework::LoDTensor>());
VisitDataType(ToDataType(lod_tensors[0]->type()), func);
} else {
// We sum lod_tensors to reduce_sum_trg which is in local_scopes_0
// here, but it doesn't mean reduce_sum_trg must be in local_scopes_0.
auto &reduce_sum_trg = *this->local_scopes_[0]
->FindVar(kLocalExecScopeName)
->Get<Scope *>()
->FindVar(out_var_handle->name_)
->GetMutable<framework::LoDTensor>();
ReduceLoDTensor func(lod_tensors, &reduce_sum_trg);
VisitDataType(ToDataType(lod_tensors[0]->type()), func);
auto trg = out_var->GetMutable<framework::LoDTensor>();
if (reduce_sum_trg.data<void>() != trg->data<void>()) {
TensorCopy(reduce_sum_trg, platform::CPUPlace(), trg);
}
}
});
} else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) {
#ifdef PADDLE_WITH_CUDA
......
......@@ -15,6 +15,7 @@
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/operators/detail/safe_ref.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle {
namespace operators {
......@@ -65,6 +66,12 @@ class ReadOp : public framework::OperatorBase {
.GetMutable<framework::ReaderHolder>();
std::vector<std::string> out_arg_names = Outputs("Out");
std::vector<framework::LoDTensor> ins;
// For profiling
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(dev_place);
platform::RecordEvent record_event(Type(), &ctx);
reader->ReadNext(&ins);
if (ins.empty()) {
if (Attr<bool>("throw_eof_exp")) {
......
......@@ -123,7 +123,8 @@ def __bootstrap__():
read_env_flags = [
'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir',
'eager_delete_scope', 'use_mkldnn', 'initial_cpu_memory_in_mb',
'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads'
'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads',
'cpu_deterministic'
]
if core.is_compiled_with_dist():
read_env_flags.append('rpc_deadline')
......
......@@ -174,6 +174,9 @@ class SE_ResNeXt():
padding=(filter_size - 1) / 2,
groups=groups,
act=None,
# avoid pserver CPU init differs from GPU
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant()),
bias_attr=False)
return fluid.layers.batch_norm(input=conv, act=act)
......@@ -194,10 +197,8 @@ class SE_ResNeXt():
def get_model(batch_size):
# Input data
image = fluid.layers.fill_constant(
shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0)
label = fluid.layers.fill_constant(
shape=[batch_size, 1], dtype='int64', value=0.0)
image = fluid.layers.data(name="data", shape=[3, 224, 224], dtype='float32')
label = fluid.layers.data(name="int64", shape=[1], dtype='int64')
# Train program
model = SE_ResNeXt(layers=50)
......@@ -222,8 +223,10 @@ def get_model(batch_size):
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
optimizer = fluid.optimizer.Momentum(
learning_rate=fluid.layers.piecewise_decay(
boundaries=bd, values=lr),
# FIXME(typhoonzero): add back LR decay once ParallelExecutor fixed.
#learning_rate=fluid.layers.piecewise_decay(
# boundaries=bd, values=lr),
learning_rate=base_lr,
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
optimizer.minimize(avg_cost)
......@@ -232,7 +235,7 @@ def get_model(batch_size):
train_reader = paddle.batch(
paddle.dataset.flowers.train(), batch_size=batch_size)
test_reader = paddle.batch(
paddle.dataset.flowers.test(), batch_size=batch_size)
paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size)
return test_program, avg_cost, train_reader, test_reader, acc_top1, out
......@@ -256,7 +259,6 @@ class DistSeResneXt2x2:
trainers)
pserver_prog = t.get_pserver_program(current_endpoint)
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
......@@ -302,12 +304,19 @@ class DistSeResneXt2x2:
]
feeder = fluid.DataFeeder(feed_var_list, place)
reader_generator = train_reader()
first_loss, = exe.run(fetch_list=[avg_cost.name])
reader_generator = test_reader()
data = next(reader_generator)
first_loss, = exe.run(fetch_list=[avg_cost.name],
feed=feeder.feed(data))
print(first_loss)
for i in xrange(5):
loss, = exe.run(fetch_list=[avg_cost.name])
last_loss, = exe.run(fetch_list=[avg_cost.name])
data = next(reader_generator)
loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data))
data = next(reader_generator)
last_loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data))
print(last_loss)
......
......@@ -63,7 +63,8 @@ class TestDistBase(unittest.TestCase):
"PATH": os.getenv("PATH"),
"PYTHONPATH": os.getenv("PYTHONPATH"),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH"),
"FLAGS_fraction_of_gpu_memory_to_use": "0.15"
"FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_cudnn_deterministic": "1"
}
# Run local to get a base line
env_local = {"CUDA_VISIBLE_DEVICES": "0"}
......
......@@ -17,8 +17,7 @@ from test_dist_base import TestDistBase
class TestDistSeResneXt2x2(TestDistBase):
def test_se_resnext(self):
# TODO(paddle-dev): Is the delta too large?
self.check_with_place("dist_se_resnext.py", delta=0.2)
self.check_with_place("dist_se_resnext.py")
if __name__ == "__main__":
......
......@@ -359,5 +359,110 @@ class TestL2DecayWithPiecewise(TranspilerTest):
["sum", "scale", "scale", "elementwise_add", "momentum"])
class TestDistLookupTableBase(TranspilerTest):
def network_with_table(self, is_sparse, is_distributed):
def emb_pool(ids):
table_size = 1000
emb_size = 64
emb = fluid.layers.embedding(
input=ids,
size=[table_size, emb_size],
dtype='float32',
param_attr='shared_w', # share parameter
is_sparse=is_sparse,
is_distributed=is_distributed)
pool = fluid.layers.sequence_pool(input=emb, pool_type='average')
return pool
title_ids = fluid.layers.data(
name='title_ids', shape=[1], dtype='int64', lod_level=1)
brand_ids = fluid.layers.data(
name='brand_ids', shape=[1], dtype='int64', lod_level=1)
title_emb = emb_pool(title_ids)
brand_emb = emb_pool(brand_ids)
fc0 = fluid.layers.concat(input=[title_emb, brand_emb], axis=1)
predict = fluid.layers.fc(input=fc0,
size=2,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=fluid.ParamAttr(name='fc_b'))
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(cost)
optimizer = fluid.optimizer.Adam(learning_rate=0.003)
optimizer.minimize(avg_cost)
class TestLocalLookupTable(TestDistLookupTableBase):
def net_conf(self):
self.network_with_table(is_sparse=True, is_distributed=False)
def transpiler_test_impl(self):
pserver1, startup1 = self.get_pserver(self.pserver1_ep)
self.assertEqual(len(pserver1.blocks), 3)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self.assertEqual([op.type for op in pserver1.blocks[1].ops],
["sum", "scale", "adam", "scale", "scale"])
# 2 optimize for table adam
# NOTE: if param is not selected rows, the grad will scaled to grad / trainer_num
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
["sum", "adam", "scale", "scale"])
trainer = self.get_trainer()
self.assertEqual(len(trainer.blocks), 1)
ops = [
'lookup_table', 'sequence_pool', 'lookup_table', 'sequence_pool',
'concat', 'mul', 'elementwise_add', 'cross_entropy', 'mean',
'fill_constant', 'mean_grad', 'cross_entropy_grad',
'elementwise_add_grad', 'send', 'mul_grad', 'send', 'concat_grad',
'sequence_pool_grad', 'lookup_table_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sum', 'split_selected_rows', 'send',
'send_barrier', 'recv', 'recv', 'recv', 'fetch_barrier', 'concat'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
class TestDistLookupTable(TestDistLookupTableBase):
def net_conf(self):
self.network_with_table(is_sparse=True, is_distributed=True)
def transpiler_test_impl(self):
pserver1, startup1 = self.get_pserver(self.pserver1_ep)
self.assertEqual(len(pserver1.blocks), 6)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self.assertEqual([op.type for op in pserver1.blocks[1].ops],
["sum", "scale", "adam", "scale", "scale"])
# 2 optimize for table sgd
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
["sum", "sgd"])
# 3 prefetch -> lookup_sparse_table for data0
self.assertEqual([op.type for op in pserver1.blocks[3].ops],
["lookup_sparse_table"])
# 4 prefetch -> lookup_sparse_table for data1
self.assertEqual([op.type for op in pserver1.blocks[4].ops],
["lookup_sparse_table"])
# 5 save table
self.assertEqual([op.type for op in pserver1.blocks[5].ops], ["save"])
trainer = self.get_trainer()
self.assertEqual(len(trainer.blocks), 1)
ops = [
'split_ids', 'prefetch', 'merge_ids', 'sequence_pool', 'split_ids',
'prefetch', 'merge_ids', 'sequence_pool', 'concat', 'mul',
'elementwise_add', 'cross_entropy', 'mean', 'fill_constant',
'mean_grad', 'cross_entropy_grad', 'elementwise_add_grad', 'send',
'mul_grad', 'send', 'concat_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sequence_pool_grad', 'lookup_table_grad',
'sum', 'split_ids', 'send', 'send_barrier', 'recv', 'recv',
'fetch_barrier'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
if __name__ == "__main__":
unittest.main()
......@@ -198,7 +198,7 @@ class TestResnet(TestParallelExecutorBase):
model,
use_cuda,
iter=20,
delta2=1e-4):
delta2=1e-6):
if use_cuda and not core.is_compiled_with_cuda():
return
......@@ -276,10 +276,10 @@ class TestResnet(TestParallelExecutorBase):
model=SE_ResNeXt50Small, use_cuda=False, iter=2, delta2=1e-3)
def test_seresnext_with_new_strategy(self):
# self._compare_reduce_and_allreduce(
# model=SE_ResNeXt50Small, use_cuda=True)
self._compare_reduce_and_allreduce(
model=SE_ResNeXt50Small, use_cuda=False, iter=5, delta2=1e-2)
model=SE_ResNeXt50Small, use_cuda=True, delta2=1e-2)
self._compare_reduce_and_allreduce(
model=SE_ResNeXt50Small, use_cuda=False, iter=5)
if __name__ == '__main__':
......
......@@ -896,8 +896,6 @@ class DistributeTranspiler(object):
self.table_name
][0]
table_opt_block = pserver_program.create_block(pre_block_idx)
# only support sgd now
assert table_opt_op.type == "sgd"
if self.sync_mode:
# create grad vars in pserver program
......@@ -937,11 +935,12 @@ class DistributeTranspiler(object):
"LearningRate": [lr_var]
}
outputs = {"ParamOut": [param_var]}
table_opt_block.append_op(
type=table_opt_op.type,
inputs=inputs,
outputs=outputs,
attrs=table_opt_op.attrs)
# only support sgd now
import logging
logging.warn(
"distribute lookup table only support sgd optimizer, change it's optimizer to sgd instead of "
+ table_opt_op.type)
table_opt_block.append_op(type="sgd", inputs=inputs, outputs=outputs)
# add table parameter gradient and it's block id to grad_to_block_id
grad_to_block_id.append(grad_var.name + ":" + str(table_opt_block.idx))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册