未验证 提交 5e839e4d 编写于 作者: T tangwei12 提交者: GitHub

add sparse embedding & load vars for 2.0 & gloo bug fix (#30306)

* add sparse embedding & load vars for 2.0

Change-Id: I36b59ed5f015189dc9d9d2e34a9357722d369f1b

* fix hdfs gloo

Change-Id: Ia84d579053720ad804183e54c9a04b4f031c79c6

* fix gloo hdfs

Change-Id: I5ab982fd483cddc10adcdef0b8aa83aca976cb9e

* move loadvar/sparse embedding from incubute to static

Change-Id: I57081d3545ad2efab78c72420d2162c0eacaf3a0
上级 da3ab010
...@@ -229,18 +229,18 @@ void ParallelConnectContext::connectFullMesh( ...@@ -229,18 +229,18 @@ void ParallelConnectContext::connectFullMesh(
store.wait({key}, getTimeout()); store.wait({key}, getTimeout());
std::vector<char> allAddrs; std::vector<char> allAddrs;
auto max_retry_times = 5; auto max_retry_times = 10;
// Connect to other side of this pair // Connect to other side of this pair
while (max_retry_times > 0) { while (max_retry_times > 0) {
allAddrs = store.get(key); allAddrs = store.get(key);
VLOG(3) << "store get all address size: " << allAddrs.size() VLOG(3) << "store get all address size: " << allAddrs.size()
<< " except: " << total_add_size; << " except: " << total_add_size;
if (allAddrs.size() == static_cast<size_t>(total_add_size)) { if (allAddrs.size() == static_cast<size_t>(total_add_size)) {
break; break;
} }
sleep(5);
--max_retry_times; --max_retry_times;
} }
...@@ -272,11 +272,13 @@ void GlooWrapper::Init() { ...@@ -272,11 +272,13 @@ void GlooWrapper::Init() {
attr.iface = iface_; attr.iface = iface_;
std::shared_ptr<gloo::rendezvous::HdfsStore> file_store = nullptr; std::shared_ptr<gloo::rendezvous::HdfsStore> file_store = nullptr;
std::shared_ptr<gloo::rendezvous::HTTPStore> http_store = nullptr; std::shared_ptr<gloo::rendezvous::HTTPStore> http_store = nullptr;
auto context = std::make_shared<gloo::rendezvous::Context>(rank_, size_);
context->setTimeout(run_timeout_);
auto dev = gloo::transport::tcp::CreateDevice(attr); auto dev = gloo::transport::tcp::CreateDevice(attr);
switch (store_type_) { switch (store_type_) {
case GlooStoreType::HDFS: { case GlooStoreType::HDFS: {
auto context = std::make_shared<gloo::rendezvous::ParallelConnectContext>(
rank_, size_);
context->setTimeout(run_timeout_);
std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs"); std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs");
cmd += " -D fs.default.name=" + hdfs_name_; cmd += " -D fs.default.name=" + hdfs_name_;
cmd += " -D hadoop.job.ugi=" + hdfs_ugi_; cmd += " -D hadoop.job.ugi=" + hdfs_ugi_;
...@@ -286,22 +288,25 @@ void GlooWrapper::Init() { ...@@ -286,22 +288,25 @@ void GlooWrapper::Init() {
auto prefix_store = auto prefix_store =
std::make_shared<gloo::rendezvous::PrefixStore>(prefix_, *file_store); std::make_shared<gloo::rendezvous::PrefixStore>(prefix_, *file_store);
context->connectFullMesh(*prefix_store, dev); context->connectFullMesh(*prefix_store, dev);
context_ = std::move(context);
break; break;
} }
case GlooStoreType::HTTP: { case GlooStoreType::HTTP: {
auto context = std::make_shared<gloo::rendezvous::Context>(rank_, size_);
context->setTimeout(run_timeout_);
http_store = std::make_shared<gloo::rendezvous::HTTPStore>( http_store = std::make_shared<gloo::rendezvous::HTTPStore>(
http_ip_, http_port_, prefix_ + "_" + http_scope_, rank_); http_ip_, http_port_, prefix_ + "_" + http_scope_, rank_);
http_store->SetTimeoutSeconds(init_timeout_.count()); http_store->SetTimeoutSeconds(init_timeout_.count());
context->connectFullMesh(*http_store, dev); context->connectFullMesh(*http_store, dev);
http_store->Finalize(); http_store->Finalize();
VLOG(3) << "after calling http_store->Finalize."; VLOG(3) << "after calling http_store->Finalize.";
context_ = std::move(context);
break; break;
} }
default: default:
LOG(ERROR) << "unknown store type " << store_type_; LOG(ERROR) << "unknown store type " << store_type_;
exit(-1); exit(-1);
} }
context_ = std::move(context);
#endif #endif
is_initialized_ = true; is_initialized_ = true;
VLOG(3) << "gloo initialized done."; VLOG(3) << "gloo initialized done.";
......
...@@ -976,7 +976,7 @@ def sparse_embedding(input, ...@@ -976,7 +976,7 @@ def sparse_embedding(input,
'fluid.contrib.layers.sparse_embedding') 'fluid.contrib.layers.sparse_embedding')
check_dtype(dtype, 'dtype', ['float32'], check_dtype(dtype, 'dtype', ['float32'],
'fluid.contrib.layers.sparse_embedding') 'paddle.static.nn.sparse_embedding')
w = helper.create_parameter( w = helper.create_parameter(
attr=helper.param_attr, attr=helper.param_attr,
......
...@@ -14,13 +14,37 @@ ...@@ -14,13 +14,37 @@
# TODO: import framework api under this directory # TODO: import framework api under this directory
__all__ = [ __all__ = [
'append_backward', 'gradients', 'Executor', 'global_scope', 'scope_guard', 'append_backward',
'BuildStrategy', 'CompiledProgram', 'Print', 'py_func', 'ExecutionStrategy', 'gradients',
'name_scope', 'ParallelExecutor', 'program_guard', 'WeightNormParamAttr', 'Executor',
'default_main_program', 'default_startup_program', 'Program', 'data', 'global_scope',
'InputSpec', 'save', 'load', 'save_inference_model', 'load_inference_model', 'scope_guard',
'load_program_state', 'set_program_state', 'cpu_places', 'cuda_places', 'BuildStrategy',
'xpu_places', 'Variable' 'CompiledProgram',
'Print',
'py_func',
'ExecutionStrategy',
'name_scope',
'ParallelExecutor',
'program_guard',
'WeightNormParamAttr',
'default_main_program',
'default_startup_program',
'Program',
'data',
'InputSpec',
'save',
'load',
'save_inference_model',
'load_inference_model',
'load_program_state',
'set_program_state',
'cpu_places',
'cuda_places',
'xpu_places',
'Variable',
'load_vars',
'save_vars',
] ]
from . import nn from . import nn
...@@ -61,6 +85,10 @@ from ..fluid.io import save #DEFINE_ALIAS ...@@ -61,6 +85,10 @@ from ..fluid.io import save #DEFINE_ALIAS
from ..fluid.io import load #DEFINE_ALIAS from ..fluid.io import load #DEFINE_ALIAS
from ..fluid.io import load_program_state #DEFINE_ALIAS from ..fluid.io import load_program_state #DEFINE_ALIAS
from ..fluid.io import set_program_state #DEFINE_ALIAS from ..fluid.io import set_program_state #DEFINE_ALIAS
from ..fluid.io import load_vars #DEFINE_ALIAS
from ..fluid.io import save_vars #DEFINE_ALIAS
from ..fluid.layers import create_parameter #DEFINE_ALIAS from ..fluid.layers import create_parameter #DEFINE_ALIAS
from ..fluid.layers import create_global_var #DEFINE_ALIAS from ..fluid.layers import create_global_var #DEFINE_ALIAS
from ..fluid.layers.metric_op import auc #DEFINE_ALIAS from ..fluid.layers.metric_op import auc #DEFINE_ALIAS
...@@ -38,6 +38,7 @@ __all__ = [ ...@@ -38,6 +38,7 @@ __all__ = [
'spectral_norm', 'spectral_norm',
'switch_case', 'switch_case',
'while_loop', 'while_loop',
'sparse_embedding',
] ]
from .common import fc #DEFINE_ALIAS from .common import fc #DEFINE_ALIAS
...@@ -67,3 +68,4 @@ from ...fluid.layers import switch_case #DEFINE_ALIAS ...@@ -67,3 +68,4 @@ from ...fluid.layers import switch_case #DEFINE_ALIAS
from ...fluid.layers import while_loop #DEFINE_ALIAS from ...fluid.layers import while_loop #DEFINE_ALIAS
from ...fluid.input import embedding #DEFINE_ALIAS from ...fluid.input import embedding #DEFINE_ALIAS
from ...fluid.contrib.layers import sparse_embedding #DEFINE_ALIAS
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册