提交 764723d4 编写于 作者: W wanghaoshuang

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into fix_avg

...@@ -2,4 +2,15 @@ ...@@ -2,4 +2,15 @@
Cluster Training and Prediction Cluster Training and Prediction
############################### ###############################
TBD .. contents::
1. Network connection errors in the log during multi-node cluster training
------------------------------------------------
There are maybe some errors in the log belonging to network connection problem during multi-node cluster training, for example, :code:`Connection reset by peer`.
This kind of error is usually caused by the abnormal exit of a training process in some node, and the other nodes cannot connect with this node any longer. Steps to troubleshoot the problem are as follows:
* Find the first error in the :code:`train.log`, :code:`server.log`, check whether other fault casued the problem, such as FPE, lacking of memory or disk.
* If the first error in server.log says "Address already used", this may be caused by the port conflict of the non-exclusive execution. Connect the sys-admin to check if the current MPI cluster supports jobs submitted with parameter :code:`resource=full`. If the current MPI cluster does not support this parameter, change the server port and try agian.
* If the current MPI cluster does not support exclusive pattern which allows a process to occupy the whole node, ask the administrator to replace or update the this cluster.
...@@ -147,15 +147,52 @@ void BlockDesc::RemoveOp(size_t s, size_t e) { ...@@ -147,15 +147,52 @@ void BlockDesc::RemoveOp(size_t s, size_t e) {
if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) { if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) {
return; return;
} }
auto get_vars = [](std::deque<std::unique_ptr<OpDesc>>::iterator &op,
std::vector<std::string> &v) {
auto in_names = (*op)->InputArgumentNames();
v.insert(v.end(), in_names.begin(), in_names.end());
auto out_names = (*op)->OutputArgumentNames();
v.insert(v.end(), out_names.begin(), out_names.end());
std::sort(v.begin(), v.end());
auto last = std::unique(v.begin(), v.end());
v.erase(last, v.end());
};
need_update_ = true; need_update_ = true;
for (auto it = ops_.begin() + s; it != ops_.begin() + e; it++) {
auto names = (*it)->InputArgumentNames(); for (size_t i = s; i < e; i++) {
for (auto n : names) { // since remove op one by one, every time remove the first op.
// TODO(typhoonzero): delete vars if no other op use it. auto op = ops_.begin() + s;
VLOG(3) << "deleting var " << n;
// collect input and output variables from current delete op
std::vector<std::string> cur_vars;
get_vars(op, cur_vars);
// remove current op
ops_.erase(ops_.begin() + s);
// collect input and output variables from other ops
std::vector<std::string> other_vars;
for (auto it = ops_.begin(); it != ops_.end(); it++) {
get_vars(it, other_vars);
}
// variables should be deleted
std::vector<std::string> delete_vars;
// delete_vars = cur_vars - cur_vars ^ other_input_vars
std::set_difference(cur_vars.begin(), cur_vars.end(), other_vars.begin(),
other_vars.end(),
std::inserter(delete_vars, delete_vars.end()));
// remove variables
for (size_t i = 0; i < delete_vars.size(); i++) {
auto name = delete_vars[i];
auto it = vars_.find(name);
PADDLE_ENFORCE(it != vars_.end(),
"%s is not in variable list, it should not be deleted",
name);
vars_.erase(it);
VLOG(3) << "deleting variable " << name;
} }
} }
ops_.erase(ops_.begin() + s, ops_.begin() + e);
} }
std::vector<OpDesc *> BlockDesc::AllOps() const { std::vector<OpDesc *> BlockDesc::AllOps() const {
......
...@@ -89,6 +89,11 @@ class BlockDesc { ...@@ -89,6 +89,11 @@ class BlockDesc {
OpDesc *InsertOp(size_t index); OpDesc *InsertOp(size_t index);
/*
* Remove Op and its input/output variables.
* Note that for either input or ouput variable, if it is also an input or
* output variable of other ops, we should remain it.
*/
void RemoveOp(size_t s, size_t e); void RemoveOp(size_t s, size_t e);
std::vector<OpDesc *> AllOps() const; std::vector<OpDesc *> AllOps() const;
......
...@@ -48,6 +48,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -48,6 +48,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
void* dest, int size) { void* dest, int size) {
const void* data = NULL; const void* data = NULL;
int size_to_write = 0; int size_to_write = 0;
int length = size;
int total_written = 0;
if (platform::is_gpu_place(place)) { if (platform::is_gpu_place(place)) {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
...@@ -56,16 +58,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -56,16 +58,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
platform::CPUPlace cpu; platform::CPUPlace cpu;
char* p = reinterpret_cast<char*>(dest); char* p = reinterpret_cast<char*>(dest);
while (size > 0) { while (total_written < length) {
if (!input->GetDirectBufferPointer(&data, &size_to_write)) { if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
return false; return false;
} }
// NOTE: if raw buffer is large and have two neighbor fields of raw
// buffers GetDirectBufferPointer can get all of them, use length to
// truncate it.
if (total_written + size_to_write > length) {
size_to_write = length - total_written;
}
memory::Copy(boost::get<platform::CUDAPlace>(place), memory::Copy(boost::get<platform::CUDAPlace>(place),
reinterpret_cast<void*>(p), cpu, data, size_to_write, reinterpret_cast<void*>(p), cpu, data, size_to_write,
gpu_dev_ctx.stream()); gpu_dev_ctx.stream());
p += size_to_write; p += size_to_write;
size -= size_to_write; total_written += size_to_write;
input->Skip(size_to_write); input->Skip(size_to_write);
} }
...@@ -77,16 +84,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -77,16 +84,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
} }
char* p = reinterpret_cast<char*>(dest); char* p = reinterpret_cast<char*>(dest);
while (size > 0) { while (total_written < length) {
if (!input->GetDirectBufferPointer(&data, &size_to_write)) { if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
return false; return false;
} }
// NOTE: if raw buffer is large and have two neighbor fields of raw buffers
// GetDirectBufferPointer can get all of them, use length to truncate it.
if (total_written + size_to_write > length) {
size_to_write = length - total_written;
}
// TODO(gongwb): can we avoid copy? // TODO(gongwb): can we avoid copy?
platform::CPUPlace cpu; platform::CPUPlace cpu;
memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write); memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write);
p += size_to_write; p += size_to_write;
size -= size_to_write; total_written += size_to_write;
input->Skip(size_to_write); input->Skip(size_to_write);
} }
...@@ -153,6 +165,7 @@ bool VariableResponse::CopySelectRowsData( ...@@ -153,6 +165,7 @@ bool VariableResponse::CopySelectRowsData(
const platform::DeviceContext& ctx, int length) { const platform::DeviceContext& ctx, int length) {
auto var = scope_->FindVar(meta_.varname()); auto var = scope_->FindVar(meta_.varname());
auto* slr = var->GetMutable<framework::SelectedRows>(); auto* slr = var->GetMutable<framework::SelectedRows>();
slr->mutable_rows()->resize(length / 8); // int64
int64_t* rows_data = slr->mutable_rows()->data(); int64_t* rows_data = slr->mutable_rows()->data();
// copy rows CPU data, GPU data will be copied lazily. // copy rows CPU data, GPU data will be copied lazily.
...@@ -233,7 +246,6 @@ int VariableResponse::Parse(Source* source) { ...@@ -233,7 +246,6 @@ int VariableResponse::Parse(Source* source) {
if (tag != 0) { if (tag != 0) {
return -1; return -1;
} }
return 0; return 0;
} }
......
...@@ -144,7 +144,12 @@ class ParallelDoOp : public framework::OperatorBase { ...@@ -144,7 +144,12 @@ class ParallelDoOp : public framework::OperatorBase {
PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(), PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(),
"Only support parameter type as LoDTensor"); "Only support parameter type as LoDTensor");
auto &src = scope.FindVar(param)->Get<LoDTensor>(); auto &src = scope.FindVar(param)->Get<LoDTensor>();
for (size_t i = 0; i < sub_scopes.size(); ++i) {
auto *sub_scope0 = sub_scopes[0];
auto *dst0 = sub_scope0->Var(param)->GetMutable<LoDTensor>();
dst0->ShareDataWith(src);
for (size_t i = 1; i < sub_scopes.size(); ++i) {
auto &place = places[i]; auto &place = places[i];
auto *sub_scope = sub_scopes[i]; auto *sub_scope = sub_scopes[i];
auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>(); auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
......
...@@ -153,9 +153,15 @@ if [ $? -ne 0 ]; then ...@@ -153,9 +153,15 @@ if [ $? -ne 0 ]; then
exit 1 exit 1
fi fi
INSTALLED_VERSION=`pip freeze 2>/dev/null | grep '^paddle' | sed 's/.*==//g'` if [ "@WITH_GPU@" == "ON" ]; then
PADDLE_NAME="paddlepaddle-gpu"
else
PADDLE_NAME="paddlepaddle"
fi
INSTALLED_VERSION=`pip freeze 2>/dev/null | grep "^${PADDLE_NAME}==" | sed 's/.*==//g'`
if [ -z ${INSTALLED_VERSION} ]; then if [ -z "${INSTALLED_VERSION}" ]; then
INSTALLED_VERSION="0.0.0" # not installed INSTALLED_VERSION="0.0.0" # not installed
fi fi
cat <<EOF | python - cat <<EOF | python -
......
...@@ -1483,6 +1483,7 @@ def batch_norm(input, ...@@ -1483,6 +1483,7 @@ def batch_norm(input,
param_attr=None, param_attr=None,
bias_attr=None, bias_attr=None,
data_layout='NCHW', data_layout='NCHW',
in_place=False,
name=None, name=None,
moving_mean_name=None, moving_mean_name=None,
moving_variance_name=None): moving_variance_name=None):
...@@ -1538,7 +1539,7 @@ def batch_norm(input, ...@@ -1538,7 +1539,7 @@ def batch_norm(input,
saved_mean = helper.create_tmp_variable(dtype=dtype, stop_gradient=True) saved_mean = helper.create_tmp_variable(dtype=dtype, stop_gradient=True)
saved_variance = helper.create_tmp_variable(dtype=dtype, stop_gradient=True) saved_variance = helper.create_tmp_variable(dtype=dtype, stop_gradient=True)
batch_norm_out = helper.create_tmp_variable(dtype) batch_norm_out = input if in_place else helper.create_tmp_variable(dtype)
helper.append_op( helper.append_op(
type="batch_norm", type="batch_norm",
......
...@@ -98,7 +98,7 @@ def img_conv_group(input, ...@@ -98,7 +98,7 @@ def img_conv_group(input,
use_mkldnn=use_mkldnn) use_mkldnn=use_mkldnn)
if conv_with_batchnorm[i]: if conv_with_batchnorm[i]:
tmp = layers.batch_norm(input=tmp, act=conv_act) tmp = layers.batch_norm(input=tmp, act=conv_act, in_place=True)
drop_rate = conv_batchnorm_drop_rate[i] drop_rate = conv_batchnorm_drop_rate[i]
if abs(drop_rate) > 1e-5: if abs(drop_rate) > 1e-5:
tmp = layers.dropout(x=tmp, dropout_prob=drop_rate) tmp = layers.dropout(x=tmp, dropout_prob=drop_rate)
......
...@@ -186,6 +186,34 @@ class TestBlockDesc(unittest.TestCase): ...@@ -186,6 +186,34 @@ class TestBlockDesc(unittest.TestCase):
all_ops.append(block.op(idx)) all_ops.append(block.op(idx))
self.assertEqual(all_ops, [op0, op1, op2]) self.assertEqual(all_ops, [op0, op1, op2])
def test_remove_op(self):
prog = core.ProgramDesc()
self.assertIsNotNone(prog)
block = prog.block(0)
self.assertIsNotNone(block)
op1 = block.append_op()
op2 = block.append_op()
var1 = block.var("var1")
var2 = block.var("var2")
var3 = block.var("var3")
var4 = block.var("var4")
var5 = block.var("var5")
op1.set_input("X", ["var1", "var2"])
op1.set_output("Y", ["var3", "var4"])
op2.set_input("X", ["var1"])
op2.set_output("Y", ["var4", "var5"])
# remove op1, its input var2 and output var3 will be removed at the same time,
# but its input var1 and output var4 will not be removed since they are used for op2.
block.remove_op(0, 1)
all_ops = []
for idx in xrange(0, block.op_size()):
all_ops.append(block.op(idx))
self.assertEqual(all_ops, [op2])
all_vars = block.all_vars()
self.assertEqual(set(all_vars), {var1, var4, var5})
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册