未验证 提交 d0962abd 编写于 作者: C Chengmo 提交者: GitHub

supplement bug fix of parameter server (#26217)

* fix fluid.embedding
上级 ad6e3dd6
...@@ -25,25 +25,32 @@ class DistributedLookupTableOp : public framework::OperatorWithKernel { ...@@ -25,25 +25,32 @@ class DistributedLookupTableOp : public framework::OperatorWithKernel {
using framework::OperatorWithKernel::OperatorWithKernel; using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override { void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInputs("Ids"), PADDLE_ENFORCE_EQ(ctx->HasInputs("Ids"), true,
"Input(Ids) of LookupTableOp should not be null."); platform::errors::InvalidArgument(
PADDLE_ENFORCE(ctx->HasInput("W"), "Input(Ids) of LookupTableOp should not be null."));
"Input(W) of LookupTableOp should not be null."); PADDLE_ENFORCE_EQ(ctx->HasInput("W"), true,
PADDLE_ENFORCE(ctx->HasOutputs("Outputs"), platform::errors::InvalidArgument(
"Output(Outs) of LookupTableOp should not be null."); "Input(W) of LookupTableOp should not be null."));
PADDLE_ENFORCE_EQ(ctx->HasOutputs("Outputs"), true,
platform::errors::InvalidArgument(
"Output(Outs) of LookupTableOp should not be null."));
auto ids_dims = ctx->GetInputsDim("Ids"); auto ids_dims = ctx->GetInputsDim("Ids");
auto table_dims = ctx->GetInputDim("W"); auto table_dims = ctx->GetInputDim("W");
PADDLE_ENFORCE_EQ(table_dims.size(), 2, PADDLE_ENFORCE_EQ(
"Only 2 dimensions of the 'Embedding' is supported."); table_dims.size(), 2,
platform::errors::InvalidArgument(
"Only 2 dimensions of the 'Embedding' is supported."));
for (auto &ids_dim : ids_dims) { for (auto &ids_dim : ids_dims) {
PADDLE_ENFORCE_EQ(ids_dim.size(), 2, PADDLE_ENFORCE_EQ(ids_dim.size(), 2,
"The dimension of the 'Ids' tensor must be 2."); platform::errors::InvalidArgument(
"The dimension of the 'Ids' tensor must be 2."));
} }
auto endpoints = ctx->Attrs().Get<std::vector<std::string>>("endpoints"); auto endpoints = ctx->Attrs().Get<std::vector<std::string>>("endpoints");
// for fluid.embedding
auto lookup_table_version = auto lookup_table_version =
ctx->Attrs().Get<std::string>("lookup_table_version"); ctx->Attrs().Get<std::string>("lookup_table_version");
......
...@@ -35,9 +35,30 @@ class DistributedLookupTableKernel : public framework::OpKernel<T> { ...@@ -35,9 +35,30 @@ class DistributedLookupTableKernel : public framework::OpKernel<T> {
auto endpoints = context.Attr<std::vector<std::string>>("endpoints"); auto endpoints = context.Attr<std::vector<std::string>>("endpoints");
auto is_distributed = context.Attr<bool>("is_distributed"); auto is_distributed = context.Attr<bool>("is_distributed");
auto lookup_table_version =
context.Attr<std::string>("lookup_table_version");
operators::distributed::prefetchs(id_names, out_names, embedding_name, operators::distributed::prefetchs(id_names, out_names, embedding_name,
is_distributed, lookup_tables, endpoints, is_distributed, lookup_tables, endpoints,
context, context.scope()); context, context.scope());
if (lookup_table_version == "lookup_table_v2") {
auto &scope = context.scope();
auto emb_dim =
scope.FindVar(embedding_name)->Get<framework::LoDTensor>().dims()[1];
for (size_t i = 0; i < id_names.size(); ++i) {
auto *id_var = scope.FindVar(id_names[i]);
auto *out_var = scope.FindVar(out_names[i]);
auto *id_tensor = id_var->GetMutable<framework::LoDTensor>();
auto *out_tensor = out_var->GetMutable<framework::LoDTensor>();
auto id_dims = id_tensor->dims();
out_tensor->Resize(framework::make_ddim(
{static_cast<int64_t>(id_dims[0]), static_cast<int64_t>(id_dims[1]),
static_cast<int64_t>(emb_dim)}));
}
}
} }
}; };
......
...@@ -154,15 +154,16 @@ class ParameterServerRuntime(RuntimeBase): ...@@ -154,15 +154,16 @@ class ParameterServerRuntime(RuntimeBase):
kwargs["sparse_attrs"] = get_sparse_attrs() kwargs["sparse_attrs"] = get_sparse_attrs()
return kwargs return kwargs
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_lr_ops from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_lr_ops, _has_global_step
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import \ from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import \
SyncStrategy, GeoStrategy SyncStrategy, GeoStrategy
trainer_config = self.async_strategy.get_trainer_runtime_config() trainer_config = self.async_strategy.get_trainer_runtime_config()
lrs = _get_lr_ops(self.origin_main_program)
if len(lrs) > 0: lrs = _has_global_step(_get_lr_ops(self.origin_main_program))
if lrs:
kwargs = {"need_global_step": "1"} kwargs = {"need_global_step": "1"}
else: else:
kwargs = {"need_global_step": "0"} kwargs = {"need_global_step": "0"}
......
...@@ -42,6 +42,9 @@ op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() ...@@ -42,6 +42,9 @@ op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched
OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize
SPARSE_OP_LIST = ["lookup_table", "lookup_table_v2"]
SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"}
def _get_lr_ops(program): def _get_lr_ops(program):
lr_ops = [] lr_ops = []
...@@ -66,7 +69,7 @@ def _has_global_step(lr_ops): ...@@ -66,7 +69,7 @@ def _has_global_step(lr_ops):
def is_sparse_op(op): def is_sparse_op(op):
if op.type == "lookup_table" and op.attr('is_sparse') is True and op.attr( if op.type in SPARSE_OP_LIST and op.attr('is_sparse') is True and op.attr(
'is_distributed') is False: 'is_distributed') is False:
return True return True
...@@ -78,7 +81,7 @@ def is_sparse_op(op): ...@@ -78,7 +81,7 @@ def is_sparse_op(op):
def is_distributed_sparse_op(op): def is_distributed_sparse_op(op):
if op.type == "lookup_table" and op.attr('is_distributed') is True: if op.type in SPARSE_OP_LIST and op.attr('is_distributed') is True:
return True return True
if op.type == "distributed_lookup_table" and op.attr( if op.type == "distributed_lookup_table" and op.attr(
...@@ -802,11 +805,10 @@ class CompileTimeStrategy(object): ...@@ -802,11 +805,10 @@ class CompileTimeStrategy(object):
def _get_sparse_varnames(): def _get_sparse_varnames():
varnames = [] varnames = []
op_types = {"lookup_table": "W"}
for op in origin_program.global_block().ops: for op in origin_program.global_block().ops:
if op.type in op_types.keys() \ if op.type in SPARSE_OP_TYPE_DICT.keys() \
and op.attr('remote_prefetch') is True: and op.attr('remote_prefetch') is True:
param_name = op.input(op_types[op.type])[0] param_name = op.input(SPARSE_OP_TYPE_DICT[op.type])[0]
varnames.append(param_name) varnames.append(param_name)
return list(set(varnames)) return list(set(varnames))
......
...@@ -40,6 +40,8 @@ LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched ...@@ -40,6 +40,8 @@ LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched
OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize
op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"}
DEVICE_LIST = ["cpu", "gpu", "xpu"] DEVICE_LIST = ["cpu", "gpu", "xpu"]
COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"] COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"]
DEFAULT_DEVICE = 'cpu' DEFAULT_DEVICE = 'cpu'
...@@ -81,11 +83,10 @@ def distributed_ops_pass(program, config): ...@@ -81,11 +83,10 @@ def distributed_ops_pass(program, config):
def _get_pull_sparse_ops(_program): def _get_pull_sparse_ops(_program):
pull_sparse_ops = {} pull_sparse_ops = {}
op_types = {"lookup_table": "W"}
for op in _program.global_block().ops: for op in _program.global_block().ops:
if op.type in op_types.keys() \ if op.type in SPARSE_OP_TYPE_DICT.keys() \
and op.attr('remote_prefetch') is True: and op.attr('remote_prefetch') is True:
param_name = op.input(op_types[op.type])[0] param_name = op.input(SPARSE_OP_TYPE_DICT[op.type])[0]
ops = pull_sparse_ops.get(param_name, []) ops = pull_sparse_ops.get(param_name, [])
ops.append(op) ops.append(op)
pull_sparse_ops[param_name] = ops pull_sparse_ops[param_name] = ops
...@@ -101,6 +102,7 @@ def distributed_ops_pass(program, config): ...@@ -101,6 +102,7 @@ def distributed_ops_pass(program, config):
w = program.global_block().vars[ops[0].input("W")[0]] w = program.global_block().vars[ops[0].input("W")[0]]
padding_idx = ops[0].attr("padding_idx") padding_idx = ops[0].attr("padding_idx")
is_distributed = ops[0].attr("is_distributed") is_distributed = ops[0].attr("is_distributed")
op_type = ops[0].type
outputs = [ outputs = [
program.global_block().vars[op.output("Out")[0]] for op in ops program.global_block().vars[op.output("Out")[0]] for op in ops
...@@ -149,7 +151,8 @@ def distributed_ops_pass(program, config): ...@@ -149,7 +151,8 @@ def distributed_ops_pass(program, config):
"is_distributed": is_distributed, "is_distributed": is_distributed,
"pserver_num": len(pserver_endpoints), "pserver_num": len(pserver_endpoints),
"padding_idx": padding_idx, "padding_idx": padding_idx,
"trainer_id": trainer_id "trainer_id": trainer_id,
"lookup_table_version": op_type
}) })
else: else:
raise ValueError( raise ValueError(
......
...@@ -432,8 +432,6 @@ if(WITH_DISTRIBUTE) ...@@ -432,8 +432,6 @@ if(WITH_DISTRIBUTE)
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_mnist_lars") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_mnist_lars")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_mnist_train") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_mnist_train")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_save_load") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_save_load")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_simnet_bow")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_ctr")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_text_classification") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_text_classification")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_train") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_train")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_word2vec") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_word2vec")
......
...@@ -196,8 +196,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): ...@@ -196,8 +196,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
fleet.stop_worker() fleet.stop_worker()
def do_dataset_training(self, fleet): def do_dataset_training(self, fleet):
dnn_input_dim, lr_input_dim, train_file_path = ctr_dataset_reader.prepare_data( train_file_list = ctr_dataset_reader.prepare_fake_data()
)
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
...@@ -206,9 +205,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): ...@@ -206,9 +205,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
thread_num = 2 thread_num = 2
batch_size = 128 batch_size = 128
filelist = [] filelist = train_file_list
for _ in range(thread_num):
filelist.append(train_file_path)
# config dataset # config dataset
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() dataset = paddle.distributed.fleet.DatasetFactory().create_dataset()
......
...@@ -19,6 +19,8 @@ import argparse ...@@ -19,6 +19,8 @@ import argparse
import time import time
import math import math
import random import random
import shutil
import tempfile
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -29,7 +31,8 @@ from multiprocessing import Process ...@@ -29,7 +31,8 @@ from multiprocessing import Process
import os import os
import signal import signal
from functools import reduce from functools import reduce
from test_dist_base import TestDistRunnerBase, runtime_main from test_dist_fleet_base import runtime_main, FleetDistRunnerBase
from paddle.distributed.fleet.base.util_factory import fleet_util
DTYPE = "int64" DTYPE = "int64"
DATA_URL = 'http://paddle-dist-ce-data.bj.bcebos.com/simnet.train.1000' DATA_URL = 'http://paddle-dist-ce-data.bj.bcebos.com/simnet.train.1000'
...@@ -49,6 +52,18 @@ fluid.default_startup_program().random_seed = 1 ...@@ -49,6 +52,18 @@ fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1 fluid.default_main_program().random_seed = 1
def fake_simnet_reader():
def reader():
for _ in range(1000):
q = np.random.random_integers(0, 1500 - 1, size=1).tolist()
label = np.random.random_integers(0, 1, size=1).tolist()
pt = np.random.random_integers(0, 1500 - 1, size=1).tolist()
nt = np.random.random_integers(0, 1500 - 1, size=1).tolist()
yield [q, label, pt, nt]
return reader
def get_acc(cos_q_nt, cos_q_pt, batch_size): def get_acc(cos_q_nt, cos_q_pt, batch_size):
cond = fluid.layers.less_than(cos_q_nt, cos_q_pt) cond = fluid.layers.less_than(cos_q_nt, cos_q_pt)
cond = fluid.layers.cast(cond, dtype='float64') cond = fluid.layers.cast(cond, dtype='float64')
...@@ -75,34 +90,40 @@ def get_loss(cos_q_pt, cos_q_nt): ...@@ -75,34 +90,40 @@ def get_loss(cos_q_pt, cos_q_nt):
return avg_cost return avg_cost
def get_optimizer(op="sgd"):
if op.upper() == "sgd".upper():
optimizer = fluid.optimizer.SGD(learning_rate=base_lr)
elif op.upper() == "adam".upper():
optimizer = fluid.optimizer.Adam(learning_rate=base_lr)
else:
optimizer = fluid.optimizer.SGD(learning_rate=base_lr)
return optimizer
def train_network(batch_size, def train_network(batch_size,
is_distributed=False, is_distributed=False,
is_sparse=False, is_sparse=False,
is_self_contained_lr=False): is_self_contained_lr=False,
is_pyreader=False):
# query # query
q = fluid.layers.data( q = fluid.layers.data(
name="query_ids", shape=[1], dtype="int64", lod_level=1) name="query_ids", shape=[1], dtype="int64", lod_level=1)
# label data
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
# pt
pt = fluid.layers.data(
name="pos_title_ids", shape=[1], dtype="int64", lod_level=1)
# nt
nt = fluid.layers.data(
name="neg_title_ids", shape=[1], dtype="int64", lod_level=1)
datas = [q, label, pt, nt]
reader = None
if is_pyreader:
reader = fluid.io.PyReader(
feed_list=datas,
capacity=64,
iterable=False,
use_double_buffer=False)
# embedding # embedding
q_emb = fluid.embedding( q_emb = fluid.embedding(
input=q, input=q,
is_distributed=is_distributed, is_distributed=is_distributed,
size=[dict_dim, emb_dim], size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01), initializer=fluid.initializer.Constant(value=0.01), name="__emb__"),
name="__emb__",
learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__"),
is_sparse=is_sparse) is_sparse=is_sparse)
q_emb = fluid.layers.reshape(q_emb, [-1, emb_dim]) q_emb = fluid.layers.reshape(q_emb, [-1, emb_dim])
# vsum # vsum
...@@ -115,12 +136,8 @@ def train_network(batch_size, ...@@ -115,12 +136,8 @@ def train_network(batch_size,
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01), initializer=fluid.initializer.Constant(value=0.01),
name="__q_fc__", name="__q_fc__",
learning_rate=base_lr)) learning_rate=base_lr), )
# label data
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
# pt
pt = fluid.layers.data(
name="pos_title_ids", shape=[1], dtype="int64", lod_level=1)
# embedding # embedding
pt_emb = fluid.embedding( pt_emb = fluid.embedding(
input=pt, input=pt,
...@@ -129,9 +146,7 @@ def train_network(batch_size, ...@@ -129,9 +146,7 @@ def train_network(batch_size,
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01), initializer=fluid.initializer.Constant(value=0.01),
name="__emb__", name="__emb__",
learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( learning_rate=emb_lr),
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__"),
is_sparse=is_sparse) is_sparse=is_sparse)
pt_emb = fluid.layers.reshape(pt_emb, [-1, emb_dim]) pt_emb = fluid.layers.reshape(pt_emb, [-1, emb_dim])
# vsum # vsum
...@@ -142,24 +157,16 @@ def train_network(batch_size, ...@@ -142,24 +157,16 @@ def train_network(batch_size,
input=pt_ss, input=pt_ss,
size=hid_dim, size=hid_dim,
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01), initializer=fluid.initializer.Constant(value=0.01), name="__fc__"),
name="__fc__",
learning_rate=base_lr),
bias_attr=fluid.ParamAttr(name="__fc_b__")) bias_attr=fluid.ParamAttr(name="__fc_b__"))
# nt
nt = fluid.layers.data(
name="neg_title_ids", shape=[1], dtype="int64", lod_level=1)
# embedding # embedding
nt_emb = fluid.embedding( nt_emb = fluid.embedding(
input=nt, input=nt,
is_distributed=is_distributed, is_distributed=is_distributed,
size=[dict_dim, emb_dim], size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01), initializer=fluid.initializer.Constant(value=0.01), name="__emb__"),
name="__emb__",
learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__"),
is_sparse=is_sparse) is_sparse=is_sparse)
nt_emb = fluid.layers.reshape(nt_emb, [-1, emb_dim]) nt_emb = fluid.layers.reshape(nt_emb, [-1, emb_dim])
# vsum # vsum
...@@ -170,9 +177,7 @@ def train_network(batch_size, ...@@ -170,9 +177,7 @@ def train_network(batch_size,
input=nt_ss, input=nt_ss,
size=hid_dim, size=hid_dim,
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01), initializer=fluid.initializer.Constant(value=0.01), name="__fc__"),
name="__fc__",
learning_rate=base_lr),
bias_attr=fluid.ParamAttr(name="__fc_b__")) bias_attr=fluid.ParamAttr(name="__fc_b__"))
cos_q_pt = fluid.layers.cos_sim(q_fc, pt_fc) cos_q_pt = fluid.layers.cos_sim(q_fc, pt_fc)
cos_q_nt = fluid.layers.cos_sim(q_fc, nt_fc) cos_q_nt = fluid.layers.cos_sim(q_fc, nt_fc)
...@@ -180,79 +185,67 @@ def train_network(batch_size, ...@@ -180,79 +185,67 @@ def train_network(batch_size,
avg_cost = get_loss(cos_q_pt, cos_q_nt) avg_cost = get_loss(cos_q_pt, cos_q_nt)
# acc # acc
acc = get_acc(cos_q_nt, cos_q_pt, batch_size) acc = get_acc(cos_q_nt, cos_q_pt, batch_size)
return [avg_cost, acc, cos_q_pt] return avg_cost, acc, cos_q_pt, reader
def combination(x, y): class TestDistSimnetBow2x2(FleetDistRunnerBase):
res = [[[xi, yi] for yi in y] for xi in x] """
return res[0] For test SimnetBow model, use Fleet api
"""
def get_one_data(file_list): def net(self, args, batch_size=4, lr=0.01):
for file in file_list: avg_cost, _, predict, self.reader = \
contents = [] train_network(batch_size=batch_size, is_distributed=False,
with open(file, "r") as fin: is_sparse=True, is_self_contained_lr=False, is_pyreader=(args.reader == "pyreader"))
for i in fin: self.avg_cost = avg_cost
contents.append(i.strip()) self.predict = predict
for index, q in enumerate(contents):
try: return avg_cost
one_data = [[int(j) for j in i.split(" ")]
for i in q.split(";")[:-1]] def check_model_right(self, dirname):
if one_data[1][0] + one_data[1][1] != len(one_data) - 3: model_filename = os.path.join(dirname, "__model__")
q = fin.readline()
continue with open(model_filename, "rb") as f:
tmp = combination(one_data[3:3 + one_data[1][0]], program_desc_str = f.read()
one_data[3 + one_data[1][0]:])
except Exception as e: program = fluid.Program.parse_from_string(program_desc_str)
continue with open(os.path.join(dirname, "__model__.proto"), "w") as wn:
wn.write(str(program))
for each in tmp:
yield [one_data[2], 0, each[0], each[1]] def do_pyreader_training(self, fleet):
"""
do training using dataset, using fetch handler to catch variable
def get_batch_reader(file_list, batch_size): Args:
def batch_reader(): fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
res = [] """
for i in get_one_data(file_list):
if random.random() <= sample_rate: exe = fluid.Executor(fluid.CPUPlace())
res.append(i) fleet.init_worker()
if len(res) >= batch_size: exe.run(fluid.default_startup_program())
yield res batch_size = 4
res = [] # reader
train_reader = paddle.batch(fake_simnet_reader(), batch_size=batch_size)
return batch_reader self.reader.decorate_sample_list_generator(train_reader)
for epoch_id in range(1):
self.reader.start()
def get_train_reader(batch_size): try:
# The training data set. pass_start = time.time()
train_file = os.path.join(paddle.dataset.common.DATA_HOME, "simnet", while True:
"train") loss_val = exe.run(program=fluid.default_main_program(),
train_reader = get_batch_reader([train_file], batch_size) fetch_list=[self.avg_cost.name])
train_feed = ["query_ids", "pos_title_ids", "neg_title_ids", "label"] loss_val = np.mean(loss_val)
return train_reader, train_feed message = "TRAIN ---> pass: {} loss: {}\n".format(epoch_id,
loss_val)
fleet_util.print_on_rank(message, 0)
class TestDistSimnetBow2x2(TestDistRunnerBase):
def get_model(self, batch_size=2): pass_time = time.time() - pass_start
# Train program except fluid.core.EOFException:
avg_cost, acc, predict = \ self.reader.reset()
train_network(batch_size, fleet.stop_worker()
bool(int(os.environ["IS_DISTRIBUTED"])),
bool(int(os.environ["IS_SPARSE"])), def do_dataset_training(self, fleet):
bool(int(os.environ["IS_SELF_CONTAINED_LR"]))) pass
inference_program = fluid.default_main_program().clone()
# Optimization
opt = os.getenv('OPTIMIZER', 'sgd')
opt = get_optimizer(opt)
opt.minimize(avg_cost)
# Reader
train_reader, _ = get_train_reader(batch_size)
return inference_program, avg_cost, train_reader, train_reader, acc, predict
if __name__ == "__main__": if __name__ == "__main__":
paddle.dataset.common.download(DATA_URL, 'simnet', DATA_MD5, "train")
runtime_main(TestDistSimnetBow2x2) runtime_main(TestDistSimnetBow2x2)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import logging
import tarfile
import random
import paddle
import paddle.fluid.incubate.data_generator as data_generator
logging.basicConfig()
logger = logging.getLogger("paddle")
logger.setLevel(logging.INFO)
class DatasetSimnetReader(data_generator.MultiSlotDataGenerator):
def generate_sample(self, line):
pass
...@@ -156,40 +156,5 @@ class TestDistCtrHalfAsync2x2(TestFleetBase): ...@@ -156,40 +156,5 @@ class TestDistCtrHalfAsync2x2(TestFleetBase):
"dist_fleet_ctr.py", delta=1e-5, check_error_log=True) "dist_fleet_ctr.py", delta=1e-5, check_error_log=True)
class TestDistCtrPsGpuPyreaderAsync2x2(TestFleetBase):
def _setup_config(self):
self._mode = "async"
self._reader = "pyreader"
def check_with_place(self,
model_file,
delta=1e-3,
check_error_log=False,
need_envs={}):
required_envs = {
"PATH": os.getenv("PATH", ""),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_rpc_deadline": "30000", # 5sec to fail fast
"http_proxy": "",
"FLAGS_communicator_send_queue_size": "2",
"FLAGS_communicator_max_merge_var_num": "2",
"CPU_NUM": "2",
"SAVE_MODEL": "1"
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train(self):
self.check_with_place(
"dist_fleet_ctr_ps_gpu.py", delta=1e-5, check_error_log=True)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -21,7 +21,7 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker ...@@ -21,7 +21,7 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from test_dist_fleet_base import TestFleetBase from test_dist_fleet_base import TestFleetBase
from dist_simnet_bow import train_network from dist_fleet_simnet_bow import train_network
class TestDistGeoCtr_2x2(TestFleetBase): class TestDistGeoCtr_2x2(TestFleetBase):
...@@ -72,7 +72,7 @@ class TestGeoSgdTranspiler(unittest.TestCase): ...@@ -72,7 +72,7 @@ class TestGeoSgdTranspiler(unittest.TestCase):
strategy = StrategyFactory.create_geo_strategy(5) strategy = StrategyFactory.create_geo_strategy(5)
avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse) avg_cost, _, _, _ = train_network(batch_size, is_distribute, is_sparse)
optimizer = fluid.optimizer.SGD(0.1) optimizer = fluid.optimizer.SGD(0.1)
optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy)
......
...@@ -21,7 +21,7 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker ...@@ -21,7 +21,7 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
from test_dist_fleet_base import TestFleetBase from test_dist_fleet_base import TestFleetBase
from dist_simnet_bow import train_network from dist_fleet_simnet_bow import train_network
@unittest.skip(reason="Skip unstable ut, add it after PR 22957 merged") @unittest.skip(reason="Skip unstable ut, add it after PR 22957 merged")
...@@ -44,7 +44,7 @@ class TestDistGeoClipByGlobalNormTranspiler(unittest.TestCase): ...@@ -44,7 +44,7 @@ class TestDistGeoClipByGlobalNormTranspiler(unittest.TestCase):
strategy.geo_sgd_mode = True strategy.geo_sgd_mode = True
strategy.geo_sgd_need_push_nums = 5 strategy.geo_sgd_need_push_nums = 5
avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse) avg_cost, _, _, _ = train_network(batch_size, is_distribute, is_sparse)
fluid.clip.set_gradient_clip( fluid.clip.set_gradient_clip(
clip=fluid.clip.GradientClipByGlobalNorm(2.0)) clip=fluid.clip.GradientClipByGlobalNorm(2.0))
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import unittest
import tempfile
from test_dist_fleet_base import TestFleetBase
class TestDistSimnetASync2x2(TestFleetBase):
def _setup_config(self):
self._mode = "async"
self._reader = "pyreader"
def check_with_place(self,
model_file,
delta=1e-3,
check_error_log=False,
need_envs={}):
required_envs = {
"PATH": os.getenv("PATH", ""),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_rpc_deadline": "5000", # 5sec to fail fast
"http_proxy": "",
"CPU_NUM": "2"
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train(self):
self.check_with_place(
"dist_fleet_simnet_bow.py", delta=1e-5, check_error_log=True)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import unittest
from test_dist_base import TestDistBase
import os
flag_name = os.path.splitext(__file__)[0]
class TestDistSimnetBowDense2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '0',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=1e-5,
check_error_log=True,
need_envs=need_envs,
log_name=flag_name)
class TestDistSimnetBow2x2DenseAsync(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._enforce_place = "CPU"
# FIXME(typhoonzero): fix async tests later
def notest_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '0',
'IS_SELF_CONTAINED_LR': '1',
}
self.check_with_place(
"dist_simnet_bow.py",
delta=100,
check_error_log=True,
need_envs=need_envs,
log_name=flag_name)
class TestDistSimnetBowSparse2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=1e-5,
check_error_log=True,
need_envs=need_envs,
log_name=flag_name)
class TestDistSimnetBow2x2SparseAsync(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=100,
check_error_log=True,
need_envs=need_envs,
log_name=flag_name)
# FIXME(tangwei): Learningrate variable is not created on pserver.
class TestDistSimnetBow2x2LookupTableSync(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=1e-5,
check_error_log=True,
need_envs=need_envs,
log_name=flag_name)
class TestDistSimnetBow2x2LookupTableAsync(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=100,
check_error_log=True,
need_envs=need_envs,
log_name=flag_name)
class TestDistSimnetBow2x2LookupTableNotContainLRSync(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '0'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=1e-5,
check_error_log=True,
need_envs=need_envs,
log_name=flag_name)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册