提交 c6d7c2bd 编写于 作者: T typhoonzero

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into split_byref_op

......@@ -179,6 +179,7 @@ set(EXTERNAL_LIBS
if(WITH_GPU)
include(cuda)
include(tensorrt)
endif(WITH_GPU)
if(WITH_AMD_GPU)
......
......@@ -45,6 +45,13 @@ ENV PATH=${PATH}:${GOROOT}/bin:${GOPATH}/bin
# install glide
RUN curl -s -q https://glide.sh/get | sh
# Install TensorRT
# The unnecessary files has been removed to make the library small. It only contains include and lib now.
RUN wget -qO- http://paddlepaddledeps.bj.bcebos.com/TensorRT-4.0.0.3.Ubuntu-16.04.4.x86_64-gnu.cuda-8.0.cudnn7.0.tar.gz | \
tar -xz -C /usr/local && \
cp -rf /usr/local/TensorRT/include /usr && \
cp -rf /usr/local/TensorRT/lib /usr
# git credential to skip password typing
RUN git config --global credential.helper store
......@@ -57,7 +64,7 @@ RUN localedef -i en_US -f UTF-8 en_US.UTF-8
# specify sphinx version as 1.5.6 and remove -U option for [pip install -U
# sphinx-rtd-theme] since -U option will cause sphinx being updated to newest
# version(1.7.1 for now), which causes building documentation failed.
RUN pip install --upgrade pip && \
RUN pip install --upgrade pip==9.0.3 && \
pip install -U wheel && \
pip install -U docopt PyYAML sphinx==1.5.6 && \
pip install sphinx-rtd-theme==0.1.9 recommonmark
......
......@@ -27,7 +27,7 @@ RUN git config --global credential.helper store
# Fix locales to en_US.UTF-8
RUN localedef -i en_US -f UTF-8 en_US.UTF-8
RUN pip install --upgrade pip && \
RUN pip install --upgrade pip==9.0.3 && \
pip install -U 'protobuf==3.1.0' && \
pip install -U wheel sphinx && \
pip install pre-commit
......
......@@ -80,6 +80,16 @@ if(WITH_GPU)
# Include cuda and cudnn
include_directories(${CUDNN_INCLUDE_DIR})
include_directories(${CUDA_TOOLKIT_INCLUDE})
if(TENSORRT_FOUND)
if(${CUDA_VERSION_MAJOR} VERSION_LESS 8)
message(FATAL_ERROR "TensorRT needs CUDA >= 8.0 to compile")
endif()
if(${CUDNN_MAJOR_VERSION} VERSION_LESS 7)
message(FATAL_ERROR "TensorRT needs CUDNN >= 7.0 to compile")
endif()
include_directories(${TENSORRT_INCLUDE_DIR})
endif()
elseif(WITH_AMD_GPU)
add_definitions(-DPADDLE_WITH_HIP)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D__HIP_PLATFORM_HCC__")
......
......@@ -33,7 +33,7 @@ ExternalProject_Add(
extern_grpc
DEPENDS protobuf zlib
GIT_REPOSITORY "https://github.com/grpc/grpc.git"
GIT_TAG "v1.11.x"
GIT_TAG "v1.10.x"
PREFIX ${GRPC_SOURCES_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
......
if(NOT WITH_GPU)
return()
endif()
set(TENSORRT_ROOT "/usr" CACHE PATH "TENSORRT ROOT")
find_path(TENSORRT_INCLUDE_DIR NvInfer.h
PATHS ${TENSORRT_ROOT} ${TENSORRT_ROOT}/include
$ENV{TENSORRT_ROOT} $ENV{TENSORRT_ROOT}/include
NO_DEFAULT_PATH
)
find_library(TENSORRT_LIBRARY NAMES libnvinfer.so libnvinfer.a
PATHS ${TENSORRT_ROOT} ${TENSORRT_ROOT}/lib
$ENV{TENSORRT_ROOT} $ENV{TENSORRT_ROOT}/lib
NO_DEFAULT_PATH
DOC "Path to TensorRT library.")
if(TENSORRT_INCLUDE_DIR AND TENSORRT_LIBRARY)
set(TENSORRT_FOUND ON)
else()
set(TENSORRT_FOUND OFF)
endif()
if(TENSORRT_FOUND)
file(READ ${TENSORRT_INCLUDE_DIR}/NvInfer.h TENSORRT_VERSION_FILE_CONTENTS)
string(REGEX MATCH "define NV_TENSORRT_MAJOR +([0-9]+)" TENSORRT_MAJOR_VERSION
"${TENSORRT_VERSION_FILE_CONTENTS}")
string(REGEX REPLACE "define NV_TENSORRT_MAJOR +([0-9]+)" "\\1"
TENSORRT_MAJOR_VERSION "${TENSORRT_MAJOR_VERSION}")
message(STATUS "Current TensorRT header is ${TENSORRT_INCLUDE_DIR}/NvInfer.h. "
"Current TensorRT version is v${TENSORRT_MAJOR_VERSION}. ")
endif()
......@@ -3,7 +3,9 @@ add_custom_target(paddle_apis ALL
add_custom_target(paddle_docs ALL
DEPENDS paddle_v2_docs paddle_v2_docs_cn
paddle_fluid_docs paddle_fluid_docs_cn)
paddle_fluid_docs paddle_fluid_docs_cn
paddle_mobile_docs paddle_mobile_docs_cn)
add_subdirectory(v2)
add_subdirectory(fluid)
add_subdirectory(mobile)
......@@ -473,6 +473,12 @@ multiplex
.. autofunction:: paddle.fluid.layers.multiplex
:noindex:
label_smooth
------------
.. autofunction:: paddle.fluid.layers.label_smooth
:noindex:
ops
===
......
......@@ -84,7 +84,7 @@ Running an operator can be asynchronized. There is a thread pool to execute an `
## Synchronize GPU Kernels
The GPU is a non-blocking device. The different streams need be synchronized when switing streams. In current implementation, the synchronization based on the following algorithm:
The GPU is a non-blocking device. The different streams need be synchronized when switching streams. In current implementation, the synchronization based on the following algorithm:
1. `OpHandle` will record `DeviceContext` that it is used.
2. In `OpHandle::Run`, if the `DeviceContext` of current operator is different from `DeviceContext` of any input variable, just wait the generate operator of this input variable.
......
## Distributed training overview doc
Currently Paddle Fluid use parameter server architecture to support distributed training.
For synchronous and asynchronous training, the differences are mostly in the logic of parameter server. Now we have already support synchronous training.
### Synchronous training
The training process of synchronous training is:
![synchronous distributed training](./src/sync_distributed_training.png)
1. Pserver
1. set `barrier_condition_` to 0 and waits for trainers to send gradient.
1. Trainer
1. Trainer read minibatch of data, run forward-backward with local parameter copy and get the gradients for parameters.
1. Trainer use split op to split all the gradient into blocks. The split method is determined at compile time.
1. Trainer use send_op to send all the split gradients to corresponding parameter server.
1. After trainer send all the gradients, it will send a `BATCH_BARRIER_MESSAGE` to all pservers.
1. Trainer call GetVariable to pserver and wait for `barrier_condition_` on pserver to be 1.
1. Pserver
1. Pserver will count the number of `BATCH_BARRIER_MESSAGE`.
1. When the count of `BATCH_BARRIER_MESSAGE` is equal to the number of Trainer. Pserver thinks it received all gradient from all trainers.
1. Pserver will run the optimization block to optimize the parameters.
1. After optimization, pserver set `barrier_condition_` to 1.
1. Pserver wait for `FETCH_BARRIER_MESSAGE`.
1. Trainer.
1. The trainer uses GetVariable to get all the parameters from pserver.
1. Trainer sends a `FETCH_BARRIER_MESSAGE` to each pserver.
1. Pserver.
1. when the number of `FETCH_BARRIER_MESSAGE` reach the number of all trainers. Pserver think all the parameters have been got. it will go back to 1. to set `barrier_condition_` to 0.
### Asynchronous training
In the above process. There are two barriers for all trainers to synchronize with each other. In asynchronous training, these two barriers are not needed. The trainer can just send gradients to pserver and then get parameters back.
The training process of asynchronous training can be:
![asynchronous distributed training](./src/async_distributed_training.png)
1. Pserver:
1. Each parameter has a queue to receive its gradient from trainers.
1. Each parameter has a thread to read data from the queue and run optimize block, using the gradient to optimize the parameter.
1. Using an independent thread to handle RPC call `GetVariable` for trainers to get parameters back.(Maybe here we should use a thread pool to speed up fetching the parameters.)
1. Trainer:
1. Trainer read a batch of data. Run forward and backward with local parameter copy and get the gradients for parameters.
1. Trainer split all gradients to blocks and then send these gradient blocks to pservers(pserver will put them into the queue).
2. Trainer gets all parameters back from pserver.
### Note:
There are also some conditions that need to consider. For exmaple:
1. If trainer needs to wait for the pserver to apply it's gradient and then get back the parameters back.
1. If we need a lock between parameter update and parameter fetch.
1. If one parameter must be on one server, or it can also be split and send to multiple parameter servers.
The above architecture of asynchronous training can support different mode, we can have a detailed test in the future for these problems.
# Design Doc: Asynchronous Update With Distributed Training
## Background
For the typical synchronous distributed training, some significant steps are as follows:
1. A Trainer will compute the gradients and SEND them to the Parameter Server(PServer) nodes.
1. After the PServer node received gradients came from all the Trainers, It will aggregate the
gradient variables for the same parameter into one gradient variable and then apply the aggregated
gradient to the respective parameter, finally using an optimize algorithms(SGD, Monument...)
to update the parameters.
1. The Trainer would wait for the PServers finished the optimize stage, and GET the parameters from PServer,
so all the Trainers would get the same parameters.
In the synchronously distributed training, there should be a `Barrier` to synchronise the
parameters after the optimizing stage. The performance of a distributed training job would
depend on the slowest node if there were hundreds or thousands of training nodes in a
Job, the performance of synchronously distributed training might be very poor because of
the slow node. So this design doc would introduce an approach to implement
*asynchronously* distributed training in PaddlePaddle Fluid.
## Design
<img src="./src/async_update.png" width="600"/>
As the figure above, we describe a global view of asynchronously update process and use
the parameter `w1` as an example to introduce the steps:
1. For each gradient variables, they may distribute on different GPU card and aggregate
them while they are all calculated.
1. Split the gradient variable into multiple blocks according to the number of PServer
instances and then send them.
1. PServer would run an `Optimize Block` using a specified optimize algorithm to update
the specified parameter.
1. The trainer will fetch latest parameter from PServer before running forward Op which depends
on the specified parameter.
1. Broadcast the received variable into multiple GPU cards and continue to run the next
mini-batch.
### Trainer
- For the multiple devices distributed training, we need to aggregate the gradient
variables which placed on different devices firstly and then schedule a `SendVars` Operator to
send the gradient variables to the multiple PServer instances.
- Schedule `FetchVars` operator to fetch the latest parameter from PServer before running
the forward ops.
- There could be a large number of gradient variables to be sent, so we need to use another
thread pool(IO Threadpool) whose a number of the schedulable threads is larger than the
computing thread pool to avoid competitive the thread resources with computing.
### Parameter Server
<img src="./src/async_pserver.png" width="750"/>
- There should be multiple trainer instances want to optimize the same parameter at
the same time, to avoid the racing, we need one `BlockingQueue` for each gradient
variable to process them one by one.
- We need a `Map` structure to map a gradient variable name to the `OptimizeBlock` which
can optimize the respective parameter.
......@@ -4,6 +4,7 @@
.. toctree::
:maxdepth: 1
api_doc_std_cn.md
new_op_cn.md
new_op_kernel.md
use_eigen_cn.md
......
......@@ -4,6 +4,7 @@ Development
.. toctree::
:maxdepth: 1
api_doc_std_en.md
new_op_en.md
new_op_kernel.md
use_eigen_en.md
......
if(NOT DEFINED SPHINX_THEME)
set(SPHINX_THEME default)
endif()
if(NOT DEFINED SPHINX_THEME_DIR)
set(SPHINX_THEME_DIR)
endif()
# configured documentation tools and intermediate build results
set(BINARY_BUILD_DIR_EN "${CMAKE_CURRENT_BINARY_DIR}/en/_build")
# Sphinx cache with pickled ReST documents
set(SPHINX_CACHE_DIR_EN "${CMAKE_CURRENT_BINARY_DIR}/en/_doctrees")
# HTML output director
set(SPHINX_HTML_DIR_EN "${CMAKE_CURRENT_BINARY_DIR}/en/html")
configure_file(
"${CMAKE_CURRENT_SOURCE_DIR}/../templates/conf.py.en.in"
"${BINARY_BUILD_DIR_EN}/conf.py"
@ONLY)
sphinx_add_target(paddle_mobile_docs
html
${BINARY_BUILD_DIR_EN}
${SPHINX_CACHE_DIR_EN}
${CMAKE_CURRENT_SOURCE_DIR}
${SPHINX_HTML_DIR_EN})
add_dependencies(paddle_mobile_docs gen_proto_py paddle_python)
# configured documentation tools and intermediate build results
set(BINARY_BUILD_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/_build")
# Sphinx cache with pickled ReST documents
set(SPHINX_CACHE_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/_doctrees")
# HTML output director
set(SPHINX_HTML_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/html")
configure_file(
"${CMAKE_CURRENT_SOURCE_DIR}/../templates/conf.py.cn.in"
"${BINARY_BUILD_DIR_CN}/conf.py"
@ONLY)
sphinx_add_target(paddle_mobile_docs_cn
html
${BINARY_BUILD_DIR_CN}
${SPHINX_CACHE_DIR_CN}
${CMAKE_CURRENT_SOURCE_DIR}
${SPHINX_HTML_DIR_CN})
add_dependencies(paddle_mobile_docs_cn gen_proto_py paddle_python)
移动端
=====
.. toctree::
:maxdepth: 1
cross_compiling_for_android_cn.md
cross_compiling_for_ios_cn.md
cross_compiling_for_raspberry_cn.md
\ No newline at end of file
Mobile
======
.. toctree::
:maxdepth: 1
cross_compiling_for_android_en.md
cross_compiling_for_ios_en.md
cross_compiling_for_raspberry_en.md
......@@ -55,21 +55,21 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
}
}
void MultiDevSSAGraphBuilder::CreateOpHandleIOs(SSAGraph *result, OpDesc *op,
void MultiDevSSAGraphBuilder::CreateOpHandleIOs(SSAGraph *result,
const OpDesc &op,
const platform::Place &p,
const size_t &i) const {
auto *op_handle = result->ops_.back().get();
op_handle->dev_ctxes_[p] = const_cast<platform::DeviceContext *>(
platform::DeviceContextPool::Instance().Get(p));
op_handle->dev_ctxes_[p] = platform::DeviceContextPool::Instance().Get(p);
auto var_names = op->InputArgumentNames();
auto var_names = op.InputArgumentNames();
for (auto &each_var_name : var_names) {
VarHandle *var = CreateOrGetLatestVarHandle(result, each_var_name, p, i);
op_handle->AddInput(var);
}
var_names = op->OutputArgumentNames();
var_names = op.OutputArgumentNames();
for (auto &each_var_name : var_names) {
CreateOpOutput(result, op_handle, each_var_name, p, i);
......@@ -107,7 +107,7 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
result.ops_.emplace_back(new SendOpHandle(*op, s, p));
// Create inputs for output on original place and no ssa output
// is created for send op.
CreateOpHandleIOs(&result, op, p, 0);
CreateOpHandleIOs(&result, *op, p, 0);
continue;
}
......@@ -117,7 +117,7 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
result.ops_.emplace_back(new ComputationOpHandle(*op, s, p));
auto *op_handle = result.ops_.back().get();
CreateOpHandleIOs(&result, op, p, i);
CreateOpHandleIOs(&result, *op, p, i);
auto var_names = op->OutputArgumentNames();
......
......@@ -45,8 +45,8 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const override;
private:
void CreateOpHandleIOs(SSAGraph *result, OpDesc *op, const platform::Place &p,
const size_t &i) const;
void CreateOpHandleIOs(SSAGraph *result, const OpDesc &op,
const platform::Place &p, const size_t &i) const;
private:
std::string loss_var_name_;
......
......@@ -33,13 +33,6 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
running_ops_(0),
allow_op_delay_(allow_op_delay) {}
void ThreadedSSAGraphExecutor::RunDelayedOps(
const std::unordered_set<OpHandleBase *> &delayed_ops) {
for (auto op : delayed_ops) {
op->Run(use_event_);
}
}
FeedFetchList ThreadedSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
std::unordered_map<OpHandleBase *, size_t> pending_ops;
......@@ -51,8 +44,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
// together since we currently cannot overlap computation and memcpy streams.
// Should revisit it if overlapping is available.
std::unordered_set<OpHandleBase *> delayed_ops;
std::unordered_set<OpHandleBase *> blocked_by_delayed_ops;
std::unordered_set<VarHandleBase *> delayed_vars;
auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) {
pending_vars.insert(&var);
......@@ -122,24 +113,26 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
InsertPendingOp(*op);
}
auto run_all_ready_ops = [&] {
for (auto *op : ready_ops) {
if (op->IsMultiDeviceTransfer() && allow_op_delay_) {
delayed_ops.insert(op);
delayed_vars.insert(op->outputs_.begin(), op->outputs_.end());
ready_vars.Extend(op->outputs_);
continue;
}
auto run_all_ops = [&](std::unordered_set<OpHandleBase *> &set) {
for (auto *op : set) {
running_ops_++;
RunOp(&ready_vars, op);
}
ready_ops.clear();
set.clear();
};
// Step 3. Execution
while (!pending_vars.empty() || !ready_ops.empty() || !delayed_ops.empty()) {
while (!pending_vars.empty()) {
// 1. Run All Ready ops
run_all_ready_ops();
// Keep loop until all vars are ready.
//
// NOTE: DelayedOps have a lower priority. It will be scheduled after all
// ready_ops have been performed.
if (ready_ops.empty() && allow_op_delay_) {
run_all_ops(delayed_ops);
} else {
run_all_ops(ready_ops);
}
// 2. Find ready variable
bool timeout;
......@@ -160,29 +153,16 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto &deps = pending_ops[op];
--deps;
if (deps == 0) {
if (delayed_vars.find(ready_var) != delayed_vars.end()) {
blocked_by_delayed_ops.insert(op);
if (op->IsMultiDeviceTransfer() && allow_op_delay_) {
delayed_ops.insert(op);
} else {
ready_ops.insert(op);
}
}
}
}
// When there are no other ops to schedule, schedule buffered delayed
// ops and unblock other ops.
if (ready_ops.empty() && !delayed_ops.empty() && running_ops_ == 0) {
RunDelayedOps(delayed_ops);
delayed_ops.clear();
for (auto *op : blocked_by_delayed_ops) {
ready_ops.insert(op);
}
blocked_by_delayed_ops.clear();
}
// Keep loop until all vars are ready.
}
PADDLE_ENFORCE(ready_ops.empty());
PADDLE_ENFORCE(delayed_ops.empty());
PADDLE_ENFORCE(blocked_by_delayed_ops.empty());
// Wait FetchOps.
if (!fetch_ops.empty()) {
......
......@@ -88,8 +88,6 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
void RunOp(BlockingQueue<VarHandleBase *> *ready_var_q,
details::OpHandleBase *op);
void RunDelayedOps(const std::unordered_set<OpHandleBase *> &delayed_ops);
private:
std::unique_ptr<::ThreadPool> pool_;
std::vector<Scope *> local_scopes_;
......
......@@ -155,13 +155,9 @@ void ParallelExecutor::BCastParamsToGPUs(
#endif
}
void ParallelExecutor::Run(
const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name,
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) {
platform::RecordBlock b(0);
SplitTensorToPlaces(feed_tensors);
// Create local scopes.
for (auto &scope : member_->local_scopes_) {
Scope &local_scope = scope->NewScope();
......@@ -195,14 +191,28 @@ void ParallelExecutor::Run(
auto &local_scope =
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
scope->DeleteScope(local_scope);
local_scope = nullptr;
}
}
void ParallelExecutor::SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
for (auto it : feed_tensors) {
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
void ParallelExecutor::FeedTensorsIntoLocalScopes(
const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());
for (size_t i = 0; i < tensors.size(); ++i) {
auto &map = tensors[i];
auto *scope = member_->local_scopes_[i];
for (auto &pair : map) {
auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
trg->ShareDataWith(pair.second);
trg->set_lod(pair.second.lod());
}
}
}
void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor> &tensors) {
for (auto pair : tensors) {
auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
PADDLE_ENFORCE_EQ(
member_->places_.size(), lod_tensors.size(),
"The number of samples of current batch is less than the count of "
......@@ -211,7 +221,7 @@ void ParallelExecutor::SplitTensorToPlaces(
for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var?
auto t =
member_->local_scopes_[j]->Var(it.first)->GetMutable<LoDTensor>();
member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
t->ShareDataWith(lod_tensors[j]);
t->set_lod(lod_tensors[j].lod());
}
......
......@@ -44,16 +44,22 @@ class ParallelExecutor {
std::vector<Scope*>& GetLocalScopes();
/**
* Feed tensors to local scopes. The size of tensors should be equal to the
* size of local scopes.
*/
void FeedTensorsIntoLocalScopes(
const std::vector<std::unordered_map<std::string, LoDTensor>>& tensors);
void FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor>& tensors);
void Run(const std::vector<std::string>& fetch_tensors,
const std::string& fetched_var_name,
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
const std::string& fetched_var_name);
void BCastParamsToGPUs(const std::unordered_set<std::string>& vars) const;
private:
void SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
ParallelExecutorPrivate* member_;
};
......
......@@ -66,7 +66,7 @@ TEST(ProgramDesc, copy_ctor) {
for (size_t i = 0; i < global_block->OpSize(); ++i) {
auto op_origin = global_block->Op(i);
auto op_copy = global_block->Op(i);
auto op_copy = global_block_copy->Op(i);
ASSERT_EQ(op_origin->Type(), op_copy->Type());
ASSERT_EQ(op_origin->Inputs(), op_copy->Inputs());
......@@ -131,7 +131,7 @@ TEST(ProgramDescBind, serialize_and_deserialize) {
for (size_t i = 0; i < global_block->OpSize(); ++i) {
auto op_origin = global_block->Op(i);
auto op_restored = global_block->Op(i);
auto op_restored = global_block_restored->Op(i);
ASSERT_EQ(op_origin->Type(), op_restored->Type());
ASSERT_EQ(op_origin->Inputs(), op_restored->Inputs());
......
......@@ -21,4 +21,7 @@ endif()
if(WITH_TESTING)
add_subdirectory(tests/book)
if (TENSORRT_FOUND)
add_subdirectory(tensorrt)
endif()
endif()
nv_test(test_tensorrt SRCS test_tensorrt.cc DEPS dynload_cuda device_context dynamic_loader)
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "NvInfer.h"
#include "cuda.h"
#include "cuda_runtime_api.h"
#include "paddle/fluid/platform/dynload/tensorrt.h"
namespace dy = paddle::platform::dynload;
class Logger : public nvinfer1::ILogger {
public:
void log(nvinfer1::ILogger::Severity severity, const char* msg) override {
switch (severity) {
case Severity::kINFO:
LOG(INFO) << msg;
break;
case Severity::kWARNING:
LOG(WARNING) << msg;
break;
case Severity::kINTERNAL_ERROR:
case Severity::kERROR:
LOG(ERROR) << msg;
break;
default:
break;
}
}
};
class ScopedWeights {
public:
ScopedWeights(float value) : value_(value) {
w.type = nvinfer1::DataType::kFLOAT;
w.values = &value_;
w.count = 1;
}
const nvinfer1::Weights& get() { return w; }
private:
float value_;
nvinfer1::Weights w;
};
// The following two API are implemented in TensorRT's header file, cannot load
// from the dynamic library. So create our own implementation and directly
// trigger the method from the dynamic library.
nvinfer1::IBuilder* createInferBuilder(nvinfer1::ILogger& logger) {
return static_cast<nvinfer1::IBuilder*>(
dy::createInferBuilder_INTERNAL(&logger, NV_TENSORRT_VERSION));
}
nvinfer1::IRuntime* createInferRuntime(nvinfer1::ILogger& logger) {
return static_cast<nvinfer1::IRuntime*>(
dy::createInferRuntime_INTERNAL(&logger, NV_TENSORRT_VERSION));
}
const char* kInputTensor = "input";
const char* kOutputTensor = "output";
// Creates a network to compute y = 2x + 3
nvinfer1::IHostMemory* CreateNetwork() {
Logger logger;
// Create the engine.
nvinfer1::IBuilder* builder = createInferBuilder(logger);
ScopedWeights weights(2.);
ScopedWeights bias(3.);
nvinfer1::INetworkDefinition* network = builder->createNetwork();
// Add the input
auto input = network->addInput(kInputTensor, nvinfer1::DataType::kFLOAT,
nvinfer1::DimsCHW{1, 1, 1});
EXPECT_NE(input, nullptr);
// Add the hidden layer.
auto layer = network->addFullyConnected(*input, 1, weights.get(), bias.get());
EXPECT_NE(layer, nullptr);
// Mark the output.
auto output = layer->getOutput(0);
output->setName(kOutputTensor);
network->markOutput(*output);
// Build the engine.
builder->setMaxBatchSize(1);
builder->setMaxWorkspaceSize(1 << 10);
auto engine = builder->buildCudaEngine(*network);
EXPECT_NE(engine, nullptr);
// Serialize the engine to create a model, then close.
nvinfer1::IHostMemory* model = engine->serialize();
network->destroy();
engine->destroy();
builder->destroy();
return model;
}
void Execute(nvinfer1::IExecutionContext& context, const float* input,
float* output) {
const nvinfer1::ICudaEngine& engine = context.getEngine();
// Two binds, input and output
ASSERT_EQ(engine.getNbBindings(), 2);
const int input_index = engine.getBindingIndex(kInputTensor);
const int output_index = engine.getBindingIndex(kOutputTensor);
// Create GPU buffers and a stream
void* buffers[2];
ASSERT_EQ(0, cudaMalloc(&buffers[input_index], sizeof(float)));
ASSERT_EQ(0, cudaMalloc(&buffers[output_index], sizeof(float)));
cudaStream_t stream;
ASSERT_EQ(0, cudaStreamCreate(&stream));
// Copy the input to the GPU, execute the network, and copy the output back.
ASSERT_EQ(0, cudaMemcpyAsync(buffers[input_index], input, sizeof(float),
cudaMemcpyHostToDevice, stream));
context.enqueue(1, buffers, stream, nullptr);
ASSERT_EQ(0, cudaMemcpyAsync(output, buffers[output_index], sizeof(float),
cudaMemcpyDeviceToHost, stream));
cudaStreamSynchronize(stream);
// Release the stream and the buffers
cudaStreamDestroy(stream);
ASSERT_EQ(0, cudaFree(buffers[input_index]));
ASSERT_EQ(0, cudaFree(buffers[output_index]));
}
TEST(TensorrtTest, BasicFunction) {
// Create the network serialized model.
nvinfer1::IHostMemory* model = CreateNetwork();
// Use the model to create an engine and an execution context.
Logger logger;
nvinfer1::IRuntime* runtime = createInferRuntime(logger);
nvinfer1::ICudaEngine* engine =
runtime->deserializeCudaEngine(model->data(), model->size(), nullptr);
model->destroy();
nvinfer1::IExecutionContext* context = engine->createExecutionContext();
// Execute the network.
float input = 1234;
float output;
Execute(*context, &input, &output);
EXPECT_EQ(output, input * 2 + 3);
// Destroy the engine.
context->destroy();
engine->destroy();
runtime->destroy();
}
cc_library(dynamic_loader SRCS dynamic_loader.cc DEPS glog gflags enforce)
list(APPEND CUDA_SRCS cublas.cc cudnn.cc curand.cc nccl.cc)
if (TENSORRT_FOUND)
list(APPEND CUDA_SRCS tensorrt.cc)
endif()
configure_file(cupti_lib_path.h.in ${CMAKE_CURRENT_BINARY_DIR}/cupti_lib_path.h)
if (CUPTI_FOUND)
list(APPEND CUDA_SRCS cupti.cc)
......
......@@ -45,6 +45,10 @@ DEFINE_string(nccl_dir, "",
DEFINE_string(cupti_dir, "", "Specify path for loading cupti.so.");
DEFINE_string(
tensorrt_dir, "",
"Specify path for loading tensorrt library, such as libnvinfer.so.");
namespace paddle {
namespace platform {
namespace dynload {
......@@ -194,6 +198,14 @@ void* GetNCCLDsoHandle() {
#endif
}
void* GetTensorRtDsoHandle() {
#if defined(__APPLE__) || defined(__OSX__)
return GetDsoHandleFromSearchPath(FLAGS_tensorrt_dir, "libnvinfer.dylib");
#else
return GetDsoHandleFromSearchPath(FLAGS_tensorrt_dir, "libnvinfer.so");
#endif
}
} // namespace dynload
} // namespace platform
} // namespace paddle
......@@ -25,6 +25,7 @@ void* GetCurandDsoHandle();
void* GetWarpCTCDsoHandle();
void* GetLapackDsoHandle();
void* GetNCCLDsoHandle();
void* GetTensorRtDsoHandle();
} // namespace dynload
} // namespace platform
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/platform/dynload/tensorrt.h"
namespace paddle {
namespace platform {
namespace dynload {
std::once_flag tensorrt_dso_flag;
void *tensorrt_dso_handle;
#define DEFINE_WRAP(__name) DynLoad__##__name __name
TENSORRT_RAND_ROUTINE_EACH(DEFINE_WRAP);
} // namespace dynload
} // namespace platform
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <NvInfer.h>
#include <dlfcn.h>
#include <mutex> // NOLINT
#include "paddle/fluid/platform/dynload/dynamic_loader.h"
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace platform {
namespace dynload {
extern std::once_flag tensorrt_dso_flag;
extern void* tensorrt_dso_handle;
#ifdef PADDLE_USE_DSO
#define DECLARE_DYNAMIC_LOAD_TENSORRT_WRAP(__name) \
struct DynLoad__##__name { \
template <typename... Args> \
auto operator()(Args... args) -> decltype(__name(args...)) { \
using tensorrt_func = decltype(__name(args...)) (*)(Args...); \
std::call_once(tensorrt_dso_flag, []() { \
tensorrt_dso_handle = \
paddle::platform::dynload::GetTensorRtDsoHandle(); \
PADDLE_ENFORCE(tensorrt_dso_handle, "load tensorrt so failed"); \
}); \
void* p_##__name = dlsym(tensorrt_dso_handle, #__name); \
PADDLE_ENFORCE(p_##__name, "load %s failed", #__name); \
return reinterpret_cast<tensorrt_func>(p_##__name)(args...); \
} \
}; \
extern DynLoad__##__name __name
#else
#define DECLARE_DYNAMIC_LOAD_TENSORRT_WRAP(__name) \
struct DynLoad__##__name { \
template <typename... Args> \
tensorrtResult_t operator()(Args... args) { \
return __name(args...); \
} \
}; \
extern DynLoad__##__name __name
#endif
#define TENSORRT_RAND_ROUTINE_EACH(__macro) \
__macro(createInferBuilder_INTERNAL); \
__macro(createInferRuntime_INTERNAL);
TENSORRT_RAND_ROUTINE_EACH(DECLARE_DYNAMIC_LOAD_TENSORRT_WRAP)
} // namespace dynload
} // namespace platform
} // namespace paddle
......@@ -873,6 +873,11 @@ HOSTDEVICE inline bool(isfinite)(const float16& a) {
return !((isnan)(a)) && !((isinf)(a));
}
inline std::ostream& operator<<(std::ostream& os, const float16& a) {
os << static_cast<float>(a);
return os;
}
} // namespace platform
} // namespace paddle
......
......@@ -141,5 +141,10 @@ TEST(float16, lod_tensor_cpu) {
}
}
TEST(float16, print) {
float16 a = float16(1.0f);
std::cout << a << std::endl;
}
} // namespace platform
} // namespace paddle
......@@ -505,11 +505,19 @@ All parameter, weight, gradient are variables in Paddle.
scope, local_scopes, allow_op_delay);
})
.def("bcast_params", &ParallelExecutor::BCastParamsToGPUs)
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
// We still cannot get local_scope from this vector, since the element
// of vec<Scope*> will be freed by Python GC. We can only return Scope*
// one by one and mark them as reference.
.def("local_scopes",
[](ParallelExecutor &self) -> std::vector<Scope *> * {
return &self.GetLocalScopes();
},
py::return_value_policy::reference)
.def("feed_tensors_into_local_scopes",
&ParallelExecutor::FeedTensorsIntoLocalScopes)
.def("feed_and_split_tensor_into_local_scopes",
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
.def("run", &ParallelExecutor::Run);
BindRecordIOWriter(&m);
......
......@@ -190,6 +190,11 @@ void PyCUDATensorSetFromArray(
static_cast<const platform::CUDADeviceContext *>(pool.Get(place));
paddle::platform::GpuMemcpyAsync(dst, array.data(), sizeof(T) * array.size(),
cudaMemcpyHostToDevice, dev_ctx->stream());
// NOTE: For safety, here wait the copy complete.
// It because the CPU array.data() could be destroyed after this method.
// If we make this method async, it could be copied data from a memory buffer
// that has been freed.
dev_ctx->Wait();
}
template <>
......@@ -216,6 +221,11 @@ void PyCUDATensorSetFromArray(
paddle::platform::GpuMemcpyAsync(dst, array.data(),
sizeof(uint16_t) * array.size(),
cudaMemcpyHostToDevice, dev_ctx->stream());
// NOTE: For safety, here wait the copy complete.
// It because the CPU array.data() could be destroyed after this method.
// If we make this method async, it could be copied data from a memory buffer
// that has been freed.
dev_ctx->Wait();
}
template <typename T>
......
......@@ -198,7 +198,7 @@ EOF
# run paddle version to install python packages first
RUN apt-get update &&\
${NCCL_DEPS}\
apt-get install -y wget python-pip dmidecode python-tk && pip install -U pip && \
apt-get install -y wget python-pip dmidecode python-tk && pip install -U pip==9.0.3 && \
pip install /*.whl; apt-get install -f -y && \
apt-get clean -y && \
rm -f /*.whl && \
......
......@@ -32,6 +32,8 @@ DEFINE_string(warpctc_dir, "", "Specify path for loading libwarpctc.so.");
DEFINE_string(lapack_dir, "", "Specify path for loading liblapack.so.");
DEFINE_string(tensorrt_dir, "", "Specify path for loading libnvinfer.so.");
static inline std::string join(const std::string& part1,
const std::string& part2) {
// directory separator
......@@ -157,3 +159,12 @@ void GetLapackDsoHandle(void** dso_handle) {
GetDsoHandleFromSearchPath(FLAGS_lapack_dir, "liblapacke.so", dso_handle);
#endif
}
void GetTensorRtDsoHandle(void** dso_handle) {
#if defined(__APPLE__) || defined(__OSX__)
GetDsoHandleFromSearchPath(
FLAGS_tensorrt_dir, "libnvinfer.dylib", dso_handle);
#else
GetDsoHandleFromSearchPath(FLAGS_tensorrt_dir, "libnvinfer.so", dso_handle);
#endif
}
......@@ -58,3 +58,11 @@ void GetWarpCTCDsoHandle(void** dso_handle);
*
*/
void GetLapackDsoHandle(void** dso_handle);
/**
* @brief load the DSO of tensorrt
*
* @param **dso_handle dso handler
*
*/
void GetTensorRtDsoHandle(void** dso_handle);
......@@ -77,6 +77,7 @@ __all__ = [
'lod_reset',
'lrn',
'pad',
'label_smooth',
]
......@@ -3678,3 +3679,68 @@ def pad(x, paddings, pad_value=0., name=None):
attrs={'paddings': paddings,
'pad_value': float(pad_value)})
return out
def label_smooth(label,
prior_dist=None,
epsilon=0.1,
dtype="float32",
name=None):
"""
Label smoothing is a mechanism to regularize the classifier layer and is
called label-smoothing regularization (LSR).
Label smoothing is proposed to encourage the model to be less confident,
since optimizing the log-likelihood of the correct label directly may
cause overfitting and reduce the ability of the model to adapt. Label
smoothing replaces the ground-truth label :math:`y` with the weighted sum
of itself and some fixed distribution :math:`\mu`. For class :math:`k`,
i.e.
.. math::
\\tilde{y_k} = (1 - \epsilon) * y_k + \epsilon * \mu_k,
where :math:`1 - \epsilon` and :math:`\epsilon` are the weights
respectively, and :math:`\\tilde{y}_k` is the smoothed label. Usually
uniform distribution is used for :math:`\mu`.
See more details about label smoothing in https://arxiv.org/abs/1512.00567.
Args:
label(Variable): The input variable containing the label data. The
label data should use one-hot representation.
prior_dist(Variable): The prior distribution to be used to smooth
labels. If not provided, an uniform distribution
is used. The shape of :attr:`prior_dist` should
be :math:`(1, class\_num)`.
epsilon(float): The weight used to mix up the original ground-truth
distribution and the fixed distribution.
dtype(np.dtype|core.VarDesc.VarType|str): The type of data : float32,
float_64, int etc.
name(str|None): A name for this layer(optional). If set None, the layer
will be named automatically.
Returns:
Variable: The tensor variable containing the smoothed labels.
Examples:
.. code-block:: python
label = layers.data(name="label", shape=[1], dtype="float32")
one_hot_label = layers.one_hot(input=label, depth=10)
smooth_label = layers.label_smooth(
label=one_hot_label, epsilon=0.1, dtype="float32")
"""
if epsilon > 1. or epsilon < 0.:
raise ValueError("The value of epsilon must be between 0 and 1.")
helper = LayerHelper("label_smooth", **locals())
label.stop_gradient = True
smooth_label = helper.create_tmp_variable(dtype)
helper.append_op(
type="label_smooth",
inputs={"X": label,
"PriorDist": prior_dist} if prior_dist else {"X": label},
outputs={"Out": smooth_label},
attrs={"epsilon": float(epsilon)})
return smooth_label
......@@ -169,7 +169,7 @@ class Accuracy(MetricBase):
return self.value / self.weight
class ChunkEvalutor(MetricBase):
class ChunkEvaluator(MetricBase):
"""
Accumulate counter numbers output by chunk_eval from mini-batches and
compute the precision recall and F1-score using the accumulated counter
......@@ -177,7 +177,7 @@ class ChunkEvalutor(MetricBase):
"""
def __init__(self, name=None):
super(ChunkEvalutor, self).__init__(name)
super(ChunkEvaluator, self).__init__(name)
self.num_infer_chunks = 0
self.num_label_chunks = 0
self.num_correct_chunks = 0
......
......@@ -16,6 +16,7 @@ import core
import multiprocessing
import framework
import executor
import sys
__all__ = ['ParallelExecutor']
......@@ -123,28 +124,93 @@ class ParallelExecutor(object):
allow_op_delay)
self.scope = scope
def run(self, fetch_list, feed_dict={}):
def run(self, fetch_list, feed=None, feed_dict=None):
"""
:param fetch_list: A list of variable names that will be fetched.
:param feed_dict: A dict mapping for feed variable name to LoDTensor
or numpy array.
:return: fetched value list.
"""
if not isinstance(feed_dict, dict):
raise TypeError("feed_dict should be a dict")
Run a parallel executor with fetch_list.
The feed parameter can be a dict or a list. If feed is a dict, the
feed data will be split into multiple devices. If feed is a list, we
assume the data has been splitted into multiple devices, the each
element in the list will be copied to each device directly.
For example, if the feed is a dict:
>>> exe = ParallelExecutor()
>>> # the image will be splitted into devices. If there is two devices
>>> # each device will process an image with shape (24, 1, 28, 28)
>>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))})
For example, if the feed is a list:
>>> exe = ParallelExecutor()
>>> # each device will process each element in the list.
>>> # the 1st device will process an image with shape (48, 1, 28, 28)
>>> # the 2nd device will process an image with shape (32, 1, 28, 28)
>>> #
>>> # you can use exe.device_count to get the device number.
>>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))},
>>> {"image": numpy.random.random(size=(32, 1, 28, 28))},
>>> ])
Args:
fetch_list(list): The fetched variable names
feed(list|dict|None): The feed variables. If the feed is a dict,
tensors in that dict will be splitted into each devices. If
the feed is a list, each element of the list will be copied
to each device.
feed_dict: Alias for feed parameter, for backward compatibility.
This parameter is deprecated.
feed_tensor_dict = {}
for i, feed_name in enumerate(feed_dict):
feed_tensor = feed_dict[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
feed_tensor.set(feed_dict[feed_name], self._act_places[0])
feed_tensor_dict[feed_name] = feed_tensor
Returns: fetched result list.
"""
if feed is None:
feed = feed_dict
print >> sys.stderr, "`feed_dict` is deprecated. Please use `feed=`"
if isinstance(feed, dict):
feed_tensor_dict = dict()
for feed_name in feed:
feed_tensor = feed[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
# always set to CPU place, since the tensor need to be splitted
# it is fast in CPU
feed_tensor.set(feed[feed_name], core.CPUPlace())
feed_tensor_dict[feed_name] = feed_tensor
self.executor.feed_and_split_tensor_into_local_scopes(
feed_tensor_dict)
elif isinstance(feed, list) or isinstance(feed, tuple):
if len(feed) != len(self._act_places):
raise ValueError(
"Feed a list of tensor, the list should be the same size as places"
)
res = list()
for i, each in enumerate(feed):
if not isinstance(each, dict):
raise TypeError(
"Each element of feed list should be a dict")
res_dict = dict()
for feed_name in each:
tensor = each[feed_name]
if not isinstance(tensor, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(tensor, self._act_places[i])
tensor = tmp
res_dict[feed_name] = tensor
res.append(res_dict)
self.executor.feed_tensors_into_local_scopes(res)
fetch_var_name = '@FETCHED_VAR_NAME@'
self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
self.executor.run(fetch_list, fetch_var_name)
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
return [arr[i] for i in range(len(arr))]
def bcast_params(self):
self.executor.bcast_params(set(self.persistable_vars))
@property
def device_count(self):
return len(self._act_places)
......@@ -340,6 +340,16 @@ class TestBook(unittest.TestCase):
print(layers.lod_reset(x=x, y=y))
print(str(program))
def test_label_smooth(self):
program = Program()
with program_guard(program):
label = layers.data(name="label", shape=[1], dtype="float32")
one_hot_label = layers.one_hot(input=label, depth=10)
smooth_label = layers.label_smooth(
label=one_hot_label, epsilon=0.1, dtype="float32")
self.assertIsNotNone(smooth_label)
print(str(program))
if __name__ == '__main__':
unittest.main()
......@@ -203,31 +203,32 @@ class TestParallelExecutorBase(unittest.TestCase):
iter=10,
batch_size=None,
allow_op_delay=False,
feed_dict={}):
feed_dict=None):
main = fluid.Program()
startup = fluid.Program()
startup.random_seed = 1 # Fix random seed
with fluid.program_guard(main, startup):
loss = method(use_feed=len(feed_dict) > 0)
loss = method(use_feed=feed_dict is not None)
adam = fluid.optimizer.Adam()
adam.minimize(loss)
if memory_opt:
fluid.memory_optimize(main)
place = fluid.CUDAPlace(0)
startup_exe = fluid.Executor(place)
startup_exe.run(startup)
exe = fluid.ParallelExecutor(True, loss_name=loss.name)
exe = fluid.ParallelExecutor(
True, loss_name=loss.name, allow_op_delay=allow_op_delay)
if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count()
begin = time.time()
first_loss, = exe.run([loss.name], feed_dict=feed_dict)
first_loss, = exe.run([loss.name], feed=feed_dict)
first_loss = numpy.array(first_loss)
for i in xrange(iter):
exe.run([], feed_dict=feed_dict)
exe.run([], feed=feed_dict)
last_loss, = exe.run([loss.name], feed_dict=feed_dict)
last_loss, = exe.run([loss.name], feed=feed_dict)
end = time.time()
if batch_size is not None:
......@@ -648,5 +649,5 @@ class TestCRFModel(unittest.TestCase):
for i in xrange(10):
cur_batch = next(data)
print map(numpy.array,
pe.run(feed_dict=feeder.feed(cur_batch),
pe.run(feed=feeder.feed(cur_batch),
fetch_list=[avg_cost.name]))[0]
......@@ -50,7 +50,7 @@ An example implementation for single item data reader creator:
def reader():
while True:
yield numpy.random.uniform(-1, 1, size=width*height)
return reader
return reader
An example implementation for multiple item data reader creator:
......@@ -60,7 +60,7 @@ An example implementation for multiple item data reader creator:
def reader():
while True:
yield numpy.random.uniform(-1, 1, size=width*height), label
return reader
return reader
TODO(yuyang18): Should we add whole design doc here?
......
# AWS benchmark testing tool
This is an automation tool for deploying paddlepaddle benchmark testing to AWS.
## Features
- subnet creation to fit just the amount of ec2 instances required.
- pserver and trainer ec2 instances allocation, and instance state verification
- nvidia-docker ready for GPU training
- Instances and network element garbage collection when a task is accomplished or an error occurred
- Test log is collected in realtime
- Web service for checking log or tearing down the testing setup
- No testing code change needed
- Lots of optional configuration options
## Usages
### Prerequisites
- You have a working AWS account
- You have [AWS Command Line Interface](https://aws.amazon.com/cli/) installed
- Your AWS cli is bind with a account which has `AmazonEC2FullAccess` permission, and it's set as default credential.
- You have key pair created and pem file downloaded.
- You have a default VPC in the region you want to run the test.
- You have a Security Group created for the VPC mentioned above, which allows port 22 and the port you want to expose your control web service (5436 by default)
- If your test is supposed to run in a GPU machine, especially a multi card GPU machine (p2, p3 series), you might need to contact amazon to raise the limit which allows no more than 1 GPU instance at a time.
### Start a benchmark test
#### Create training image
*What to expect in this step:*
*You will have your training logic packed with paddle runtime in a docker image, and be able to be picked up by AWS instance for training.*
Training python script and PaddlePaddle runtime are supposed to be packed into one docker image. Use PaddlePaddle production images as base image and create the training images with the docker file as follows:
```Dockerfile
FROM paddlepaddle/paddle:latest-gpu
ENV HOME /root
COPY ./ /root/
WORKDIR /root
RUN pip install -r /root/requirements.txt
ENTRYPOINT ["python", "my_training.py"]
```
***Please Note***
Training nodes will run your `ENTRYPOINT` script with the following environment variables:
- `TASK_NAME`: unique name to identify this training process.
- `TRAINING_ROLE`: current node's role in this training process, either "PSERVER" or "TRAINER"
- `PSERVER_HOSTS`: comma separated value of pserver end points, I.E. "192.168.1.2:5436,192.168.1.3:5436"
- `PSERVERS`: same as above
- `TRAINERS`: trainer count
- `SERVER_ENDPOINT`: current server end point if the node role is a pserver
- `TRAINER_INDEX`: an integer to identify the index of current trainer if the node role is a trainer.
- `PADDLE_INIT_TRAINER_ID`: same as above
Now we have a working distributed training script which takes advantage of node environment variables and docker file to generate the training image. Run the following command:
```bash
docker build -t myreponname/paddle_benchmark .
```
Now you have the image built and tagged with `myreponame/paddle_benchmark`, let's push it to dockerhub so that it can be picked up by out AWS instance.
```bash
docker push myreponame/paddle_benchmark
```
#### Create instances and start training
*What to expect in this step*
*you will be asked to provide some basic settings to config your training, and this tool will have your training started and monitored*
Now let's start the training process:
```bash
docker run -i -v $HOME/.aws:/root/.aws -v <full path to your pem file>:/root/<key pare name>.pem \
putcn/paddle_aws_client \
--action create \
--key_name <your key pare name> \
--security_group_id <your security group id> \
--docker_image myreponame/paddle_benchmark \
--pserver_count 2 \
--trainer_count 2
```
Now just wait until you see this:
```
master server finished init process, visit http://XXX:XXX/status to check master log
```
That means you can turn off your laptop and your cluster is creating instances, starting training process, collecting logs and eventually shut all pservers and trainers down when training is finished.
#### Post creation operations
To access the master log:
```bash
docker run -i -v $HOME/.aws:/root/.aws \
putcn/paddle_aws_client \
--action status \
--master_server_public_ip <master ip> \
--master_server_port <master port>
```
To tear down the training setup:
```bash
docker run -i -v $HOME/.aws:/root/.aws \
putcn/paddle_aws_client \
--action cleanup \
--master_server_public_ip <master ip> \
--master_server_port <master port>
```
To retrieve training logs
TBD
### Tech details
*What to expect in this step*
*You will understand what is happening behind the scene, and how to check the training log, how to tear down the training on the fly, etc.*
Let's understand what is happening under the hood when you run above command in your laptop
![alt](diagram.png)
There are 4 roles in the figure above:
- client: your laptop
- master: who tasks to aws api server to create/tear down instances, and monitor training process
- AWS api server: the one who actually creates and manages instances
- pservers and trainers: training instances
When you run the `docker run` command above, what it actually does is to ask aws api service to create a subnet (step 1) and a master instance (step 2), and pass all the parameters the client collected or generated (step 3). The master is kept as minimum hardware config to keep the running cost low.
Then when the master is up and running, it will ask the aws api server to create the heavy lifting training instances who are expensive to run (step 4). And the master will start training process as soon as they are done initializing (step 5).
Meanwhile, the master will expose a web service for client to check training log or even tear the training setup down by a web service call.
if you are creating the training with client docker container, and also monitoring your aws dashboard, you will initially see a instance tagged with `ROLE=MASTER` and `TASK_NAME=<yourtask name>_master` starts, then you will see several instances tagged with `ROLE=PSERVER` and `ROLE=TRAINER` starts.
When the training is finished, pservers and trainers will be terminated. All their logs are kept in master node's docker env.
Master exposes 4 major services:
- GET `/status`: return master log
- GET `/logs`: return list of log file names
- GET `/log/<logfile name>`: return a particular log by log file name
- POST `/cleanup`: teardown the whole setup
### Parameters
TBD, please refer to client/cluster_launcher.py for now
### Trouble shooting
TBD
FROM python:2.7.14-stretch
ENV HOME /root
COPY ./ /root/
WORKDIR /root
RUN pip install -r /root/requirements.txt
ENTRYPOINT ["python", "cluster_launcher.py"]
\ No newline at end of file
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
import time
import math
import logging
import copy
import netaddr
import boto3
import namesgenerator
import paramiko
from scp import SCPClient
import requests
def str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'--key_name', type=str, default="", help="required, key pair name")
parser.add_argument(
'--security_group_id',
type=str,
default="",
help="required, the security group id associated with your VPC")
parser.add_argument(
'--vpc_id',
type=str,
default="",
help="The VPC in which you wish to run test")
parser.add_argument(
'--subnet_id',
type=str,
default="",
help="The Subnet_id in which you wish to run test")
parser.add_argument(
'--pserver_instance_type',
type=str,
default="c5.2xlarge",
help="your pserver instance type, c5.2xlarge by default")
parser.add_argument(
'--trainer_instance_type',
type=str,
default="p2.8xlarge",
help="your trainer instance type, p2.8xlarge by default")
parser.add_argument(
'--task_name',
type=str,
default="",
help="the name you want to identify your job")
parser.add_argument(
'--pserver_image_id',
type=str,
default="ami-da2c1cbf",
help="ami id for system image, default one has nvidia-docker ready, \
use ami-1ae93962 for us-east-2")
parser.add_argument(
'--pserver_command', type=str, default="", help="pserver start command")
parser.add_argument(
'--trainer_image_id',
type=str,
default="ami-da2c1cbf",
help="ami id for system image, default one has nvidia-docker ready, \
use ami-1ae93962 for us-west-2")
parser.add_argument(
'--trainer_command', type=str, default="", help="trainer start command")
parser.add_argument(
'--availability_zone',
type=str,
default="us-east-2a",
help="aws zone id to place ec2 instances")
parser.add_argument(
'--trainer_count', type=int, default=1, help="Trainer count")
parser.add_argument(
'--pserver_count', type=int, default=1, help="Pserver count")
parser.add_argument(
'--action', type=str, default="create", help="create|cleanup|status")
parser.add_argument('--pem_path', type=str, help="private key file")
parser.add_argument(
'--pserver_port', type=str, default="5436", help="pserver port")
parser.add_argument(
'--docker_image', type=str, default="busybox", help="training docker image")
parser.add_argument(
'--master_server_port', type=int, default=5436, help="master server port")
parser.add_argument(
'--master_server_public_ip', type=str, help="master server public ip")
parser.add_argument(
'--master_docker_image',
type=str,
default="putcn/paddle_aws_master:latest",
help="master docker image id")
parser.add_argument(
'--no_clean_up',
type=str2bool,
default=False,
help="whether to clean up after training")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
ec2client = boto3.client('ec2')
def print_arguments():
print('----------- Configuration Arguments -----------')
for arg, value in sorted(vars(args).iteritems()):
print('%s: %s' % (arg, value))
print('------------------------------------------------')
def create_subnet():
# if no vpc id provided, list vpcs
logging.info("start creating subnet")
if not args.vpc_id:
logging.info("no vpc provided, trying to find the default one")
vpcs_desc = ec2client.describe_vpcs(
Filters=[{
"Name": "isDefault",
"Values": ["true", ]
}], )
if len(vpcs_desc["Vpcs"]) == 0:
raise ValueError('No default VPC')
args.vpc_id = vpcs_desc["Vpcs"][0]["VpcId"]
vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"]
logging.info("default vpc fount with id %s and CidrBlock %s" %
(args.vpc_id, vpc_cidrBlock))
if not vpc_cidrBlock:
logging.info("trying to find cidrblock for vpc")
vpcs_desc = ec2client.describe_vpcs(
Filters=[{
"Name": "vpc-id",
"Values": [args.vpc_id, ],
}], )
if len(vpcs_desc["Vpcs"]) == 0:
raise ValueError('No VPC found')
vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"]
logging.info("cidrblock for vpc is %s" % vpc_cidrBlock)
# list subnets in vpc in order to create a new one
logging.info("trying to find ip blocks for new subnet")
subnets_desc = ec2client.describe_subnets(
Filters=[{
"Name": "vpc-id",
"Values": [args.vpc_id, ],
}], )
ips_taken = []
for subnet_dec in subnets_desc["Subnets"]:
ips_taken.append(subnet_dec["CidrBlock"])
ip_blocks_avaliable = netaddr.IPSet(
[vpc_cidrBlock]) ^ netaddr.IPSet(ips_taken)
# adding 10 addresses as buffer
cidr_prefix = 32 - math.ceil(
math.log(args.pserver_count + args.trainer_count + 10, 2))
if cidr_prefix <= 16:
raise ValueError('Too many nodes to fit in current VPC')
for ipnetwork in ip_blocks_avaliable.iter_cidrs():
try:
subnet_cidr = ipnetwork.subnet(int(cidr_prefix)).next()
logging.info("subnet ip block found %s" % (subnet_cidr))
break
except Exception:
pass
if not subnet_cidr:
raise ValueError(
'No avaliable subnet to fit required nodes in current VPC')
logging.info("trying to create subnet")
subnet_desc = ec2client.create_subnet(
CidrBlock=str(subnet_cidr),
VpcId=args.vpc_id,
AvailabilityZone=args.availability_zone)
subnet_id = subnet_desc["Subnet"]["SubnetId"]
subnet_waiter = ec2client.get_waiter('subnet_available')
# sleep for 1s before checking its state
time.sleep(1)
subnet_waiter.wait(SubnetIds=[subnet_id, ])
logging.info("subnet created")
logging.info("adding tags to newly created subnet")
ec2client.create_tags(
Resources=[subnet_id, ],
Tags=[{
"Key": "Task_name",
'Value': args.task_name
}])
return subnet_id
def run_instances(image_id, instance_type, count=1, role="MASTER", cmd=""):
response = ec2client.run_instances(
ImageId=image_id,
InstanceType=instance_type,
MaxCount=count,
MinCount=count,
UserData=cmd,
DryRun=False,
InstanceInitiatedShutdownBehavior="stop",
KeyName=args.key_name,
Placement={'AvailabilityZone': args.availability_zone},
NetworkInterfaces=[{
'DeviceIndex': 0,
'SubnetId': args.subnet_id,
"AssociatePublicIpAddress": True,
'Groups': args.security_group_ids
}],
TagSpecifications=[{
'ResourceType': "instance",
'Tags': [{
"Key": 'Task_name',
"Value": args.task_name + "_master"
}, {
"Key": 'Role',
"Value": role
}]
}])
instance_ids = []
for instance in response["Instances"]:
instance_ids.append(instance["InstanceId"])
if len(instance_ids) > 0:
logging.info(str(len(instance_ids)) + " instance(s) created")
else:
logging.info("no instance created")
#create waiter to make sure it's running
logging.info("waiting for instance to become accessible")
waiter = ec2client.get_waiter('instance_status_ok')
waiter.wait(
Filters=[{
"Name": "instance-status.status",
"Values": ["ok"]
}, {
"Name": "instance-status.reachability",
"Values": ["passed"]
}, {
"Name": "instance-state-name",
"Values": ["running"]
}],
InstanceIds=instance_ids)
instances_response = ec2client.describe_instances(InstanceIds=instance_ids)
return instances_response["Reservations"][0]["Instances"]
def generate_task_name():
return namesgenerator.get_random_name()
def init_args():
if not args.task_name:
args.task_name = generate_task_name()
logging.info("task name generated %s" % (args.task_name))
if not args.pem_path:
args.pem_path = os.path.expanduser("~") + "/" + args.key_name + ".pem"
if args.security_group_id:
args.security_group_ids = (args.security_group_id, )
def create():
init_args()
# create subnet
if not args.subnet_id:
args.subnet_id = create_subnet()
# create master node
master_instance_response = run_instances(
image_id="ami-7a05351f", instance_type="t2.nano")
logging.info("master server started")
args.master_server_public_ip = master_instance_response[0][
"PublicIpAddress"]
args.master_server_ip = master_instance_response[0]["PrivateIpAddress"]
logging.info("master server started, master_ip=%s, task_name=%s" %
(args.master_server_public_ip, args.task_name))
# cp config file and pems to master node
ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path)
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(
hostname=args.master_server_public_ip, username="ubuntu", pkey=ssh_key)
with SCPClient(ssh_client.get_transport()) as scp:
scp.put(os.path.expanduser("~") + "/" + ".aws",
recursive=True,
remote_path='/home/ubuntu/')
scp.put(args.pem_path,
remote_path='/home/ubuntu/' + args.key_name + ".pem")
logging.info("credentials and pem copied to master")
# set arguments and start docker
kick_off_cmd = "docker run -d -v /home/ubuntu/.aws:/root/.aws/"
kick_off_cmd += " -v /home/ubuntu/" + args.key_name + ".pem:/root/" + args.key_name + ".pem"
kick_off_cmd += " -v /home/ubuntu/logs/:/root/logs/"
kick_off_cmd += " -p " + str(args.master_server_port) + ":" + str(
args.master_server_port)
kick_off_cmd += " " + args.master_docker_image
args_to_pass = copy.copy(args)
args_to_pass.action = "serve"
del args_to_pass.pem_path
del args_to_pass.security_group_ids
del args_to_pass.master_docker_image
del args_to_pass.master_server_public_ip
for arg, value in sorted(vars(args_to_pass).iteritems()):
if value:
kick_off_cmd += ' --%s %s' % (arg, value)
logging.info(kick_off_cmd)
stdin, stdout, stderr = ssh_client.exec_command(command=kick_off_cmd)
return_code = stdout.channel.recv_exit_status()
logging.info(return_code)
if return_code != 0:
raise Exception("Error while kicking off master")
logging.info(
"master server finished init process, visit %s to check master log" %
(get_master_web_url("/status")))
def cleanup():
print requests.post(get_master_web_url("/cleanup")).text
def status():
print requests.post(get_master_web_url("/status")).text
def get_master_web_url(path):
return "http://" + args.master_server_public_ip + ":" + str(
args.master_server_port) + path
if __name__ == "__main__":
print_arguments()
if args.action == "create":
if not args.key_name or not args.security_group_id:
raise ValueError("key_name and security_group_id are required")
create()
elif args.action == "cleanup":
if not args.master_server_public_ip:
raise ValueError("master_server_public_ip is required")
cleanup()
elif args.action == "status":
if not args.master_server_public_ip:
raise ValueError("master_server_public_ip is required")
status()
netaddr==0.7.19
boto3==1.6.21
namesgenerator==0.3
paramiko==2.4.1
scp
requests
FROM python:2.7.14-stretch
ENV HOME /root
COPY ./ /root/
WORKDIR /root
RUN pip install -r /root/requirements.txt
ENTRYPOINT ["python", "cluster_master.py"]
\ No newline at end of file
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
import json
import math
import time
import threading
import logging
import netaddr
import boto3
import namesgenerator
import paramiko
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
# You must have aws_access_key_id, aws_secret_access_key, region set in
# ~/.aws/credentials and ~/.aws/config
def str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'--key_name', type=str, default="", help="required, key pair name")
parser.add_argument(
'--security_group_id',
type=str,
default="",
help="required, the security group id associated with your VPC")
parser.add_argument(
'--vpc_id',
type=str,
default="",
help="The VPC in which you wish to run test")
parser.add_argument(
'--subnet_id',
type=str,
default="",
help="The Subnet_id in which you wish to run test")
parser.add_argument(
'--pserver_instance_type',
type=str,
default="c5.2xlarge",
help="your pserver instance type, c5.2xlarge by default")
parser.add_argument(
'--trainer_instance_type',
type=str,
default="p2.8xlarge",
help="your trainer instance type, p2.8xlarge by default")
parser.add_argument(
'--task_name',
type=str,
default="",
help="the name you want to identify your job")
parser.add_argument(
'--pserver_image_id',
type=str,
default="ami-da2c1cbf",
help="ami id for system image, default one has nvidia-docker ready, use ami-1ae93962 for us-east-2"
)
parser.add_argument(
'--trainer_image_id',
type=str,
default="ami-da2c1cbf",
help="ami id for system image, default one has nvidia-docker ready, use ami-1ae93962 for us-west-2"
)
parser.add_argument(
'--availability_zone',
type=str,
default="us-east-2a",
help="aws zone id to place ec2 instances")
parser.add_argument(
'--trainer_count', type=int, default=1, help="Trainer count")
parser.add_argument(
'--pserver_count', type=int, default=1, help="Pserver count")
parser.add_argument(
'--pserver_bash_file',
type=str,
default=os.path.join(os.path.dirname(__file__), "pserver.sh.template"),
help="pserver bash file path")
parser.add_argument(
'--pserver_command', type=str, default="", help="pserver start command")
parser.add_argument(
'--trainer_bash_file',
type=str,
default=os.path.join(os.path.dirname(__file__), "trainer.sh.template"),
help="trainer bash file path")
parser.add_argument(
'--trainer_command', type=str, default="", help="trainer start command")
parser.add_argument(
'--action', type=str, default="serve", help="create|cleanup|serve")
parser.add_argument('--pem_path', type=str, help="private key file")
parser.add_argument(
'--pserver_port', type=str, default="5436", help="pserver port")
parser.add_argument(
'--docker_image', type=str, default="busybox", help="training docker image")
parser.add_argument(
'--master_server_port', type=int, default=5436, help="master server port")
parser.add_argument(
'--master_server_ip', type=str, default="", help="master server private ip")
parser.add_argument(
'--no_clean_up',
type=str2bool,
default=False,
help="whether to clean up after training")
args = parser.parse_args()
ec2client = boto3.client('ec2')
args.log_path = os.path.join(os.path.dirname(__file__), "logs/")
logging.basicConfig(
filename=args.log_path + 'master.log',
level=logging.INFO,
format='%(asctime)s %(message)s')
log_files = ["master.log"]
def create_subnet():
# if no vpc id provided, list vpcs
logging.info("start creating subnet")
if not args.vpc_id:
logging.info("no vpc provided, trying to find the default one")
vpcs_desc = ec2client.describe_vpcs(
Filters=[{
"Name": "isDefault",
"Values": ["true", ]
}], )
if len(vpcs_desc["Vpcs"]) == 0:
raise ValueError('No default VPC')
args.vpc_id = vpcs_desc["Vpcs"][0]["VpcId"]
vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"]
logging.info("default vpc fount with id %s and CidrBlock %s" %
(args.vpc_id, vpc_cidrBlock))
if not vpc_cidrBlock:
logging.info("trying to find cidrblock for vpc")
vpcs_desc = ec2client.describe_vpcs(
Filters=[{
"Name": "vpc-id",
"Values": [args.vpc_id, ],
}], )
if len(vpcs_desc["Vpcs"]) == 0:
raise ValueError('No VPC found')
vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"]
logging.info("cidrblock for vpc is %s" % vpc_cidrBlock)
# list subnets in vpc in order to create a new one
logging.info("trying to find ip blocks for new subnet")
subnets_desc = ec2client.describe_subnets(
Filters=[{
"Name": "vpc-id",
"Values": [args.vpc_id, ],
}], )
ips_taken = []
for subnet_dec in subnets_desc["Subnets"]:
ips_taken.append(subnet_dec["CidrBlock"])
ip_blocks_avaliable = netaddr.IPSet(
[vpc_cidrBlock]) ^ netaddr.IPSet(ips_taken)
# adding 10 addresses as buffer
cidr_prefix = 32 - math.ceil(
math.log(args.pserver_count + args.trainer_count + 10, 2))
if cidr_prefix <= 16:
raise ValueError('Too many nodes to fit in current VPC')
for ipnetwork in ip_blocks_avaliable.iter_cidrs():
try:
subnet_cidr = ipnetwork.subnet(int(cidr_prefix)).next()
logging.info("subnet ip block found %s" % (subnet_cidr))
break
except Exception:
pass
if not subnet_cidr:
raise ValueError(
'No avaliable subnet to fit required nodes in current VPC')
logging.info("trying to create subnet")
subnet_desc = ec2client.create_subnet(
CidrBlock=str(subnet_cidr),
VpcId=args.vpc_id,
AvailabilityZone=args.availability_zone)
subnet_id = subnet_desc["Subnet"]["SubnetId"]
subnet_waiter = ec2client.get_waiter('subnet_available')
# sleep for 1s before checking its state
time.sleep(1)
subnet_waiter.wait(SubnetIds=[subnet_id, ])
logging.info("subnet created")
logging.info("adding tags to newly created subnet")
ec2client.create_tags(
Resources=[subnet_id, ],
Tags=[{
"Key": "Task_name",
'Value': args.task_name
}])
return subnet_id
def generate_task_name():
return namesgenerator.get_random_name()
def script_to_str(file_path):
if not file_path:
return "echo $PSERVER_HOSTS"
file = open(file_path, 'r')
text = file.read().strip()
file.close()
return text
def run_instances(image_id, instance_type, count, role, cmd=""):
response = ec2client.run_instances(
ImageId=image_id,
InstanceType=instance_type,
MaxCount=count,
MinCount=count,
UserData=cmd,
DryRun=False,
InstanceInitiatedShutdownBehavior="stop",
KeyName=args.key_name,
Placement={'AvailabilityZone': args.availability_zone},
NetworkInterfaces=[{
'DeviceIndex': 0,
'SubnetId': args.subnet_id,
"AssociatePublicIpAddress": True,
'Groups': args.security_group_ids
}],
TagSpecifications=[{
'ResourceType': "instance",
'Tags': [{
"Key": 'Task_name',
"Value": args.task_name
}, {
"Key": 'Role',
"Value": role
}]
}])
instance_ids = []
for instance in response["Instances"]:
instance_ids.append(instance["InstanceId"])
if len(instance_ids) > 0:
logging.info(str(len(instance_ids)) + " instance(s) created")
else:
logging.info("no instance created")
#create waiter to make sure it's running
logging.info("waiting for instance to become accessible")
waiter = ec2client.get_waiter('instance_status_ok')
waiter.wait(
Filters=[{
"Name": "instance-status.status",
"Values": ["ok"]
}, {
"Name": "instance-status.reachability",
"Values": ["passed"]
}, {
"Name": "instance-state-name",
"Values": ["running"]
}],
InstanceIds=instance_ids)
instances_response = ec2client.describe_instances(InstanceIds=instance_ids)
return instances_response["Reservations"][0]["Instances"]
def create_pservers():
try:
return run_instances(
image_id=args.pserver_image_id,
instance_type=args.pserver_instance_type,
count=args.pserver_count,
role="PSERVER", )
except Exception:
logging.exception("error while trying to create pservers")
cleanup(args.task_name)
def log_to_file(source, filename):
if not filename in log_files:
log_files.append(filename)
with open(args.log_path + filename, "a") as log_file:
for line in iter(source.readline, ""):
log_file.write(line)
def create_trainers(kickoff_cmd, pserver_endpoints_str):
def create_and_start_trainer(trainer_index):
logging.info("trainer " + str(trainer_index) + " is starting")
instance_response = run_instances(
image_id=args.trainer_image_id,
instance_type=args.trainer_instance_type,
count=1,
role="TRAINER", )[0]
trainer_ip = instance_response["PrivateIpAddress"]
logging.info("trainer " + str(trainer_index) + " started")
ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path)
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(hostname=trainer_ip, username="ubuntu", pkey=ssh_key)
logging.info("trainer " + str(trainer_index) +
" terminal connected via ssh")
cmd = kickoff_cmd.format(
PSERVER_HOSTS=pserver_endpoints_str,
DOCKER_IMAGE=args.docker_image,
TRAINER_INDEX=str(trainer_index),
TASK_NAME=args.task_name,
TRAINER_COUNT=args.trainer_count,
COMMAND=args.trainer_command,
MASTER_ENDPOINT=args.master_server_ip + ":" +
str(args.master_server_port))
logging.info(cmd)
stdin, stdout, stderr = ssh_client.exec_command(command=cmd)
# read and save output log
logging.info("trainer " + str(trainer_index) +
" command executed, keep fetching log")
stdout_thread = threading.Thread(
target=log_to_file,
args=(
stdout,
"trainer_" + str(trainer_index) + ".log", ))
stderr_thread = threading.Thread(
target=log_to_file,
args=(
stderr,
"trainer_" + str(trainer_index) + "_err.log", ))
stdout_thread.start()
stderr_thread.start()
stdout_thread.join()
stderr_thread.join()
return_code = stdout.channel.recv_exit_status()
if return_code != 0:
trainer_create_results[trainer_index] = {'has_error': True}
raise ValueError("trainer didn't finish with exit code 0")
ssh_client.close()
# multi thread starting trainer instance and run kickoff command
trainer_threads = []
trainer_create_results = {}
try:
for i in xrange(args.trainer_count):
logging.info("starting tread for trainer " + str(i))
trainer_thread = threading.Thread(
target=create_and_start_trainer, args=(i, ))
trainer_thread.start()
trainer_threads.append(trainer_thread)
for trainer_thread in trainer_threads:
trainer_thread.join()
for result in trainer_create_results:
if result["has_error"]:
logging.error(
"error during trainer starting or training, destorying the while cluster "
)
cleanup(args.task_name)
break
logging.info("all trainers stopped")
except Exception, e:
logging.info(
"Training exception, clean up resources, please check log for more info"
)
finally:
cleanup(args.task_name)
def cleanup(task_name):
if args.no_clean_up:
logging.info("no clean up option set, going to leave the setup running")
return
#shutdown all ec2 instances
print("going to clean up " + task_name + " instances")
instances_response = ec2client.describe_instances(Filters=[{
"Name": "tag:Task_name",
"Values": [task_name]
}])
instance_ids = []
if len(instances_response["Reservations"]) > 0:
for reservation in instances_response["Reservations"]:
for instance in reservation["Instances"]:
instance_ids.append(instance["InstanceId"])
ec2client.terminate_instances(InstanceIds=instance_ids)
instance_termination_waiter = ec2client.get_waiter(
'instance_terminated')
instance_termination_waiter.wait(InstanceIds=instance_ids)
#delete the subnet created
subnet = ec2client.describe_subnets(Filters=[{
"Name": "tag:Task_name",
"Values": [task_name]
}])
if len(subnet["Subnets"]) > 0:
ec2client.delete_subnet(SubnetId=subnet["Subnets"][0]["SubnetId"])
# no subnet delete waiter, just leave it.
logging.info("Clearnup done")
return
def kickoff_pserver(host, pserver_endpoints_str):
try:
ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path)
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(hostname=host, username="ubuntu", pkey=ssh_key)
cmd = (script_to_str(args.pserver_bash_file)).format(
PSERVER_HOSTS=pserver_endpoints_str,
DOCKER_IMAGE=args.docker_image,
PSERVER_PORT=args.pserver_port,
TASK_NAME=args.task_name,
COMMAND=args.pserver_command,
TRAINER_COUNT=args.trainer_count,
TRAINER_INDEX=0,
# there is no way to use 0.0.0.0:port to start pserver
# has to docker --network="host" with host ip to make this work
SERVER_ENDPOINT=host + ":" + str(args.pserver_port),
MASTER_ENDPOINT=args.master_server_ip + ":" +
str(args.master_server_port))
logging.info(cmd)
stdin, stdout, stderr = ssh_client.exec_command(command=cmd)
stdout_thread = threading.Thread(
target=log_to_file, args=(
stdout,
"pserver_" + host + ".log", ))
stderr_thread = threading.Thread(
target=log_to_file, args=(
stderr,
"pserver_" + host + "_err.log", ))
stdout_thread.start()
stderr_thread.start()
stdout_thread.join()
stderr_thread.join()
return_code = stdout.channel.recv_exit_status()
logging.info(return_code)
if return_code != 0:
raise Exception("Error while kicking off pserver training process")
except Exception:
logging.exception("Error while kicking off pserver training process")
cleanup(args.task_name)
finally:
ssh_client.close()
def init_args():
if not args.task_name:
args.task_name = generate_task_name()
logging.info("task name generated %s" % (args.task_name))
if not args.pem_path:
args.pem_path = os.path.expanduser("~") + "/" + args.key_name + ".pem"
if args.security_group_id:
args.security_group_ids = (args.security_group_id, )
args.trainers_job_done_count = 0
def create_cluster():
if not args.subnet_id:
logging.info("creating subnet for this task")
args.subnet_id = create_subnet()
logging.info("subnet %s created" % (args.subnet_id))
logging.info("creating pservers")
pserver_create_response = create_pservers()
logging.info("pserver created, collecting pserver ips")
pserver_endpoints = []
for pserver in pserver_create_response:
pserver_endpoints.append(pserver["NetworkInterfaces"][0][
"PrivateIpAddress"] + ":" + args.pserver_port)
pserver_endpoints_str = ",".join(pserver_endpoints)
logging.info("kicking off pserver training process")
pserver_threads = []
for pserver in pserver_create_response:
pserver_thread = threading.Thread(
target=kickoff_pserver,
args=(pserver["PrivateIpAddress"], pserver_endpoints_str))
pserver_thread.start()
pserver_threads.append(pserver_thread)
logging.info("all pserver training process started")
logging.info("creating trainers and kicking off trainer training process")
create_trainers(
kickoff_cmd=script_to_str(args.trainer_bash_file),
pserver_endpoints_str=pserver_endpoints_str)
for pserver_thread in pserver_threads:
pserver_thread.join()
logging.info("all process ended")
def start_server(args):
class S(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/text')
self.end_headers()
def do_HEAD(self):
self._set_headers()
def do_404(self):
self.send_response(404)
self.send_header('Content-type', 'text/text')
self.end_headers()
logging.info("Received invalid GET request" + self.path)
self.wfile.write("NO ACTION FOUND")
def do_GET(self):
request_path = self.path
if request_path == "/status" or request_path == "/master_logs":
self._set_headers()
logging.info("Received request to return status")
with open(args.log_path + "master.log", "r") as logfile:
self.wfile.write(logfile.read().strip())
elif request_path == "/list_logs" or request_path == "/logs":
self._set_headers()
self.wfile.write("\n".join(log_files))
elif "/log/" in request_path:
self._set_headers()
log_file_path = request_path.replace("/log/", "")
logging.info("requesting log file path is" + args.log_path +
log_file_path)
with open(args.log_path + log_file_path, "r") as logfile:
self.wfile.write(logfile.read().strip())
else:
self.do_404()
def do_POST(self):
request_path = self.path
if request_path == "/save_data":
self._set_headers()
logging.info("Received request to save data")
self.wfile.write("DATA SAVED!")
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
if args.task_name:
with open(args.task_name + ".txt", "a") as text_file:
text_file.write(post_data + "\n")
elif request_path == "/cleanup":
self._set_headers()
logging.info("Received request to cleanup cluster")
cleanup(args.task_name)
self.wfile.write("cleanup in progress")
else:
self.do_404()
server_address = ('', args.master_server_port)
httpd = HTTPServer(server_address, S)
logging.info("HTTP server is starting")
httpd.serve_forever()
def print_arguments():
logging.info('----------- Configuration Arguments -----------')
for arg, value in sorted(vars(args).iteritems()):
logging.info('%s: %s' % (arg, value))
logging.info('------------------------------------------------')
if __name__ == "__main__":
print_arguments()
if args.action == "create":
logging.info("going to create cluster")
if not args.key_name or not args.security_group_id:
raise ValueError("key_name and security_group_id are required")
init_args()
create_cluster()
elif args.action == "cleanup":
logging.info("going to cleanup cluster")
if not args.task_name:
raise ValueError("task_name is required")
cleanup(args.task_name)
elif args.action == "serve":
# serve mode
if not args.master_server_ip:
raise ValueError(
"No master server ip set, please run with --action create")
logging.info("going to start serve and create cluster")
init_args()
logging.info("starting server in another thread")
server_thread = threading.Thread(target=start_server, args=(args, ))
server_thread.start()
create_cluster()
server_thread.join()
elif args.action == "test":
start_server(args)
#!/bin/bash
docker run --network="host" -i -e "SERVER_ENDPOINT={SERVER_ENDPOINT}" -e "MASTER_ENDPOINT={MASTER_ENDPOINT}" -e "TASK_NAME={TASK_NAME}" -e "TRAINER_INDEX={TRAINER_INDEX}" -e "TRAINING_ROLE=PSERVER" -e "TRAINER_COUNT={TRAINER_COUNT}" -e "TRAINERS={TRAINER_COUNT}" -e "PSERVER_HOSTS={PSERVER_HOSTS}" -e "PSERVERS={PSERVER_HOSTS}" {DOCKER_IMAGE} {COMMAND} --device CPU
\ No newline at end of file
netaddr==0.7.19
boto3==1.6.21
namesgenerator==0.3
paramiko==2.4.1
#!/bin/bash
nvidia-docker run --network="host" -i -e "MASTER_ENDPOINT={MASTER_ENDPOINT}" -e "TASK_NAME={TASK_NAME}" -e "TRAINER_COUNT={TRAINER_COUNT}" -e "TRAINERS={TRAINER_COUNT}" -e "TRAINER_INDEX={TRAINER_INDEX}" -e "PADDLE_INIT_TRAINER_ID={TRAINER_INDEX}" -e "TRAINING_ROLE=TRAINER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" -e "PSERVERS={PSERVER_HOSTS}" {DOCKER_IMAGE} {COMMAND} --device GPU
\ No newline at end of file
......@@ -37,7 +37,7 @@ RUN git config --global credential.helper store
# Fix locales to en_US.UTF-8
RUN localedef -i en_US -f UTF-8 en_US.UTF-8
RUN pip install --upgrade pip && \
RUN pip install --upgrade pip==9.0.3 && \
pip install -U 'protobuf==3.1.0' && \
pip install -U wheel sphinx && \
pip install pre-commit
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册