diff --git a/doc/design/dist_refactor/distributed_architecture.md b/doc/design/dist_refactor/distributed_architecture.md
index 3a741f95866fb6c301ca9097af7916281f2278cf..9368c5780dc922953f38bf0f86d9f797a4a8a6fe 100644
--- a/doc/design/dist_refactor/distributed_architecture.md
+++ b/doc/design/dist_refactor/distributed_architecture.md
@@ -152,12 +152,12 @@ for data in train_reader():
`JobDesc` object describe the distributed job resource specification to run on
Cluster environment.
-
+
`RemoteExecutor.run` sends the `ProgramDesc` and
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/develop/doc/autoscale/README.md#training-job-resource)
to a server in the cluster which executes `RemoteExecutor.listen`. This server is responsible
-to start the final Kubernetes Jobs to run the different role of `ProgramDesc`.
+to start the final Kubernetes Jobs to run the different role of `ProgramDesc` from `ConfigMap`.
### Placement Algorithm
diff --git a/doc/design/dist_refactor/src/remote_executor.graffle b/doc/design/dist_refactor/src/remote_executor.graffle
index ce2c18fee5687732053c48af9c8c290a994a8090..41b2067311694b56d211a4f32d1b76884eeffd2d 100644
Binary files a/doc/design/dist_refactor/src/remote_executor.graffle and b/doc/design/dist_refactor/src/remote_executor.graffle differ
diff --git a/doc/design/dist_refactor/src/remote_executor.png b/doc/design/dist_refactor/src/remote_executor.png
index 6be4b1841b99efdb59557975485d0387f422308c..744e2fb2e0f1bbe058e991ba7b2a09000965ee79 100644
Binary files a/doc/design/dist_refactor/src/remote_executor.png and b/doc/design/dist_refactor/src/remote_executor.png differ
diff --git a/doc/howto/usage/cluster/fluid_cluster_train_en.md b/doc/howto/usage/cluster/fluid_cluster_train_en.md
index 11904a6f71bb6ce37417aeffb8e408ec65961b12..ae825d9a517c7e9005d4e32f8f34b3f6a79be0c9 100644
--- a/doc/howto/usage/cluster/fluid_cluster_train_en.md
+++ b/doc/howto/usage/cluster/fluid_cluster_train_en.md
@@ -16,6 +16,12 @@ PaddlePaddle must be installed on all nodes. If you have GPU cards on your nodes
PaddlePaddle build and installation guide can be found [here](http://www.paddlepaddle.org/docs/develop/documentation/en/getstarted/build_and_install/index_en.html).
+In addition to above, the `cmake` command should be run with the option `WITH_DISTRIBUTE` set to on. An example bare minimum `cmake` command would look as follows:
+
+``` bash
+cmake .. -DWITH_DOC=OFF -DWITH_GPU=OFF -DWITH_DISTRIBUTE=ON -DWITH_SWIG_PY=ON -DWITH_PYTHON=ON
+```
+
### Update the training script
#### Non-cluster training script
@@ -119,7 +125,14 @@ for pass_id in range(100):
### E2E demo
-Please find the complete demo from [here](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/fluid/tests/book_distribute/notest_dist_fit_a_line.py). In parameter server node run the following in the command line:
+Please find the complete demo from [here](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/fluid/tests/book_distribute/notest_dist_fit_a_line.py).
+First `cd` into the folder that contains the `python` files. In this case:
+
+```bash
+cd /paddle/python/paddle/v2/fluid/tests/book_distribute
+```
+
+In parameter server node run the following in the command line:
``` bash
PSERVERS=192.168.1.2:6174 SERVER_ENDPOINT=192.168.1.2:6174 TRAINING_ROLE=PSERVER python notest_dist_fit_a_line.py
diff --git a/paddle/inference/inference.cc b/paddle/inference/inference.cc
index 49001778808173b82865a4b6632a6b175ef96242..09268ffb3a1410b22f1b7d997a5cc0e4176b6d55 100644
--- a/paddle/inference/inference.cc
+++ b/paddle/inference/inference.cc
@@ -19,14 +19,10 @@ limitations under the License. */
#include "paddle/framework/init.h"
#include "paddle/framework/scope.h"
-#ifdef PADDLE_USE_PTOOLS
-#include "chooseser.h"
-#endif
-
namespace paddle {
void InferenceEngine::LoadInferenceModel(const std::string& dirname) {
- std::string model_filename = dirname + "/__model__.dat";
+ std::string model_filename = dirname + "/__model__";
LOG(INFO) << "loading model from " << model_filename;
std::ifstream inputfs(model_filename, std::ios::in | std::ios::binary);
std::string program_desc_str;
@@ -52,39 +48,15 @@ void InferenceEngine::LoadInferenceModel(const std::string& dirname) {
}
}
-void InferenceEngine::LoadInferenceModel(
- const std::string& dirname,
- const std::vector& feed_var_names,
- const std::vector& fetch_var_names) {
- std::string model_filename = dirname + "/__model__.dat";
- LOG(INFO) << "loading model from " << model_filename;
- std::ifstream inputfs(model_filename, std::ios::in | std::ios::binary);
- std::string program_desc_str;
- inputfs.seekg(0, std::ios::end);
- program_desc_str.resize(inputfs.tellg());
- inputfs.seekg(0, std::ios::beg);
- LOG(INFO) << "program_desc_str's size: " << program_desc_str.size();
- inputfs.read(&program_desc_str[0], program_desc_str.size());
- inputfs.close();
-
- program_ = new framework::ProgramDesc(program_desc_str);
- GenerateLoadProgram(dirname);
-
- if (feed_var_names.empty() || fetch_var_names.empty()) {
- LOG(FATAL) << "Please specify the feed_var_names and fetch_var_names.";
- }
- feed_var_names_ = feed_var_names;
- fetch_var_names_ = fetch_var_names;
- PrependFeedOp();
- AppendFetchOp();
-}
-
bool InferenceEngine::IsParameter(const framework::VarDesc* var) {
- if (var->Persistable() && var->Name() != "feed" && var->Name() != "fetch") {
+ if (var->Persistable()) {
// There are many unreachable variables in the program
for (size_t i = 0; i < program_->Size(); ++i) {
const framework::BlockDesc& block = program_->Block(i);
for (auto* op : block.AllOps()) {
+ if (op->Type() == "feed") {
+ continue;
+ }
for (auto input_argument_name : op->InputArgumentNames()) {
if (input_argument_name == var->Name()) {
return true;
diff --git a/paddle/inference/inference.h b/paddle/inference/inference.h
index 7fc09cb9e539a65a8cd3cceb1543bc7d111c22b3..26f259824b945e260b370ced9d065842264075d5 100644
--- a/paddle/inference/inference.h
+++ b/paddle/inference/inference.h
@@ -29,9 +29,6 @@ public:
}
void LoadInferenceModel(const std::string& dirname);
- void LoadInferenceModel(const std::string& dirname,
- const std::vector& feed_var_names,
- const std::vector& fetch_var_names);
void Execute(const std::vector& feeds,
std::vector& fetchs);
diff --git a/paddle/operators/detail/grpc_client.cc b/paddle/operators/detail/grpc_client.cc
index 1e41587c418fb0ce4e452d5c6735c54e2d42f798..d699dabf2fb982f267c4869180efaf0e600eb46c 100644
--- a/paddle/operators/detail/grpc_client.cc
+++ b/paddle/operators/detail/grpc_client.cc
@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "grpc_client.h"
+#include "paddle/framework/threadpool.h"
namespace paddle {
namespace operators {
namespace detail {
@@ -22,25 +23,32 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,
const framework::Scope& scope,
const std::string& var_name,
int64_t time_out) {
- sendrecv::VariableMessage req;
- auto* var = scope.FindVar(var_name);
- SerializeToMessage(var_name, var, ctx, &req);
-
- // varhandle
- VarHandle var_h;
- var_h.ep = ep;
- var_h.scope = &scope;
- var_h.name = var_name;
- var_h.ctx = &ctx;
-
- // stub context
- auto ch = GetChannel(ep);
- SendProcessor* s = new SendProcessor(ch);
- s->Prepare(var_h, time_out);
- s->response_call_back_ = NULL;
-
- auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
- rpc->Finish(&s->reply_, &s->status_, (void*)s);
+ const platform::DeviceContext* p_ctx = &ctx;
+ const std::string ep_val = ep;
+ const std::string var_name_val = var_name;
+ const framework::Scope* p_scope = &scope;
+ const auto ch = GetChannel(ep_val);
+
+ framework::Async([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, this] {
+ auto* var = p_scope->FindVar(var_name_val);
+ sendrecv::VariableMessage req;
+ SerializeToMessage(var_name_val, var, *p_ctx, &req);
+
+ // varhandle
+ VarHandle var_h;
+ var_h.ep = ep_val;
+ var_h.scope = p_scope;
+ var_h.name = var_name_val;
+ var_h.ctx = p_ctx;
+
+ // stub context
+ SendProcessor* s = new SendProcessor(ch);
+ s->Prepare(var_h, time_out);
+ s->response_call_back_ = NULL;
+
+ auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
+ rpc->Finish(&s->reply_, &s->status_, (void*)s);
+ });
req_count_++;
@@ -50,8 +58,6 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,
void ProcGetResponse(const VarHandle& var_h,
const sendrecv::VariableMessage& ret_msg) {
auto* outvar = var_h.scope->FindVar(var_h.name);
-
- std::istringstream iss(ret_msg.serialized());
DeserializeFromMessage(ret_msg, *var_h.ctx, outvar);
}
@@ -60,24 +66,31 @@ bool RPCClient::AsyncGetVariable(const std::string& ep,
const framework::Scope& scope,
const std::string& var_name,
int64_t time_out) {
- sendrecv::VariableMessage req;
- req.set_varname(var_name);
-
- // varhandle
- VarHandle var_h;
- var_h.ep = ep;
- var_h.scope = &scope;
- var_h.name = var_name;
- var_h.ctx = &ctx;
-
- // stub context
- auto ch = GetChannel(ep);
- GetProcessor* s = new GetProcessor(ch);
- s->Prepare(var_h, time_out);
- s->response_call_back_ = ProcGetResponse;
-
- auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_);
- rpc->Finish(&s->reply_, &s->status_, (void*)s);
+ const platform::DeviceContext* p_ctx = &ctx;
+ const std::string ep_val = ep;
+ const std::string var_name_val = var_name;
+ const framework::Scope* p_scope = &scope;
+ const auto ch = GetChannel(ep_val);
+
+ framework::Async([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, this] {
+ sendrecv::VariableMessage req;
+ req.set_varname(var_name_val);
+
+ // varhandle
+ VarHandle var_h;
+ var_h.ep = ep_val;
+ var_h.scope = p_scope;
+ var_h.name = var_name_val;
+ var_h.ctx = p_ctx;
+
+ // stub context
+ GetProcessor* s = new GetProcessor(ch);
+ s->Prepare(var_h, time_out);
+ s->response_call_back_ = ProcGetResponse;
+
+ auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_);
+ rpc->Finish(&s->reply_, &s->status_, (void*)s);
+ });
req_count_++;
@@ -85,19 +98,31 @@ bool RPCClient::AsyncGetVariable(const std::string& ep,
}
bool RPCClient::Wait() {
- bool ok = true;
+ if (req_count_ <= 0) {
+ return true;
+ }
- while (true) {
- if (req_count_ <= 0) {
- break;
- }
+ std::vector a(req_count_);
+ std::vector> waits(req_count_);
- if (!Proceed()) {
+ for (int i = 0; i < req_count_; i++) {
+ waits[i] = framework::Async([i, &a, this] { a[i] = Proceed(); });
+ }
+
+ for (int i = 0; i < req_count_; i++) {
+ waits[i].wait();
+ }
+
+ int last_req_count = req_count_;
+ req_count_ = 0;
+
+ for (int i = 0; i < last_req_count; i++) {
+ if (!a[i]) {
return false;
}
}
- return ok;
+ return true;
}
bool RPCClient::Proceed() {
@@ -124,7 +149,6 @@ bool RPCClient::Proceed() {
c->Process();
delete c;
- req_count_--;
return true;
}
diff --git a/paddle/operators/math/CMakeLists.txt b/paddle/operators/math/CMakeLists.txt
index c607704efac86982c8c22e462381aaab488a9b69..28c5aec1996ad04a6cb551ac68c14b613d16858e 100644
--- a/paddle/operators/math/CMakeLists.txt
+++ b/paddle/operators/math/CMakeLists.txt
@@ -11,7 +11,7 @@ if(WITH_GPU)
nv_library(sequence_pooling SRCS sequence_pooling.cc sequence_pooling.cu DEPS device_context math_function)
nv_library(vol2col SRCS vol2col.cc vol2col.cu DEPS device_context tensor)
nv_library(context_project SRCS context_project.cc context_project.cu DEPS device_context math_function)
- nv_library(sequence2batch SRCS sequence2batch.cc sequence2batch.cu DEPS device_context tensor)
+ nv_library(sequence2batch SRCS sequence2batch.cc sequence2batch.cu DEPS device_context tensor math_function)
nv_library(sequence_padding SRCS sequence_padding.cc sequence_padding.cu DEPS lod_tensor device_context)
nv_library(sequence_scale SRCS sequence_scale.cc sequence_scale.cu DEPS lod_tensor device_context)
nv_library(lstm_compute SRCS lstm_compute.cc lstm_compute.cu DEPS device_context activation_functions)
@@ -28,7 +28,7 @@ else()
cc_library(sequence_pooling SRCS sequence_pooling.cc DEPS device_context math_function)
cc_library(vol2col SRCS vol2col.cc DEPS device_context tensor)
cc_library(context_project SRCS context_project.cc DEPS device_context math_function)
- cc_library(sequence2batch SRCS sequence2batch.cc DEPS device_context tensor)
+ cc_library(sequence2batch SRCS sequence2batch.cc DEPS device_context tensor math_function)
cc_library(sequence_padding SRCS sequence_padding.cc DEPS lod_tensor device_context)
cc_library(sequence_scale SRCS sequence_scale.cc DEPS lod_tensor device_context)
cc_library(lstm_compute SRCS lstm_compute.cc DEPS device_context activation_functions)
diff --git a/python/paddle/v2/fluid/backward.py b/python/paddle/v2/fluid/backward.py
index ae81d68bafd22db5d9f7ab0f9cc0dcdb204493e1..29243c90e872ca4a7d1ce6f84f6297b865655da1 100644
--- a/python/paddle/v2/fluid/backward.py
+++ b/python/paddle/v2/fluid/backward.py
@@ -178,7 +178,7 @@ def _remove_no_grad_branch_(op_descs, no_grad_set):
if _all_in_set_(
filter(lambda name: name.find(core.grad_var_suffix()) != -1,
op_desc.input_arg_names()), no_grad_set):
- no_grad_set.union(out_arg_names)
+ no_grad_set.update(out_arg_names)
return True
return False
diff --git a/python/paddle/v2/fluid/executor.py b/python/paddle/v2/fluid/executor.py
index 9d5ed9571a2fa0a871a25e43b23b1a3c3a6102db..9f48815b8b84426c7d539af4e7d45ea47e69d4d9 100644
--- a/python/paddle/v2/fluid/executor.py
+++ b/python/paddle/v2/fluid/executor.py
@@ -68,6 +68,84 @@ def as_numpy(tensor):
return ans
+def has_feed_operators(block, feed_targets, feed_holder_name):
+ """ Check whether the block already has feed operators.
+
+ Return false if the block does not have any feed operators.
+ If some feed operators have been prepended to the block, check that
+ the info contained in these feed operators matches the feed_targets
+ and feed_holder_name. Raise exception when any mismatch is found.
+ Return true when the block has feed operators with matching info.
+
+ Args:
+ block: a block instance (typically global block of a program)
+ feed_targets: a dictionary of {feed_target_name: feed_target_data}
+ feed_holder_name: the name of the variable that holds the data of
+ all feed targets. The type of this feed_holder variable is
+ FEED_MINIBATCH, which is essentially vector.
+
+ Returns:
+ A boolean value that indicates whether a block has feed operators
+ that match the info contained in feed_targets and feed_holder_name.
+ """
+
+ feed_count = 0
+ for op in block.ops:
+ if op.desc.type() == 'feed':
+ feed_count += 1
+ assert op.desc.input('X')[0] == feed_holder_name
+ feed_target_name = op.desc.output('Out')[0]
+ if feed_target_name not in feed_targets:
+ raise Exception("'feed_targets' does not have {} variable".
+ format(feed_target_name))
+ else:
+ break
+ if feed_count > 0 and feed_count != len(feed_targets):
+ raise Exception(
+ "Feed operators in program desc do not match 'feed_targets'")
+ return feed_count > 0
+
+
+def has_fetch_operators(block, fetch_targets, fetch_holder_name):
+ """ Check whether the block already has fetch operators.
+
+ Return false if the block does not have any fetch operators.
+ If some fetch operators have been appended to the block, check that
+ the info contained in these fetch operators matches the fetch_targets
+ and fetch_holder_name. Raise exception when any mismatch is found.
+ Return true when the block has fetch operators with matching info.
+
+ Args:
+ block: a block instance (typically global block of a program)
+ fetch_targets: a dictionary of {fetch_target_name: fetch_target_data}
+ fetch_holder_name: the name of the variable that holds the data of
+ all fetch targets. The type of this fetch_holder variable is
+ FETCH_LIST, which is essentially vector.
+
+ Return:
+ A boolean value that indicates whether a block has fetch operators
+ that match the info contained in fetch_targets and fetch_holder_name.
+ """
+
+ fetch_count = 0
+ for op in block.ops:
+ if op.desc.type() == 'fetch':
+ fetch_count += 1
+ assert op.desc.output('Out')[0] == fetch_holder_name
+ fetch_target_name = op.desc.input('X')[0]
+ if fetch_target_name not in [
+ var.desc.name() for var in fetch_targets
+ ]:
+ raise Exception("'fetch_targets' does not have {} variable".
+ format(fetch_target_name))
+ idx = op.desc.attr('col')
+ assert fetch_target_name == fetch_targets[idx].desc.name()
+ if fetch_count > 0 and fetch_count != len(fetch_targets):
+ raise Exception(
+ "Fetch operators in program desc do not match 'fetch_targets'")
+ return fetch_count > 0
+
+
class Executor(object):
def __init__(self, places):
if not isinstance(places, list) and not isinstance(places, tuple):
@@ -147,33 +225,50 @@ class Executor(object):
program = program.clone()
global_block = program.global_block()
- feed_var = global_block.create_var(
- name=feed_var_name,
- type=core.VarDesc.VarType.FEED_MINIBATCH,
- persistable=True)
-
- for i, name in enumerate(feed):
- out = global_block.var(name)
- global_block.prepend_op(
- 'feed',
- inputs={'X': [feed_var]},
- outputs={'Out': [out]},
- attrs={'col': i})
- cur_feed = feed[name]
- if not isinstance(cur_feed, core.LoDTensor):
- cur_feed = self.aslodtensor(cur_feed)
- core.set_feed_variable(scope, cur_feed, feed_var.name, i)
-
- fetch_var = global_block.create_var(
- name=fetch_var_name,
- type=core.VarDesc.VarType.FETCH_LIST,
- persistable=True)
- for i, var in enumerate(fetch_list):
- global_block.append_op(
- type='fetch',
- inputs={'X': [var]},
- outputs={'Out': [fetch_var]},
- attrs={'col': i})
+
+ if feed_var_name in global_block.vars:
+ feed_var = global_block.var(feed_var_name)
+ else:
+ feed_var = global_block.create_var(
+ name=feed_var_name,
+ type=core.VarDesc.VarType.FEED_MINIBATCH,
+ persistable=True)
+
+ if fetch_var_name in global_block.vars:
+ fetch_var = global_block.var(fetch_var_name)
+ else:
+ fetch_var = global_block.create_var(
+ name=fetch_var_name,
+ type=core.VarDesc.VarType.FETCH_LIST,
+ persistable=True)
+
+ if not has_feed_operators(global_block, feed, feed_var_name):
+ for i, name in enumerate(feed):
+ out = global_block.var(name)
+ global_block.prepend_op(
+ type='feed',
+ inputs={'X': [feed_var]},
+ outputs={'Out': [out]},
+ attrs={'col': i})
+
+ for op in global_block.ops:
+ if op.desc.type() == 'feed':
+ feed_target_name = op.desc.output('Out')[0]
+ cur_feed = feed[feed_target_name]
+ if not isinstance(cur_feed, core.LoDTensor):
+ cur_feed = self.aslodtensor(cur_feed)
+ idx = op.desc.attr('col')
+ core.set_feed_variable(scope, cur_feed, feed_var_name, idx)
+ else:
+ break
+
+ if not has_fetch_operators(global_block, fetch_list, fetch_var_name):
+ for i, var in enumerate(fetch_list):
+ global_block.append_op(
+ type='fetch',
+ inputs={'X': [var]},
+ outputs={'Out': [fetch_var]},
+ attrs={'col': i})
self.executor.run(program.desc, scope, 0, True, True)
outs = [
diff --git a/python/paddle/v2/fluid/io.py b/python/paddle/v2/fluid/io.py
index 5b02d2495d1ebe9e82e7f847e5bd07548901c7fc..d56ec45c538b580f5520bc060b4b339bb1be0539 100644
--- a/python/paddle/v2/fluid/io.py
+++ b/python/paddle/v2/fluid/io.py
@@ -13,7 +13,6 @@
# limitations under the License.
import os
-import cPickle as pickle
from paddle.v2.fluid.evaluator import Evaluator
from paddle.v2.fluid.framework import Program, Parameter, default_main_program, Variable
@@ -191,8 +190,8 @@ def get_inference_program(target_vars, main_program=None):
vars = []
for var in target_vars:
if isinstance(var, Evaluator):
- vars.append(var.states)
- vars.append(var.metrics)
+ vars.extend(var.states)
+ vars.extend(var.metrics)
else:
vars.append(var)
pruned_program = main_program.prune(targets=vars)
@@ -200,12 +199,16 @@ def get_inference_program(target_vars, main_program=None):
return inference_program
-def prepend_feed_ops(inference_program, feeded_var_names):
+def prepend_feed_ops(inference_program,
+ feed_target_names,
+ feed_holder_name='feed'):
global_block = inference_program.global_block()
feed_var = global_block.create_var(
- name='feed', type=core.VarDesc.VarType.FEED_MINIBATCH, persistable=True)
+ name=feed_holder_name,
+ type=core.VarDesc.VarType.FEED_MINIBATCH,
+ persistable=True)
- for i, name in enumerate(feeded_var_names):
+ for i, name in enumerate(feed_target_names):
out = global_block.var(name)
global_block.prepend_op(
type='feed',
@@ -214,12 +217,16 @@ def prepend_feed_ops(inference_program, feeded_var_names):
attrs={'col': i})
-def append_fetch_ops(inference_program, fetch_var_names):
+def append_fetch_ops(inference_program,
+ fetch_target_names,
+ fetch_holder_name='fetch'):
global_block = inference_program.global_block()
fetch_var = global_block.create_var(
- name='fetch', type=core.VarDesc.VarType.FETCH_LIST, persistable=True)
+ name=fetch_holder_name,
+ type=core.VarDesc.VarType.FETCH_LIST,
+ persistable=True)
- for i, name in enumerate(fetch_var_names):
+ for i, name in enumerate(fetch_target_names):
global_block.append_op(
type='fetch',
inputs={'X': [name]},
@@ -269,21 +276,12 @@ def save_inference_model(dirname,
inference_program = pruned_program.inference_optimize()
fetch_var_names = [v.name for v in target_vars]
- model_file_name = dirname + "/__model__"
- with open(model_file_name, "w") as f:
- pickle.dump({
- "program_desc_str": inference_program.desc.serialize_to_string(),
- "feed_var_names": feeded_var_names,
- "fetch_var_names": fetch_var_names
- }, f, -1)
-
prepend_feed_ops(inference_program, feeded_var_names)
append_fetch_ops(inference_program, fetch_var_names)
- # Save only programDesc of inference_program in binary format
- # in another file: __model__.dat
- with open(model_file_name + ".dat", "wb") as fp:
- fp.write(inference_program.desc.serialize_to_string())
+ model_file_name = dirname + "/__model__"
+ with open(model_file_name, "wb") as f:
+ f.write(inference_program.desc.serialize_to_string())
save_params(executor, dirname, main_program)
@@ -306,6 +304,24 @@ def load_persistables_if_exist(executor, dirname, main_program=None):
predicate=_is_presistable_and_exist_)
+def get_feed_targets_names(program):
+ feed_targets_names = []
+ global_block = program.global_block()
+ for op in global_block.ops:
+ if op.desc.type() == 'feed':
+ feed_targets_names.insert(0, op.desc.output('Out')[0])
+ return feed_targets_names
+
+
+def get_fetch_targets_names(program):
+ fetch_targets_names = []
+ global_block = program.global_block()
+ for op in global_block.ops:
+ if op.desc.type() == 'fetch':
+ fetch_targets_names.append(op.desc.input('X')[0])
+ return fetch_targets_names
+
+
def load_inference_model(dirname, executor):
"""
Load inference model from a directory
@@ -313,24 +329,28 @@ def load_inference_model(dirname, executor):
:param dirname: directory path
:param executor: executor that load inference model
- :return: [program, feed_var_names, fetch_var_names]
+ :return: [program, feed_target_names, fetch_targets]
program: program especially for inference.
- feeded_var_names: Names of variables that need to feed data
- fetch_vars: Variables from which we can get inference results.
+ feed_target_names: Names of variables that need to feed data
+ fetch_targets: Variables from which we can get inference results.
"""
if not os.path.isdir(dirname):
raise ValueError("There is no directory named '%s'", dirname)
model_file_name = dirname + "/__model__"
- model = pickle.load(open(model_file_name, "r"))
- program_desc_str = model["program_desc_str"]
- feed_var_names = model["feed_var_names"]
- fetch_var_names = model["fetch_var_names"]
+ with open(model_file_name, "rb") as f:
+ program_desc_str = f.read()
+
program = Program.parse_from_string(program_desc_str)
load_persistables_if_exist(executor, dirname, program)
- fetch_vars = [program.global_block().var(name) for name in fetch_var_names]
- return [program, feed_var_names, fetch_vars]
+ feed_target_names = get_feed_targets_names(program)
+ fetch_target_names = get_fetch_targets_names(program)
+ fetch_targets = [
+ program.global_block().var(name) for name in fetch_target_names
+ ]
+
+ return [program, feed_target_names, fetch_targets]
def get_parameter_value(para, executor):
diff --git a/python/paddle/v2/fluid/tests/book_distribute/notest_recommender_system_dist.py b/python/paddle/v2/fluid/tests/book_distribute/notest_recommender_system_dist.py
new file mode 100644
index 0000000000000000000000000000000000000000..2d8885e377b0a10d8b5bad4e8fcecb9cc6fc8b64
--- /dev/null
+++ b/python/paddle/v2/fluid/tests/book_distribute/notest_recommender_system_dist.py
@@ -0,0 +1,216 @@
+# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import numpy as np
+import os
+import paddle.v2 as paddle
+import paddle.v2.fluid as fluid
+import paddle.v2.fluid.core as core
+import paddle.v2.fluid.layers as layers
+import paddle.v2.fluid.nets as nets
+from paddle.v2.fluid.optimizer import SGDOptimizer
+
+IS_SPARSE = True
+BATCH_SIZE = 256
+PASS_NUM = 100
+
+
+def get_usr_combined_features():
+ USR_DICT_SIZE = paddle.dataset.movielens.max_user_id() + 1
+ uid = layers.data(name='user_id', shape=[1], dtype='int64')
+ usr_emb = layers.embedding(
+ input=uid,
+ dtype='float32',
+ size=[USR_DICT_SIZE, 32],
+ param_attr='user_table',
+ is_sparse=IS_SPARSE)
+ usr_fc = layers.fc(input=usr_emb, size=32)
+ USR_GENDER_DICT_SIZE = 2
+
+ usr_gender_id = layers.data(name='gender_id', shape=[1], dtype='int64')
+ usr_gender_emb = layers.embedding(
+ input=usr_gender_id,
+ size=[USR_GENDER_DICT_SIZE, 16],
+ param_attr='gender_table',
+ is_sparse=IS_SPARSE)
+ usr_gender_fc = layers.fc(input=usr_gender_emb, size=16)
+
+ USR_AGE_DICT_SIZE = len(paddle.dataset.movielens.age_table)
+ usr_age_id = layers.data(name='age_id', shape=[1], dtype="int64")
+ usr_age_emb = layers.embedding(
+ input=usr_age_id,
+ size=[USR_AGE_DICT_SIZE, 16],
+ is_sparse=IS_SPARSE,
+ param_attr='age_table')
+ usr_age_fc = layers.fc(input=usr_age_emb, size=16)
+
+ USR_JOB_DICT_SIZE = paddle.dataset.movielens.max_job_id() + 1
+ usr_job_id = layers.data(name='job_id', shape=[1], dtype="int64")
+ usr_job_emb = layers.embedding(
+ input=usr_job_id,
+ size=[USR_JOB_DICT_SIZE, 16],
+ param_attr='job_table',
+ is_sparse=IS_SPARSE)
+ usr_job_fc = layers.fc(input=usr_job_emb, size=16)
+
+ concat_embed = layers.concat(
+ input=[usr_fc, usr_gender_fc, usr_age_fc, usr_job_fc], axis=1)
+
+ usr_combined_features = layers.fc(input=concat_embed, size=200, act="tanh")
+ return usr_combined_features
+
+
+def get_mov_combined_features():
+ MOV_DICT_SIZE = paddle.dataset.movielens.max_movie_id() + 1
+ mov_id = layers.data(name='movie_id', shape=[1], dtype='int64')
+ mov_emb = layers.embedding(
+ input=mov_id,
+ dtype='float32',
+ size=[MOV_DICT_SIZE, 32],
+ param_attr='movie_table',
+ is_sparse=IS_SPARSE)
+ mov_fc = layers.fc(input=mov_emb, size=32)
+
+ CATEGORY_DICT_SIZE = len(paddle.dataset.movielens.movie_categories())
+ category_id = layers.data(name='category_id', shape=[1], dtype='int64')
+ mov_categories_emb = layers.embedding(
+ input=category_id, size=[CATEGORY_DICT_SIZE, 32], is_sparse=IS_SPARSE)
+ mov_categories_hidden = layers.sequence_pool(
+ input=mov_categories_emb, pool_type="sum")
+
+ MOV_TITLE_DICT_SIZE = len(paddle.dataset.movielens.get_movie_title_dict())
+ mov_title_id = layers.data(name='movie_title', shape=[1], dtype='int64')
+ mov_title_emb = layers.embedding(
+ input=mov_title_id, size=[MOV_TITLE_DICT_SIZE, 32], is_sparse=IS_SPARSE)
+ mov_title_conv = nets.sequence_conv_pool(
+ input=mov_title_emb,
+ num_filters=32,
+ filter_size=3,
+ act="tanh",
+ pool_type="sum")
+
+ concat_embed = layers.concat(
+ input=[mov_fc, mov_categories_hidden, mov_title_conv], axis=1)
+
+ mov_combined_features = layers.fc(input=concat_embed, size=200, act="tanh")
+ return mov_combined_features
+
+
+def model():
+ usr_combined_features = get_usr_combined_features()
+ mov_combined_features = get_mov_combined_features()
+
+ # need cos sim
+ inference = layers.cos_sim(X=usr_combined_features, Y=mov_combined_features)
+ scale_infer = layers.scale(x=inference, scale=5.0)
+
+ label = layers.data(name='score', shape=[1], dtype='float32')
+ square_cost = layers.square_error_cost(input=scale_infer, label=label)
+ avg_cost = layers.mean(x=square_cost)
+
+ return avg_cost
+
+
+def func_feed(feeding, data, place):
+ feed_tensors = {}
+ for (key, idx) in feeding.iteritems():
+ tensor = core.LoDTensor()
+ if key != "category_id" and key != "movie_title":
+ if key == "score":
+ numpy_data = np.array(map(lambda x: x[idx], data)).astype(
+ "float32")
+ else:
+ numpy_data = np.array(map(lambda x: x[idx], data)).astype(
+ "int64")
+ else:
+ numpy_data = map(lambda x: np.array(x[idx]).astype("int64"), data)
+ lod_info = [len(item) for item in numpy_data]
+ offset = 0
+ lod = [offset]
+ for item in lod_info:
+ offset += item
+ lod.append(offset)
+ numpy_data = np.concatenate(numpy_data, axis=0)
+ tensor.set_lod([lod])
+
+ numpy_data = numpy_data.reshape([numpy_data.shape[0], 1])
+ tensor.set(numpy_data, place)
+ feed_tensors[key] = tensor
+ return feed_tensors
+
+
+def main():
+ cost = model()
+ optimizer = SGDOptimizer(learning_rate=0.2)
+ optimize_ops, params_grads = optimizer.minimize(cost)
+
+ train_reader = paddle.batch(
+ paddle.reader.shuffle(
+ paddle.dataset.movielens.train(), buf_size=8192),
+ batch_size=BATCH_SIZE)
+
+ place = fluid.CPUPlace()
+ exe = fluid.Executor(place)
+
+ t = fluid.DistributeTranspiler()
+
+ # all parameter server endpoints list for spliting parameters
+ pserver_endpoints = os.getenv("PSERVERS")
+ # server endpoint for current node
+ current_endpoint = os.getenv("SERVER_ENDPOINT")
+ # run as trainer or parameter server
+ training_role = os.getenv("TRAINING_ROLE", "TRAINER")
+ t.transpile(
+ optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2)
+
+ if training_role == "PSERVER":
+ if not current_endpoint:
+ print("need env SERVER_ENDPOINT")
+ exit(1)
+ pserver_prog = t.get_pserver_program(current_endpoint)
+ pserver_startup = t.get_startup_program(current_endpoint, pserver_prog)
+ exe.run(pserver_startup)
+ exe.run(pserver_prog)
+ elif training_role == "TRAINER":
+ exe.run(fluid.default_startup_program())
+ trainer_prog = t.get_trainer_program()
+
+ feeding = {
+ 'user_id': 0,
+ 'gender_id': 1,
+ 'age_id': 2,
+ 'job_id': 3,
+ 'movie_id': 4,
+ 'category_id': 5,
+ 'movie_title': 6,
+ 'score': 7
+ }
+
+ for pass_id in range(PASS_NUM):
+ for data in train_reader():
+ outs = exe.run(trainer_prog,
+ feed=func_feed(feeding, data, place),
+ fetch_list=[cost])
+ out = np.array(outs[0])
+ print("cost=" + str(out[0]))
+ if out[0] < 6.0:
+ print("Training complete. Average cost is less than 6.0.")
+ # if avg cost less than 6.0, we think our code is good.
+ exit(0)
+ else:
+ print("environment var TRAINER_ROLE should be TRAINER os PSERVER")
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/paddle/v2/fluid/tests/test_multihead_attention.py b/python/paddle/v2/fluid/tests/test_multihead_attention.py
index 54ec3e3d6e53f35d6a518ef659853e1a13c1711f..a2b300a645fe21931cc12a4e7bb8ebe9b85707c9 100644
--- a/python/paddle/v2/fluid/tests/test_multihead_attention.py
+++ b/python/paddle/v2/fluid/tests/test_multihead_attention.py
@@ -58,7 +58,7 @@ class TestMultiheadAttention(unittest.TestCase):
"""Run the test program.
"""
places = [core.CPUPlace()]
- if core.is_compile_gpu():
+ if core.is_compiled_with_cuda():
places.append(core.CUDAPlace(0))
for place in places: