Core dumped when distributed training
Created by: Officium
Paddle fluid 1.3 MPI cluster, tail pserver.log
I0509 16:14:34.399401 17882 grpc_server.cc:547] HandleRequest RequestGet, req_id:0 get next
I0509 16:14:34.399401 17883 grpc_server.cc:547] HandleRequest RequestGet, req_id:1 get next
I0509 16:14:34.399453 17882 grpc_server.cc:558] RequestGet name:[vtag_v1_embedding], ep:[ipv4:10.76.85.30:30006] Process using req_id:0
I0509 16:14:34.399453 17883 grpc_server.cc:558] RequestGet name:[context&base&hour_embedding], ep:[ipv4:10.76.85.30:30006] Process using req_id:1
I0509 16:14:34.399468 17882 grpc_server.cc:142] RequestGet vtag_v1_embedding from vtag_v1_embedding
I0509 16:14:34.399468 17883 grpc_server.cc:142] RequestGet context&base&hour_embedding from context&base&hour_embedding
I0509 16:14:34.399485 17883 request_handler_impl.cc:90] RequestGetHandler:context&base&hour_embedding out_var_name: context&base&hour_embedding
I0509 16:14:34.399488 17882 request_handler_impl.cc:90] RequestGetHandler:vtag_v1_embedding out_var_name: vtag_v1_embedding
I0509 16:14:34.399499 17882 rpc_server.cc:129] RPCServer WaitCond in RequestGet
I0509 16:14:34.399502 17883 rpc_server.cc:129] RPCServer WaitCond in RequestGet
I0509 16:14:34.399510 17882 rpc_server.cc:139] RPCServer WaitCond out RequestGet
I0509 16:14:34.399519 17883 rpc_server.cc:139] RPCServer WaitCond out RequestGet
I0509 16:14:34.414811 17884 grpc_server.cc:547] HandleRequest RequestGet, req_id:2 get next
I0509 16:14:34.414814 17885 grpc_server.cc:547] HandleRequest RequestGet, req_id:3 get next
I0509 16:14:34.414842 17884 grpc_server.cc:558] RequestGet name:[vtag_v1_embedding], ep:[ipv4:10.76.86.19:40474] Process using req_id:2
I0509 16:14:34.414844 17885 grpc_server.cc:558] RequestGet name:[context&base&hour_embedding], ep:[ipv4:10.76.86.19:40474] Process using req_id:3
I0509 16:14:34.414853 17884 grpc_server.cc:142] RequestGet vtag_v1_embedding from vtag_v1_embedding
I0509 16:14:34.414855 17885 grpc_server.cc:142] RequestGet context&base&hour_embedding from context&base&hour_embedding
I0509 16:14:34.414860 17885 request_handler_impl.cc:90] RequestGetHandler:context&base&hour_embedding out_var_name: context&base&hour_embedding
I0509 16:14:34.414860 17884 request_handler_impl.cc:90] RequestGetHandler:vtag_v1_embedding out_var_name: vtag_v1_embedding
I0509 16:14:34.414867 17885 rpc_server.cc:129] RPCServer WaitCond in RequestGet
I0509 16:14:34.414877 17884 rpc_server.cc:129] RPCServer WaitCond in RequestGet
I0509 16:14:34.414878 17885 rpc_server.cc:139] RPCServer WaitCond out RequestGet
I0509 16:14:34.414893 17884 rpc_server.cc:139] RPCServer WaitCond out RequestGet
terminate called after throwing an instance of 'paddle::platform::EnforceNotMet'
terminate called after throwing an instance of 'paddle::platform::EnforceNotMet'
what(): holder_ should not be null
Tensor not initialized yet when Tensor::type() is called. at [/paddle/paddle/fluid/framework/tensor.h:145]
PaddlePaddle Call Stacks:
0 0x7f5fe873259dp void paddle::platform::EnforceNotMet::Init<std::string>(std::string, char const*, int) + 365
1 0x7f5fe87328e7p paddle::platform::EnforceNotMet::EnforceNotMet(std::string const&, char const*, int) + 87
2 0x7f5fe9321060p paddle::operators::distributed::GetTensorPayload(paddle::framework::Variable*, paddle::platform::DeviceContext const&, sendrecv::VariableMessage*) + 3472
3 0x7f5fe931d61bp paddle::operators::distributed::SerializeToByteBuffer(std::string const&, paddle::framework::Variable*, paddle::platform::DeviceContext const&, grpc::ByteBuffer*, std::string const&, int, std::string const&) + 555
4 0x7f5fe9318397p paddle::operators::distributed::RequestGet::Process() + 471
5 0x7f5fe9313472p paddle::operators::distributed::AsyncGRPCServer::HandleRequest(grpc::ServerCompletionQueue*, std::string const&, std::function<void (std::string const&, int)>) + 1138
6 0x7f5fe931727fp std::thread::_Impl<std::_Bind_simple<std::_Bind<std::_Mem_fn<void (paddle::operators::distributed::AsyncGRPCServer::*)(grpc::ServerCompletionQueue*, std::string const&, std::function<void (std::string const&, int)>)> (paddle::operators::distributed::AsyncGRPCServer*, grpc::ServerCompletionQueue*, std::string, std::function<void (std::string const&, int)>)> ()> >::_M_run() + 127
7 0x7f5ff46d78a0p
8 0x7f605004b1c3p
9 0x7f604f67312dp clone + 109
what(): holder_ should not be null
Tensor not initialized yet when Tensor::type() is called. at [/paddle/paddle/fluid/framework/tensor.h:145]
PaddlePaddle Call Stacks:
0 0x7f5fe873259dp void paddle::platform::EnforceNotMet::Init<std::string>(std::string, char const*, int) + 365
1 0x7f5fe87328e7p paddle::platform::EnforceNotMet::EnforceNotMet(std::string const&, char const*, int) + 87
2 0x7f5fe9321060p paddle::operators::distributed::GetTensorPayload(paddle::framework::Variable*, paddle::platform::DeviceContext const&, sendrecv::VariableMessage*) + 3472
3 0x7f5fe931d61bp paddle::operators::distributed::SerializeToByteBuffer(std::string const&, paddle::framework::Variable*, paddle::platform::DeviceContext const&, grpc::ByteBuffer*, std::string const&, int, std::string const&) + 555
4 0x7f5fe9318397p paddle::operators::distributed::RequestGet::Process() + 471
5 0x7f5fe9313472p paddle::operators::distributed::AsyncGRPCServer::HandleRequest(grpc::ServerCompletionQueue*, std::string const&, std::function<void (std::string const&, int)>) + 1138
6 0x7f5fe931727fp std::thread::_Impl<std::_Bind_simple<std::_Bind<std::_Mem_fn<void (paddle::operators::distributed::AsyncGRPCServer::*)(grpc::ServerCompletionQueue*, std::string const&, std::function<void (std::string const&, int)>)> (paddle::operators::distributed::AsyncGRPCServer*, grpc::ServerCompletionQueue*, std::string, std::function<void (std::string const&, int)>)> ()> >::_M_run() + 127
7 0x7f5ff46d78a0p
8 0x7f605004b1c3p
9 0x7f604f67312dp clone + 109
*** Aborted at 1557389674 (unix time) try "date -d @1557389674" if you are using GNU date ***
terminate called recursively
terminate called recursively
PC: @ 0x0 (unknown)
*** SIGABRT (@0x1f800004530) received by PID 17712 (TID 0x7f5f7abfd700) from PID 17712; stack trace: ***
@ 0x7f6050053160 (unknown)
@ 0x7f604f5c13f7 __GI_raise
@ 0x7f604f5c27d8 __GI_abort
@ 0x7f5ff4686c65 __gnu_cxx::__verbose_terminate_handler()
@ 0x7f5ff4684e06 __cxxabiv1::__terminate()
@ 0x7f5ff4684e33 std::terminate()
@ 0x7f5ff46d7935 execute_native_thread_routine
@ 0x7f605004b1c3 start_thread
@ 0x7f604f67312d __clone
@ 0x0 (unknown)
.//paddle/start_server.sh: line 74: 17712 Aborted (core dumped) /home/disk1/normandy/maybach/app-user-20190509160536-2878/workspace/python27-gcc482/bin/python train.py --is_mpi=1
training code
def init(in_dim, gain):
"""
in_dim (int): intput dim,
gain (float): relu -> sqrt(2), lrelu -> sqrt(5), sigmoid / linear -> 1, tanh -> 5 / 3
"""
bound = math.sqrt(3.0 / in_dim) * gain
weight_attr = fluid.ParamAttr(initializer=fluid.initializer.Uniform(-bound, bound))
bound = 1.0 / math.sqrt(in_dim)
bias_attr = fluid.ParamAttr(initializer=fluid.initializer.Uniform(-bound, bound))
return weight_attr, bias_attr
def train_program():
embeddings = []
embeddings_size = 0
dense_cnt = 0
prior_cnt = 0
datas = []
for fea in features:
if fea.type == 'id':
data = fluid.layers.data(
name=fea.name,
shape=[1],
dtype='int64'
)
output = fluid.layers.embedding(
input=data,
size=(fea.size, fea.emb_size),
param_attr=fluid.ParamAttr(
name='{}_embedding'.format(fea.name),
initializer=fluid.initializer.TruncatedNormal(scale=1e-2)),
)
embeddings_size += fea.emb_size
embeddings.append(output)
elif fea.type == 'sparse_weight':
data = fluid.layers.data(
name=fea.name,
shape=[fea.size],
)
output = fluid.layers.fc(
name='fc_' + fea.name,
input=data,
size=fea.fc_size,
param_attr=fluid.ParamAttr(
name='{}_embedding'.format(fea.name),
initializer=fluid.initializer.TruncatedNormal(scale=1e-2)),
bias_attr=False,
)
embeddings_size += fea.fc_size
embeddings.append(output)
elif fea.type == 'dense':
if fea.group == 2:
prior_cnt += 1
else:
dense_cnt += 1
continue
else:
raise NotImplementedError
datas.append(data)
dense = fluid.layers.data(name='dense', shape=[dense_cnt])
prior = fluid.layers.data(name='prior', shape=[prior_cnt])
datas.extend([dense, prior])
# x0 in deep & cross
x0 = fluid.layers.concat(name='x0', input=[dense, prior] + embeddings, axis=1)
x0_len = dense_cnt + prior_cnt + embeddings_size
# Deep1
weight_attr, bias_attr = init(x0_len, math.sqrt(2))
h1 = fluid.layers.fc(name="deep1",
size=256, input=x0, act="relu",
param_attr=weight_attr, bias_attr=bias_attr)
# Deep2
weight_attr, bias_attr = init(256, math.sqrt(2))
h2 = fluid.layers.fc(name="deep2",
size=128, input=h1, act="relu",
param_attr=weight_attr, bias_attr=bias_attr)
weight_attr, bias_attr = init(x0_len, 0.1)
# Cross1
w0 = fluid.layers.create_parameter(shape=[x0_len], dtype='float32', attr=weight_attr)
b0 = fluid.layers.create_parameter(shape=[x0_len], dtype='float32', attr=bias_attr)
x1 = fluid.layers.elementwise_add(
fluid.layers.elementwise_add(x0, b0),
fluid.layers.matmul(fluid.layers.matmul(x0, x0, True), w0)
)
# Cross2
w1 = fluid.layers.create_parameter(shape=[x0_len], dtype='float32', attr=weight_attr)
b1 = fluid.layers.create_parameter(shape=[x0_len], dtype='float32', attr=bias_attr)
x2 = fluid.layers.elementwise_add(
fluid.layers.elementwise_add(x1, b1),
fluid.layers.matmul(fluid.layers.matmul(x0, x1, True), w1)
)
# stack
x_stack = fluid.layers.concat(name='x_stack', input=[x2, h2], axis=1)
# output
weight_attr, bias_attr = init(x0_len + 128, 1)
predict = fluid.layers.fc(name="logits",
size=2, input=x_stack, act="softmax",
param_attr=weight_attr, bias_attr=bias_attr)
fluid.clip.set_gradient_clip(clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=100.0))
# calculate loss
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
datas.append(label)
onehot_label = fluid.layers.one_hot(input=label, depth=2)
smoothed_label = fluid.layers.label_smooth(onehot_label)
smoothed_cost = fluid.layers.cross_entropy(predict, smoothed_label, soft_label=True)
avg_cost = fluid.layers.mean(smoothed_cost)
_, auc_batch, _ = fluid.layers.auc(input=predict, label=label)
return datas, predict, avg_cost, auc_batch
def optimizer_program():
return fluid.optimizer.AdamOptimizer(
learning_rate=5e-4,
)
def train(is_mpi, train_reader, test_reader, EPOCH_NUM, output_path, use_cuda):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
# define graph and executor
inputs, predict, avg_cost, auc = train_program()
test_program = fluid.default_main_program().clone(for_test=True)
optimizer = optimizer_program()
optimizer.minimize(avg_cost)
exe = fluid.Executor(place)
feeder = fluid.DataFeeder(feed_list=inputs, place=place)
# validate
def validate(program):
costs, aucs = [], []
for test_data in test_reader():
avg_cost_np = exe.run(program=program,
feed=feeder.feed(test_data),
fetch_list=[avg_cost, auc])
costs.append(avg_cost_np[0])
aucs.append(avg_cost_np[1])
return [sum(costs) / len(costs), sum(aucs) / len(aucs)]
# main train loop
def train_loop(program):
exe.run(fluid.default_startup_program())
data_test_iter = test_reader()
for pass_id in range(EPOCH_NUM):
for step_id, data_train in enumerate(train_reader()):
train_cost, train_acc = exe.run(program,
feed=feeder.feed(data_train),
fetch_list=[avg_cost, auc])
try:
data_test = next(data_test_iter)
except StopIteration:
data_test_iter = test_reader()
data_test = next(data_test_iter)
valid_cost, valid_auc = exe.run(program=test_program,
feed=feeder.feed(data_test),
fetch_list=[avg_cost, auc])
print("{} Pass {}, Batch {}, AUC {:.4f}/{:.4f} Cost {:.6f}/{:.6f}"
.format(datetime.now(), pass_id, step_id,
train_acc[0], valid_auc[0], train_cost[0], valid_cost[0]))
valid_auc, valid_cost = validate(test_program)
print("Pass %d, Cost %f AUC %f" % (pass_id, valid_cost, valid_auc))
# save parameters
if output_path is not None:
names = [x.name for x in inputs[:-1]]
fluid.io.save_inference_model(output_path, names, [predict], exe)
exe.close()
if not is_mpi:
train_loop(fluid.default_main_program())
else:
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
port = os.getenv("PADDLE_PORT", "6174")
pserver_ips = os.getenv("PADDLE_PSERVERS") # ip,ip...
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist) # ip:port,ip:port...
trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
t = fluid.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_startup, pserver_prog = t.get_pserver_programs(current_endpoint)
exe.run(pserver_startup)
exe.run(pserver_prog)
elif training_role == "TRAINER":
train_loop(t.get_trainer_program())