diff --git a/doc/fluid/design/algorithm/parameter_average.md b/doc/fluid/design/algorithm/parameter_average.md
index 70c5cdecadbf57af8200d63893a86b41a4282a54..940d37fb31dcd0c50ea6c4c42b052d7cb23a9c47 100644
--- a/doc/fluid/design/algorithm/parameter_average.md
+++ b/doc/fluid/design/algorithm/parameter_average.md
@@ -5,10 +5,10 @@ In a large scale machine learning setup where the size of the training data is h
Polyak and Juditsky (1992) showed that the test performance of simple average of parameters obtained by Stochastic Gradient Descent (SGD) is as good as that of parameter values that are obtained by training the model over and over again, over the training dataset.
-Hence, to accelerate the speed of Stochastic Gradient Descent, Averaged Stochastic Gradient Descent (ASGD) was proposed in Polyak and Juditsky (1992). For ASGD, the running average of parameters obtained by SGD, is used as the estimator for
. The averaging is done as follows:
+Hence, to accelerate the speed of Stochastic Gradient Descent, Averaged Stochastic Gradient Descent (ASGD) was proposed in Polyak and Juditsky (1992). For ASGD, the running average of parameters obtained by SGD, is used as the estimator for
. The averaging is done as follows:
-
+
We propose averaging for any optimizer similar to how ASGD performs it, as mentioned above.
diff --git a/doc/fluid/design/concurrent/channel.md b/doc/fluid/design/concurrent/channel.md
index a5cf17faa8bc42a49ba35c195b91da62ea24e6eb..df67438bcc741ac521b00ee962fc13c93db21182 100644
--- a/doc/fluid/design/concurrent/channel.md
+++ b/doc/fluid/design/concurrent/channel.md
@@ -114,13 +114,13 @@ current thread under two conditions:
#### Channel Send
-
+
#### Channel Receive
-
+
## Limitations and Considerations
diff --git a/doc/fluid/design/concurrent/concurrent_programming.md b/doc/fluid/design/concurrent/concurrent_programming.md
index 64602166065af28309d7a01fdeb7076a9b0a081a..1859f983e9133674e69ecd506d7683ea926b2b8f 100644
--- a/doc/fluid/design/concurrent/concurrent_programming.md
+++ b/doc/fluid/design/concurrent/concurrent_programming.md
@@ -23,21 +23,25 @@ The following table compares concepts in Fluid and Go
user-defined functions |
layers |
+ |
control-flow and built-in functions |
intrinsics/operators |
+ |
goroutines, channels |
class ThreadPool |
+ |
runtime |
class Executor |
+ |
diff --git a/doc/fluid/design/concurrent/select_op.md b/doc/fluid/design/concurrent/select_op.md
index 98dd94a2be0e6cf92d8832e380bce1e358a49ca3..4fcae57cc7932cdaebe549486e7f7cebf0bd038a 100644
--- a/doc/fluid/design/concurrent/select_op.md
+++ b/doc/fluid/design/concurrent/select_op.md
@@ -254,7 +254,7 @@ only one case will be executed.
### select_op flow
-
+
The select algorithm is inspired by golang's select routine. Please refer to
diff --git a/doc/fluid/design/dist_train/distributed_architecture.md b/doc/fluid/design/dist_train/distributed_architecture.md
index 3cd4750bcea7d8f0f759e49b9b2df4e98cd9b1f5..229cb47c17d633be6848bb35e58d33ec9b47ec3b 100644
--- a/doc/fluid/design/dist_train/distributed_architecture.md
+++ b/doc/fluid/design/dist_train/distributed_architecture.md
@@ -40,11 +40,11 @@ computation is only specified in Python code which sits outside of PaddlePaddle,
Similar to how a compiler uses an intermediate representation (IR) so that the programmer does not need to manually optimize their code for most of the cases, we can have an intermediate representation in PaddlePaddle as well. The compiler optimizes the IR as follows:
-
+
PaddlePaddle can support model parallelism by converting the IR so that the user no longer needs to manually perform the computation and operations in the Python component:
-
+
The IR for PaddlePaddle after refactoring is called a `Block`, it specifies the computation dependency graph and the variables used in the computation.
@@ -60,7 +60,7 @@ For a detailed explanation, refer to this document -
The revamped distributed training architecture can address the above discussed limitations. Below is the illustration of how it does so:
-
+
The major components are: *Python API*, *Distribute Transpiler* and *Remote Executor*.
@@ -152,7 +152,7 @@ for data in train_reader():
`JobDesc` object describe the distributed job resource specification to run on
Cluster environment.
-
+
`RemoteExecutor.run` sends the `ProgramDesc` and
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/unreleased-tpr/doc/autoscale/README.md#training-job-resource)
@@ -171,7 +171,7 @@ In the future, a more general placement algorithm should be implemented, which m
The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime:
-
+
### Training Data
diff --git a/doc/fluid/design/dist_train/multi_cpu.md b/doc/fluid/design/dist_train/multi_cpu.md
index 586612622a654403f07ce55bf7c7f35578af542f..38222d083084ebfca3099ce96b47868c42d55101 100644
--- a/doc/fluid/design/dist_train/multi_cpu.md
+++ b/doc/fluid/design/dist_train/multi_cpu.md
@@ -8,11 +8,11 @@ Op graph to a multi-CPU Op graph, and run `ParallelDo` Op to run the graph.
## Transpiler
-
+
After converted:
-
+
## Implement
diff --git a/doc/fluid/design/dist_train/parameter_server.md b/doc/fluid/design/dist_train/parameter_server.md
index 179b5f8c299189c69167012e5ec8a2df9ab2380e..73c85da5e89eee0ac7857a0b808bc64ae673fdad 100644
--- a/doc/fluid/design/dist_train/parameter_server.md
+++ b/doc/fluid/design/dist_train/parameter_server.md
@@ -41,11 +41,11 @@ We will need these OPs: *Send*, *Recv*, *Enqueue*, *Dequeue*.
Below is an example of converting the user defined graph to the
subgraphs for the trainer and the parameter server:
-
+
After converting:
-
+
1. The parameter variable W and its optimizer program are placed on the parameter server.
1. Operators are added to the program.
@@ -69,8 +69,7 @@ In Fluid, we introduce [SelectedRows](../selected_rows.md) to represent a list o
non-zero gradient data. So when we do parameter optimization both locally and remotely,
we only need to send those non-zero rows to the optimizer operators:
-
-
+
### Benefits
- Model parallelism becomes easier to implement: it is an extension to
diff --git a/doc/fluid/design/dynamic_rnn/rnn.md b/doc/fluid/design/dynamic_rnn/rnn.md
index 9a61cd788afc3234c8957ad8c740e470382aa5dd..7b61b050f640814d6949cf6847b431da53d59581 100644
--- a/doc/fluid/design/dynamic_rnn/rnn.md
+++ b/doc/fluid/design/dynamic_rnn/rnn.md
@@ -5,7 +5,7 @@ This document describes the RNN (Recurrent Neural Network) operator and how it i
## RNN Algorithm Implementation
-
+
The above diagram shows an RNN unrolled into a full network.
diff --git a/doc/fluid/design/modules/batch_norm_op.md b/doc/fluid/design/modules/batch_norm_op.md
index 211e060cc15327428900cd8eaf2fb74021701505..e451ffcc73b5de2b911e1c6de54b42a5d1d54c37 100644
--- a/doc/fluid/design/modules/batch_norm_op.md
+++ b/doc/fluid/design/modules/batch_norm_op.md
@@ -66,7 +66,7 @@ As most C++ operators do, `batch_norm_op` is defined by inputs, outputs, attribu
The following graph showes the training computational process of `batch_norm_op`:
-
+
cudnn provides APIs to finish the whole series of computation, we can use them in our GPU kernel.
@@ -124,7 +124,7 @@ for pass_id in range(PASS_NUM):
`is_infer` is an attribute. Once an operator is created, its attributes can not be changed. It suggests us that we shall maintain two `batch_norm_op` in the model, one's `is_infer` is `True`(we call it `infer_batch_norm_op`) and the other one's is `False`(we call it `train_batch_norm_op`). They share all parameters and variables, but be placed in two different branches. That is to say, if a network contains a `batch_norm_op`, it will fork into two branches, one go through `train_batch_norm_op` and the other one go through `infer_batch_norm_op`:
-
+
Just like what is shown in the above graph, the net forks before `batch_norm_op` and will never merge again. All the operators after `batch_norm_op` will duplicate.
diff --git a/doc/fluid/design/modules/regularization.md b/doc/fluid/design/modules/regularization.md
index ffc3199a84cb156173caca0d16643ea5194922f0..8cd5ff71d193f03e1ac923724b52f28c6057d25d 100644
--- a/doc/fluid/design/modules/regularization.md
+++ b/doc/fluid/design/modules/regularization.md
@@ -6,17 +6,17 @@ A central problem in machine learning is how to design an algorithm that will pe
### Parameter Norm Penalties
Most common regularization approaches in deep learning are based on limiting the capacity of the models by adding a parameter norm penalty to the objective function `J`. This is given as follows:
-
+
The parameter `alpha` is a hyperparameter that weights the relative contribution of the norm penalty term, `omega`, relative to the standard objective function `J`.
The most commonly used norm penalties are the L2 norm penalty and the L1 norm penalty. These are given as follows:
##### L2 Regularization:
-
+
##### L1 Regularization
-
+
A much more detailed mathematical background of regularization can be found [here](http://www.deeplearningbook.org/contents/regularization.html).
@@ -40,11 +40,11 @@ The idea of building ops for regularization is in sync with the refactored Paddl
Below is an example of a really simple feed forward neural network.
-
+
The Python API will modify this computation graph to add regularization operators. The modified computation graph will look as follows:
-
+
### Python API implementation for Regularization
diff --git a/doc/fluid/design/network/deep_speech_2.md b/doc/fluid/design/network/deep_speech_2.md
index d3906143d3eab2853507eaf8eae51e7777e7178b..f32a5b7e8a4d820319a666dab4c3129360e2c924 100644
--- a/doc/fluid/design/network/deep_speech_2.md
+++ b/doc/fluid/design/network/deep_speech_2.md
@@ -116,7 +116,7 @@ The classical DS2 network contains 15 layers (from bottom to top):
- **One** CTC-loss layer
-
+
Figure 1. Archetecture of Deep Speech 2 Network.
@@ -208,7 +208,7 @@ TODO by Assignees
### Beam Search with CTC and LM
-
+
Figure 2. Algorithm for CTC Beam Search Decoder.
diff --git a/doc/fluid/design/network/sequence_decoder.md b/doc/fluid/design/network/sequence_decoder.md
index a56c1b5bcacabe0d511e7e081baa9f0de4719ed5..f13d30ca9fe09c9525c711436f605bb280e11000 100644
--- a/doc/fluid/design/network/sequence_decoder.md
+++ b/doc/fluid/design/network/sequence_decoder.md
@@ -199,7 +199,7 @@ Packing the `selected_generation_scores` will get a `LoDTensor`, and each tail i
## LoD and shape changes during decoding
-
+
According to the image above, the only phase that changes the LoD is beam search.
diff --git a/doc/fluid/design/others/gan_api.md b/doc/fluid/design/others/gan_api.md
index 8cc79304700e61f2ce1b6ff6290d0133bb7fdbb3..7167470088766985fa5ad31657410309330fd725 100644
--- a/doc/fluid/design/others/gan_api.md
+++ b/doc/fluid/design/others/gan_api.md
@@ -7,14 +7,14 @@ It applies several important concepts in machine learning system design, includi
In our GAN design, we wrap it as a user-friendly easily customized python API to design different models. We take the conditional DC-GAN (Unsupervised Representation Learning with Deep Convolutional Generative Adversarial Networks [https://arxiv.org/abs/1511.06434]) as an example due to its good performance on image generation.
-
+
Figure 1. The overall running logic of GAN. The black solid arrows indicate the forward pass; the green dashed arrows indicate the backward pass of generator training; the red dashed arrows indicate the backward pass of the discriminator training. The BP pass of the green (red) arrow should only update the parameters in the green (red) boxes. The diamonds indicate the data providers. d\_loss and g\_loss marked in red and green are the two targets we would like to run.
The operators, layers and functions required/optional to build a GAN demo is summarized in https://github.com/PaddlePaddle/Paddle/issues/4563.
-
+
Figure 2. Photo borrowed from the original DC-GAN paper.
diff --git a/doc/fluid/dev/releasing_process.md b/doc/fluid/dev/releasing_process.md
index d459b54e09a782416ce218eba42e69de8378ef2f..c5943ccd81c2ae2aaacd2676da12509db889f54a 100644
--- a/doc/fluid/dev/releasing_process.md
+++ b/doc/fluid/dev/releasing_process.md
@@ -37,7 +37,7 @@ PaddlePaddle每次发新的版本,遵循以下流程:
可以在此页面的"Artifacts"下拉框中找到生成的3个二进制文件,分别对应CAPI,`cp27m`和`cp27mu`的版本。然后按照上述的方法
使用`twine`工具上传即可。
-
+
* 注:CI环境使用 https://github.com/PaddlePaddle/buildtools 这里的DockerImage作为编译环境以支持更多的Linux
发型版,如果需要手动编译,也可以使用这些镜像。这些镜像也可以从 https://hub.docker.com/r/paddlepaddle/paddle_manylinux_devel/tags/ 下载得到。
diff --git a/doc/fluid/howto/performance/profiler.md b/doc/fluid/howto/performance/profiler.md
index fe05534be7c473fcdbb15f59b05102980b940896..ee96e7c74ce317caddb387cbb1d4998937bd5c81 100644
--- a/doc/fluid/howto/performance/profiler.md
+++ b/doc/fluid/howto/performance/profiler.md
@@ -23,7 +23,7 @@ But how to record the time for the mixed C++ and CUDA program? There many C++ A
The overall flow is shown as the following figure.
-
+
### Event
diff --git a/paddle/fluid/framework/block_desc.h b/paddle/fluid/framework/block_desc.h
index 468423e0e8e7b8c9ebc14b7568c9c3bd21645ea7..873969b2a884f6d9e133fe87bf72725c36ce8b98 100644
--- a/paddle/fluid/framework/block_desc.h
+++ b/paddle/fluid/framework/block_desc.h
@@ -17,6 +17,7 @@ limitations under the License. */
#include
#include
#include
+#include
#include
#include
@@ -96,6 +97,8 @@ class BlockDesc {
*/
void RemoveOp(size_t s, size_t e);
+ void RemoveVar(const std::string &name) { vars_.erase(name); }
+
std::vector AllOps() const;
size_t OpSize() const { return ops_.size(); }
diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h
index dee505fee0dccd8d60bb290a8bec4df243e504a2..4f130d265900483ec7a7c541f2610d17a352913f 100644
--- a/paddle/fluid/framework/lod_tensor.h
+++ b/paddle/fluid/framework/lod_tensor.h
@@ -142,6 +142,7 @@ class LoDTensor : public Tensor {
return (lod_)[level].size() - 1;
}
+ // Split LoDTensor and copy to each place specified in places.
std::vector SplitLoDTensor(
const std::vector places) const;
diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc
index 17885143247f0e0db8f12931e3c3412e7114ef3d..7be93fa6002ae93c3e1b75c8f7fe5ca5f40b271f 100644
--- a/paddle/fluid/framework/parallel_executor.cc
+++ b/paddle/fluid/framework/parallel_executor.cc
@@ -150,13 +150,30 @@ void ParallelExecutor::BCastParamsToGPUs(
#endif
}
-void ParallelExecutor::Run(const std::vector &fetch_tensors,
- const std::string &fetched_var_name) {
+void ParallelExecutor::Run(
+ const std::vector &fetch_tensors,
+ const std::string &fetched_var_name,
+ const std::unordered_map &feed_tensors) {
platform::RecordBlock b(0);
+ SplitTensorToPlaces(feed_tensors);
auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable() =
fetch_data;
}
+void ParallelExecutor::SplitTensorToPlaces(
+ const std::unordered_map &feed_tensors) {
+ for (auto it : feed_tensors) {
+ auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
+ for (size_t j = 0; j < member_->places_.size(); ++j) {
+ // TODO(panxy0718): Do I need to delete this var?
+ member_->local_scopes_[j]
+ ->Var(it.first)
+ ->GetMutable()
+ ->ShareDataWith(lod_tensors[j]);
+ }
+ }
+}
+
} // namespace framework
} // namespace paddle
diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h
index 964b476234e622cae934d41bc3793bc3114a5f1a..c7c58b2b808383621a6d492f9188b0d36bfa6858 100644
--- a/paddle/fluid/framework/parallel_executor.h
+++ b/paddle/fluid/framework/parallel_executor.h
@@ -42,9 +42,13 @@ class ParallelExecutor {
bool allow_op_delay);
void Run(const std::vector& fetch_tensors,
- const std::string& fetched_var_name = "fetched_var");
+ const std::string& fetched_var_name,
+ const std::unordered_map& feed_tensors);
private:
+ void SplitTensorToPlaces(
+ const std::unordered_map& feed_tensors);
+
ParallelExecutorPrivate* member_;
void BCastParamsToGPUs(const ProgramDesc& startup_program) const;
diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt
index 9ed79453b962b8702a88cea888a860cd5d8d64d1..952ac8b1dcf9221d0e5718cc073f1e390176e5a2 100644
--- a/paddle/fluid/operators/CMakeLists.txt
+++ b/paddle/fluid/operators/CMakeLists.txt
@@ -193,6 +193,7 @@ if(WITH_DISTRIBUTE)
set_source_files_properties(send_vars_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(send_barrier_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
+ set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op listen_and_serv_op sum_op executor)
else()
set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op)
diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc
index 19bba46e3bd49a689fbe1d0c93efe01806fb0228..2e7bf1921a26fc88d854e4db2c501548695a136a 100644
--- a/paddle/fluid/operators/detail/grpc_server.cc
+++ b/paddle/fluid/operators/detail/grpc_server.cc
@@ -186,7 +186,8 @@ void AsyncGRPCServer::WaitClientGet(int count) {
void AsyncGRPCServer::RunSyncUpdate() {
::grpc::ServerBuilder builder;
- builder.AddListeningPort(address_, ::grpc::InsecureServerCredentials());
+ builder.AddListeningPort(address_, ::grpc::InsecureServerCredentials(),
+ &selected_port_);
builder.SetMaxSendMessageSize(std::numeric_limits::max());
builder.SetMaxReceiveMessageSize(std::numeric_limits::max());
builder.RegisterService(&service_);
@@ -196,7 +197,8 @@ void AsyncGRPCServer::RunSyncUpdate() {
cq_prefetch_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
- LOG(INFO) << "Server listening on " << address_ << std::endl;
+ LOG(INFO) << "Server listening on " << address_
+ << " selected port: " << selected_port_;
std::function send_register =
std::bind(&AsyncGRPCServer::TryToRegisterNewSendOne, this);
diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h
index 5b5033018c6aefdc165886fc4e9086ff0a7e9201..380447f47c142bdc16e60f78c4b2d94235ec5060 100644
--- a/paddle/fluid/operators/detail/grpc_server.h
+++ b/paddle/fluid/operators/detail/grpc_server.h
@@ -63,6 +63,8 @@ class AsyncGRPCServer final {
void SetExecutor(framework::Executor *executor) { executor_ = executor; }
+ int GetSelectedPort() { return selected_port_; }
+
const ReceivedMessage Get() { return this->var_recv_queue_.Pop(); }
void Push(const std::string &msg_name) {
@@ -111,6 +113,7 @@ class AsyncGRPCServer final {
int prefetch_blk_id_;
framework::ProgramDesc *program_;
framework::Executor *executor_;
+ int selected_port_;
};
}; // namespace detail
diff --git a/paddle/fluid/operators/fc_mkldnn_op.cc b/paddle/fluid/operators/fc_mkldnn_op.cc
index 9c704a2949f7100e0812eafe1e58ef04bf71f840..847b7b0c12e1679501dbe83d578b23ca2aef3e9e 100644
--- a/paddle/fluid/operators/fc_mkldnn_op.cc
+++ b/paddle/fluid/operators/fc_mkldnn_op.cc
@@ -27,8 +27,8 @@ template
class MKLDNNMD {
public:
explicit MKLDNNMD(const T* in, const T* w, bool bias)
- : in{paddle::framework::vectorize2int(in->dims())},
- w{paddle::framework::vectorize2int(w->dims())} {
+ : in(paddle::framework::vectorize2int(in->dims())),
+ w(paddle::framework::vectorize2int(w->dims())) {
with_bias_ = bias;
}
@@ -78,7 +78,7 @@ class MKLDNNMD {
class MKLDNNMemory {
public:
MKLDNNMemory(MKLDNNMD* t, const mkldnn::engine& e)
- : md_{t}, engine_{e} {}
+ : md_(t), engine_(e) {}
virtual ~MKLDNNMemory() = default;
template
diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc
index 91a1f226cd0c96f675bdd59dca809c43b0cedd4f..9188f2d989e601b7a97dedaf71f7080829cdb7c3 100644
--- a/paddle/fluid/operators/listen_and_serv_op.cc
+++ b/paddle/fluid/operators/listen_and_serv_op.cc
@@ -12,20 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
-#include
#include
+#include
-#include "paddle/fluid/framework/executor.h"
-#include "paddle/fluid/framework/lod_tensor.h"
-#include "paddle/fluid/framework/op_registry.h"
-#include "paddle/fluid/framework/threadpool.h"
-#include "paddle/fluid/operators/detail/grpc_server.h"
+#include "paddle/fluid/operators/listen_and_serv_op.h"
namespace paddle {
namespace operators {
-constexpr char kOptimizeBlock[] = "OptimizeBlock";
-
void RunServer(std::shared_ptr service) {
service->RunSyncUpdate();
VLOG(4) << "RunServer thread end";
@@ -66,143 +60,138 @@ static void ParallelExecuteBlocks(
for (size_t i = 0; i < fs.size(); ++i) fs[i].wait();
}
-class ListenAndServOp : public framework::OperatorBase {
- public:
- ListenAndServOp(const std::string &type,
- const framework::VariableNameMap &inputs,
- const framework::VariableNameMap &outputs,
- const framework::AttributeMap &attrs)
- : OperatorBase(type, inputs, outputs, attrs) {
- if (!rpc_service_) {
- std::string endpoint = Attr("endpoint");
- rpc_service_.reset(new detail::AsyncGRPCServer(endpoint));
- server_thread_.reset(new std::thread(RunServer, rpc_service_));
- }
- }
+ListenAndServOp::ListenAndServOp(const std::string &type,
+ const framework::VariableNameMap &inputs,
+ const framework::VariableNameMap &outputs,
+ const framework::AttributeMap &attrs)
+ : OperatorBase(type, inputs, outputs, attrs) {}
- void Stop() override {
- rpc_service_->Push(LISTEN_TERMINATE_MESSAGE);
- server_thread_->join();
+int ListenAndServOp::GetSelectedPort() {
+ return rpc_service_->GetSelectedPort();
+}
+
+void ListenAndServOp::Stop() {
+ rpc_service_->Push(LISTEN_TERMINATE_MESSAGE);
+ server_thread_->join();
+}
+
+void ListenAndServOp::RunImpl(const framework::Scope &scope,
+ const platform::Place &dev_place) const {
+ platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
+ auto &dev_ctx = *pool.Get(dev_place);
+ framework::Scope &recv_scope = scope.NewScope();
+
+ if (!rpc_service_) {
+ std::string endpoint = Attr("endpoint");
+ rpc_service_.reset(new detail::AsyncGRPCServer(endpoint));
}
- void RunImpl(const framework::Scope &scope,
- const platform::Place &dev_place) const override {
- platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
- auto &dev_ctx = *pool.Get(dev_place);
- framework::Scope &recv_scope = scope.NewScope();
-
- // FIXME(Yancey1989): initialize rpc server with lazy mode.
- rpc_service_->SetScope(&recv_scope);
- rpc_service_->SetDevCtx(&dev_ctx);
- auto ins = Inputs("X");
- auto fan_in = Attr("Fanin");
-
- auto *block = Attr(kOptimizeBlock);
- auto *program = block->Program();
- size_t num_blocks = program->Size();
- PADDLE_ENFORCE_GE(num_blocks, 2,
- "server program should have at least 2 blocks");
-
- framework::Executor executor(dev_place);
- std::vector block_list;
- for (size_t blkid = 1; blkid < num_blocks; ++blkid)
- block_list.push_back(blkid);
- auto prepared = executor.Prepare(*program, block_list);
- prepared.insert(
- prepared.begin(),
- std::shared_ptr(nullptr));
-
- // TODO(qiao) set proper fields for table lookup and update
- rpc_service_->SetExecutor(&executor);
- rpc_service_->SetPrefetchBlkdId(0);
- rpc_service_->SetProgram(program);
-
- // TODO(typhoonzero): change this to a while_op for every cluster-batch.
- bool exit_flag = false;
- // Record received sparse variables, so that
- // we could reset those after execute optimize program
- std::vector sparse_vars;
- while (!exit_flag) {
- // Get from multiple trainers, we don't care about the order in which
- // the gradients arrives, just add suffix 0~n and merge the gradient.
- rpc_service_->SetCond(0);
- size_t recv_var_cnt = 0;
- int batch_barrier = 0;
- while (batch_barrier != fan_in) {
- const detail::ReceivedMessage v = rpc_service_->Get();
- auto recv_var_name = v.first;
- if (recv_var_name == LISTEN_TERMINATE_MESSAGE) {
- LOG(INFO) << "received terminate message and exit";
- exit_flag = true;
- break;
- } else if (recv_var_name == BATCH_BARRIER_MESSAGE) {
- VLOG(3) << "recv batch barrier message";
- batch_barrier++;
- continue;
- } else {
- VLOG(3) << "received grad: " << recv_var_name;
- recv_var_cnt++;
- auto var = v.second->GetVar();
- if (var == nullptr) {
- LOG(ERROR) << "Can not find server side var: " << recv_var_name;
- PADDLE_THROW("Can not find server side var");
- }
- if (var->IsType()) {
- sparse_vars.push_back(var);
- }
- }
- }
- if (exit_flag) {
- rpc_service_->SetCond(1);
- rpc_service_->ShutDown();
+ auto ins = Inputs("X");
+ auto fan_in = Attr("Fanin");
+ auto *block = Attr(kOptimizeBlock);
+ auto *program = block->Program();
+ size_t num_blocks = program->Size();
+ PADDLE_ENFORCE_GE(num_blocks, 2,
+ "server program should have at least 2 blocks");
+
+ framework::Executor executor(dev_place);
+ std::vector block_list;
+ for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
+ block_list.push_back(blkid);
+ }
+ auto prepared = executor.Prepare(*program, block_list);
+ // Insert placeholder for block0 which holds current op itself.
+ prepared.insert(prepared.begin(),
+ std::shared_ptr(nullptr));
+
+ rpc_service_->SetScope(&recv_scope);
+ rpc_service_->SetDevCtx(&dev_ctx);
+ // TODO(qiao) set proper fields for table lookup and update
+ rpc_service_->SetExecutor(&executor);
+ rpc_service_->SetPrefetchBlkdId(0);
+ rpc_service_->SetProgram(program);
+ // start the server listening after all member initialized.
+ server_thread_.reset(new std::thread(RunServer, rpc_service_));
+ // FIXME(typhoonzero): do we need to wait until the server port is ready?
+ sleep(5);
+
+ // TODO(typhoonzero): change this to a while_op for every cluster-batch.
+ bool exit_flag = false;
+ // Record received sparse variables, so that
+ // we could reset those after execute optimize program
+ std::vector sparse_vars;
+ while (!exit_flag) {
+ // Get from multiple trainers, we don't care about the order in which
+ // the gradients arrives, just add suffix 0~n and merge the gradient.
+ rpc_service_->SetCond(0);
+ size_t recv_var_cnt = 0;
+ int batch_barrier = 0;
+ while (batch_barrier != fan_in) {
+ const detail::ReceivedMessage v = rpc_service_->Get();
+ auto recv_var_name = v.first;
+ if (recv_var_name == LISTEN_TERMINATE_MESSAGE) {
+ LOG(INFO) << "received terminate message and exit";
+ exit_flag = true;
break;
- }
-
- // NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads
- // and this will still work.
-
- // The optimize blocks which have the same parent ID would run parallel
- // TODO(Yancey1989): need to use ParallelExecutor for future
- int32_t last_parent_blkid = program->Block(1).Parent();
- std::vector parallel_blkids;
- parallel_blkids.push_back(1);
- double ts = detail::GetTimestamp();
- for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
- if (program->Block(blkid).Parent() != last_parent_blkid) {
- for (size_t idx : parallel_blkids) VLOG(3) << idx;
- ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
- &recv_scope);
- parallel_blkids.clear();
- last_parent_blkid = program->Block(blkid).Parent();
+ } else if (recv_var_name == BATCH_BARRIER_MESSAGE) {
+ VLOG(3) << "recv batch barrier message";
+ batch_barrier++;
+ continue;
+ } else {
+ VLOG(3) << "received grad: " << recv_var_name;
+ recv_var_cnt++;
+ auto var = v.second->GetVar();
+ if (var == nullptr) {
+ LOG(ERROR) << "Can not find server side var: " << recv_var_name;
+ PADDLE_THROW("Can not find server side var");
+ }
+ if (var->IsType()) {
+ sparse_vars.push_back(var);
}
- parallel_blkids.push_back(blkid);
- }
- ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
- &recv_scope);
-
- VLOG(3) << "run all blocks spent " << detail::GetTimestamp() - ts
- << "(ms)";
-
- // Reset the received sparse variables, the sum operator would not
- // sum the input sparse variables which rows is empty at the next
- // mini-batch.
- // TODO(Yancey1989): move the reset action into an operator, we couldn't
- // have any hide logic in the operator.
- for (auto &var : sparse_vars) {
- var->GetMutable()->mutable_rows()->clear();
}
+ }
+ if (exit_flag) {
rpc_service_->SetCond(1);
- // NOTE: does not consider barrier request retry in here, we may use
- // global barrier id to resolve this.
- rpc_service_->WaitClientGet(fan_in);
- sparse_vars.clear();
- } // while(true)
- }
+ rpc_service_->ShutDown();
+ break;
+ }
- protected:
- std::shared_ptr rpc_service_;
- std::shared_ptr server_thread_;
-};
+ // NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads
+ // and this will still work.
+
+ // The optimize blocks which have the same parent ID would run parallel
+ // TODO(Yancey1989): need to use ParallelExecutor for future
+ int32_t last_parent_blkid = program->Block(1).Parent();
+ std::vector parallel_blkids;
+ parallel_blkids.push_back(1);
+ double ts = detail::GetTimestamp();
+ for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
+ if (program->Block(blkid).Parent() != last_parent_blkid) {
+ ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
+ &recv_scope);
+ parallel_blkids.clear();
+ last_parent_blkid = program->Block(blkid).Parent();
+ }
+ parallel_blkids.push_back(blkid);
+ }
+ ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
+ &recv_scope);
+ VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
+
+ // Reset the received sparse variables, the sum operator would not
+ // sum the input sparse variables which rows is empty at the next
+ // mini-batch.
+ // TODO(Yancey1989): move the reset action into an operator, we couldn't
+ // have any hide logic in the operator.
+ for (auto &var : sparse_vars) {
+ var->GetMutable()->mutable_rows()->clear();
+ }
+ rpc_service_->SetCond(1);
+ // FIXME(typhoonzero): use another condition to sync wait clients get.
+ rpc_service_->WaitClientGet(fan_in);
+ sparse_vars.clear();
+ } // while(true)
+}
class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
public:
diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h
new file mode 100644
index 0000000000000000000000000000000000000000..0da87afc961e896f04b4f0028bf9b17d5e992548
--- /dev/null
+++ b/paddle/fluid/operators/listen_and_serv_op.h
@@ -0,0 +1,53 @@
+/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License. */
+
+#pragma once
+
+#include
+#include
+
+#include "paddle/fluid/framework/executor.h"
+#include "paddle/fluid/framework/lod_tensor.h"
+#include "paddle/fluid/framework/op_registry.h"
+#include "paddle/fluid/framework/threadpool.h"
+#include "paddle/fluid/operators/detail/grpc_server.h"
+
+namespace paddle {
+namespace operators {
+
+constexpr char kOptimizeBlock[] = "OptimizeBlock";
+
+void RunServer(std::shared_ptr service);
+
+class ListenAndServOp : public framework::OperatorBase {
+ public:
+ ListenAndServOp(const std::string &type,
+ const framework::VariableNameMap &inputs,
+ const framework::VariableNameMap &outputs,
+ const framework::AttributeMap &attrs);
+
+ int GetSelectedPort();
+
+ void Stop() override;
+
+ void RunImpl(const framework::Scope &scope,
+ const platform::Place &dev_place) const override;
+
+ protected:
+ mutable std::shared_ptr rpc_service_;
+ mutable std::shared_ptr server_thread_;
+};
+
+} // namespace operators
+} // namespace paddle
diff --git a/paddle/fluid/operators/prior_box_op.cc b/paddle/fluid/operators/prior_box_op.cc
index c22a55bce263423d5c17fffdb06b7ece02ae26da..82e54139c8c1f42b1d8f74811a6793ec5c66473e 100644
--- a/paddle/fluid/operators/prior_box_op.cc
+++ b/paddle/fluid/operators/prior_box_op.cc
@@ -73,7 +73,7 @@ class PriorBoxOp : public framework::OperatorWithKernel {
const framework::ExecutionContext& ctx) const override {
return framework::OpKernelType(
framework::ToDataType(ctx.Input("Input")->type()),
- platform::CPUPlace());
+ ctx.device_context());
}
};
@@ -171,6 +171,5 @@ namespace ops = paddle::operators;
REGISTER_OPERATOR(prior_box, ops::PriorBoxOp, ops::PriorBoxOpMaker,
paddle::framework::EmptyGradOpMaker);
-REGISTER_OP_CPU_KERNEL(
- prior_box, ops::PriorBoxOpKernel,
- ops::PriorBoxOpKernel);
+REGISTER_OP_CPU_KERNEL(prior_box, ops::PriorBoxOpKernel,
+ ops::PriorBoxOpKernel);
diff --git a/paddle/fluid/operators/prior_box_op.cu b/paddle/fluid/operators/prior_box_op.cu
new file mode 100644
index 0000000000000000000000000000000000000000..76bf2b3b7de7a24c80e927c16199f89c5b7fb794
--- /dev/null
+++ b/paddle/fluid/operators/prior_box_op.cu
@@ -0,0 +1,167 @@
+/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License. */
+
+#include "paddle/fluid/operators/prior_box_op.h"
+
+namespace paddle {
+namespace operators {
+
+template
+__device__ inline T clip(T in) {
+ return min(max(in, 0.), 1.);
+}
+
+template
+__global__ void GenPriorBox(T* out, const T* aspect_ratios, const int height,
+ const int width, const int im_height,
+ const int im_width, const int as_num,
+ const T offset, const T step_width,
+ const T step_height, const T* min_sizes,
+ const T* max_sizes, const int min_num,
+ bool is_clip) {
+ int num_priors = max_sizes ? as_num * min_num + min_num : as_num * min_num;
+ int box_num = height * width * num_priors;
+ for (int i = blockIdx.x * blockDim.x + threadIdx.x; i < box_num;
+ i += blockDim.x * gridDim.x) {
+ int h = i / (num_priors * width);
+ int w = (i / num_priors) % width;
+ int p = i % num_priors;
+ int m = max_sizes ? p / (as_num + 1) : p / as_num;
+ T cx = (w + offset) * step_width;
+ T cy = (h + offset) * step_height;
+ T bw, bh;
+ T min_size = min_sizes[m];
+ if (max_sizes) {
+ int s = p % (as_num + 1);
+ if (s < as_num) {
+ T ar = aspect_ratios[s];
+ bw = min_size * sqrt(ar) / 2.;
+ bh = min_size / sqrt(ar) / 2.;
+ } else {
+ T max_size = max_sizes[m];
+ bw = sqrt(min_size * max_size) / 2.;
+ bh = bw;
+ }
+ } else {
+ int s = p % as_num;
+ T ar = aspect_ratios[s];
+ bw = min_size * sqrt(ar) / 2.;
+ bh = min_size / sqrt(ar) / 2.;
+ }
+ T xmin = (cx - bw) / im_width;
+ T ymin = (cy - bh) / im_height;
+ T xmax = (cx + bw) / im_width;
+ T ymax = (cy + bh) / im_height;
+ out[i * 4] = is_clip ? clip(xmin) : xmin;
+ out[i * 4 + 1] = is_clip ? clip(ymin) : ymin;
+ out[i * 4 + 2] = is_clip ? clip(xmax) : xmax;
+ out[i * 4 + 3] = is_clip ? clip(ymax) : ymax;
+ }
+}
+
+template
+__global__ void SetVariance(T* out, const T* var, const int vnum,
+ const int num) {
+ for (int i = blockIdx.x * blockDim.x + threadIdx.x; i < num;
+ i += blockDim.x * gridDim.x) {
+ out[i] = var[i % vnum];
+ }
+}
+
+template
+class PriorBoxOpCUDAKernel : public framework::OpKernel {
+ public:
+ void Compute(const framework::ExecutionContext& ctx) const override {
+ auto* input = ctx.Input("Input");
+ auto* image = ctx.Input("Image");
+ auto* boxes = ctx.Output("Boxes");
+ auto* vars = ctx.Output("Variances");
+
+ auto min_sizes = ctx.Attr>("min_sizes");
+ auto max_sizes = ctx.Attr>("max_sizes");
+ auto input_aspect_ratio = ctx.Attr>("aspect_ratios");
+ auto variances = ctx.Attr>("variances");
+ auto flip = ctx.Attr("flip");
+ auto clip = ctx.Attr("clip");
+
+ std::vector aspect_ratios;
+ ExpandAspectRatios(input_aspect_ratio, flip, aspect_ratios);
+
+ T step_w = static_cast(ctx.Attr("step_w"));
+ T step_h = static_cast(ctx.Attr("step_h"));
+ T offset = static_cast(ctx.Attr("offset"));
+
+ auto im_width = image->dims()[3];
+ auto im_height = image->dims()[2];
+
+ auto width = input->dims()[3];
+ auto height = input->dims()[2];
+
+ T step_width, step_height;
+ if (step_w == 0 || step_h == 0) {
+ step_width = static_cast(im_width) / width;
+ step_height = static_cast(im_height) / height;
+ } else {
+ step_width = step_w;
+ step_height = step_h;
+ }
+
+ int num_priors = aspect_ratios.size() * min_sizes.size();
+ if (max_sizes.size() > 0) {
+ num_priors += max_sizes.size();
+ }
+ int min_num = static_cast(min_sizes.size());
+ int box_num = width * height * num_priors;
+
+ int block = 512;
+ int grid = (box_num + block - 1) / block;
+
+ auto stream =
+ ctx.template device_context().stream();
+
+ boxes->mutable_data(ctx.GetPlace());
+ vars->mutable_data(ctx.GetPlace());
+
+ framework::Tensor r;
+ framework::TensorFromVector(aspect_ratios, ctx.device_context(), &r);
+
+ framework::Tensor min;
+ framework::TensorFromVector(min_sizes, ctx.device_context(), &min);
+
+ T* max_data = nullptr;
+ framework::Tensor max;
+ if (max_sizes.size() > 0) {
+ framework::TensorFromVector(max_sizes, ctx.device_context(), &max);
+ max_data = max.data();
+ }
+
+ GenPriorBox<<>>(
+ boxes->data(), r.data(), height, width, im_height, im_width,
+ aspect_ratios.size(), offset, step_width, step_height, min.data(),
+ max_data, min_num, clip);
+
+ framework::Tensor v;
+ framework::TensorFromVector(variances, ctx.device_context(), &v);
+ grid = (box_num * 4 + block - 1) / block;
+ SetVariance<<>>(vars->data(), v.data(),
+ variances.size(), box_num * 4);
+ }
+}; // namespace operators
+
+} // namespace operators
+} // namespace paddle
+
+namespace ops = paddle::operators;
+REGISTER_OP_CUDA_KERNEL(prior_box, ops::PriorBoxOpCUDAKernel,
+ ops::PriorBoxOpCUDAKernel);
diff --git a/paddle/fluid/operators/prior_box_op.h b/paddle/fluid/operators/prior_box_op.h
index 18bb2deb6b5acf626dfb2883a5771d9d195d45c0..1e4a12aac1c5f1c3b7e2e1bc83170de9ad590fc3 100644
--- a/paddle/fluid/operators/prior_box_op.h
+++ b/paddle/fluid/operators/prior_box_op.h
@@ -51,7 +51,7 @@ struct ClipFunctor {
}
};
-template
+template
class PriorBoxOpKernel : public framework::OpKernel {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
@@ -106,49 +106,24 @@ class PriorBoxOpKernel : public framework::OpKernel {
int idx = 0;
for (size_t s = 0; s < min_sizes.size(); ++s) {
auto min_size = min_sizes[s];
- // first prior: aspect_ratio = 1, size = min_size
- box_width = box_height = min_size / 2.;
- // xmin
- e_boxes(h, w, idx, 0) = (center_x - box_width) / img_width;
- // ymin
- e_boxes(h, w, idx, 1) = (center_y - box_height) / img_height;
- // xmax
- e_boxes(h, w, idx, 2) = (center_x + box_width) / img_width;
- // ymax
- e_boxes(h, w, idx, 3) = (center_y + box_height) / img_height;
-
- idx++;
- if (max_sizes.size() > 0) {
- auto max_size = max_sizes[s];
- // second prior: aspect_ratio = 1,
- // size = sqrt(min_size * max_size)
- box_width = box_height = sqrt(min_size * max_size) / 2.;
- // xmin
+ // priors with different aspect ratios
+ for (size_t r = 0; r < aspect_ratios.size(); ++r) {
+ float ar = aspect_ratios[r];
+ box_width = min_size * sqrt(ar) / 2.;
+ box_height = min_size / sqrt(ar) / 2.;
e_boxes(h, w, idx, 0) = (center_x - box_width) / img_width;
- // ymin
e_boxes(h, w, idx, 1) = (center_y - box_height) / img_height;
- // xmax
e_boxes(h, w, idx, 2) = (center_x + box_width) / img_width;
- // ymax
e_boxes(h, w, idx, 3) = (center_y + box_height) / img_height;
idx++;
}
-
- // rest of priors
- for (size_t r = 0; r < aspect_ratios.size(); ++r) {
- float ar = aspect_ratios[r];
- if (fabs(ar - 1.) < 1e-6) {
- continue;
- }
- box_width = min_size * sqrt(ar) / 2.;
- box_height = min_size / sqrt(ar) / 2.;
- // xmin
+ if (max_sizes.size() > 0) {
+ auto max_size = max_sizes[s];
+ // square prior with size sqrt(minSize * maxSize)
+ box_width = box_height = sqrt(min_size * max_size) / 2.;
e_boxes(h, w, idx, 0) = (center_x - box_width) / img_width;
- // ymin
e_boxes(h, w, idx, 1) = (center_y - box_height) / img_height;
- // xmax
e_boxes(h, w, idx, 2) = (center_x + box_width) / img_width;
- // ymax
e_boxes(h, w, idx, 3) = (center_y + box_height) / img_height;
idx++;
}
diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc
index 04392b3e05fa2d8b602946ba03672bf2491dcfbc..542bc3fde2a3616807eea560be85fb42026d5825 100644
--- a/paddle/fluid/operators/send_recv_op_test.cc
+++ b/paddle/fluid/operators/send_recv_op_test.cc
@@ -20,6 +20,7 @@ limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
+#include "paddle/fluid/operators/listen_and_serv_op.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
#include "paddle/fluid/string/printf.h"
@@ -34,6 +35,7 @@ namespace m = paddle::operators::math;
// global for simplicity.
std::unique_ptr listen_and_serv_op;
+int selected_port;
void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) {
p::CPUDeviceContext ctx(place);
@@ -128,14 +130,16 @@ void StartServerNet(bool is_sparse) {
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block);
f::AttributeMap attrs;
- attrs.insert({"endpoint", std::string("127.0.0.1:6174")});
+ attrs.insert({"endpoint", std::string("127.0.0.1:0")});
attrs.insert({"Fanin", 1});
attrs.insert({"ParamList", std::vector({"Out"})});
attrs.insert({"GradList", std::vector({"x1"})});
attrs.insert({"OptimizeBlock", optimize_block});
listen_and_serv_op =
f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs);
+ LOG(INFO) << "selected port before run " << selected_port;
listen_and_serv_op->Run(scope, place);
+ LOG(INFO) << "server exit";
}
TEST(SendRecvOp, CPUDense) {
@@ -149,12 +153,19 @@ TEST(SendRecvOp, CPUDense) {
scope.Var("RPC_CLIENT_VAR");
f::AttributeMap attrs;
- attrs.insert({"endpoints", std::vector({"127.0.0.1:6174"})});
- attrs.insert({"epmap", std::vector({"127.0.0.1:6174"})});
+ selected_port = static_cast(
+ listen_and_serv_op.get())
+ ->GetSelectedPort();
+ LOG(INFO) << "selected port " << selected_port;
+ std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
+ attrs.insert({"endpoints", std::vector({endpoint})});
+ attrs.insert({"epmap", std::vector({endpoint})});
auto send_op = f::OpRegistry::CreateOp(
"send", {{"X", {"x1"}}},
{{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs);
+ LOG(INFO) << "before run " << endpoint;
send_op->Run(scope, place);
+ LOG(INFO) << "end run";
auto in_var = scope.Var("x1");
auto tensor = in_var->GetMutable();
@@ -167,6 +178,7 @@ TEST(SendRecvOp, CPUDense) {
for (int64_t i = 0; i < target->numel(); ++i) {
EXPECT_EQ(expected[i] * 2, actual[i]);
}
+ LOG(INFO) << "before stop";
listen_and_serv_op->Stop();
server_thread.join();
listen_and_serv_op.reset(nullptr);
@@ -182,8 +194,13 @@ TEST(SendRecvOp, CPUSparse) {
InitSelectedRowsInScope(scope, place);
scope.Var("RPC_CLIENT_VAR");
f::AttributeMap attrs;
- attrs.insert({"endpoints", std::vector({"127.0.0.1:6174"})});
- attrs.insert({"epmap", std::vector({"127.0.0.1:6174"})});
+ selected_port = static_cast(
+ listen_and_serv_op.get())
+ ->GetSelectedPort();
+ LOG(INFO) << "selected port " << selected_port;
+ std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
+ attrs.insert({"endpoints", std::vector({endpoint})});
+ attrs.insert({"epmap", std::vector({endpoint})});
auto send_op = f::OpRegistry::CreateOp(
"send", {{"X", {"x1"}}},
{{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs);
diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc
index 45a64f43846e79c27295e52c59dca6bdfaa120a3..985984983a2239f6961bf519bae27fcbb9e7d6d3 100644
--- a/paddle/fluid/pybind/protobuf.cc
+++ b/paddle/fluid/pybind/protobuf.cc
@@ -15,6 +15,8 @@ limitations under the License. */
#include "paddle/fluid/pybind/protobuf.h"
#include
#include
+#include
+#include
#include "paddle/fluid/framework/backward.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/op_desc.h"
@@ -98,7 +100,7 @@ namespace pybind {
using namespace paddle::framework; // NOLINT
template
-static py::bytes SerializeMessage(T &self) {
+static py::bytes SerializeMessage(T &self) { // NOLINT
// Check IsInitialized in Python
std::string retv;
PADDLE_ENFORCE(self.Proto()->SerializePartialToString(&retv),
@@ -107,7 +109,7 @@ static py::bytes SerializeMessage(T &self) {
}
// Bind Methods
-void BindProgramDesc(py::module &m) {
+void BindProgramDesc(py::module &m) { // NOLINT
py::class_(m, "ProgramDesc", "")
.def(py::init<>())
.def("__init__",
@@ -151,7 +153,7 @@ void BindProgramDesc(py::module &m) {
});
}
-void BindBlockDesc(py::module &m) {
+void BindBlockDesc(py::module &m) { // NOLINT
py::class_(m, "BlockDesc", "")
.def_property_readonly("id", &BlockDesc::ID)
.def_property_readonly("parent", &BlockDesc::Parent)
@@ -200,13 +202,19 @@ void BindBlockDesc(py::module &m) {
return self.FindVarRecursive(name);
},
py::return_value_policy::reference)
+ .def("remove_var",
+ [](BlockDesc &self, py::bytes byte_name) {
+ std::string name = byte_name;
+ return self.RemoveVar(name);
+ },
+ py::return_value_policy::reference)
.def("all_vars", &BlockDesc::AllVars, py::return_value_policy::reference)
.def("op_size", &BlockDesc::OpSize)
.def("op", &BlockDesc::Op, py::return_value_policy::reference)
.def("serialize_to_string", SerializeMessage);
}
-void BindVarDsec(py::module &m) {
+void BindVarDsec(py::module &m) { // NOLINT
py::class_ var_desc(m, "VarDesc", "");
var_desc
.def("name",
@@ -257,7 +265,7 @@ void BindVarDsec(py::module &m) {
.value("RAW", proto::VarType::RAW);
}
-void BindOpDesc(py::module &m) {
+void BindOpDesc(py::module &m) { // NOLINT
py::enum_(m, "AttrType", "")
.value("INT", proto::AttrType::INT)
.value("INTS", proto::AttrType::INTS)
diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py
index 9311fc9904eb730aa56e94a4e45a1479a67df641..31bedb592f1a801cbf5c78f5ba4f06ba569f9494 100644
--- a/python/paddle/fluid/distribute_transpiler.py
+++ b/python/paddle/fluid/distribute_transpiler.py
@@ -408,11 +408,7 @@ class DistributeTranspiler:
pserver_vars = pserver_program.global_block().vars
created_var_map = dict()
for _, var in pserver_vars.iteritems():
- tmpvar = s_prog.global_block().create_var(
- name=var.name,
- persistable=var.persistable,
- dtype=var.dtype,
- shape=var.shape)
+ tmpvar = s_prog.global_block().clone_variable(var)
created_var_map[var.name] = tmpvar
# 2. rename op outputs
@@ -708,11 +704,7 @@ class DistributeTranspiler:
varlist = [varlist]
for var in varlist:
- program.global_block().create_var(
- name=var.name,
- persistable=var.persistable,
- dtype=var.dtype,
- shape=var.shape)
+ program.global_block().clone_variable(var)
optimize_block.append_op(
type=opt_op.type,
diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py
index e15456bfc0835066e3c899aea7e2cf642b4797d8..39d4017861f4d2ac2e8e85c3d70440a43e6cdc71 100644
--- a/python/paddle/fluid/framework.py
+++ b/python/paddle/fluid/framework.py
@@ -946,13 +946,20 @@ class Block(object):
The new variable cloned from 'var' in current block.
"""
assert isinstance(var, Variable)
- return self.create_var(
- name=var.name,
- shape=var.shape,
- dtype=var.dtype,
- type=var.type,
- lod_level=var.lod_level,
- persistable=True)
+ ret_var = None
+ # make STEP_SCOPES var can be safely cloned.
+ if var.type == core.VarDesc.VarType.STEP_SCOPES:
+ ret_var = self.create_var(
+ name=var.name, persistable=var.persistable, type=var.type)
+ else:
+ ret_var = self.create_var(
+ name=var.name,
+ shape=var.shape,
+ dtype=var.dtype,
+ type=var.type,
+ lod_level=var.lod_level,
+ persistable=True)
+ return ret_var
class Program(object):
diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py
index a2c830b3c943b114f3024f23f73f78bf87e1da34..1b3ba414ecb50cc4d75dcaecd1f31265334c9aec 100644
--- a/python/paddle/fluid/parallel_executor.py
+++ b/python/paddle/fluid/parallel_executor.py
@@ -26,25 +26,29 @@ class ParallelExecutor(object):
use_cuda,
num_threads=None,
allow_op_delay=False):
- places = []
+ self._places = []
+ self._act_places = []
if use_cuda:
for i in xrange(core.get_cuda_device_count()):
p = core.Place()
- p.set_place(core.CUDAPlace(i))
- places.append(p)
+ self._act_places.append(core.CUDAPlace(i))
+ p.set_place(self._act_places[-1])
+ self._places.append(p)
else:
for i in xrange(multiprocessing.cpu_count()):
p = core.Place()
- p.set_place(core.CPUPlace())
- places.append(p)
+ self._act_places.append(core.CPUPlace(i))
+ p.set_place(self._act_places[-1])
+ self._places.append(p)
+ assert self._places, "no place for execution"
if num_threads is None:
if use_cuda:
# Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future.
- num_threads = len(places)
+ num_threads = len(self._places)
else:
- min(len(places) * 2, multiprocessing.cpu_count())
+ min(len(self._places) * 2, multiprocessing.cpu_count())
startup = framework.default_startup_program()
main = framework.default_main_program()
@@ -53,7 +57,7 @@ class ParallelExecutor(object):
self.executor = core.ParallelExecutor(
num_threads,
True if use_cuda else False, # use_event
- places,
+ self._places,
set([
p.name for p in main.global_block().iter_parameters()
if not p.stop_gradient
@@ -65,8 +69,25 @@ class ParallelExecutor(object):
allow_op_delay)
self.scope = scope
- def run(self, fetch_list):
+ def run(self, fetch_list, feed_dict={}):
+ """
+ :param fetch_list: A list of variable names that will be fetched.
+ :param feed_dict: A dict mapping for feed variable name to LoDTensor
+ or numpy array.
+ :return: fetched value list.
+ """
+ if not isinstance(feed_dict, dict):
+ raise TypeError("feed_dict should be a dict")
+
+ feed_tensor_dict = {}
+ for i, feed_name in enumerate(feed_dict):
+ feed_tensor = feed_dict[feed_name]
+ if not isinstance(feed_tensor, core.LoDTensor):
+ feed_tensor = core.LoDTensor()
+ feed_tensor.set(feed_dict[feed_name], self._act_places[0])
+ feed_tensor_dict[feed_name] = feed_tensor
+
fetch_var_name = '@FETCHED_VAR_NAME@'
- self.executor.run(fetch_list, fetch_var_name)
+ self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
return [arr[i] for i in range(len(arr))]
diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py
index a79e4b3e183eaef06be27a724893799923e84ac1..0cbef82e33c306e2b89ea4f3d66d48da3840ccbd 100644
--- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py
+++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py
@@ -21,13 +21,17 @@ import paddle.dataset.mnist as mnist
import paddle.dataset.wmt16 as wmt16
-def simple_fc_net():
- reader = fluid.layers.open_recordio_file(
- filename='./mnist.recordio',
- shapes=[[-1, 784], [-1, 1]],
- lod_levels=[0, 0],
- dtypes=['float32', 'int64'])
- img, label = fluid.layers.read_file(reader)
+def simple_fc_net(use_feed):
+ if use_feed:
+ img = fluid.layers.data(name='image', shape=[784], dtype='float32')
+ label = fluid.layers.data(name='label', shape=[1], dtype='int64')
+ else:
+ reader = fluid.layers.open_recordio_file(
+ filename='./mnist.recordio',
+ shapes=[[-1, 784], [-1, 1]],
+ lod_levels=[0, 0],
+ dtypes=['float32', 'int64'])
+ img, label = fluid.layers.read_file(reader)
hidden = img
for _ in xrange(4):
hidden = fluid.layers.fc(
@@ -42,13 +46,18 @@ def simple_fc_net():
return loss
-def fc_with_batchnorm():
- reader = fluid.layers.open_recordio_file(
- filename='./mnist.recordio',
- shapes=[[-1, 784], [-1, 1]],
- lod_levels=[0, 0],
- dtypes=['float32', 'int64'])
- img, label = fluid.layers.read_file(reader)
+def fc_with_batchnorm(use_feed):
+ if use_feed:
+ img = fluid.layers.data(name='image', shape=[784], dtype='float32')
+ label = fluid.layers.data(name='label', shape=[1], dtype='int64')
+ else:
+ reader = fluid.layers.open_recordio_file(
+ filename='./mnist.recordio',
+ shapes=[[-1, 784], [-1, 1]],
+ lod_levels=[0, 0],
+ dtypes=['float32', 'int64'])
+ img, label = fluid.layers.read_file(reader)
+
hidden = img
for _ in xrange(1):
hidden = fluid.layers.fc(
@@ -135,7 +144,9 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio):
return fluid.layers.elementwise_add(x=short, y=scale, act='relu')
-def SE_ResNeXt152Small(batch_size=2):
+def SE_ResNeXt152Small(batch_size=2, use_feed=False):
+ assert not use_feed, "SE_ResNeXt doesn't support feed yet"
+
img = fluid.layers.fill_constant(
shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0)
label = fluid.layers.fill_constant(
@@ -185,30 +196,28 @@ class TestParallelExecutorBase(unittest.TestCase):
memory_opt=True,
iter=10,
batch_size=None,
- allow_op_delay=False):
+ allow_op_delay=False,
+ feed_dict={}):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
- loss = method()
+ loss = method(use_feed=len(feed_dict) > 0)
adam = fluid.optimizer.Adam()
adam.minimize(loss)
if memory_opt:
fluid.memory_optimize(main)
- exe = fluid.ParallelExecutor(
- loss_name=loss.name,
- use_cuda=True,
- allow_op_delay=allow_op_delay)
+ exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True)
if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count()
begin = time.time()
- first_loss, = exe.run([loss.name])
+ first_loss, = exe.run([loss.name], feed_dict=feed_dict)
first_loss = numpy.array(first_loss)
for i in xrange(iter):
- exe.run([])
+ exe.run([], feed_dict=feed_dict)
- last_loss, = exe.run([loss.name])
+ last_loss, = exe.run([loss.name], feed_dict=feed_dict)
end = time.time()
if batch_size is not None:
@@ -242,9 +251,19 @@ class TestMNIST(TestParallelExecutorBase):
self.check_network_convergence(simple_fc_net)
self.check_network_convergence(simple_fc_net, allow_op_delay=True)
+ img = numpy.zeros(shape=[32, 784], dtype='float32')
+ label = numpy.ones(shape=[32, 1], dtype='int64')
+ self.check_network_convergence(
+ simple_fc_net, feed_dict={"image": img,
+ "label": label})
+
def test_batchnorm_fc(self):
self.check_network_convergence(fc_with_batchnorm)
- self.check_network_convergence(fc_with_batchnorm, allow_op_delay=True)
+ img = numpy.zeros(shape=[32, 784], dtype='float32')
+ label = numpy.ones(shape=[32, 1], dtype='int64')
+ self.check_network_convergence(
+ fc_with_batchnorm, feed_dict={"image": img,
+ "label": label})
class TestResnet(TestParallelExecutorBase):
@@ -400,7 +419,8 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
import transformer_model
-def transformer():
+def transformer(use_feed):
+ assert not use_feed, "transfomer doesn't support feed yet"
return transformer_model.transformer(
ModelHyperParams.src_vocab_size + 1,
ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1,
diff --git a/python/paddle/fluid/tests/unittests/test_prior_box_op.py b/python/paddle/fluid/tests/unittests/test_prior_box_op.py
index c21138c13e6753f9dfcbd7d439269f7cf9a04f23..bcbc02a2baa46b9ab583ecf3006bd3262e6038fd 100644
--- a/python/paddle/fluid/tests/unittests/test_prior_box_op.py
+++ b/python/paddle/fluid/tests/unittests/test_prior_box_op.py
@@ -28,7 +28,6 @@ class TestPriorBoxOp(OpTest):
self.attrs = {
'min_sizes': self.min_sizes,
- 'max_sizes': self.max_sizes,
'aspect_ratios': self.aspect_ratios,
'variances': self.variances,
'flip': self.flip,
@@ -37,25 +36,28 @@ class TestPriorBoxOp(OpTest):
'step_h': self.step_h,
'offset': self.offset
}
+ if len(self.max_sizes) > 0:
+ self.attrs['max_sizes'] = self.max_sizes
self.outputs = {'Boxes': self.out_boxes, 'Variances': self.out_var}
def test_check_output(self):
self.check_output()
- def test_check_grad(self):
- return
-
def setUp(self):
self.op_type = "prior_box"
self.set_data()
+ def set_max_sizes(self):
+ max_sizes = [5, 10]
+ self.max_sizes = np.array(max_sizes).astype('float32').tolist()
+
def init_test_params(self):
- self.layer_w = 4
- self.layer_h = 4
+ self.layer_w = 32
+ self.layer_h = 32
- self.image_w = 20
- self.image_h = 20
+ self.image_w = 40
+ self.image_h = 40
self.step_w = float(self.image_w) / float(self.layer_w)
self.step_h = float(self.image_h) / float(self.layer_h)
@@ -66,8 +68,7 @@ class TestPriorBoxOp(OpTest):
self.min_sizes = [2, 4]
self.min_sizes = np.array(self.min_sizes).astype('float32').tolist()
- self.max_sizes = [5, 10]
- self.max_sizes = np.array(self.max_sizes).astype('float32').tolist()
+ self.set_max_sizes()
self.aspect_ratios = [2.0, 3.0]
self.flip = True
self.real_aspect_ratios = [1, 2.0, 1.0 / 2.0, 3.0, 1.0 / 3.0]
@@ -79,7 +80,7 @@ class TestPriorBoxOp(OpTest):
self.clip = True
self.num_priors = len(self.real_aspect_ratios) * len(self.min_sizes)
- if len(self.max_sizes) > 1:
+ if len(self.max_sizes) > 0:
self.num_priors += len(self.max_sizes)
self.offset = 0.5
@@ -105,35 +106,27 @@ class TestPriorBoxOp(OpTest):
idx = 0
for s in range(len(self.min_sizes)):
min_size = self.min_sizes[s]
- c_w = c_h = min_size / 2.
- out_boxes[h, w, idx, :] = [
- (c_x - c_w) / self.image_w, (c_y - c_h) / self.image_h,
- (c_x + c_w) / self.image_w, (c_y + c_h) / self.image_h
- ]
- idx += 1
-
- if len(self.max_sizes) > 0:
- max_size = self.max_sizes[s]
- # second prior: aspect_ratio = 1,
- c_w = c_h = math.sqrt(min_size * max_size) / 2
+ # rest of priors
+ for r in range(len(self.real_aspect_ratios)):
+ ar = self.real_aspect_ratios[r]
+ c_w = min_size * math.sqrt(ar) / 2
+ c_h = (min_size / math.sqrt(ar)) / 2
out_boxes[h, w, idx, :] = [(c_x - c_w) / self.image_w,
(c_y - c_h) / self.image_h,
(c_x + c_w) / self.image_w,
(c_y + c_h) / self.image_h]
idx += 1
- # rest of priors
- for r in range(len(self.real_aspect_ratios)):
- ar = self.real_aspect_ratios[r]
- if math.fabs(ar - 1.) < 1e-6:
- continue
- c_w = min_size * math.sqrt(ar) / 2
- c_h = (min_size / math.sqrt(ar)) / 2
+ if len(self.max_sizes) > 0:
+ max_size = self.max_sizes[s]
+ # second prior: aspect_ratio = 1,
+ c_w = c_h = math.sqrt(min_size * max_size) / 2
out_boxes[h, w, idx, :] = [(c_x - c_w) / self.image_w,
(c_y - c_h) / self.image_h,
(c_x + c_w) / self.image_w,
(c_y + c_h) / self.image_h]
idx += 1
+
# clip the prior's coordidate such that it is within[0, 1]
if self.clip:
out_boxes = np.clip(out_boxes, 0.0, 1.0)
@@ -144,5 +137,10 @@ class TestPriorBoxOp(OpTest):
self.out_var = out_var.astype('float32')
+class TestPriorBoxOpWithMaxSize(TestPriorBoxOp):
+ def set_max_sizes(self):
+ self.max_sizes = []
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
index e4cf4a8bce8a53c0348130716dc18c61ac9a5913..f98a8bbc68a4315df3ae761f2e52b8f11cb620c6 100644
--- a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
+++ b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
@@ -19,9 +19,9 @@ from paddle.fluid.framework import Program
class TestOpDesc(unittest.TestCase):
def test_op_desc(self):
- prog = core.ProgramDesc()
- self.assertIsNotNone(prog)
- block = prog.block(0)
+ program_desc = core.ProgramDesc()
+ self.assertIsNotNone(program_desc)
+ block = program_desc.block(0)
self.assertIsNotNone(block)
op = block.append_op()
self.assertIsNotNone(op)
@@ -67,7 +67,7 @@ class TestOpDesc(unittest.TestCase):
self.assertEqual(8, len(op.attr_names()))
- op.set_block_attr("block_attr", prog.block(0))
+ op.set_block_attr("block_attr", program_desc.block(0))
self.assertEqual(0, op.block_attr("block_attr"))
mul_op = block.append_op()
@@ -88,20 +88,20 @@ class TestProgramDesc(unittest.TestCase):
del program_desc
def test_append_block(self):
- prog_desc = core.ProgramDesc()
- self.assertIsNotNone(prog_desc)
- block_root = prog_desc.block(0)
+ program_desc = core.ProgramDesc()
+ self.assertIsNotNone(program_desc)
+ block_root = program_desc.block(0)
self.assertIsNotNone(block_root)
self.assertEqual(block_root.id, 0)
- block1 = prog_desc.append_block(block_root)
- block2 = prog_desc.append_block(block1)
+ block1 = program_desc.append_block(block_root)
+ block2 = program_desc.append_block(block1)
self.assertIsNotNone(block1)
self.assertEqual(block1.id, block2.parent)
self.assertEqual(block_root.id, block1.parent)
- block3 = prog_desc.append_block(block_root)
+ block3 = program_desc.append_block(block_root)
self.assertEqual(block3.parent, block_root.id)
- self.assertEqual(prog_desc.block(1).id, 1)
- self.assertEqual(4, prog_desc.num_blocks())
+ self.assertEqual(program_desc.block(1).id, 1)
+ self.assertEqual(4, program_desc.num_blocks())
class TestVarDesc(unittest.TestCase):
@@ -162,9 +162,9 @@ class TestVarDesc(unittest.TestCase):
class TestBlockDesc(unittest.TestCase):
def test_add_var(self):
- prog = core.ProgramDesc()
- self.assertIsNotNone(prog)
- block = prog.block(0)
+ program_desc = core.ProgramDesc()
+ self.assertIsNotNone(program_desc)
+ block = program_desc.block(0)
self.assertIsNotNone(block)
var1 = block.var("var1")
var2 = block.var("var2")
@@ -175,9 +175,9 @@ class TestBlockDesc(unittest.TestCase):
self.assertEqual(var2_re, var2)
def test_add_op(self):
- prog = core.ProgramDesc()
- self.assertIsNotNone(prog)
- block = prog.block(0)
+ program_desc = core.ProgramDesc()
+ self.assertIsNotNone(program_desc)
+ block = program_desc.block(0)
self.assertIsNotNone(block)
op1 = block.append_op()
op2 = block.append_op()
@@ -189,9 +189,9 @@ class TestBlockDesc(unittest.TestCase):
def test_remove_op(self):
program = Program()
- prog = program.desc
- self.assertIsNotNone(prog)
- block = prog.block(0)
+ program_desc = program.desc
+ self.assertIsNotNone(program_desc)
+ block = program_desc.block(0)
self.assertIsNotNone(block)
op0 = block.append_op()