未验证 提交 a4cfa5ae 编写于 作者: W wangguanqun 提交者: GitHub

add dymf to gpups in python (#43497)

* gpups default config and dataset

* codestyle

* add unittest

* code style

* add dymf to gpups

* codestyle

* add static.nn.cvm import

* PSERVER_DEBUG

* add fs config to worker desc

* update unittest

* unittest

* remove gpups unittest

* remove gpups unittest

* static check
上级 87beee54
......@@ -288,20 +288,26 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) {
os << v[0] << " " << v[1] << " " << v[2] << " " << v[3] << " " << v[4];
// << v[5] << " " << v[6];
for (int i = common_feature_value.EmbedG2SumIndex();
i < common_feature_value.EmbedxWIndex(); i++) {
i < common_feature_value.EmbedxG2SumIndex(); i++) {
os << " " << v[i];
}
os << " " << common_feature_value.Slot(const_cast<float*>(v)) << " "
<< common_feature_value.MfDim(const_cast<float*>(v));
// os << " " << common_feature_value.Slot(const_cast<float*>(v)) << " "
// << common_feature_value.MfDim(const_cast<float*>(v));
auto show = common_feature_value.Show(const_cast<float*>(v));
auto click = common_feature_value.Click(const_cast<float*>(v));
auto score = ShowClickScore(show, click);
if (score >= _config.embedx_threshold() &&
param > common_feature_value.EmbedxG2SumIndex()) {
VLOG(0) << "common_feature_value.EmbedxG2SumIndex():"
<< common_feature_value.EmbedxG2SumIndex();
// VLOG(1) << "common_feature_value.EmbedxG2SumIndex():"
// << common_feature_value.EmbedxG2SumIndex();
// VLOG(1) << "common_feature_value.EmbedxWIndex():"
// << common_feature_value.EmbedxWIndex();
// VLOG(1) << "common_feature_value.MfDim():"
// << common_feature_value.MfDim(const_cast<float*>(v));
for (auto i = common_feature_value.EmbedxG2SumIndex();
i < common_feature_value.Dim(); ++i) {
i < common_feature_value.EmbedxWIndex() +
common_feature_value.MfDim(const_cast<float*>(v));
++i) {
os << " " << v[i];
}
}
......
......@@ -313,6 +313,14 @@ class DistributedOpsPass(PassBase):
for i in range(len(global_block.ops)):
assert global_block.desc.op(i) == global_block.ops[i].desc
if attrs['use_ps_gpu']:
gpups_inputs_idxs = list()
gpups_outputs_idxs = list()
gpups_inputs = list()
gpups_outputs = list()
gpups_w_size = list()
gpups_min_distributed_idx = len(_program.global_block().ops) + 1
for param, ops in pull_sparse_ops.items():
all_ops = _program.global_block().ops
op_device = ""
......@@ -368,42 +376,37 @@ class DistributedOpsPass(PassBase):
outputs_idxs[out_id] = min(idx,
outputs_idxs[out_id])
if attrs['use_ps_gpu']:
gpups_inputs_idxs.extend(inputs_idxs)
gpups_outputs_idxs.extend(outputs_idxs)
gpups_inputs.extend(inputs)
gpups_outputs.extend(outputs)
gpups_w_size.extend([w.shape[1]] * len(inputs))
gpups_min_distributed_idx = min(min(op_idxs),
gpups_min_distributed_idx)
continue
if min(outputs_idxs) - max(inputs_idxs) >= 1:
if max(inputs_idxs) == -1:
distributed_idx = min(op_idxs)
else:
distributed_idx = max(inputs_idxs) + 1
if attrs['use_ps_gpu']:
_program.global_block()._insert_op(
index=distributed_idx,
type="pull_gpups_sparse",
inputs={
"Ids": inputs,
'W': w
},
outputs={"Out": outputs},
attrs={
"size": [w.shape[1] for i in inputs],
"is_distributed": True,
"is_sparse": True
})
else:
_program.global_block()._insert_op(
index=distributed_idx,
type="distributed_lookup_table",
inputs={
"Ids": inputs,
'W': w
},
outputs={"Outputs": outputs},
attrs={
"is_distributed": is_distributed,
"padding_idx": padding_idx,
"table_id": table_id,
"lookup_table_version": op_type,
"op_device": op_device
})
_program.global_block()._insert_op(
index=distributed_idx,
type="distributed_lookup_table",
inputs={
"Ids": inputs,
'W': w
},
outputs={"Outputs": outputs},
attrs={
"is_distributed": is_distributed,
"padding_idx": padding_idx,
"table_id": table_id,
"lookup_table_version": op_type,
"op_device": op_device
})
else:
for i in range(len(inputs_idxs)):
distributed_idx = op_idxs[i]
......@@ -424,6 +427,32 @@ class DistributedOpsPass(PassBase):
"op_device": op_device
})
if attrs['use_ps_gpu'] and len(gpups_inputs) > 0:
if max(gpups_inputs_idxs) > 0:
raise ValueError("There can't be ops before embedding in gpups")
_program.global_block()._insert_op(index=gpups_min_distributed_idx,
type="pull_gpups_sparse",
inputs={
"Ids": gpups_inputs,
},
outputs={"Out": gpups_outputs},
attrs={
"size": gpups_w_size,
"is_distributed": True,
"is_sparse": True
})
PSGPU = paddle.fluid.core.PSGPU()
try:
gpu_slot = [int(var.name) for var in gpups_inputs]
except (ValueError):
raise ValueError(
"The slot name in gpups Should be able to convert to integer."
)
PSGPU.set_slot_vector(gpu_slot)
gpu_mf_sizes = [x - 3 for x in gpups_w_size]
PSGPU.set_slot_dim_vector(gpu_mf_sizes)
def _get_pull_sparse_ops(self, _program, attrs):
pull_sparse_ops = {}
pull_sparse_ids = {}
......
......@@ -596,8 +596,11 @@ class SparseTable(Table):
if proto.table_name == self.common.table_name:
usr_table_proto = proto
break
table_proto.table_class = 'MemorySparseTable'
warnings.warn("The PS mode must use MemorySparseTable.")
if usr_table_proto.HasField("table_class"):
table_proto.table_class = usr_table_proto.table_class
else:
table_proto.table_class = 'MemorySparseTable'
warnings.warn("The PS mode must use MemorySparseTable.")
if usr_table_proto.HasField("shard_num"):
table_proto.shard_num = usr_table_proto.shard_num
else:
......@@ -821,6 +824,7 @@ class PsDescBuilder(object):
self.barrier_table_id = table.idx
self.service._set(
self.ps_desc.server_param.downpour_server_param.service_param)
self.fs_client._set(self.ps_desc.fs_client_param)
return text_format.MessageToString(self.ps_desc)
def build_server_desc(self):
......@@ -937,9 +941,10 @@ class TheOnePSRuntime(RuntimeBase):
main_program._fleet_opt = {}
main_program._fleet_opt["use_ps_gpu"] = True
gpus_env = os.getenv("FLAGS_selected_gpus")
main_program._fleet_opt["worker_places"] = [
int(s) for s in gpus_env.split(",")
]
gpus_env = [int(s) for s in gpus_env.split(",")]
main_program._fleet_opt["worker_places"] = gpus_env
PSGPU = fluid.core.PSGPU()
PSGPU.init_gpu_ps(gpus_env)
def sync_strategy_envs():
kwargs = {}
......@@ -1084,9 +1089,9 @@ class TheOnePSRuntime(RuntimeBase):
if self.is_heter_ps_mode:
trainers += len(self.role_maker._get_heter_worker_endpoints())
# debug = bool(int(os.getenv("PSERVER_DEBUG", "0")))
# if debug:
# print("server: \n{}".format(server_desc))
debug = bool(int(os.getenv("PSERVER_DEBUG", "0")))
if debug:
print("server: \n{}".format(server_desc))
self._server = fluid.core.DistFleetWrapper()
self._server.init_server(server_desc, self.string_hosts, role_id,
......
......@@ -781,11 +781,17 @@ if(WITH_DISTRIBUTE)
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_gloo")
if(NOT WITH_HETERPS)
list(REMOVE_ITEM DIST_TEST_OPS "test_communicator_ps_gpu")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_ps11")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_ps12")
endif()
py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS})
py_test_modules(test_communicator_async MODULES test_communicator_async ENVS
${dist_ENVS})
py_test_modules(test_communicator_ps_gpu MODULES test_communicator_ps_gpu
ENVS ${dist_ENVS})
# py_test_modules(test_communicator_ps_gpu MODULES test_communicator_ps_gpu
# ENVS ${dist_ENVS})
py_test_modules(test_communicator_geo MODULES test_communicator_geo ENVS
${dist_ENVS})
py_test_modules(
......
......@@ -154,14 +154,14 @@ class TestPsTrainerPass(PsPassTestBase):
self.config['debug_new_minimize'] = '1'
self.config['log_dir'] = ps_log_root_dir + "gpubox_log_new_minimize"
remove_path_if_exists(self.config['log_dir'])
self.ps_launch("gpu-ps")
file1 = './ps_log/gpubox_run_minimize_debug:_0_worker_main.prototxt'
file2 = './ps_log/gpubox_run_minimize_debug:_1_worker_main.prototxt'
if self.check(file1, file2):
logger.info('test_ps_optimizer_minimize_gpu passed!')
else:
logger.error('test_ps_optimizer_minimize_gpu failed!')
# self.ps_launch("gpu-ps")
# file1 = './ps_log/gpubox_run_minimize_debug:_0_worker_main.prototxt'
# file2 = './ps_log/gpubox_run_minimize_debug:_1_worker_main.prototxt'
# if self.check(file1, file2):
# logger.info('test_ps_optimizer_minimize_gpu passed!')
# else:
# logger.error('test_ps_optimizer_minimize_gpu failed!')
def test_append_send_ops_pass(self):
self.init()
......
......@@ -273,9 +273,7 @@ class StaticModel():
dtype="float32")
sparse_input_ids = [
paddle.static.data(name="C" + str(i),
shape=[None, 1],
dtype="int64")
paddle.static.data(name=str(i), shape=[None, 1], dtype="int64")
for i in range(1, self.sparse_inputs_slots)
]
......
......@@ -74,10 +74,7 @@ class TestPSPassWithBow(unittest.TestCase):
is_sparse = True
# query
q = fluid.layers.data(name="query_ids",
shape=[1],
dtype="int64",
lod_level=1)
q = fluid.layers.data(name="1", shape=[1], dtype="int64", lod_level=1)
# embedding
q_emb = fluid.contrib.layers.sparse_embedding(
input=q,
......@@ -101,10 +98,7 @@ class TestPSPassWithBow(unittest.TestCase):
# label data
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
# pt
pt = fluid.layers.data(name="pos_title_ids",
shape=[1],
dtype="int64",
lod_level=1)
pt = fluid.layers.data(name="2", shape=[1], dtype="int64", lod_level=1)
# embedding
pt_emb = fluid.contrib.layers.sparse_embedding(
input=pt,
......@@ -127,10 +121,7 @@ class TestPSPassWithBow(unittest.TestCase):
learning_rate=base_lr),
bias_attr=fluid.ParamAttr(name="__fc_b__"))
# nt
nt = fluid.layers.data(name="neg_title_ids",
shape=[1],
dtype="int64",
lod_level=1)
nt = fluid.layers.data(name="3", shape=[1], dtype="int64", lod_level=1)
# embedding
nt_emb = fluid.contrib.layers.sparse_embedding(
input=nt,
......
......@@ -77,10 +77,7 @@ class TestPSPassWithBow(unittest.TestCase):
is_sparse = True
# query
q = fluid.layers.data(name="query_ids",
shape=[1],
dtype="int64",
lod_level=1)
q = fluid.layers.data(name="1", shape=[1], dtype="int64", lod_level=1)
# embedding
q_emb = fluid.contrib.layers.sparse_embedding(
input=q,
......@@ -104,10 +101,7 @@ class TestPSPassWithBow(unittest.TestCase):
# label data
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
# pt
pt = fluid.layers.data(name="pos_title_ids",
shape=[1],
dtype="int64",
lod_level=1)
pt = fluid.layers.data(name="2", shape=[1], dtype="int64", lod_level=1)
# embedding
pt_emb = fluid.contrib.layers.sparse_embedding(
input=pt,
......@@ -130,10 +124,7 @@ class TestPSPassWithBow(unittest.TestCase):
learning_rate=base_lr),
bias_attr=fluid.ParamAttr(name="__fc_b__"))
# nt
nt = fluid.layers.data(name="neg_title_ids",
shape=[1],
dtype="int64",
lod_level=1)
nt = fluid.layers.data(name="3", shape=[1], dtype="int64", lod_level=1)
# embedding
nt_emb = fluid.contrib.layers.sparse_embedding(
input=nt,
......
......@@ -40,6 +40,7 @@ from ...fluid.layers import while_loop # noqa: F401
from ...fluid.input import embedding # noqa: F401
from ...fluid.contrib.layers import sparse_embedding # noqa: F401
from ...fluid.layers import continuous_value_model # noqa: F401
from ...fluid.layers.sequence_lod import sequence_conv # noqa: F401
from ...fluid.layers.sequence_lod import sequence_softmax # noqa: F401
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册