提交 a764e900 编写于 作者: T tensor-tang

Merge remote-tracking branch 'ups/develop' into fea/jit/gen

test=develop
......@@ -2,8 +2,8 @@
[![Build Status](https://travis-ci.org/PaddlePaddle/Paddle.svg?branch=develop)](https://travis-ci.org/PaddlePaddle/Paddle)
[![Documentation Status](https://img.shields.io/badge/docs-latest-brightgreen.svg?style=flat)](http://paddlepaddle.org/documentation/docs/en/1.0/getstarted/index_en.html)
[![Documentation Status](https://img.shields.io/badge/中文文档-最新-brightgreen.svg)](http://paddlepaddle.org/documentation/docs/zh/1.0/beginners_guide/index.html)
[![Documentation Status](https://img.shields.io/badge/docs-latest-brightgreen.svg?style=flat)](http://paddlepaddle.org/documentation/docs/en/1.1/getstarted/index_en.html)
[![Documentation Status](https://img.shields.io/badge/中文文档-最新-brightgreen.svg)](http://paddlepaddle.org/documentation/docs/zh/1.1/beginners_guide/index.html)
[![Release](https://img.shields.io/github/release/PaddlePaddle/Paddle.svg)](https://github.com/PaddlePaddle/Paddle/releases)
[![License](https://img.shields.io/badge/license-Apache%202-blue.svg)](LICENSE)
......@@ -19,7 +19,7 @@ Our vision is to enable deep learning for everyone via PaddlePaddle.
Please refer to our [release announcement](https://github.com/PaddlePaddle/Paddle/releases) to track the latest feature of PaddlePaddle.
### Latest PaddlePaddle Release: [Fluid 1.0.1](https://github.com/PaddlePaddle/Paddle/tree/release/1.0.0)
### Latest PaddlePaddle Release: [Fluid 1.1.0](https://github.com/PaddlePaddle/Paddle/tree/release/1.1)
### Install Latest Stable Release:
```
# Linux CPU
......@@ -27,9 +27,9 @@ pip install paddlepaddle
# Linux GPU cuda9cudnn7
pip install paddlepaddle-gpu
# Linux GPU cuda8cudnn7
pip install paddlepaddle-gpu==1.0.1.post87
pip install paddlepaddle-gpu==1.1.0.post87
# Linux GPU cuda8cudnn5
pip install paddlepaddle-gpu==1.0.1.post85
pip install paddlepaddle-gpu==1.1.0.post85
# For installation on other platform, refer to http://paddlepaddle.org/
```
......@@ -76,26 +76,26 @@ pip install paddlepaddle-gpu==1.0.1.post85
## Installation
It is recommended to read [this doc](http://paddlepaddle.org/documentation/docs/zh/1.0/beginners_guide/index.html) on our website.
It is recommended to read [this doc](http://paddlepaddle.org/documentation/docs/zh/1.1/beginners_guide/index.html) on our website.
## Documentation
We provide [English](http://paddlepaddle.org/documentation/docs/en/1.0.0/getstarted/index_en.html) and
[Chinese](http://paddlepaddle.org/documentation/docs/zh/1.0/beginners_guide/index.html) documentation.
We provide [English](http://paddlepaddle.org/documentation/docs/en/1.1/getstarted/index_en.html) and
[Chinese](http://paddlepaddle.org/documentation/docs/zh/1.1/beginners_guide/index.html) documentation.
- [Deep Learning 101](https://github.com/PaddlePaddle/book)
You might want to start from this online interactive book that can run in a Jupyter Notebook.
- [Distributed Training](http://paddlepaddle.org/documentation/docs/zh/1.0/user_guides/howto/training/cluster_howto.html)
- [Distributed Training](http://paddlepaddle.org/documentation/docs/zh/1.1/user_guides/howto/training/cluster_howto.html)
You can run distributed training jobs on MPI clusters.
- [Python API](http://paddlepaddle.org/documentation/api/zh/1.0/fluid.html)
- [Python API](http://paddlepaddle.org/documentation/api/zh/1.1/fluid.html)
Our new API enables much shorter programs.
- [How to Contribute](http://paddlepaddle.org/documentation/docs/zh/1.0/advanced_usage/development/contribute_to_paddle.html)
- [How to Contribute](http://paddlepaddle.org/documentation/docs/zh/1.1/advanced_usage/development/contribute_to_paddle.html)
We appreciate your contributions!
......
......@@ -56,6 +56,7 @@ cc_library(scope_buffered_ssa_graph_executor SRCS scope_buffered_ssa_graph_execu
# device_context reduce_op_handle )
cc_library(fast_threaded_ssa_graph_executor SRCS fast_threaded_ssa_graph_executor.cc
DEPS fetch_op_handle ssa_graph_executor scope simple_threadpool device_context)
cc_test(fused_broadcast_op_test SRCS fused_broadcast_op_handle_test.cc DEPS fused_broadcast_op_handle)
cc_library(build_strategy SRCS build_strategy.cc DEPS
graph_viz_pass multi_devices_graph_pass
......
......@@ -12,232 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/broadcast_op_handle.h"
#include "gtest/gtest.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/framework/details/broadcast_op_handle_test.h"
namespace paddle {
namespace framework {
namespace details {
namespace f = paddle::framework;
namespace p = paddle::platform;
// test data amount
const f::DDim kDims = {20, 20};
struct TestBroadcastOpHandle {
std::vector<std::unique_ptr<p::DeviceContext>> ctxs_;
std::vector<Scope*> local_scopes_;
std::vector<Scope*> param_scopes_;
Scope g_scope_;
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
std::vector<p::Place> gpu_list_;
bool use_gpu_;
#ifdef PADDLE_WITH_CUDA
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
#endif
void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
}
#ifdef PADDLE_WITH_CUDA
if (nccl_ctxs_) {
nccl_ctxs_->WaitAll();
}
#endif
}
void InitCtxOnGpu(bool use_gpu) {
use_gpu_ = use_gpu;
if (use_gpu_) {
#ifdef PADDLE_WITH_CUDA
int count = p::GetCUDADeviceCount();
if (count <= 1) {
LOG(WARNING) << "Cannot test multi-gpu Broadcast, because the CUDA "
"device count is "
<< count;
exit(0);
}
for (int i = 0; i < count; ++i) {
auto p = p::CUDAPlace(i);
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CUDADeviceContext(p));
}
nccl_ctxs_.reset(new platform::NCCLContextMap(gpu_list_));
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
int count = 8;
for (int i = 0; i < count; ++i) {
auto p = p::CPUPlace();
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CPUDeviceContext(p));
}
#ifdef PADDLE_WITH_CUDA
nccl_ctxs_.reset(nullptr);
#endif
}
}
void InitBroadcastOp(size_t input_scope_idx) {
for (size_t j = 0; j < gpu_list_.size(); ++j) {
local_scopes_.push_back(&(g_scope_.NewScope()));
Scope& local_scope = local_scopes_.back()->NewScope();
*local_scopes_.back()
->Var(details::kLocalExecScopeName)
->GetMutable<Scope*>() = &local_scope;
local_scope.Var("out");
param_scopes_.emplace_back(&local_scope);
}
param_scopes_[input_scope_idx]->Var("input");
std::unique_ptr<ir::Node> n =
ir::CreateNodeForTest("node0", ir::Node::Type::kOperation);
if (use_gpu_) {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(new BroadcastOpHandle(n.get(), local_scopes_, gpu_list_,
nccl_ctxs_.get()));
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(new BroadcastOpHandle(n.get(), local_scopes_, gpu_list_,
nccl_ctxs_.get()));
#else
op_handle_.reset(
new BroadcastOpHandle(n.get(), local_scopes_, gpu_list_));
#endif
}
std::unique_ptr<ir::Node> v =
ir::CreateNodeForTest("node1", ir::Node::Type::kVariable);
auto* in_var_handle = new VarHandle(v.get(), 1, input_scope_idx, "input",
gpu_list_[input_scope_idx]);
vars_.emplace_back(in_var_handle);
op_handle_->AddInput(in_var_handle);
// add dummy var
std::unique_ptr<ir::Node> v2 =
ir::CreateNodeForTest("node2", ir::Node::Type::kVariable);
vars_.emplace_back(new DummyVarHandle(v2.get()));
DummyVarHandle* dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
dummy_var_handle->ClearGeneratedOp();
op_handle_->AddInput(dummy_var_handle);
for (size_t j = 0; j < gpu_list_.size(); ++j) {
if (!use_gpu_) {
op_handle_->SetDeviceContext(gpu_list_[j], ctxs_[j].get());
}
std::unique_ptr<ir::Node> v3 =
ir::CreateNodeForTest("node3", ir::Node::Type::kVariable);
VarHandle* out_var_handle =
new VarHandle(v3.get(), 2, j, "out", gpu_list_[j]);
vars_.emplace_back(out_var_handle);
op_handle_->AddOutput(out_var_handle);
}
// add dummy var
std::unique_ptr<ir::Node> v4 =
ir::CreateNodeForTest("node4", ir::Node::Type::kVariable);
vars_.emplace_back(new DummyVarHandle(v4.get()));
DummyVarHandle* out_dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
out_dummy_var_handle->ClearGeneratedOp();
op_handle_->AddOutput(out_dummy_var_handle);
}
void TestBroadcastLodTensor(size_t input_scope_idx) {
auto in_var = param_scopes_[input_scope_idx]->FindVar("input");
PADDLE_ENFORCE_NOT_NULL(in_var);
auto in_lod_tensor = in_var->GetMutable<f::LoDTensor>();
in_lod_tensor->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
std::vector<float> send_vector(static_cast<size_t>(f::product(kDims)));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k;
}
f::LoD lod{{0, 10, 20}};
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), in_lod_tensor);
in_lod_tensor->set_lod(lod);
in_lod_tensor->Resize(kDims);
op_handle_->Run(false);
WaitAll();
p::CPUPlace cpu_place;
for (size_t j = 0; j < gpu_list_.size(); ++j) {
auto out_var = param_scopes_[j]->FindVar("out");
PADDLE_ENFORCE_NOT_NULL(out_var);
auto out_tensor = out_var->Get<f::LoDTensor>();
PADDLE_ENFORCE_EQ(out_tensor.lod(), lod, "lod is not equal.");
f::Tensor result_tensor;
f::TensorCopySync(out_tensor, cpu_place, &result_tensor);
float* ct = result_tensor.mutable_data<float>(cpu_place);
for (int64_t i = 0; i < f::product(kDims); ++i) {
ASSERT_NEAR(ct[i], send_vector[i], 1e-5);
}
}
}
void TestBroadcastSelectedRows(size_t input_scope_idx) {
auto in_var = param_scopes_[input_scope_idx]->FindVar("input");
PADDLE_ENFORCE_NOT_NULL(in_var);
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();
auto value = in_selected_rows->mutable_value();
value->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
int height = static_cast<int>(kDims[0]) * 2;
std::vector<int64_t> rows{0, 1, 2, 3, 3, 0, 14, 7, 3, 1,
2, 4, 6, 3, 1, 1, 1, 1, 3, 7};
in_selected_rows->set_height(height);
in_selected_rows->set_rows(rows);
std::vector<float> send_vector(static_cast<size_t>(f::product(kDims)));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k;
}
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), value);
op_handle_->Run(false);
WaitAll();
p::CPUPlace cpu_place;
for (size_t j = 0; j < gpu_list_.size(); ++j) {
auto out_var = param_scopes_[j]->FindVar("out");
PADDLE_ENFORCE_NOT_NULL(out_var);
auto& out_select_rows = out_var->Get<f::SelectedRows>();
auto rt = out_select_rows.value();
PADDLE_ENFORCE_EQ(out_select_rows.height(), height,
"height is not equal.");
for (size_t k = 0; k < out_select_rows.rows().size(); ++k) {
PADDLE_ENFORCE_EQ(out_select_rows.rows()[k], rows[k]);
}
f::Tensor result_tensor;
f::TensorCopySync(rt, cpu_place, &result_tensor);
float* ct = result_tensor.data<float>();
for (int64_t i = 0; i < f::product(kDims); ++i) {
ASSERT_NEAR(ct[i], send_vector[i], 1e-5);
}
}
}
};
TEST(BroadcastTester, TestCPUBroadcastTestLodTensor) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "gtest/gtest.h"
#include "paddle/fluid/framework/details/broadcast_op_handle.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
namespace details {
namespace f = paddle::framework;
namespace p = paddle::platform;
// test data amount
const f::DDim kDims = {20, 20};
struct TestBroadcastOpHandle {
std::vector<std::unique_ptr<p::DeviceContext>> ctxs_;
std::vector<Scope*> local_scopes_;
std::vector<Scope*> param_scopes_;
Scope g_scope_;
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
std::vector<p::Place> place_list_;
bool use_gpu_;
#ifdef PADDLE_WITH_CUDA
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
#endif
void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
}
#ifdef PADDLE_WITH_CUDA
if (nccl_ctxs_) {
nccl_ctxs_->WaitAll();
}
#endif
}
void InitCtxOnGpu(bool use_gpu) {
use_gpu_ = use_gpu;
if (use_gpu_) {
#ifdef PADDLE_WITH_CUDA
int count = p::GetCUDADeviceCount();
if (count <= 1) {
LOG(WARNING) << "Cannot test multi-gpu Broadcast, because the CUDA "
"device count is "
<< count;
exit(0);
}
for (int i = 0; i < count; ++i) {
auto p = p::CUDAPlace(i);
place_list_.push_back(p);
ctxs_.emplace_back(new p::CUDADeviceContext(p));
}
nccl_ctxs_.reset(new platform::NCCLContextMap(place_list_));
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
int count = 8;
for (int i = 0; i < count; ++i) {
auto p = p::CPUPlace();
place_list_.push_back(p);
ctxs_.emplace_back(new p::CPUDeviceContext(p));
}
#ifdef PADDLE_WITH_CUDA
nccl_ctxs_.reset(nullptr);
#endif
}
}
void InitBroadcastOp(size_t input_scope_idx) {
for (size_t j = 0; j < place_list_.size(); ++j) {
local_scopes_.push_back(&(g_scope_.NewScope()));
Scope& local_scope = local_scopes_.back()->NewScope();
*local_scopes_.back()
->Var(details::kLocalExecScopeName)
->GetMutable<Scope*>() = &local_scope;
local_scope.Var("out");
param_scopes_.emplace_back(&local_scope);
}
param_scopes_[input_scope_idx]->Var("input");
std::unique_ptr<ir::Node> n =
ir::CreateNodeForTest("node0", ir::Node::Type::kOperation);
if (use_gpu_) {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(new BroadcastOpHandle(n.get(), local_scopes_,
place_list_, nccl_ctxs_.get()));
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(new BroadcastOpHandle(n.get(), local_scopes_,
place_list_, nccl_ctxs_.get()));
#else
op_handle_.reset(
new BroadcastOpHandle(n.get(), local_scopes_, place_list_));
#endif
}
std::unique_ptr<ir::Node> v =
ir::CreateNodeForTest("node1", ir::Node::Type::kVariable);
auto* in_var_handle = new VarHandle(v.get(), 1, input_scope_idx, "input",
place_list_[input_scope_idx]);
vars_.emplace_back(in_var_handle);
op_handle_->AddInput(in_var_handle);
// add dummy var
std::unique_ptr<ir::Node> v2 =
ir::CreateNodeForTest("node2", ir::Node::Type::kVariable);
vars_.emplace_back(new DummyVarHandle(v2.get()));
DummyVarHandle* dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
dummy_var_handle->ClearGeneratedOp();
op_handle_->AddInput(dummy_var_handle);
for (size_t j = 0; j < place_list_.size(); ++j) {
if (!use_gpu_) {
op_handle_->SetDeviceContext(place_list_[j], ctxs_[j].get());
}
std::unique_ptr<ir::Node> v3 =
ir::CreateNodeForTest("node3", ir::Node::Type::kVariable);
VarHandle* out_var_handle =
new VarHandle(v3.get(), 2, j, "out", place_list_[j]);
vars_.emplace_back(out_var_handle);
op_handle_->AddOutput(out_var_handle);
}
// add dummy var
std::unique_ptr<ir::Node> v4 =
ir::CreateNodeForTest("node4", ir::Node::Type::kVariable);
vars_.emplace_back(new DummyVarHandle(v4.get()));
DummyVarHandle* out_dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
out_dummy_var_handle->ClearGeneratedOp();
op_handle_->AddOutput(out_dummy_var_handle);
}
std::vector<float> InitLoDTensor(const std::string& varname,
size_t input_scope_idx, const f::LoD& lod,
float val_scalar = 0.0) {
auto var = param_scopes_[input_scope_idx]->FindVar(varname);
PADDLE_ENFORCE_NOT_NULL(var);
auto lod_tensor = var->GetMutable<f::LoDTensor>();
std::vector<float> send_vector(static_cast<size_t>(f::product(kDims)));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k + val_scalar;
}
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), lod_tensor);
lod_tensor->set_lod(lod);
lod_tensor->Resize(kDims);
return send_vector;
}
std::vector<float> InitSelectedRows(const std::string& varname,
size_t input_scope_idx,
const std::vector<int64_t>& rows,
int height, float value_scalar = 0.0) {
std::vector<float> send_vector(static_cast<size_t>(f::product(kDims)));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k + value_scalar;
}
auto var = param_scopes_[input_scope_idx]->FindVar(varname);
PADDLE_ENFORCE_NOT_NULL(var);
auto selected_rows = var->GetMutable<f::SelectedRows>();
auto value = selected_rows->mutable_value();
value->mutable_data<float>(kDims, place_list_[input_scope_idx]);
selected_rows->set_height(height);
selected_rows->set_rows(rows);
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), value);
return send_vector;
}
void SelectedRowsEqual(const std::string& varname, int input_scope_idx,
const std::vector<float>& send_vector,
const std::vector<int64_t>& rows, int height) {
auto var = param_scopes_[input_scope_idx]->FindVar(varname);
PADDLE_ENFORCE_NOT_NULL(var);
auto& selected_rows = var->Get<f::SelectedRows>();
auto rt = selected_rows.value();
PADDLE_ENFORCE_EQ(selected_rows.height(), height, "height is not equal.");
for (size_t k = 0; k < selected_rows.rows().size(); ++k) {
PADDLE_ENFORCE_EQ(selected_rows.rows()[k], rows[k]);
}
p::CPUPlace cpu_place;
f::Tensor result_tensor;
f::TensorCopySync(rt, cpu_place, &result_tensor);
float* ct = result_tensor.data<float>();
for (int64_t i = 0; i < f::product(kDims); ++i) {
ASSERT_NEAR(ct[i], send_vector[i], 1e-5);
}
}
void LoDTensorEqual(const std::string& varname,
const std::vector<float>& send_vec, const f::LoD& lod,
framework::Scope* scope) {
p::CPUPlace cpu_place;
auto var = scope->FindVar(varname);
PADDLE_ENFORCE_NOT_NULL(var);
auto tensor = var->Get<f::LoDTensor>();
PADDLE_ENFORCE_EQ(tensor.lod(), lod, "lod is not equal.");
f::Tensor result_tensor;
f::TensorCopySync(tensor, cpu_place, &result_tensor);
float* ct = result_tensor.mutable_data<float>(cpu_place);
for (int64_t k = 0; k < f::product(kDims); ++k) {
ASSERT_NEAR(ct[k], send_vec[k], 1e-5);
}
}
void TestBroadcastLodTensor(size_t input_scope_idx) {
f::LoD lod{{0, 10, 20}};
auto send_vector = InitLoDTensor("input", input_scope_idx, lod);
op_handle_->Run(false);
WaitAll();
for (size_t j = 0; j < place_list_.size(); ++j) {
LoDTensorEqual("out", send_vector, lod, param_scopes_[j]);
}
}
void TestBroadcastSelectedRows(size_t input_scope_idx) {
std::vector<int64_t> rows{0, 1, 2, 3, 3, 0, 14, 7, 3, 1,
2, 4, 6, 3, 1, 1, 1, 1, 3, 7};
int height = static_cast<int>(kDims[0] * 2);
auto send_vector = InitSelectedRows("input", input_scope_idx, rows, height);
op_handle_->Run(false);
WaitAll();
for (size_t j = 0; j < place_list_.size(); ++j) {
SelectedRowsEqual("out", input_scope_idx, send_vector, rows, height);
}
}
};
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -92,13 +92,13 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run(
size_t num_complete = 0;
remaining_ = 0;
BlockingQueue<size_t> complete_q;
auto complete_q = std::make_shared<BlockingQueue<size_t>>();
for (auto op : bootstrap_ops_) {
RunOpAsync(op_deps.get(), op, &complete_q);
RunOpAsync(op_deps.get(), op, complete_q);
}
while (num_complete != op_deps->size()) {
size_t num_comp = complete_q.Pop();
size_t num_comp = complete_q->Pop();
if (num_comp == -1UL) {
int remaining = 0;
while (true) {
......@@ -107,7 +107,7 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run(
break;
}
for (int i = 0; i < remaining; ++i) {
complete_q.Pop();
complete_q->Pop();
}
}
exception_.ReThrow();
......@@ -120,7 +120,8 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run(
}
void FastThreadedSSAGraphExecutor::RunOpAsync(
std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps,
OpHandleBase *op, BlockingQueue<size_t> *complete_q) {
OpHandleBase *op,
const std::shared_ptr<BlockingQueue<size_t>> &complete_q) {
++remaining_;
this->pool_.enqueue([=] {
OpHandleBase *op_to_run = op;
......@@ -144,7 +145,7 @@ void FastThreadedSSAGraphExecutor::RunOpAsync(
if (op_to_run == nullptr) {
op_to_run = pending_op;
} else {
this->RunOpAsync(op_deps, pending_op, complete_q);
RunOpAsync(op_deps, pending_op, complete_q);
}
}
}
......@@ -156,8 +157,7 @@ void FastThreadedSSAGraphExecutor::RunOpAsync(
}
void FastThreadedSSAGraphExecutor::PrepareAtomicOpDeps() {
atomic_op_deps_ = pool_.enqueue([&] {
std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps =
new std::unordered_map<OpHandleBase *, std::atomic<int>>;
auto *op_deps = new std::unordered_map<OpHandleBase *, std::atomic<int>>;
for (auto &pair : op_deps_) {
(*op_deps)[pair.first] = pair.second;
}
......
......@@ -50,7 +50,8 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::atomic<int> remaining_;
void RunOpAsync(std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps,
OpHandleBase *op, BlockingQueue<size_t> *complete_q);
OpHandleBase *op,
const std::shared_ptr<BlockingQueue<size_t>> &complete_q);
void PrepareAtomicOpDeps();
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/fused_broadcast_op_handle.h"
#include "gtest/gtest.h"
#include "paddle/fluid/framework/details/broadcast_op_handle_test.h"
namespace paddle {
namespace framework {
namespace details {
struct TestFusedBroadcastOpHandle : TestBroadcastOpHandle {
std::vector<std::string> out_varnames_;
void InitFusedBroadcastOp(std::vector<size_t> input_scope_idxes) {
// initialize scope and var
for (size_t i = 0; i < place_list_.size(); ++i) {
local_scopes_.push_back(&(g_scope_.NewScope()));
Scope& local_scope = local_scopes_.back()->NewScope();
*local_scopes_.back()
->Var(details::kLocalExecScopeName)
->GetMutable<Scope*>() = &local_scope;
for (size_t j = 0; j < input_scope_idxes.size(); ++j) {
local_scope.Var("out_var" + j);
if (i == j) local_scope.Var("in_var" + j);
}
param_scopes_.emplace_back(&local_scope);
}
// create op handle node
std::unique_ptr<ir::Node> n =
ir::CreateNodeForTest("fused_broadcast", ir::Node::Type::kOperation);
if (use_gpu_) {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(new FusedBroadcastOpHandle(
n.get(), local_scopes_, place_list_, nccl_ctxs_.get()));
#else
PADDLE_THROW("CUDA is not supported.");
#endif
} else {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(new FusedBroadcastOpHandle(
n.get(), local_scopes_, place_list_, nccl_ctxs_.get()));
#else
op_handle_.reset(
new FusedBroadcastOpHandle(n.get(), local_scopes_, place_list_));
#endif
}
for (size_t i = 0; i < input_scope_idxes.size(); ++i) {
// add input var handle
std::unique_ptr<ir::Node> in_node =
ir::CreateNodeForTest("in_node" + i, ir::Node::Type::kVariable);
VarHandle* in_var_handle =
new VarHandle(in_node.get(), 1, input_scope_idxes[i], "in_var" + i,
place_list_[input_scope_idxes[i]]);
vars_.emplace_back(in_var_handle);
op_handle_->AddInput(in_var_handle);
// add output var handle
for (size_t j = 0; j < place_list_.size(); ++j) {
std::unique_ptr<ir::Node> out_node =
ir::CreateNodeForTest("out_node" + i, ir::Node::Type::kVariable);
VarHandle* out_var_handle =
new VarHandle(out_node.get(), 2, j, "out_var" + i, place_list_[j]);
vars_.emplace_back(out_var_handle);
op_handle_->AddOutput(out_var_handle);
}
}
}
void TestFusedBroadcastLoDTensor(std::vector<size_t> input_scope_idxes) {
std::vector<std::vector<float>> send_vec;
f::LoD lod{{0, 10, 20}};
for (size_t i = 0; i < input_scope_idxes.size(); ++i) {
const std::string varname("in_var" + i);
float val_scalar = static_cast<float>(i);
send_vec.push_back(
InitLoDTensor(varname, input_scope_idxes[i], lod, val_scalar));
}
op_handle_->Run(false);
WaitAll();
for (size_t i = 0; i < input_scope_idxes.size(); ++i) {
const std::string& varname("out_var" + i);
for (size_t j = 0; j < place_list_.size(); ++j) {
LoDTensorEqual(varname, send_vec[i], lod, param_scopes_[j]);
}
}
}
void TestFusedBroadcastSelectedRows(std::vector<size_t> input_scope_idxes) {
std::vector<std::vector<float>> send_vector;
std::vector<int64_t> rows{0, 1, 2, 3, 3, 0, 14, 7, 3, 1,
2, 4, 6, 3, 1, 1, 1, 1, 3, 7};
int height = static_cast<int>(kDims[0] * 2);
for (size_t i = 0; i < input_scope_idxes.size(); ++i) {
const std::string varname("in_var" + i);
float val_scalar = static_cast<float>(i);
send_vector.push_back(InitSelectedRows(varname, input_scope_idxes[i],
rows, height, val_scalar));
}
op_handle_->Run(false);
WaitAll();
for (size_t i = 0; i < input_scope_idxes.size(); ++i) {
const std::string& varname("out_var" + i);
for (size_t j = 0; j < place_list_.size(); ++j) {
SelectedRowsEqual(varname, input_scope_idxes[i], send_vector[i], rows,
height);
}
}
}
};
TEST(FusedBroadcastTester, CPULodTensor) {
TestFusedBroadcastOpHandle test_op;
std::vector<size_t> input_scope_idxes = {0, 1};
test_op.InitCtxOnGpu(false);
test_op.InitFusedBroadcastOp(input_scope_idxes);
test_op.TestFusedBroadcastLoDTensor(input_scope_idxes);
}
TEST(FusedBroadcastTester, CPUSelectedRows) {
TestFusedBroadcastOpHandle test_op;
std::vector<size_t> input_scope_idxes = {0, 1};
test_op.InitCtxOnGpu(false);
test_op.InitFusedBroadcastOp(input_scope_idxes);
test_op.TestFusedBroadcastSelectedRows(input_scope_idxes);
}
#ifdef PADDLE_WITH_CUDA
TEST(FusedBroadcastTester, GPULodTensor) {
TestFusedBroadcastOpHandle test_op;
std::vector<size_t> input_scope_idxes = {0, 1};
test_op.InitCtxOnGpu(true);
test_op.InitFusedBroadcastOp(input_scope_idxes);
test_op.TestFusedBroadcastLoDTensor(input_scope_idxes);
}
TEST(FusedBroadcastTester, GPUSelectedRows) {
TestFusedBroadcastOpHandle test_op;
std::vector<size_t> input_scope_idxes = {0, 1};
test_op.InitCtxOnGpu(true);
test_op.InitFusedBroadcastOp(input_scope_idxes);
test_op.TestFusedBroadcastSelectedRows(input_scope_idxes);
}
#endif
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -39,7 +39,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare", nullptr));
std::unordered_map<OpHandleBase *, size_t> pending_ops;
std::unordered_set<VarHandleBase *> pending_vars;
BlockingQueue<VarHandleBase *> ready_vars;
auto ready_vars = std::make_shared<BlockingQueue<VarHandleBase *>>();
std::unordered_set<OpHandleBase *> ready_ops;
// For ops (e.g. nccl_all_reduce) that need to coordinate multiple
// streams from multiple GPUs, it's faster to buffer them and schedule
......@@ -51,12 +51,12 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
for (auto &var_map : graph_->Get<details::GraphVars>(details::kGraphVars)) {
for (auto &name_pair : var_map) {
for (auto &version_pair : name_pair.second) {
InsertPendingVar(&pending_vars, &ready_vars, version_pair.get());
InsertPendingVar(&pending_vars, ready_vars.get(), version_pair.get());
}
}
}
for (auto &var : graph_->Get<details::GraphDepVars>(details::kGraphDepVars)) {
InsertPendingVar(&pending_vars, &ready_vars, var.get());
InsertPendingVar(&pending_vars, ready_vars.get(), var.get());
}
for (auto &op : graph_->Get<details::GraphOps>(details::kGraphOps)) {
......@@ -73,12 +73,12 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
FeedFetchList fetch_data(fetch_tensors.size());
InsertFetchOps(fetch_tensors, &fetch_ops, &fetch_dependencies, &pending_ops,
&pending_vars, &ready_vars, &fetch_data);
&pending_vars, ready_vars.get(), &fetch_data);
auto run_all_ops = [&](std::unordered_set<OpHandleBase *> &set) {
for (auto *op : set) {
running_ops_++;
RunOp(&ready_vars, op);
RunOp(ready_vars, op);
}
set.clear();
};
......@@ -87,7 +87,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
run_op_futures_.clear();
exception_holder_.Clear();
event.reset(nullptr);
// Step 3. Execution
while (!pending_vars.empty()) {
// 1. Run All Ready ops
......@@ -103,7 +102,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
// 2. Find ready variable
bool timeout;
auto cur_ready_vars = ready_vars.PopAll(1, &timeout);
auto cur_ready_vars = ready_vars->PopAll(1, &timeout);
if (timeout) {
if (exception_holder_.IsCaught()) {
......@@ -133,7 +132,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
}
}
PADDLE_ENFORCE(ready_ops.empty());
// Wait FetchOps.
ClearFetchOp(graph_.get(), &fetch_ops);
......@@ -206,7 +204,8 @@ void ThreadedSSAGraphExecutor::InsertPendingVar(
}
void ThreadedSSAGraphExecutor::RunOp(
BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) {
const std::shared_ptr<BlockingQueue<VarHandleBase *>> &ready_var_q,
details::OpHandleBase *op) {
auto op_run = [ready_var_q, op, this] {
try {
if (VLOG_IS_ON(10)) {
......
......@@ -51,7 +51,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
~ThreadedSSAGraphExecutor() {}
private:
void RunOp(BlockingQueue<VarHandleBase *> *ready_var_q,
void RunOp(const std::shared_ptr<BlockingQueue<VarHandleBase *>> &ready_var_q,
details::OpHandleBase *op);
private:
......
......@@ -354,18 +354,18 @@ void OperatorBase::GenerateTemporaryNames() {
}
}
static bool VarIsTensor(const Variable* var) {
return var->IsType<LoDTensor>() || var->IsType<SelectedRows>();
static bool VarIsTensor(const Variable& var) {
return var.IsType<LoDTensor>() || var.IsType<SelectedRows>();
}
const Tensor* GetTensorFromVar(Variable* var) {
if (var->IsType<LoDTensor>()) {
return var->GetMutable<LoDTensor>();
} else if (var->IsType<SelectedRows>()) {
return var->GetMutable<SelectedRows>()->mutable_value();
const Tensor* GetTensorFromVar(const Variable& var) {
if (var.IsType<LoDTensor>()) {
return static_cast<const Tensor*>(&(var.Get<LoDTensor>()));
} else if (var.IsType<SelectedRows>()) {
return &(var.Get<SelectedRows>().value());
} else {
PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.",
var->Type().name());
var.Type().name());
}
}
......@@ -415,8 +415,7 @@ bool ExecutionContext::HasOutput(const std::string& name) const {
template <>
const Tensor* ExecutionContext::Input<Tensor>(const std::string& name) const {
auto* var = InputVar(name);
return var == nullptr ? nullptr
: GetTensorFromVar(const_cast<Variable*>(var));
return var == nullptr ? nullptr : GetTensorFromVar(*var);
}
template <>
......@@ -428,7 +427,7 @@ const std::vector<const Tensor*> ExecutionContext::MultiInput<Tensor>(
std::transform(names.begin(), names.end(), std::back_inserter(res),
[&](const std::string& sub_name) {
auto var = scope_.FindVar(sub_name);
return var == nullptr ? nullptr : GetTensorFromVar(var);
return var == nullptr ? nullptr : GetTensorFromVar(*var);
});
return res;
}
......@@ -770,8 +769,10 @@ void OperatorWithKernel::TransferInplaceVarsBack(
for (auto& var_name : inplace_vars) {
VLOG(3) << "share inplace var " + var_name + " back to it's original scope";
auto* original_tensor = GetMutableTensorFromVar(scope.FindVar(var_name));
auto* transformed_tensor =
GetTensorFromVar(transfer_scope.FindVar(var_name));
auto* var = transfer_scope.FindVar(var_name);
PADDLE_ENFORCE(var != nullptr, "The var[%s] should not be nullptr",
var_name);
auto* transformed_tensor = GetTensorFromVar(*var);
original_tensor->ShareDataWith(*transformed_tensor);
}
}
......@@ -784,11 +785,11 @@ Scope* OperatorWithKernel::TryTransferData(
for (auto& var_name : var_name_item.second) {
auto* var = scope.FindVar(var_name);
// Only tensor can be tranfer to another device.
if (var == nullptr || !VarIsTensor(var)) {
if (var == nullptr || !VarIsTensor(*var)) {
continue;
}
auto* tensor_in = GetTensorFromVar(var);
auto* tensor_in = GetTensorFromVar(*var);
if (!tensor_in->IsInitialized()) {
continue;
}
......
......@@ -63,7 +63,7 @@ inline std::string GradVarName(const std::string& var_name) {
}
proto::VarType::Type GetDataTypeOfVar(const Variable* var);
const Tensor* GetTensorFromVar(Variable* var);
const Tensor* GetTensorFromVar(const Variable& var);
class OperatorBase;
class ExecutionContext;
......
......@@ -75,6 +75,19 @@ TEST(Tensor, MutableData) {
platform::CPUPlace());
EXPECT_EQ(p1, p2);
}
// Not sure if it's desired, but currently, Tensor type can be changed.
{
framework::Tensor src_tensor;
int8_t* p1 = src_tensor.mutable_data<int8_t>(framework::make_ddim({1}),
platform::CPUPlace());
EXPECT_NE(p1, nullptr);
*p1 = 1;
uint8_t* p2 = src_tensor.mutable_data<uint8_t>(framework::make_ddim({1}),
platform::CPUPlace());
EXPECT_NE(p2, nullptr);
EXPECT_EQ(static_cast<int>(p2[0]), 1);
}
#ifdef PADDLE_WITH_CUDA
{
......
......@@ -67,6 +67,7 @@ class SumOp : public framework::OperatorWithKernel {
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext& ctx) const override {
auto x_vars = ctx.MultiInputVar("X");
auto x_vars_name = ctx.Inputs("X");
framework::LibraryType library{framework::LibraryType::kPlain};
framework::DataLayout layout{framework::DataLayout::kAnyLayout};
......@@ -81,10 +82,11 @@ class SumOp : public framework::OperatorWithKernel {
if (x_vars[0]->IsType<framework::LoDTensor>()) {
int dtype = -1;
for (auto& x_var : x_vars) {
for (size_t idx = 0; idx < x_vars.size(); ++idx) {
PADDLE_ENFORCE(x_vars[idx] != nullptr,
"Input var[%s] should not be nullptr", x_vars_name[idx]);
// FIXME(zcd): The input x_var may be SelectedRows or LoDTensor.
auto tensor = framework::GetTensorFromVar(
const_cast<framework::Variable*>(x_var));
auto tensor = framework::GetTensorFromVar(*x_vars[idx]);
if (tensor->numel() == 0) {
continue;
}
......
......@@ -3020,7 +3020,8 @@ def sequence_pad(x, pad_value, maxlen=None, name=None):
x = fluid.layers.data(name='y', shape=[10, 5],
dtype='float32', lod_level=1)
pad_value = fluid.layers.assign(input=numpy.array([0]))
pad_value = fluid.layers.assign(
input=numpy.array([0], dtype=numpy.float32))
out = fluid.layers.sequence_pad(x=x, pad_value=pad_value)
"""
......
......@@ -283,6 +283,25 @@ class TestDecayedAdagrad(TranspilerTest):
trainer, _ = self.get_trainer()
class TestFtrl(TranspilerTest):
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=fluid.ParamAttr(name='fc_b'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
opt = fluid.optimizer.Ftrl(learning_rate=0.1)
opt.minimize(avg_cost)
def transpiler_test_impl(self):
pserver, startup = self.get_pserver(self.pserver1_ep)
trainer, _ = self.get_trainer()
class TestLRDecayConditional(TranspilerTest):
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
......@@ -405,18 +424,43 @@ class TestL2DecayWithPiecewise(TranspilerTest):
["sum", "scale", "scale", "elementwise_add", "momentum"])
class TestEmptyPserverOptimizeBlocks(TranspilerTest):
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
# only one parameter
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=False)
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=1.0)
sgd_optimizer.minimize(avg_cost)
def transpiler_test_impl(self):
config = fluid.DistributeTranspilerConfig()
config.slice_var_up = False
pserver, startup = self.get_pserver(ep=self.pserver2_ep, config=config)
self.assertEqual(len(pserver.blocks), 2)
self.assertEqual(len(pserver.blocks[1].ops), 0)
class TestDistLookupTableBase(TranspilerTest):
def network_with_table(self, is_sparse, is_distributed):
self.table_size = 1000
self.emb_size = 64
self.lookup_table_name = 'shared_w'
def emb_pool(ids):
def emb_pool(ids, table_name, is_distributed):
emb = fluid.layers.embedding(
input=ids,
size=[self.table_size, self.emb_size],
dtype='float32',
param_attr=self.lookup_table_name, # share parameter
param_attr=table_name,
is_sparse=is_sparse,
is_distributed=is_distributed)
pool = fluid.layers.sequence_pool(input=emb, pool_type='average')
......@@ -426,9 +470,13 @@ class TestDistLookupTableBase(TranspilerTest):
name='title_ids', shape=[1], dtype='int64', lod_level=1)
brand_ids = fluid.layers.data(
name='brand_ids', shape=[1], dtype='int64', lod_level=1)
title_emb = emb_pool(title_ids)
brand_emb = emb_pool(brand_ids)
fc0 = fluid.layers.concat(input=[title_emb, brand_emb], axis=1)
profile_ids = fluid.layers.data(
name='brand_ids', shape=[1], dtype='int64', lod_level=1)
title_emb = emb_pool(title_ids, self.lookup_table_name, is_distributed)
brand_emb = emb_pool(brand_ids, self.lookup_table_name, is_distributed)
profile_emb = emb_pool(profile_ids, "profile_emb", False)
fc0 = fluid.layers.concat(
input=[title_emb, brand_emb, profile_emb], axis=1)
predict = fluid.layers.fc(input=fc0,
size=2,
act=None,
......@@ -449,7 +497,7 @@ class TestLocalLookupTable(TestDistLookupTableBase):
def transpiler_test_impl(self):
pserver1, startup1 = self.get_pserver(self.pserver1_ep)
self.assertEqual(len(pserver1.blocks), 3)
self.assertEqual(len(pserver1.blocks), 4)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self.assertEqual([op.type for op in pserver1.blocks[1].ops],
......@@ -459,16 +507,23 @@ class TestLocalLookupTable(TestDistLookupTableBase):
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
["sum", "scale", "adam", "scale", "scale"])
# 3 optimize for table 2 adam
# NOTE: if param is not selected rows, the grad will scaled to grad / trainer_num
self.assertEqual([op.type for op in pserver1.blocks[3].ops],
["sum", "scale", "adam", "scale", "scale"])
trainer, _ = self.get_trainer()
self.assertEqual(len(trainer.blocks), 1)
ops = [
'lookup_table', 'sequence_pool', 'lookup_table', 'sequence_pool',
'concat', 'mul', 'elementwise_add', 'cross_entropy', 'mean',
'fill_constant', 'mean_grad', 'cross_entropy_grad',
'elementwise_add_grad', 'send', 'mul_grad', 'send', 'concat_grad',
'sequence_pool_grad', 'lookup_table_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sum', 'split_selected_rows', 'send',
'send_barrier', 'recv', 'recv', 'recv', 'fetch_barrier', 'concat'
'lookup_table', 'sequence_pool', 'concat', 'mul', 'elementwise_add',
'cross_entropy', 'mean', 'fill_constant', 'mean_grad',
'cross_entropy_grad', 'elementwise_add_grad', 'send', 'mul_grad',
'send', 'concat_grad', 'sequence_pool_grad', 'lookup_table_grad',
'split_selected_rows', 'send', 'sequence_pool_grad',
'lookup_table_grad', 'sequence_pool_grad', 'lookup_table_grad',
'sum', 'split_selected_rows', 'send', 'send_barrier', 'recv',
'recv', 'recv', 'recv', 'fetch_barrier', 'concat', 'concat'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
......@@ -480,39 +535,45 @@ class TestDistLookupTable(TestDistLookupTableBase):
def transpiler_test_impl(self):
pserver1, startup1 = self.get_pserver(self.pserver1_ep)
self.assertEqual(len(pserver1.blocks), 5)
self.assertEqual(len(pserver1.blocks), 6)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self.assertEqual([op.type for op in pserver1.blocks[1].ops],
["sum", "scale", "adam", "scale", "scale"])
# 2 optimize for table sgd
# 4 prefetch -> lookup_sparse_table for data0
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
["sum", "scale", "adam", "scale", "scale"])
# 2 optimize for table sgd
self.assertEqual([op.type for op in pserver1.blocks[3].ops],
["sum", "sgd"])
# 3 prefetch -> lookup_sparse_table for data0
self.assertEqual([op.type for op in pserver1.blocks[3].ops],
self.assertEqual([op.type for op in pserver1.blocks[4].ops],
["lookup_sparse_table"])
# 4 save table
self.assertEqual([op.type for op in pserver1.blocks[4].ops], ["save"])
# 5 save table
self.assertEqual([op.type for op in pserver1.blocks[5].ops], ["save"])
trainer, trainer_startup = self.get_trainer()
self.assertEqual(len(trainer.blocks), 1)
ops = [
'split_ids', 'prefetch', 'merge_ids', 'sequence_pool',
'sequence_pool', 'concat', 'mul', 'elementwise_add',
'cross_entropy', 'mean', 'fill_constant', 'mean_grad',
'cross_entropy_grad', 'elementwise_add_grad', 'send', 'mul_grad',
'send', 'concat_grad', 'sequence_pool_grad', 'lookup_table_grad',
'sequence_pool_grad', 'lookup_table_grad', 'sum', 'split_ids',
'send', 'send_barrier', 'recv', 'recv', 'fetch_barrier'
'sequence_pool', 'lookup_table', 'sequence_pool', 'concat', 'mul',
'elementwise_add', 'cross_entropy', 'mean', 'fill_constant',
'mean_grad', 'cross_entropy_grad', 'elementwise_add_grad', 'send',
'mul_grad', 'send', 'concat_grad', 'sequence_pool_grad',
'lookup_table_grad', 'split_selected_rows', 'send',
'sequence_pool_grad', 'lookup_table_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sum', 'split_ids', 'send', 'send_barrier',
'recv', 'recv', 'recv', 'fetch_barrier', 'concat'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
startup_ops = [
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'uniform_random', 'recv', 'recv',
'fetch_barrier', 'fake_init'
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'uniform_random',
'uniform_random', 'recv', 'recv', 'recv', 'fetch_barrier', 'concat',
'fake_init'
]
self.assertEqual([op.type for op in trainer_startup.blocks[0].ops],
startup_ops)
......@@ -526,7 +587,7 @@ class TestAsyncLocalLookupTable(TestDistLookupTableBase):
config = fluid.DistributeTranspilerConfig()
pserver1, startup1 = self.get_pserver(self.pserver1_ep, config, False)
self.assertEqual(len(pserver1.blocks), 3)
self.assertEqual(len(pserver1.blocks), 4)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self.assertEqual([op.type for op in pserver1.blocks[1].ops],
......@@ -535,17 +596,23 @@ class TestAsyncLocalLookupTable(TestDistLookupTableBase):
# NOTE: if param is not selected rows, the grad will scaled to grad / trainer_num
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
["adam", "scale", "scale"])
# 3 optimize for table adam
# NOTE: if param is not selected rows, the grad will scaled to grad / trainer_num
self.assertEqual([op.type for op in pserver1.blocks[3].ops],
["adam", "scale", "scale"])
trainer, _ = self.get_trainer(config)
self.assertEqual(len(trainer.blocks), 1)
ops = [
'lookup_table', 'sequence_pool', 'lookup_table', 'sequence_pool',
'concat', 'mul', 'elementwise_add', 'cross_entropy', 'mean',
'fill_constant', 'mean_grad', 'cross_entropy_grad',
'elementwise_add_grad', 'send', 'mul_grad', 'send', 'concat_grad',
'sequence_pool_grad', 'lookup_table_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sum', 'split_selected_rows', 'send', 'recv',
'recv', 'recv', 'concat'
'lookup_table', 'sequence_pool', 'concat', 'mul', 'elementwise_add',
'cross_entropy', 'mean', 'fill_constant', 'mean_grad',
'cross_entropy_grad', 'elementwise_add_grad', 'send', 'mul_grad',
'send', 'concat_grad', 'sequence_pool_grad', 'lookup_table_grad',
'split_selected_rows', 'send', 'sequence_pool_grad',
'lookup_table_grad', 'sequence_pool_grad', 'lookup_table_grad',
'sum', 'split_selected_rows', 'send', 'recv', 'recv', 'recv',
'recv', 'concat', 'concat'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
......@@ -559,29 +626,34 @@ class TestAsyncDistLookupTable(TestDistLookupTableBase):
pserver1, startup1 = self.get_pserver(self.pserver1_ep, config, False)
self.assertEqual(len(pserver1.blocks), 5)
self.assertEqual(len(pserver1.blocks), 6)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self.assertEqual([op.type for op in pserver1.blocks[1].ops],
["adam", "scale", "scale"])
# 2 optimize for table sgd
self.assertEqual([op.type for op in pserver1.blocks[2].ops], ["sgd"])
# 3 prefetch -> lookup_sparse_table for data0
self.assertEqual([op.type for op in pserver1.blocks[3].ops],
# 2 optimize for table adam
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
["adam", "scale", "scale"])
# 3 optimize for table sgd
self.assertEqual([op.type for op in pserver1.blocks[3].ops], ["sgd"])
# 4 prefetch -> lookup_sparse_table for data0
self.assertEqual([op.type for op in pserver1.blocks[4].ops],
["lookup_sparse_table"])
# 4 save table
self.assertEqual([op.type for op in pserver1.blocks[4].ops], ["save"])
# 5 save table
self.assertEqual([op.type for op in pserver1.blocks[5].ops], ["save"])
trainer, _ = self.get_trainer(config)
self.assertEqual(len(trainer.blocks), 1)
ops = [
'split_ids', 'prefetch', 'merge_ids', 'sequence_pool',
'sequence_pool', 'concat', 'mul', 'elementwise_add',
'cross_entropy', 'mean', 'fill_constant', 'mean_grad',
'cross_entropy_grad', 'elementwise_add_grad', 'send', 'mul_grad',
'send', 'concat_grad', 'sequence_pool_grad', 'lookup_table_grad',
'sequence_pool_grad', 'lookup_table_grad', 'sum', 'split_ids',
'send', 'recv', 'recv'
'sequence_pool', 'lookup_table', 'sequence_pool', 'concat', 'mul',
'elementwise_add', 'cross_entropy', 'mean', 'fill_constant',
'mean_grad', 'cross_entropy_grad', 'elementwise_add_grad', 'send',
'mul_grad', 'send', 'concat_grad', 'sequence_pool_grad',
'lookup_table_grad', 'split_selected_rows', 'send',
'sequence_pool_grad', 'lookup_table_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sum', 'split_ids', 'send', 'recv', 'recv',
'recv', 'concat'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
......
......@@ -55,6 +55,46 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
exe.run(pserver_prog)
def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers,
trainer_id):
x = fluid.layers.data(name='x', shape=[1], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False)
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
# loss function
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
# optimizer
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
ps1 = ip + ":" + str(int(port) + 1)
ps2 = ip + ":" + port
pserver_endpoints = ps1 + "," + ps2
config = fluid.DistributeTranspilerConfig()
config.slice_var_up = False
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
pservers=pserver_endpoints,
trainers=trainers,
sync_mode=sync_mode)
pserver_prog = t.get_pserver_program(ps2)
# pserver2 have no parameter
assert (len(pserver_prog.blocks) == 2)
assert (len(pserver_prog.blocks[1].ops) == 0)
pserver_startup = t.get_startup_program(ps2, pserver_prog)
exe.run(pserver_startup)
exe.run(pserver_prog)
class TestListenAndServOp(OpTest):
def setUp(self):
self.ps_timeout = 5
......@@ -63,9 +103,9 @@ class TestListenAndServOp(OpTest):
self.trainers = 1
self.trainer_id = 0
def _start_pserver(self, use_cuda, sync_mode):
def _start_pserver(self, use_cuda, sync_mode, pserver_func):
p = Process(
target=run_pserver,
target=pserver_func,
args=(use_cuda, sync_mode, self.ip, self.port, self.trainers,
self.trainer_id))
p.daemon = True
......@@ -92,7 +132,24 @@ class TestListenAndServOp(OpTest):
def test_handle_signal_in_serv_op(self):
# run pserver on CPU in sync mode
p1 = self._start_pserver(False, True)
p1 = self._start_pserver(False, True, run_pserver)
self._wait_ps_ready(p1.pid)
# raise SIGTERM to pserver
os.kill(p1.pid, signal.SIGINT)
p1.join()
# run pserver on CPU in async mode
p2 = self._start_pserver(False, False, run_pserver)
self._wait_ps_ready(p2.pid)
# raise SIGTERM to pserver
os.kill(p2.pid, signal.SIGTERM)
p2.join()
def test_list_and_serv_run_empty_optimize_block(self):
# run pserver on CPU in sync mode
p1 = self._start_pserver(False, True, run_pserver_with_empty_block)
self._wait_ps_ready(p1.pid)
# raise SIGTERM to pserver
......@@ -100,7 +157,7 @@ class TestListenAndServOp(OpTest):
p1.join()
# run pserver on CPU in async mode
p2 = self._start_pserver(False, False)
p2 = self._start_pserver(False, False, run_pserver_with_empty_block)
self._wait_ps_ready(p2.pid)
# raise SIGTERM to pserver
......
......@@ -35,6 +35,7 @@ import sys
import numpy as np
import collections
import six
import logging
from .ps_dispatcher import RoundRobin, HashName, PSDispatcher
from .. import core, framework
......@@ -767,6 +768,15 @@ in a single call.")
prefetch_var_name_to_block_id.extend(
lookup_table_var_name_to_block_id)
if len(optimize_blocks) == 0:
logging.warn("pserver [" + str(endpoint) +
"] has no optimize block!!")
pre_block_idx = pserver_program.num_blocks - 1
empty_block = pserver_program._create_block(pre_block_idx)
optimize_blocks.append(empty_block)
# In some case, some parameter server will have no parameter to optimize
# So we give an empty optimize block to parameter server.
attrs = {
"optimize_blocks": optimize_blocks,
"endpoint": endpoint,
......@@ -1065,7 +1075,12 @@ to transpile() call.")
continue_search_lookup_table_op = False
all_ops = program.global_block().ops
for op in all_ops:
if op.type == LOOKUP_TABLE_TYPE:
if op.type == LOOKUP_TABLE_TYPE and self.table_name == op.input(
"W")[0]:
if not op.attr('is_distributed'):
raise RuntimeError(
"lookup_table_op that lookup an distributed embedding table"
"should set is_distributed to true")
continue_search_lookup_table_op = True
lookup_table_op_index = lookup_table_op_index if lookup_table_op_index != -1 else list(
......@@ -1275,7 +1290,6 @@ to transpile() call.")
}
outputs = {"ParamOut": [param_var]}
# only support sgd now
import logging
logging.warn(
"distribute lookup table only support sgd optimizer, change it's optimizer to sgd instead of "
+ table_opt_op.type)
......@@ -1442,6 +1456,9 @@ to transpile() call.")
elif op_type == "decayed_adagrad":
if varkey == "Moment":
return param_shape
elif op_type == "ftrl":
if varkey in ["SquaredAccumulator", "LinearAccumulator"]:
return param_shape
elif op_type == "sgd":
pass
else:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册