embedding设is_distributed=True训练报错
Created by: starsblinking
fluid.layers.embedding设is_distributed=True则报错,is_distributed=False则正常跑。本来就是3台机子分布式跑,为啥一开is_distributed就不行了呢。报错信息如下: Traceback (most recent call last): File "train.py", line 166, in train(params) File "train.py", line 116, in train optimizer.minimize(avg_cost) File "/XXXXXXX/distribute_ctr/paddle2/lib/python2.7/site-packages/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/init.py", line 481, in minimize fleet._transpile(config=self._strategy) File "/XXXXXXX/distribute_ctr/paddle2/lib/python2.7/site-packages/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/init.py", line 338, in _transpile current_endpoint=self.server_endpoints()[self.server_index()]) File "/XXXXXXX/distribute_ctr/paddle2/lib/python2.7/site-packages/paddle/fluid/transpiler/distribute_transpiler.py", line 936, in transpile pserver_endpoints) File "/XXXXXXX/distribute_ctr/paddle2/lib/python2.7/site-packages/paddle/fluid/transpiler/distribute_transpiler.py", line 1801, in _replace_lookup_table_op_with_prefetch type=self.all_in_ids_vars[0].type, IndexError: list index out of range
代码就是 https://github.com/PaddlePaddle/Fleet/tree/develop/examples/distribute_ctr 这个分布式demo,不过https://github.com/PaddlePaddle/Fleet 这两天被你们删了。但好像和 https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/dnn 的是一样的。我下面大概贴点代码吧
【network.py】 class CTR(object): """ DNN for Click-Through Rate prediction help: https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr """ def input_data(self, params): dense_input = fluid.layers.data(name="dense_input", shape=[params.dense_feature_dim], dtype="float32")
sparse_input_ids = [
fluid.layers.data(name="C" + str(i),
shape=[1],
lod_level=1,
dtype="int64") for i in range(1, 27)
]
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
inputs = [dense_input] + sparse_input_ids + [label]
return inputs
def net(self, inputs, params):
def embedding_layer(input):
return fluid.layers.embedding(
input=input,
is_sparse=params.is_sparse,
#is_distributed=True,
size=[params.sparse_feature_dim, params.embedding_size],
param_attr=fluid.ParamAttr(
name="wtfSparseFeatFactors",
initializer=fluid.initializer.Uniform()
),
)
sparse_embed_seq = list(map(embedding_layer, inputs[1:-1]))
concated = fluid.layers.concat(sparse_embed_seq + inputs[0:1], axis=1)
fc1 = fluid.layers.fc(
input=concated,
size=400,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))),
)
fc2 = fluid.layers.fc(
input=fc1,
size=400,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))),
)
fc3 = fluid.layers.fc(
input=fc2,
size=400,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))),
)
predict = fluid.layers.fc(
input=fc3,
size=2,
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))),
)
cost = fluid.layers.cross_entropy(input=predict, label=inputs[-1])
avg_cost = fluid.layers.reduce_sum(cost)
auc_var, batch_auc_var, _ = fluid.layers.auc(input=predict,
label=inputs[-1],
num_thresholds=2**12,
slide_steps=20)
return avg_cost, auc_var, batch_auc_var
【train.py】 def train(params): # 根据环境变量确定当前机器/进程在分布式训练中扮演的角色 # 然后使用 fleet api的 init()方法初始化这个节点 role = role_maker.PaddleCloudRoleMaker() fleet.init(role) fleet_util = FleetUtil(mode="transpiler") fleet_util.rank0_print("my log rank0_print")
strategy = StrategyFactory.create_async_strategy()
#strategy = StrategyFactory.create_geo_strategy()
ctr_model = CTR()
inputs = ctr_model.input_data(params)
avg_cost, auc_var, batch_auc_var = ctr_model.net(inputs, params)
#optimizer = fluid.optimizer.Adam(params.learning_rate)
optimizer = fluid.optimizer.SGD(params.learning_rate)
# 配置分布式的optimizer,传入我们指定的strategy,构建program
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
# 根据节点角色,分别运行不同的逻辑
if fleet.is_server():
# 初始化及运行参数服务器节点
fleet.init_server()
fleet.run_server()
elif fleet.is_worker():
# 初始化工作节点
fleet.init_worker()
exe = fluid.Executor(fluid.CPUPlace())
# 初始化含有分布式流程的fleet.startup_program
exe.run(fleet.startup_program)
dataset = get_dataset(inputs, params)
for epoch in range(params.epochs):
start_time = time.time()
# 训练节点运行的是经过分布式裁剪的fleet.mian_program
exe.train_from_dataset(program=fleet.main_program,
dataset=dataset,
fetch_list=[auc_var],
fetch_info=["Epoch {} auc ".format(epoch)],
print_period=100,
debug=False)
end_time = time.time()
logger.info("epoch %d finished, use time=%d\n" %
((epoch), end_time - start_time))
为使您的问题得到快速解决,在建立Issues前,请您先通过如下方式搜索是否有相似问题:【搜索issue关键字】【使用labels筛选】【官方文档】 1)PaddlePaddle版本:1.8.3 2)CPU:用CPU,MKL 4)系统环境:python2.7.18 ,centos7 注:您可以通过执行summary_env.py获取以上信息。
- 训练信息 1)多机