提交 8893cf12 编写于 作者: Y yi.wu

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into...

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into fluid_benchmark_support_recordioreader
......@@ -24,7 +24,7 @@ COPY ./paddle/scripts/docker/root/ /root/
RUN apt-get update && \
apt-get install -y --allow-downgrades \
git python-pip python-dev openssh-server bison \
git python-pip python-dev python-opencv openssh-server bison \
libnccl2=2.1.2-1+cuda8.0 libnccl-dev=2.1.2-1+cuda8.0 \
wget unzip unrar tar xz-utils bzip2 gzip coreutils ntp \
curl sed grep graphviz libjpeg-dev zlib1g-dev \
......@@ -76,8 +76,7 @@ RUN easy_install -U pip && \
pip install sphinx-rtd-theme==0.1.9 recommonmark
RUN pip install pre-commit 'ipython==5.3.0' && \
pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \
pip install opencv-python
pip install 'ipykernel==4.6.0' 'jupyter==1.0.0'
#For docstring checker
RUN pip install pylint pytest astroid isort
......
FROM nvidia/cuda:9.0-cudnn7-devel-ubuntu16.04
RUN apt-get update && apt-get install -y python python-pip iputils-ping libgtk2.0-dev wget vim net-tools iftop
RUN apt-get update && apt-get install -y python python-pip iputils-ping libgtk2.0-dev wget vim net-tools iftop python-opencv
RUN ln -s /usr/lib/x86_64-linux-gnu/libcudnn.so.7 /usr/lib/libcudnn.so && ln -s /usr/lib/x86_64-linux-gnu/libnccl.so.2 /usr/lib/libnccl.so
RUN pip install -U pip
RUN pip install -U kubernetes opencv-python paddlepaddle
RUN pip install -U kubernetes paddlepaddle
# IMPORTANT:
# Add "ENV http_proxy=http://ip:port" if your download is slow, and don't forget to unset it at runtime.
......
......@@ -75,6 +75,11 @@ def parse_args():
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
# this option is available only for vgg and resnet.
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will use ParallelDo to run, else use Executor.')
parser.add_argument(
'--data_set',
type=str,
......@@ -91,8 +96,8 @@ def parse_args():
help='If set, use nvprof for CUDA.')
parser.add_argument(
'--no_test',
action='store_false',
help='If set, test the testset during training.')
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
......@@ -266,9 +271,9 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
print("Pass: %d, Iter: %d, Loss: %f\n" %
(pass_id, iters, np.mean(train_losses)))
print_train_time(start_time, time.time(), num_samples)
print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses)))
print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))),
# evaluation
if not args.no_test and batch_acc != None:
if not args.no_test and batch_acc:
pass_test_acc = test(exe, infer_prog, test_reader, feeder,
batch_acc)
print(", Test Accuracy: %f" % pass_test_acc)
......@@ -366,7 +371,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
if args.use_reader_op:
num_samples = num_samples * args.gpus
print_train_time(start_time, time.time(), num_samples)
if not args.no_test and batch_acc != None:
if not args.no_test and batch_acc:
test_acc = test(startup_exe, infer_prog, test_reader, feeder,
batch_acc)
print("Pass: %d, Test Accuracy: %f\n" % (pass_id, test_acc))
......
......@@ -84,15 +84,30 @@ def get_model(args):
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# Train program
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
# Evaluator
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(
input=predict, label=label, total=batch_size_tensor)
if args.device == 'CPU' and args.cpus > 1:
places = fluid.layers.get_places(args.cpus)
pd = fluid.layers.ParallelDo(places)
with pd.do():
predict = cnn_model(pd.read_input(images))
label = pd.read_input(label)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
batch_acc = fluid.layers.accuracy(input=predict, label=label)
pd.write_output(avg_cost)
pd.write_output(batch_acc)
avg_cost, batch_acc = pd()
avg_cost = fluid.layers.mean(avg_cost)
batch_acc = fluid.layers.mean(batch_acc)
else:
# Train program
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
# Evaluator
batch_acc = fluid.layers.accuracy(input=predict, label=label)
# inference program
inference_program = fluid.default_main_program().clone()
......
......@@ -166,18 +166,32 @@ def get_model(args):
input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
predict = model(input, class_dim)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(
input=predict, label=label, total=batch_size_tensor)
if args.device == 'CPU' and args.cpus > 1:
places = fluid.layers.get_places(args.cpus)
pd = fluid.layers.ParallelDo(places)
with pd.do():
predict = model(pd.read_input(input), class_dim)
label = pd.read_input(label)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
batch_acc = fluid.layers.accuracy(input=predict, label=label)
pd.write_output(avg_cost)
pd.write_output(batch_acc)
avg_cost, batch_acc = pd()
avg_cost = fluid.layers.mean(avg_cost)
batch_acc = fluid.layers.mean(batch_acc)
else:
predict = model(input, class_dim)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
batch_acc = fluid.layers.accuracy(input=predict, label=label)
inference_program = fluid.default_main_program().clone()
with fluid.program_guard(inference_program):
inference_program = fluid.io.get_inference_program(
target_vars=[batch_acc, batch_size_tensor])
target_vars=[batch_acc])
optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
......
......@@ -104,9 +104,8 @@ def get_model(args):
loss = fluid.layers.mean(x=loss)
# add acc
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(input=logit, label=fluid.layers.data(name='label', \
shape=[1], dtype='int64'), total=batch_size_tensor)
shape=[1], dtype='int64'))
inference_program = fluid.default_main_program().clone()
with fluid.program_guard(inference_program):
......
......@@ -45,6 +45,7 @@ ExternalProject_Add(
# checkout and clean other dirs under third_party
# 4. remove .git, and package the directory.
URL "http://paddlepaddledeps.bj.bcebos.com/grpc-v1.8.x.tar.gz"
URL_MD5 "c9c58ee7d0e8929a63155af6a2ecdbd0"
PREFIX ${GRPC_SOURCES_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
......
## 堆内存分析和优化
计算机程序都可能有内存泄漏的风险。**内存泄漏**一般是由于程序在堆(heap)上分配了内存而没有释放,随着程序的运行占用的内存越来越大,一方面会影响程序的稳定性,可能让运行速度越来越慢,或者造成oom,甚至会影响运行程序的机器的稳定性,造成宕机。
目前有很多内存泄漏分析工具,比较经典的有[valgrind](http://valgrind.org/docs/manual/quick-start.html#quick-start.intro), [gperftools](https://gperftools.github.io/gperftools/)
因为Fluid是用Python驱动C++ core来运行,valgrind直接分析非常困难,需要自己编译debug版本的、带valgrind支持的专用Python版本,而且输出的信息中大部分是Python自己的符号和调用信息,分析起来很困难,另外使用valgrind会让程序运行速度变得非常慢,所以不建议使用。
本教程主要介绍[gperftools](https://gperftools.github.io/gperftools/)的使用。
gperftool主要支持以下四个功能:
- thread-caching malloc
- heap-checking using tcmalloc
- heap-profiling using tcmalloc
- CPU profiler
Paddle也提供了基于gperftool的[CPU性能分析教程](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/fluid/howto/optimization/cpu_profiling_cn.md)
对于堆内存的分析,主要用到thread-caching malloc和heap-profiling using tcmalloc。
## 使用流程
#### 环境
本教程基于paddle提供的Docker开发环境paddlepaddle/paddle:latest-dev,基于Ubuntu 16.04.4 LTS环境。
#### 使用流程
- 安装google-perftools
```
apt-get install libunwind-dev
apt-get install google-perftools
```
- 安装pprof
```
go get -u github.com/google/pprof
```
- 设置运行环境
```
export PPROF_PATH=/root/gopath/bin/pprof
export PPROF_BINARY_PATH=/root/gopath/bin/pprof
export LD_PRELOAD=/usr/lib/libtcmalloc.so.4
```
- 使用heap profile来运行python程序。本质上是周期性的对堆的分配情况做一次快照。
```
# HEAPPROFILE 设置生成的堆分析文件的目录和文件前缀
# HEAP_PROFILE_ALLOCATION_INTERVAL 设置每分配多少存储dump一次dump,默认1GB
env HEAPPROFILE="./perf_log/test.log" HEAP_PROFILE_ALLOCATION_INTERVAL=209715200 python trainer.py
```
随着程序的运行,会在perf_log这个文件夹下生成很多文件,如下:
```
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0001.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0002.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0003.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0004.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0005.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0006.heap
```
- 使用pprof对heap文件进行分析。分析有两种模式:
- 完整模式。会对当前heap做一个分析,显示目前分配内存一些调用路径。
```
pprof --pdf python test.log.0012.heap
```
上述命令会生成一个profile00x.pdf的文件,可以直接打开,例如:[memory_cpu_allocator](https://github.com/jacquesqiao/Paddle/blob/bd2ea0e1f84bb6522a66d44a072598153634cade/doc/fluid/howto/optimization/memory_cpu_allocator.pdf)。从下图可以看出,在CPU版本fluid的运行过程中,分配存储最多的模块式CPUAllocator. 而别的模块相对而言分配内存较少,所以被忽略了,这对于分配内存泄漏是很不方便的,因为泄漏是一个缓慢的过程,在这种图中是无法看到的。
![result](https://user-images.githubusercontent.com/3048612/40964027-a54033e4-68dc-11e8-836a-144910c4bb8c.png)
- Diff模式。可以对两个时刻的heap做diff,把一些内存分配没有发生变化的模块去掉,而把增量部分显示出来。
```
pprof --pdf --base test.log.0010.heap python test.log.1045.heap
```
生成的结果为:[`memory_leak_protobuf`](https://github.com/jacquesqiao/Paddle/blob/bd2ea0e1f84bb6522a66d44a072598153634cade/doc/fluid/howto/optimization/memory_leak_protobuf.pdf)
从图中可以看出:ProgramDesc这个结构,在两个版本之间增长了200MB+,所以这里有很大的内存泄漏的可能性,最终结果也确实证明是这里造成了泄漏。
![result](https://user-images.githubusercontent.com/3048612/40964057-b434d5e4-68dc-11e8-894b-8ab62bcf26c2.png)
![result](https://user-images.githubusercontent.com/3048612/40964063-b7dbee44-68dc-11e8-9719-da279f86477f.png)
......@@ -65,7 +65,10 @@ void Main(bool use_gpu) {
}
TEST(demo, word2vec_cpu) { Main(false /*use_gpu*/); }
#ifdef PADDLE_WITH_CUDA
TEST(demo, word2vec_gpu) { Main(true /*use_gpu*/); }
#endif
} // namespace demo
} // namespace paddle
......@@ -63,6 +63,7 @@ class PaddlePredictor {
struct Config;
PaddlePredictor() = default;
PaddlePredictor(const PaddlePredictor&) = delete;
PaddlePredictor& operator=(const PaddlePredictor&) = delete;
// Predict an record.
// The caller should be responsible for allocating and releasing the memory of
......@@ -76,7 +77,7 @@ class PaddlePredictor {
virtual std::unique_ptr<PaddlePredictor> Clone() = 0;
// Destroy the Predictor.
virtual ~PaddlePredictor() {}
virtual ~PaddlePredictor() = default;
// The common configs for all the predictors.
struct Config {
......
......@@ -54,7 +54,8 @@ std::string num2str(T a) {
}
} // namespace
bool NativePaddlePredictor::Init() {
bool NativePaddlePredictor::Init(
std::shared_ptr<framework::Scope> parent_scope) {
VLOG(3) << "Predictor::init()";
if (config_.use_gpu) {
......@@ -62,9 +63,15 @@ bool NativePaddlePredictor::Init() {
} else {
place_ = paddle::platform::CPUPlace();
}
paddle::framework::InitDevices(false);
if (parent_scope) {
scope_ = parent_scope;
sub_scope_ = &(parent_scope->NewScope());
} else {
paddle::framework::InitDevices(false);
scope_.reset(new paddle::framework::Scope());
}
executor_.reset(new paddle::framework::Executor(place_));
scope_.reset(new paddle::framework::Scope());
// Initialize the inference program
if (!config_.model_dir.empty()) {
......@@ -83,13 +90,8 @@ bool NativePaddlePredictor::Init() {
return false;
}
ctx_ = executor_->Prepare(*inference_program_, 0);
// Create temporary variables first, so that the first batch do not need to
// create variables in the runtime. This is the logics of the old inference
// API.
// TODO(Superjomn) this should be modified when `Clone` is valid for
// multi-thread application.
executor_->CreateVariables(*inference_program_, scope_.get(), 0);
executor_->CreateVariables(
*inference_program_, sub_scope_ ? sub_scope_ : scope_.get(), 0);
// Get the feed_target_names and fetch_target_names
feed_target_names_ = inference_program_->GetFeedTargetNames();
......@@ -97,6 +99,13 @@ bool NativePaddlePredictor::Init() {
return true;
}
NativePaddlePredictor::~NativePaddlePredictor() {
if (sub_scope_) {
PADDLE_ENFORCE_NOT_NULL(scope_, "Should have parent scope!");
scope_->DeleteScope(sub_scope_);
}
};
bool NativePaddlePredictor::Run(const std::vector<PaddleTensor> &inputs,
std::vector<PaddleTensor> *output_data) {
VLOG(3) << "Predictor::predict";
......@@ -121,11 +130,12 @@ bool NativePaddlePredictor::Run(const std::vector<PaddleTensor> &inputs,
}
// Run the inference program
// if share variables, we need not create variables
executor_->RunPreparedContext(ctx_.get(),
scope_.get(),
&feed_targets,
&fetch_targets,
false /* don't create variable eatch time */);
executor_->RunPreparedContext(
ctx_.get(),
sub_scope_ != nullptr ? sub_scope_ : scope_.get(),
&feed_targets,
&fetch_targets,
false /* don't create variable eatch time */);
if (!GetFetch(fetchs, output_data)) {
LOG(ERROR) << "fail to get fetchs";
return false;
......@@ -138,7 +148,7 @@ std::unique_ptr<PaddlePredictor> NativePaddlePredictor::Clone() {
VLOG(3) << "Predictor::clone";
std::unique_ptr<PaddlePredictor> cls(new NativePaddlePredictor(config_));
if (!dynamic_cast<NativePaddlePredictor *>(cls.get())->Init()) {
if (!dynamic_cast<NativePaddlePredictor *>(cls.get())->Init(scope_)) {
LOG(ERROR) << "fail to call Init";
return nullptr;
}
......@@ -266,7 +276,7 @@ CreatePaddlePredictor<NativeConfig, PaddleEngineKind::kNative>(
}
std::unique_ptr<PaddlePredictor> predictor(new NativePaddlePredictor(config));
if (!dynamic_cast<NativePaddlePredictor *>(predictor.get())->Init()) {
if (!dynamic_cast<NativePaddlePredictor *>(predictor.get())->Init(nullptr)) {
return nullptr;
}
return std::move(predictor);
......
......@@ -34,14 +34,15 @@ class NativePaddlePredictor : public PaddlePredictor {
explicit NativePaddlePredictor(const NativeConfig &config)
: config_(config) {}
bool Init();
// will only create sub scope if have global scope
bool Init(std::shared_ptr<framework::Scope> parent_scope);
bool Run(const std::vector<PaddleTensor> &inputs,
std::vector<PaddleTensor> *output_data) override;
std::unique_ptr<PaddlePredictor> Clone() override;
~NativePaddlePredictor() override{};
~NativePaddlePredictor() override;
private:
bool SetFeed(const std::vector<PaddleTensor> &input_datas,
......@@ -52,11 +53,13 @@ class NativePaddlePredictor : public PaddlePredictor {
NativeConfig config_;
platform::Place place_;
std::unique_ptr<framework::Executor> executor_;
std::unique_ptr<framework::Scope> scope_;
std::shared_ptr<framework::Scope> scope_;
std::unique_ptr<framework::ExecutorPrepareContext> ctx_;
std::unique_ptr<framework::ProgramDesc> inference_program_;
std::vector<std::string> feed_target_names_;
std::vector<std::string> fetch_target_names_;
// Do not use unique_ptr, use parent scope to delete
framework::Scope *sub_scope_{nullptr};
};
} // namespace paddle
......@@ -15,3 +15,9 @@ cc_test(test_subgraph_splitter
DEPS analysis paddle_fluid tensor
ARGS --inference_model_dir=${PYTHON_TESTS_DIR}/book/word2vec.inference.model)
set_tests_properties(test_subgraph_splitter PROPERTIES DEPENDS test_word2vec)
cc_test(test_dfg_graphviz_draw_pass
SRCS dfg_graphviz_draw_pass_tester.cc
DEPS analysis
ARGS --inference_model_dir=${PYTHON_TESTS_DIR}/book/word2vec.inference.model)
set_tests_properties(test_dfg_graphviz_draw_pass PROPERTIES DEPENDS test_word2vec)
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
/*
* This file create an DFG_GraphvizDrawPass which helps to draw a data flow
* graph's structure using graphviz.
*/
#pragma once
#include <fstream>
#include <string>
#include "paddle/fluid/inference/analysis/pass.h"
namespace paddle {
namespace inference {
namespace analysis {
/*
* Output a dot file and write to some place.
*/
class DFG_GraphvizDrawPass : public DataFlowGraphPass {
public:
DFG_GraphvizDrawPass(const std::string& dir, const std::string& id)
: dir_(dir), id_(id) {}
bool Initialize() override { return Pass::Initialize(); }
void Run(DataFlowGraph* graph) override {
auto content = Draw(graph);
std::ofstream file(GenDotPath());
file.write(content.c_str(), content.size());
file.close();
LOG(INFO) << "draw dot to " << GenDotPath();
}
bool Finalize() override { return Pass::Finalize(); }
Pass* CreatePrinterPass(std::ostream& os,
const std::string& banner) const override {
return nullptr;
}
private:
// Path of the dot file to output.
std::string GenDotPath() const {
return dir_ + "/" + "graph_" + id_ + ".dot";
}
std::string Draw(DataFlowGraph* graph) { return graph->DotString(); }
std::string dir_;
std::string id_;
};
} // namespace analysis
} // namespace inference
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/inference/analysis/dfg_graphviz_draw_pass.h"
#include <gtest/gtest.h>
#include <fstream>
#include <string>
#include "paddle/fluid/inference/analysis/ut_helper.h"
namespace paddle {
namespace inference {
namespace analysis {
TEST_F(DFG_Tester, dfg_graphviz_draw_pass_tester) {
auto dfg = ProgramDescToDFG(desc);
DFG_GraphvizDrawPass pass("./", "test");
pass.Initialize();
pass.Run(&dfg);
// test content
std::ifstream file("./graph_test.dot");
ASSERT_TRUE(file.is_open());
std::string line;
int no{0};
while (std::getline(file, line)) {
no++;
}
ASSERT_EQ(no, 82);
}
} // namespace analysis
} // namespace inference
} // namespace paddle
......@@ -20,7 +20,7 @@ limitations under the License. */
#include "paddle/fluid/platform/cudnn_helper.h"
#include "paddle/fluid/platform/float16.h"
DEFINE_bool(cudnn_algo_use_autotune, true,
DEFINE_bool(cudnn_deterministic, true,
"Whether allow using an autotuning algorithm for convolution "
"operator. The autotuning algorithm may be non-deterministic. If "
"false, the algorithm is deterministic.");
......@@ -272,7 +272,7 @@ class CUDNNConvGradOpKernel : public framework::OpKernel<T> {
auto& dev_ctx = ctx.template device_context<platform::CUDADeviceContext>();
auto handle = dev_ctx.cudnn_handle();
if (input_grad) {
if (FLAGS_cudnn_algo_use_autotune) {
if (FLAGS_cudnn_deterministic) {
PADDLE_ENFORCE(
platform::dynload::cudnnGetConvolutionBackwardDataAlgorithm(
handle, cudnn_filter_desc,
......@@ -297,7 +297,7 @@ class CUDNNConvGradOpKernel : public framework::OpKernel<T> {
}
if (filter_grad) {
if (FLAGS_cudnn_algo_use_autotune) {
if (FLAGS_cudnn_deterministic) {
PADDLE_ENFORCE(
platform::dynload::cudnnGetConvolutionBackwardFilterAlgorithm(
handle, cudnn_input_desc, cudnn_output_grad_desc,
......
......@@ -38,6 +38,25 @@ void RPCClient::Init() {
if (rpc_client_.get() == nullptr) {
rpc_client_.reset(new RPCClient());
}
rpc_client_->InitEventLoop();
}
void RPCClient::InitEventLoop() {
// start the client process thread
// TODO(wuyi): can make this in a threadpool
client_thread_.reset(new std::thread(std::bind(&RPCClient::Proceed, this)));
}
RPCClient::~RPCClient() {
Wait();
cq_.Shutdown();
{
std::lock_guard<std::mutex> guard(chan_mutex_);
for (auto& it : channels_) {
it.second.reset();
}
}
client_thread_->join();
}
bool RPCClient::AsyncSendVariable(const std::string& ep,
......@@ -204,70 +223,37 @@ void RPCClient::AsyncSendFetchBarrier(const std::string& ep, int64_t time_out) {
req_count_++;
}
bool RPCClient::Wait() {
VLOG(3) << "RPCClient begin Wait()"
<< " req_count_:" << req_count_;
if (req_count_ <= 0) {
return true;
}
const size_t kReqCnt = req_count_;
bool a[kReqCnt];
std::vector<std::future<void>> waits(req_count_);
std::mutex mu;
for (int i = 0; i < req_count_; i++) {
waits[i] = framework::AsyncIO([i, &a, &mu, this] {
bool ret = Proceed();
std::lock_guard<std::mutex> l(mu);
a[i] = ret;
});
}
for (int i = 0; i < req_count_; i++) {
waits[i].wait();
}
int last_req_count = req_count_;
req_count_ = 0;
for (int i = 0; i < last_req_count; i++) {
if (!a[i]) {
return false;
}
}
return true;
void RPCClient::Wait() {
std::unique_lock<std::mutex> lk(sync_mutex_);
sync_cond_.wait(lk, [this] { return req_count_ == 0; });
}
bool RPCClient::Proceed() {
void* tag = NULL;
void RPCClient::Proceed() {
void* tag = nullptr;
bool ok = false;
// request counts.
if (!cq_.Next(&tag, &ok)) {
LOG(ERROR) << "Get meets CompletionQueue error";
return false;
}
GPR_ASSERT(ok);
PADDLE_ENFORCE(tag);
// TODO(gongwb): add more retries.
BaseProcessor* c = static_cast<BaseProcessor*>(tag);
if (!c->status_.ok()) {
LOG(ERROR) << "proc param error:" << c->var_h_.String()
<< " grpc error:" << c->status_.error_message();
while (cq_.Next(&tag, &ok)) {
BaseProcessor* c = static_cast<BaseProcessor*>(tag);
GPR_ASSERT(ok);
PADDLE_ENFORCE(c);
if (c->status_.ok()) {
c->Process();
} else {
LOG(ERROR) << "var: " << c->var_h_.String()
<< " grpc error:" << c->status_.error_message();
}
delete c;
return false;
{
std::lock_guard<std::mutex> lk(sync_mutex_);
req_count_--;
}
sync_cond_.notify_all();
}
c->Process();
delete c;
return true;
}
std::shared_ptr<grpc::Channel> RPCClient::GetChannel(const std::string& ep) {
// TODO(Yancey1989): make grpc client completely thread-safe
std::unique_lock<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> guard(chan_mutex_);
auto it = channels_.find(ep);
if (it != channels_.end()) {
return it->second;
......
......@@ -16,15 +16,18 @@ limitations under the License. */
#include <time.h>
#include <chrono> // NOLINT
#include <chrono> // NOLINT
#include <condition_variable> // NOLINT
#include <ctime>
#include <functional>
#include <iostream>
#include <map>
#include <mutex> // NOLINT
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "grpc++/channel.h"
#include "grpc++/generic/generic_stub.h"
#include "grpc++/grpc++.h"
#include "grpc++/support/byte_buffer.h"
......@@ -164,6 +167,7 @@ class FetchBarrierProcessor : public BaseProcessor {
class RPCClient {
public:
RPCClient() {}
~RPCClient();
static RPCClient* GetInstance();
......@@ -192,19 +196,28 @@ class RPCClient {
void AsyncSendFetchBarrier(const std::string& ep,
int64_t time_out = 600 * 1000);
bool Wait();
void Wait();
// InitEventLoop should only be called by Init()
void InitEventLoop();
private:
bool Proceed();
void Proceed();
std::shared_ptr<grpc::Channel> GetChannel(const std::string& ep);
// Init is called by GetInstance.
static void Init();
private:
grpc::CompletionQueue cq_;
std::map<std::string, std::shared_ptr<grpc::Channel>> channels_;
std::unordered_map<std::string, std::shared_ptr<grpc::Channel>> channels_;
std::unique_ptr<std::thread> client_thread_;
// mutex for Wait client sync
std::mutex sync_mutex_;
std::condition_variable sync_cond_;
std::atomic<int64_t> req_count_{0};
std::mutex mutex_;
// mutex for GetChannel thread safety
std::mutex chan_mutex_;
static std::unique_ptr<RPCClient> rpc_client_;
static std::once_flag init_flag_;
DISABLE_COPY_AND_ASSIGN(RPCClient);
......
......@@ -68,9 +68,7 @@ class RequestSend final : public RequestBase {
method_id, &ctx_, request_.get(), &responder_, cq_, cq_,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id)));
}
virtual ~RequestSend() {}
std::string GetReqName() override { return request_->Varname(); }
void Process() override {
......@@ -82,7 +80,6 @@ class RequestSend final : public RequestBase {
framework::Variable* outvar = nullptr;
request_handler_->Handle(varname, scope, invar, &outvar);
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
......@@ -125,7 +122,6 @@ class RequestGet final : public RequestBase {
SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(),
&reply_);
}
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
......@@ -170,10 +166,9 @@ class RequestPrefetch final : public RequestBase {
SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(),
&reply_);
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
status_ = FINISH;
}
protected:
......
......@@ -113,10 +113,6 @@ void StartServer() {
std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::StartServer, g_rpc_service.get()));
// FIXME(gongwb): don't use hard time.
sleep(10);
LOG(INFO) << "got nccl id and stop server...";
g_rpc_service->ShutDown();
server_thread.join();
}
......@@ -127,7 +123,7 @@ TEST(PREFETCH, CPU) {
std::thread server_thread(StartServer);
g_rpc_service->WaitServerReady();
detail::RPCClient client;
detail::RPCClient* client = detail::RPCClient::GetInstance();
int port = g_rpc_service->GetSelectedPort();
std::string ep = paddle::string::Sprintf("127.0.0.1:%d", port);
......@@ -141,8 +137,8 @@ TEST(PREFETCH, CPU) {
std::string in_var_name("ids");
std::string out_var_name("out");
client.AsyncPrefetchVariable(ep, ctx, scope, in_var_name, out_var_name);
client.Wait();
client->AsyncPrefetchVariable(ep, ctx, scope, in_var_name, out_var_name);
client->Wait();
auto var = scope.Var(out_var_name);
auto value = var->GetMutable<framework::SelectedRows>()->value();
auto ptr = value.mutable_data<float>(place);
......@@ -152,6 +148,7 @@ TEST(PREFETCH, CPU) {
}
}
g_rpc_service->ShutDown();
server_thread.join();
LOG(INFO) << "begin reset";
g_rpc_service.reset(nullptr);
......
......@@ -22,21 +22,21 @@ class BoxCoderOp : public framework::OperatorWithKernel {
void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("PriorBox"),
"Input(PriorBox) of BoxCoderOp should not be null.");
PADDLE_ENFORCE(ctx->HasInput("PriorBoxVar"),
"Input(PriorBoxVar) of BoxCoderOp should not be null.");
PADDLE_ENFORCE(ctx->HasInput("TargetBox"),
"Input(TargetBox) of BoxCoderOp should not be null.");
PADDLE_ENFORCE(ctx->HasOutput("OutputBox"),
"Output(OutputBox) of BoxCoderOp should not be null.");
auto prior_box_dims = ctx->GetInputDim("PriorBox");
auto prior_box_var_dims = ctx->GetInputDim("PriorBoxVar");
auto target_box_dims = ctx->GetInputDim("TargetBox");
PADDLE_ENFORCE_EQ(prior_box_dims.size(), 2,
"The rank of Input of PriorBoxVar must be 2");
PADDLE_ENFORCE_EQ(prior_box_dims[1], 4, "The shape of PriorBox is [N, 4]");
PADDLE_ENFORCE_EQ(prior_box_dims, prior_box_var_dims);
if (ctx->HasInput("PriorBoxVar")) {
auto prior_box_var_dims = ctx->GetInputDim("PriorBoxVar");
PADDLE_ENFORCE_EQ(prior_box_dims, prior_box_var_dims);
}
auto code_type = GetBoxCodeType(ctx->Attrs().Get<std::string>("code_type"));
if (code_type == BoxCodeType::kEncodeCenterSize) {
......@@ -71,9 +71,11 @@ class BoxCoderOpMaker : public framework::OpProtoAndCheckerMaker {
"of the coordinate system. [xmax, ymax] is the right bottom "
"coordinate of the anchor box.");
AddInput("PriorBoxVar",
"(Tensor, default Tensor<float>) "
"(Tensor, default Tensor<float>, optional) "
"PriorBoxVar is a 2-D Tensor with shape [M, 4] holds M group "
"of variance.");
"of variance. PriorBoxVar will set all elements to 1 by "
"default.")
.AsDispensable();
AddInput(
"TargetBox",
"(LoDTensor or Tensor) This input can be a 2-D LoDTensor with shape "
......@@ -131,5 +133,6 @@ width and height.
namespace ops = paddle::operators;
REGISTER_OPERATOR(box_coder, ops::BoxCoderOp, ops::BoxCoderOpMaker,
paddle::framework::EmptyGradOpMaker);
REGISTER_OP_CPU_KERNEL(box_coder, ops::BoxCoderKernel<float>,
ops::BoxCoderKernel<double>);
REGISTER_OP_CPU_KERNEL(
box_coder, ops::BoxCoderKernel<paddle::platform::CPUDeviceContext, float>,
ops::BoxCoderKernel<paddle::platform::CPUDeviceContext, double>);
......@@ -48,15 +48,18 @@ __global__ void EncodeCenterSizeKernel(const T* prior_box_data,
target_box_data[row_idx * len + 1] +
(normalized == false);
output[idx * len] = (target_box_center_x - prior_box_center_x) /
prior_box_width / prior_box_var_data[col_idx * len];
output[idx * len + 1] = (target_box_center_y - prior_box_center_y) /
prior_box_height /
prior_box_var_data[col_idx * len + 1];
output[idx * len + 2] = log(fabs(target_box_width / prior_box_width)) /
prior_box_var_data[col_idx * len + 2];
output[idx * len + 3] = log(fabs(target_box_height / prior_box_height)) /
prior_box_var_data[col_idx * len + 3];
output[idx * len] =
(target_box_center_x - prior_box_center_x) / prior_box_width;
output[idx * len + 1] =
(target_box_center_y - prior_box_center_y) / prior_box_height;
output[idx * len + 2] = log(fabs(target_box_width / prior_box_width));
output[idx * len + 3] = log(fabs(target_box_height / prior_box_height));
if (prior_box_var_data) {
output[idx * len] /= prior_box_var_data[col_idx * len];
output[idx * len + 1] /= prior_box_var_data[col_idx * len + 1];
output[idx * len + 2] /= prior_box_var_data[col_idx * len + 2];
output[idx * len + 3] /= prior_box_var_data[col_idx * len + 3];
}
}
}
......@@ -79,20 +82,31 @@ __global__ void DecodeCenterSizeKernel(const T* prior_box_data,
T prior_box_center_y = (prior_box_data[col_idx * len + 3] +
prior_box_data[col_idx * len + 1]) /
2;
T target_box_width = exp(prior_box_var_data[col_idx * len + 2] *
T target_box_width, target_box_height;
T target_box_center_x, target_box_center_y;
if (prior_box_var_data) {
target_box_width = exp(prior_box_var_data[col_idx * len + 2] *
target_box_data[idx * len + 2]) *
prior_box_width;
T target_box_height = exp(prior_box_var_data[col_idx * len + 3] *
target_box_height = exp(prior_box_var_data[col_idx * len + 3] *
target_box_data[idx * len + 3]) *
prior_box_height;
T target_box_center_x = prior_box_var_data[col_idx * len] *
target_box_center_x = prior_box_var_data[col_idx * len] *
target_box_data[idx * len] * prior_box_width +
prior_box_center_x;
T target_box_center_y = prior_box_var_data[col_idx * len + 1] *
target_box_center_y = prior_box_var_data[col_idx * len + 1] *
target_box_data[idx * len + 1] *
prior_box_height +
prior_box_center_y;
} else {
target_box_width = exp(target_box_data[idx * len + 2]) * prior_box_width;
target_box_height =
exp(target_box_data[idx * len + 3]) * prior_box_height;
target_box_center_x =
target_box_data[idx * len] * prior_box_width + prior_box_center_x;
target_box_center_y = target_box_data[idx * len + 1] * prior_box_height +
prior_box_center_y;
}
output[idx * len] = target_box_center_x - target_box_width / 2;
output[idx * len + 1] = target_box_center_y - target_box_height / 2;
......@@ -103,7 +117,7 @@ __global__ void DecodeCenterSizeKernel(const T* prior_box_data,
}
}
template <typename T>
template <typename DeviceContext, typename T>
class BoxCoderCUDAKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
......@@ -114,6 +128,11 @@ class BoxCoderCUDAKernel : public framework::OpKernel<T> {
auto* target_box = context.Input<framework::LoDTensor>("TargetBox");
auto* output_box = context.Output<framework::Tensor>("OutputBox");
const T* prior_box_data = prior_box->data<T>();
const T* target_box_data = target_box->data<T>();
const T* prior_box_var_data = nullptr;
if (prior_box_var) prior_box_var_data = prior_box_var->data<T>();
if (target_box->lod().size()) {
PADDLE_ENFORCE_EQ(target_box->lod().size(), 1,
"Only support 1 level of LoD.");
......@@ -125,10 +144,6 @@ class BoxCoderCUDAKernel : public framework::OpKernel<T> {
int grid = (row * col + block - 1) / block;
auto& device_ctx = context.cuda_device_context();
const T* prior_box_data = prior_box->data<T>();
const T* prior_box_var_data = prior_box_var->data<T>();
const T* target_box_data = target_box->data<T>();
output_box->mutable_data<T>({row, col, len}, context.GetPlace());
T* output = output_box->data<T>();
......@@ -150,5 +165,7 @@ class BoxCoderCUDAKernel : public framework::OpKernel<T> {
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(box_coder, ops::BoxCoderCUDAKernel<float>,
ops::BoxCoderCUDAKernel<double>);
REGISTER_OP_CUDA_KERNEL(
box_coder,
ops::BoxCoderCUDAKernel<paddle::platform::CUDADeviceContext, float>,
ops::BoxCoderCUDAKernel<paddle::platform::CUDADeviceContext, double>);
......@@ -28,19 +28,20 @@ inline BoxCodeType GetBoxCodeType(const std::string& type) {
PADDLE_THROW("Not support type %s.", type);
}
template <typename T>
template <typename DeviceContext, typename T>
class BoxCoderKernel : public framework::OpKernel<T> {
public:
void EncodeCenterSize(const framework::Tensor& target_box,
const framework::Tensor& prior_box,
const framework::Tensor& prior_box_var,
void EncodeCenterSize(const framework::Tensor* target_box,
const framework::Tensor* prior_box,
const framework::Tensor* prior_box_var,
const bool normalized, T* output) const {
int64_t row = target_box.dims()[0];
int64_t col = prior_box.dims()[0];
int64_t len = prior_box.dims()[1];
auto* target_box_data = target_box.data<T>();
auto* prior_box_data = prior_box.data<T>();
auto* prior_box_var_data = prior_box_var.data<T>();
int64_t row = target_box->dims()[0];
int64_t col = prior_box->dims()[0];
int64_t len = prior_box->dims()[1];
auto* target_box_data = target_box->data<T>();
auto* prior_box_data = prior_box->data<T>();
const T* prior_box_var_data = nullptr;
if (prior_box_var) prior_box_var_data = prior_box_var->data<T>();
for (int64_t i = 0; i < row; ++i) {
for (int64_t j = 0; j < col; ++j) {
......@@ -65,30 +66,35 @@ class BoxCoderKernel : public framework::OpKernel<T> {
(normalized == false);
size_t offset = i * col * len + j * len;
output[offset] = (target_box_center_x - prior_box_center_x) /
prior_box_width / prior_box_var_data[j * len];
output[offset + 1] = (target_box_center_y - prior_box_center_y) /
prior_box_height / prior_box_var_data[j * len + 1];
output[offset] =
(target_box_center_x - prior_box_center_x) / prior_box_width;
output[offset + 1] =
(target_box_center_y - prior_box_center_y) / prior_box_height;
output[offset + 2] =
std::log(std::fabs(target_box_width / prior_box_width)) /
prior_box_var_data[j * len + 2];
std::log(std::fabs(target_box_width / prior_box_width));
output[offset + 3] =
std::log(std::fabs(target_box_height / prior_box_height)) /
prior_box_var_data[j * len + 3];
std::log(std::fabs(target_box_height / prior_box_height));
if (prior_box_var) {
output[offset] /= prior_box_var_data[j * len];
output[offset + 1] /= prior_box_var_data[j * len + 1];
output[offset + 2] /= prior_box_var_data[j * len + 2];
output[offset + 3] /= prior_box_var_data[j * len + 3];
}
}
}
}
void DecodeCenterSize(const framework::Tensor& target_box,
const framework::Tensor& prior_box,
const framework::Tensor& prior_box_var,
void DecodeCenterSize(const framework::Tensor* target_box,
const framework::Tensor* prior_box,
const framework::Tensor* prior_box_var,
const bool normalized, T* output) const {
int64_t row = target_box.dims()[0];
int64_t col = prior_box.dims()[0];
int64_t len = prior_box.dims()[1];
int64_t row = target_box->dims()[0];
int64_t col = prior_box->dims()[0];
int64_t len = prior_box->dims()[1];
auto* target_box_data = target_box.data<T>();
auto* prior_box_data = prior_box.data<T>();
auto* prior_box_var_data = prior_box_var.data<T>();
auto* target_box_data = target_box->data<T>();
auto* prior_box_data = prior_box->data<T>();
const T* prior_box_var_data = nullptr;
if (prior_box_var) prior_box_var_data = prior_box_var->data<T>();
for (int64_t i = 0; i < row; ++i) {
for (int64_t j = 0; j < col; ++j) {
......@@ -103,19 +109,32 @@ class BoxCoderKernel : public framework::OpKernel<T> {
T prior_box_center_y =
(prior_box_data[j * len + 3] + prior_box_data[j * len + 1]) / 2;
T target_box_center_x = prior_box_var_data[j * len] *
T target_box_center_x = 0, target_box_center_y = 0;
T target_box_width = 0, target_box_height = 0;
if (prior_box_var) {
target_box_center_x = prior_box_var_data[j * len] *
target_box_data[offset] * prior_box_width +
prior_box_center_x;
T target_box_center_y = prior_box_var_data[j * len + 1] *
target_box_center_y = prior_box_var_data[j * len + 1] *
target_box_data[offset + 1] *
prior_box_height +
prior_box_center_y;
T target_box_width = std::exp(prior_box_var_data[j * len + 2] *
target_box_width = std::exp(prior_box_var_data[j * len + 2] *
target_box_data[offset + 2]) *
prior_box_width;
T target_box_height = std::exp(prior_box_var_data[j * len + 3] *
target_box_height = std::exp(prior_box_var_data[j * len + 3] *
target_box_data[offset + 3]) *
prior_box_height;
} else {
target_box_center_x =
target_box_data[offset] * prior_box_width + prior_box_center_x;
target_box_center_y = target_box_data[offset + 1] * prior_box_height +
prior_box_center_y;
target_box_width =
std::exp(target_box_data[offset + 2]) * prior_box_width;
target_box_height =
std::exp(target_box_data[offset + 3]) * prior_box_height;
}
output[offset] = target_box_center_x - target_box_width / 2;
output[offset + 1] = target_box_center_y - target_box_height / 2;
......@@ -147,10 +166,10 @@ class BoxCoderKernel : public framework::OpKernel<T> {
bool normalized = context.Attr<bool>("box_normalized");
T* output = output_box->data<T>();
if (code_type == BoxCodeType::kEncodeCenterSize) {
EncodeCenterSize(*target_box, *prior_box, *prior_box_var, normalized,
EncodeCenterSize(target_box, prior_box, prior_box_var, normalized,
output);
} else if (code_type == BoxCodeType::kDecodeCenterSize) {
DecodeCenterSize(*target_box, *prior_box, *prior_box_var, normalized,
DecodeCenterSize(target_box, prior_box, prior_box_var, normalized,
output);
}
}
......
......@@ -45,13 +45,13 @@ class FetchBarrierOp : public framework::OperatorBase {
auto rpc_client = detail::RPCClient::GetInstance();
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
for (auto& ep : eps) {
VLOG(3) << "fetch barrier, ep: " << ep;
rpc_client->AsyncSendFetchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
};
......
......@@ -135,7 +135,11 @@ class PoolCUDNNGradOpKernel : public framework::OpKernel<T> {
PoolingMode pooling_mode;
if (pooling_type == "max") {
pooling_mode = PoolingMode::kMaximum;
if (FLAGS_cudnn_deterministic) {
pooling_mode = PoolingMode::kMaximumDeterministic;
} else {
pooling_mode = PoolingMode::kMaximum;
}
} else {
pooling_mode = PoolingMode::kAverage;
}
......
......@@ -53,7 +53,7 @@ class PrefetchOp : public framework::OperatorBase {
VLOG(3) << "don't send no-initialied variable: " << ins[i];
}
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
};
......
......@@ -51,7 +51,7 @@ class RecvOp : public framework::OperatorBase {
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
}
if (sync_mode) {
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
}
};
......
......@@ -135,15 +135,16 @@ class ReduceKernel : public framework::OpKernel<T> {
} else {
int ndim = context.Input<Tensor>("X")->dims().size();
int rdim = context.Attr<std::vector<int>>("dim").size();
HANDLE_DIM(6, 5);
HANDLE_DIM(6, 4);
HANDLE_DIM(6, 3);
HANDLE_DIM(6, 2);
HANDLE_DIM(6, 1);
HANDLE_DIM(5, 4);
HANDLE_DIM(5, 3);
HANDLE_DIM(5, 2);
HANDLE_DIM(5, 1);
// comments for accelerating compiling temporarily.
// HANDLE_DIM(6, 5);
// HANDLE_DIM(6, 4);
// HANDLE_DIM(6, 3);
// HANDLE_DIM(6, 2);
// HANDLE_DIM(6, 1);
// HANDLE_DIM(5, 4);
// HANDLE_DIM(5, 3);
// HANDLE_DIM(5, 2);
// HANDLE_DIM(5, 1);
HANDLE_DIM(4, 3);
HANDLE_DIM(4, 2);
HANDLE_DIM(4, 1);
......
......@@ -49,13 +49,13 @@ class SendBarrierOp : public framework::OperatorBase {
VLOG(3) << "SendBarrierOp sync_mode:" << sync_mode;
// need to wait before sending send_barrier message
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
if (sync_mode) {
for (auto& ep : eps) {
VLOG(3) << "send barrier, ep: " << ep;
rpc_client->AsyncSendBatchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
}
};
......
......@@ -59,14 +59,14 @@ class SendOp : public framework::OperatorBase {
VLOG(3) << "don't send no-initialied variable: " << ins[i];
}
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
if (sync_mode) {
for (auto& ep : endpoints) {
VLOG(3) << "batch barrier, ep: " << ep;
rpc_client->AsyncSendBatchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
if (outs.size() > 0) {
......@@ -74,13 +74,13 @@ class SendOp : public framework::OperatorBase {
VLOG(2) << "getting " << outs[i] << " from " << epmap[i];
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
// tell pservers that current trainer have called fetch
for (auto& ep : endpoints) {
VLOG(2) << "send fetch barrier, ep: " << ep;
rpc_client->AsyncSendFetchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
}
};
......
......@@ -61,7 +61,6 @@ void StartServer() {
std::bind(&detail::AsyncGRPCServer::StartServer, g_rpc_service.get()));
g_rpc_service->SetCond(detail::kRequestSend);
std::cout << "before WaitFanInOfSend" << std::endl;
g_rpc_service->WaitBarrier(detail::kRequestSend);
LOG(INFO) << "got nccl id and stop server...";
......@@ -88,12 +87,12 @@ TEST(SendNcclId, GrpcServer) {
int port = g_rpc_service->GetSelectedPort();
std::string ep = string::Sprintf("127.0.0.1:%d", port);
detail::RPCClient client;
LOG(INFO) << "connect to server" << ep;
client.AsyncSendVariable(ep, dev_ctx, scope, NCCL_ID_VARNAME);
client.Wait();
client.AsyncSendBatchBarrier(ep);
client.Wait();
detail::RPCClient* client = detail::RPCClient::GetInstance();
LOG(INFO) << "connect to server " << ep;
client->AsyncSendVariable(ep, dev_ctx, scope, NCCL_ID_VARNAME);
client->Wait();
client->AsyncSendBatchBarrier(ep);
client->Wait();
server_thread.join();
g_rpc_service.reset(nullptr);
......
......@@ -22,6 +22,8 @@ limitations under the License. */
#include "paddle/fluid/platform/float16.h"
#include "paddle/fluid/platform/macros.h"
DECLARE_bool(cudnn_deterministic);
namespace paddle {
namespace platform {
......@@ -76,8 +78,22 @@ enum class DataLayout { // Not use
enum class PoolingMode {
kMaximum,
kAverage,
kMaximumDeterministic,
};
inline cudnnPoolingMode_t GetPoolingMode(const PoolingMode& mode) {
switch (mode) {
case PoolingMode::kMaximumDeterministic:
return CUDNN_POOLING_MAX_DETERMINISTIC;
case PoolingMode::kAverage:
return CUDNN_POOLING_AVERAGE_COUNT_EXCLUDE_PADDING;
case PoolingMode::kMaximum:
return CUDNN_POOLING_MAX;
default:
PADDLE_THROW("Unexpected pooling mode.");
}
}
template <typename T>
class CudnnDataType;
......@@ -293,9 +309,7 @@ class ScopedPoolingDescriptor {
PADDLE_ENFORCE_EQ(kernel.size(), pads.size());
PADDLE_ENFORCE_EQ(kernel.size(), strides.size());
PADDLE_ENFORCE(dynload::cudnnSetPoolingNdDescriptor(
desc_, (mode == PoolingMode::kMaximum
? CUDNN_POOLING_MAX
: CUDNN_POOLING_AVERAGE_COUNT_EXCLUDE_PADDING),
desc_, (GetPoolingMode(mode)),
CUDNN_PROPAGATE_NAN, // Always propagate nans.
kernel.size(), kernel.data(), pads.data(), strides.data()));
return desc_;
......
......@@ -120,7 +120,7 @@ def __bootstrap__():
]
if core.is_compiled_with_cuda():
read_env_flags += [
'fraction_of_gpu_memory_to_use', 'cudnn_algo_use_autotune'
'fraction_of_gpu_memory_to_use', 'cudnn_deterministic'
]
core.init_gflags([sys.argv[0]] +
["--tryfromenv=" + ",".join(read_env_flags)])
......
......@@ -120,6 +120,32 @@ class TestBoxCoderOp(OpTest):
self.outputs = {'OutputBox': output_box}
class TestBoxCoderOpWithoutBoxVar(OpTest):
def test_check_output(self):
self.check_output()
def setUp(self):
self.op_type = "box_coder"
lod = [[0, 1, 2, 3, 4, 5]]
prior_box = np.random.random((10, 4)).astype('float32')
prior_box_var = np.ones((10, 4)).astype('float32')
target_box = np.random.random((5, 10, 4)).astype('float32')
code_type = "DecodeCenterSize"
box_normalized = False
output_box = batch_box_coder(prior_box, prior_box_var, target_box,
lod[0], code_type, box_normalized)
self.inputs = {
'PriorBox': prior_box,
'TargetBox': target_box,
}
self.attrs = {
'code_type': 'decode_center_size',
'box_normalized': False
}
self.outputs = {'OutputBox': output_box}
class TestBoxCoderOpWithLoD(OpTest):
def test_check_output(self):
self.check_output()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册