diff --git a/CMakeLists.txt b/CMakeLists.txt index de47086dbd6a440cd413c7843c83b1c69d9841b2..23bbe829ac16180088bfa37df66e23f19b021ea3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,7 +39,6 @@ option(WITH_GPU "Compile PaddlePaddle with NVIDIA GPU" ${CUDA_F option(WITH_AMD_GPU "Compile PaddlePaddle with AMD GPU" OFF) option(WITH_AVX "Compile PaddlePaddle with AVX intrinsics" ${AVX_FOUND}) option(WITH_MKL "Compile PaddlePaddle with MKL support." ${AVX_FOUND}) -option(WITH_TENSORRT "Compile PaddlePaddle with TensorRT support." OFF) option(WITH_DSO "Compile PaddlePaddle with dynamic linked CUDA" ON) option(WITH_TESTING "Compile PaddlePaddle with unit testing" OFF) option(WITH_SWIG_PY "Compile PaddlePaddle with inference api" ON) @@ -180,13 +179,9 @@ set(EXTERNAL_LIBS if(WITH_GPU) include(cuda) + include(tensorrt) endif(WITH_GPU) -# TensorRT depends on GPU. -if (NOT WITH_GPU) - set(WITH_TENSORRT OFF) -endif() - if(WITH_AMD_GPU) find_package(HIP) include(hip) diff --git a/Dockerfile b/Dockerfile index 9097bb657d2366997112ec7662762a93358aa647..870304a6acc99e715dffbfabd8058be000b6872c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -46,7 +46,7 @@ ENV PATH=${PATH}:${GOROOT}/bin:${GOPATH}/bin RUN curl -s -q https://glide.sh/get | sh # Install TensorRT -# The unnecessary files has been removed to make the library small. +# The unnecessary files has been removed to make the library small. It only contains include and lib now. RUN wget -qO- http://paddlepaddledeps.bj.bcebos.com/TensorRT-4.0.0.3.Ubuntu-16.04.4.x86_64-gnu.cuda-8.0.cudnn7.0.tar.gz | \ tar -xz -C /usr/local && \ cp -rf /usr/local/TensorRT/include /usr && \ diff --git a/Dockerfile.android b/Dockerfile.android index cc022d596b4b74dd1e4f4d0901dd81c91a7decd1..848a7eba6f1421432addae8acff407b611adb4ae 100644 --- a/Dockerfile.android +++ b/Dockerfile.android @@ -27,7 +27,7 @@ RUN git config --global credential.helper store # Fix locales to en_US.UTF-8 RUN localedef -i en_US -f UTF-8 en_US.UTF-8 -RUN pip install --upgrade pip && \ +RUN pip install --upgrade pip==9.0.3 && \ pip install -U 'protobuf==3.1.0' && \ pip install -U wheel sphinx && \ pip install pre-commit diff --git a/paddle/scripts/check_env.sh b/benchmark/paddle/image/check_env.sh similarity index 100% rename from paddle/scripts/check_env.sh rename to benchmark/paddle/image/check_env.sh diff --git a/cmake/configure.cmake b/cmake/configure.cmake index f726405c4773994f6ca6509e5218750805b03995..e490397cc0624c310949a4b571bd00cac6e8953b 100644 --- a/cmake/configure.cmake +++ b/cmake/configure.cmake @@ -80,6 +80,16 @@ if(WITH_GPU) # Include cuda and cudnn include_directories(${CUDNN_INCLUDE_DIR}) include_directories(${CUDA_TOOLKIT_INCLUDE}) + + if(TENSORRT_FOUND) + if(${CUDA_VERSION_MAJOR} VERSION_LESS 8) + message(FATAL_ERROR "TensorRT needs CUDA >= 8.0 to compile") + endif() + if(${CUDNN_MAJOR_VERSION} VERSION_LESS 7) + message(FATAL_ERROR "TensorRT needs CUDNN >= 7.0 to compile") + endif() + include_directories(${TENSORRT_INCLUDE_DIR}) + endif() elseif(WITH_AMD_GPU) add_definitions(-DPADDLE_WITH_HIP) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D__HIP_PLATFORM_HCC__") diff --git a/cmake/tensorrt.cmake b/cmake/tensorrt.cmake new file mode 100644 index 0000000000000000000000000000000000000000..0c07d36bed65400164853b99f18ec0335341cd94 --- /dev/null +++ b/cmake/tensorrt.cmake @@ -0,0 +1,33 @@ +if(NOT WITH_GPU) + return() +endif() + +set(TENSORRT_ROOT "/usr" CACHE PATH "TENSORRT ROOT") +find_path(TENSORRT_INCLUDE_DIR NvInfer.h + PATHS ${TENSORRT_ROOT} ${TENSORRT_ROOT}/include + $ENV{TENSORRT_ROOT} $ENV{TENSORRT_ROOT}/include + NO_DEFAULT_PATH +) + +find_library(TENSORRT_LIBRARY NAMES libnvinfer.so libnvinfer.a + PATHS ${TENSORRT_ROOT} ${TENSORRT_ROOT}/lib + $ENV{TENSORRT_ROOT} $ENV{TENSORRT_ROOT}/lib + NO_DEFAULT_PATH + DOC "Path to TensorRT library.") + +if(TENSORRT_INCLUDE_DIR AND TENSORRT_LIBRARY) + set(TENSORRT_FOUND ON) +else() + set(TENSORRT_FOUND OFF) +endif() + +if(TENSORRT_FOUND) + file(READ ${TENSORRT_INCLUDE_DIR}/NvInfer.h TENSORRT_VERSION_FILE_CONTENTS) + string(REGEX MATCH "define NV_TENSORRT_MAJOR +([0-9]+)" TENSORRT_MAJOR_VERSION + "${TENSORRT_VERSION_FILE_CONTENTS}") + string(REGEX REPLACE "define NV_TENSORRT_MAJOR +([0-9]+)" "\\1" + TENSORRT_MAJOR_VERSION "${TENSORRT_MAJOR_VERSION}") + + message(STATUS "Current TensorRT header is ${TENSORRT_INCLUDE_DIR}/NvInfer.h. " + "Current TensorRT version is v${TENSORRT_MAJOR_VERSION}. ") +endif() diff --git a/doc/CMakeLists.txt b/doc/CMakeLists.txt index 7066637a7cb27b83724cb4030c29a1019981f52b..0f9521616952a2857222feab8c38fb480761ee2d 100644 --- a/doc/CMakeLists.txt +++ b/doc/CMakeLists.txt @@ -3,7 +3,9 @@ add_custom_target(paddle_apis ALL add_custom_target(paddle_docs ALL DEPENDS paddle_v2_docs paddle_v2_docs_cn - paddle_fluid_docs paddle_fluid_docs_cn) + paddle_fluid_docs paddle_fluid_docs_cn + paddle_mobile_docs paddle_mobile_docs_cn) add_subdirectory(v2) add_subdirectory(fluid) +add_subdirectory(mobile) diff --git a/doc/fluid/api/initializer.rst b/doc/fluid/api/initializer.rst index ee69925fda6b3fc850cfb632e8edd359e7fcff9c..f186c9c85a640da49d95a1a62c721b09b3007d83 100644 --- a/doc/fluid/api/initializer.rst +++ b/doc/fluid/api/initializer.rst @@ -33,3 +33,45 @@ Xavier :members: :noindex: +MSRA +------ + +.. autoclass:: paddle.fluid.initializer.MSRA + :members: + :noindex: + +ConstantInitializer +------------------- + +.. autoclass:: paddle.fluid.initializer.ConstantInitializer + :members: + :noindex: + +UniformInitializer +------------------ + +.. autoclass:: paddle.fluid.initializer.UniformInitializer + :members: + :noindex: + +NormalInitializer +----------------- + +.. autoclass:: paddle.fluid.initializer.NormalInitializer + :members: + :noindex: + +XavierInitializer +----------------- + +.. autoclass:: paddle.fluid.initializer.XavierInitializer + :members: + :noindex: + MSRA + ------ + +MSRAInitializer +----------------- +.. autoclass:: paddle.fluid.initializer.MSRAInitializer + :members: + :noindex: diff --git a/doc/fluid/design/concepts/parallel_executor.md b/doc/fluid/design/concepts/parallel_executor.md index 9aed3b059a1595ba3971d7d5acfc0d16a731584b..4f88e27bed722e9f2f535e368926fe49b4e72e56 100644 --- a/doc/fluid/design/concepts/parallel_executor.md +++ b/doc/fluid/design/concepts/parallel_executor.md @@ -84,7 +84,7 @@ Running an operator can be asynchronized. There is a thread pool to execute an ` ## Synchronize GPU Kernels -The GPU is a non-blocking device. The different streams need be synchronized when switing streams. In current implementation, the synchronization based on the following algorithm: +The GPU is a non-blocking device. The different streams need be synchronized when switching streams. In current implementation, the synchronization based on the following algorithm: 1. `OpHandle` will record `DeviceContext` that it is used. 2. In `OpHandle::Run`, if the `DeviceContext` of current operator is different from `DeviceContext` of any input variable, just wait the generate operator of this input variable. diff --git a/doc/fluid/design/dist_train/README.md b/doc/fluid/design/dist_train/README.md new file mode 100644 index 0000000000000000000000000000000000000000..2dd652d8bdcb8f3b6e759347bd55b217be909386 --- /dev/null +++ b/doc/fluid/design/dist_train/README.md @@ -0,0 +1,57 @@ +## Distributed training overview doc + +Currently Paddle Fluid use parameter server architecture to support distributed training. + +For synchronous and asynchronous training, the differences are mostly in the logic of parameter server. Now we have already support synchronous training. + +### Synchronous training + +The training process of synchronous training is: + +![synchronous distributed training](./src/sync_distributed_training.png) + +1. Pserver + 1. set `barrier_condition_` to 0 and waits for trainers to send gradient. +1. Trainer + 1. Trainer read minibatch of data, run forward-backward with local parameter copy and get the gradients for parameters. + 1. Trainer use split op to split all the gradient into blocks. The split method is determined at compile time. + 1. Trainer use send_op to send all the split gradients to corresponding parameter server. + 1. After trainer send all the gradients, it will send a `BATCH_BARRIER_MESSAGE` to all pservers. + 1. Trainer call GetVariable to pserver and wait for `barrier_condition_` on pserver to be 1. +1. Pserver + 1. Pserver will count the number of `BATCH_BARRIER_MESSAGE`. + 1. When the count of `BATCH_BARRIER_MESSAGE` is equal to the number of Trainer. Pserver thinks it received all gradient from all trainers. + 1. Pserver will run the optimization block to optimize the parameters. + 1. After optimization, pserver set `barrier_condition_` to 1. + 1. Pserver wait for `FETCH_BARRIER_MESSAGE`. +1. Trainer. + 1. The trainer uses GetVariable to get all the parameters from pserver. + 1. Trainer sends a `FETCH_BARRIER_MESSAGE` to each pserver. +1. Pserver. + 1. when the number of `FETCH_BARRIER_MESSAGE` reach the number of all trainers. Pserver think all the parameters have been got. it will go back to 1. to set `barrier_condition_` to 0. + +### Asynchronous training +In the above process. There are two barriers for all trainers to synchronize with each other. In asynchronous training, these two barriers are not needed. The trainer can just send gradients to pserver and then get parameters back. + +The training process of asynchronous training can be: + +![asynchronous distributed training](./src/async_distributed_training.png) + +1. Pserver: + 1. Each parameter has a queue to receive its gradient from trainers. + 1. Each parameter has a thread to read data from the queue and run optimize block, using the gradient to optimize the parameter. + 1. Using an independent thread to handle RPC call `GetVariable` for trainers to get parameters back.(Maybe here we should use a thread pool to speed up fetching the parameters.) + +1. Trainer: + 1. Trainer read a batch of data. Run forward and backward with local parameter copy and get the gradients for parameters. + 1. Trainer split all gradients to blocks and then send these gradient blocks to pservers(pserver will put them into the queue). + 2. Trainer gets all parameters back from pserver. + +### Note: +There are also some conditions that need to consider. For exmaple: + +1. If trainer needs to wait for the pserver to apply it's gradient and then get back the parameters back. +1. If we need a lock between parameter update and parameter fetch. +1. If one parameter must be on one server, or it can also be split and send to multiple parameter servers. + +The above architecture of asynchronous training can support different mode, we can have a detailed test in the future for these problems. diff --git a/doc/fluid/design/dist_train/async_update.md b/doc/fluid/design/dist_train/async_update.md new file mode 100644 index 0000000000000000000000000000000000000000..6a0835b761b69030ba30697e6e8863928efbf57f --- /dev/null +++ b/doc/fluid/design/dist_train/async_update.md @@ -0,0 +1,58 @@ +# Design Doc: Asynchronous Update With Distributed Training + +## Background + +For the typical synchronous distributed training, some significant steps are as follows: + +1. A Trainer will compute the gradients and SEND them to the Parameter Server(PServer) nodes. +1. After the PServer node received gradients came from all the Trainers, It will aggregate the +gradient variables for the same parameter into one gradient variable and then apply the aggregated +gradient to the respective parameter, finally using an optimize algorithms(SGD, Monument...) +to update the parameters. +1. The Trainer would wait for the PServers finished the optimize stage, and GET the parameters from PServer, +so all the Trainers would get the same parameters. + +In the synchronously distributed training, there should be a `Barrier` to synchronise the +parameters after the optimizing stage. The performance of a distributed training job would +depend on the slowest node if there were hundreds or thousands of training nodes in a +Job, the performance of synchronously distributed training might be very poor because of +the slow node. So this design doc would introduce an approach to implement +*asynchronously* distributed training in PaddlePaddle Fluid. + +## Design + + + +As the figure above, we describe a global view of asynchronously update process and use +the parameter `w1` as an example to introduce the steps: +1. For each gradient variables, they may distribute on different GPU card and aggregate +them while they are all calculated. +1. Split the gradient variable into multiple blocks according to the number of PServer +instances and then send them. +1. PServer would run an `Optimize Block` using a specified optimize algorithm to update +the specified parameter. +1. The trainer will fetch latest parameter from PServer before running forward Op which depends +on the specified parameter. +1. Broadcast the received variable into multiple GPU cards and continue to run the next +mini-batch. + +### Trainer + +- For the multiple devices distributed training, we need to aggregate the gradient +variables which placed on different devices firstly and then schedule a `SendVars` Operator to +send the gradient variables to the multiple PServer instances. +- Schedule `FetchVars` operator to fetch the latest parameter from PServer before running +the forward ops. +- There could be a large number of gradient variables to be sent, so we need to use another +thread pool(IO Threadpool) whose a number of the schedulable threads is larger than the +computing thread pool to avoid competitive the thread resources with computing. + +### Parameter Server + + + +- There should be multiple trainer instances want to optimize the same parameter at +the same time, to avoid the racing, we need one `BlockingQueue` for each gradient +variable to process them one by one. +- We need a `Map` structure to map a gradient variable name to the `OptimizeBlock` which +can optimize the respective parameter. diff --git a/doc/fluid/design/dist_train/mpi_enabled_design.md b/doc/fluid/design/dist_train/mpi_enabled_design.md new file mode 100644 index 0000000000000000000000000000000000000000..4ad3afc7b7522c60460c6f1f387f9415d3738778 --- /dev/null +++ b/doc/fluid/design/dist_train/mpi_enabled_design.md @@ -0,0 +1,46 @@ +# MPI-enabled PaddlePaddle Design doc + +# Background +When we do distribute multi GPU training, the communication overhead between servers become the major bottleneck, because of the following reasons: +1. Must copy at least once from GPU to CPU memory so that the data can be ready to transfer. And for the pserver side, copy data from CPU to GPU introduce more overhead. +2. GPU->CPU data transfer is 10 times slower than data transfer between GPUs or between PCIe devices. +3. TCP connections can not make full use of RDMA 100Gb devices. + +We will use OpenMPI API to PaddlePaddle, which can bring two benefits to PaddlePaddle: +1. Enable RDMA with PaddlePaddle, which bring high-performance low latency networks. +2. Enable GPUDriect with PaddlePaddle, which bring the highest throughput and lowest latency GPU read and write. + +# Change list +* Compile args: Need add compile args to enable MPI support. +* Execute args: Need add execute args to assign when and how to use MPI operations. +* New ops: Need new op ```mpi_send_op``` and ```mpi_listenandserve_op``` to support MPI send and receive. +* Transpiler optimized: Which can add ```mpi_send_op``` and ```mpi_listenandserve_op``` to the running graph. +* MPI utils package: Need MPI utils package as the low-level API supported. + +## Compile args +Because MPI or CUDA need hardware supported, so we will add compile args to enable MPI support and control compiling.Add ```WITH_MPI``` compile args to control MPI to use or not. If the ```WITH_MPI``` is ```ON```, compile system will find openMPI codes in configuration. We should prepare openMPI environment before compiling. + +## Execute args +Launch the script using the ```mpirun``` launcher, For example: ```mpirun -np 3 -hosts node1,node2,node3 python train.py```. By doing this, We can number the actors (trainer/pserver/master) with o .. (n-1). The node's number is the Rank of the calling process in a group of comm (integer), The MPI processes identify each other using a Rank ID. We have to create a mapping between PaddlePaddle's nodes and their Rank ID so that we can communicate with the correct destinations when using MPI operations. + +## New ops +We won't replace all the gRPC requests to MPI requests, the standard gRPC library is used for all administrative operations and the MPI API will be used to transfer tensor or selectRows to Pservers. The base of this idea, we create two new operators to handle requests and receives, the two operators are ```mpi_send_op``` and ```mpi_listenandserve_op```. They are a little similar to [send_op](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/operators/send_op.cc) and [listen_and_serv_op](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/operators/listen_and_serv_op.cc), also, We will build a new module to package MPI send and receive process. + +### mpi_send_op +Very similar with ```send_op```, we will replace gRPC code which used to send gradient with ```mpi_module```, at the same time, we will wrap it with ```framework::Async```. + +### mpi_listenandserve_op +Very similar with ```listen_and_serv_op```, we will replace gRPC code which used to receive gradient with ```mpi_module```, at the same time, we will wrap it with ```framework::Async```. + +## Transpiler optimized +**We can get env ```OMPI_COMM_WORLD_SIZE``` and ```OMPI_COMM_WORLD_RANK``` to distinguish use MPI or not, If we use openMPI, the variable in env must exist.** + if confirm to use MPI, we will modify ```send_op``` to ```mpi_send_op``` in distribute_transpiler, and modify ```listenandserve_op``` to ```mpi_listenandserve_op``` also. + +## MPI utils package +In this package, We will write openMPI low-level API to use MPI. +The API included in this package are: +* MPI send and receive module, We will build a new module to package MPI send and receive process. MPI send and receive are different to gRPC, the MPI [recvice](https://www.open-mpi.org/doc/v1.8/man3/MPI_Irecv.3.php) must know receive buffer size and receive buffer element. For this reason, We have to make communications twice, the first one is to send metadata about gradient through gRPC, the second one is the real communication through MPI which send gradient data to mpi_listenandserve_op. +The detailed flow is below: +![](https://github.com/seiriosPlus/Paddle/blob/mpi_enabled/doc/fluid/design/dist_train/src/mpi_module.png) +* MPI global configurations, which store the Rank ID and the mapping in global variables, for example: +gRPC client : MPI nodes :``` 127.0.0.1:32004 : 3 ``` diff --git a/doc/fluid/design/dist_train/src/async_distributed_training.png b/doc/fluid/design/dist_train/src/async_distributed_training.png new file mode 100644 index 0000000000000000000000000000000000000000..3b53ab59c0cd7b44b2956f16f1adc47fe85909d3 Binary files /dev/null and b/doc/fluid/design/dist_train/src/async_distributed_training.png differ diff --git a/doc/fluid/design/dist_train/src/async_pserver.graffle b/doc/fluid/design/dist_train/src/async_pserver.graffle new file mode 100644 index 0000000000000000000000000000000000000000..d2301611774fcb3866473e3e6470568d1e1312cf Binary files /dev/null and b/doc/fluid/design/dist_train/src/async_pserver.graffle differ diff --git a/doc/fluid/design/dist_train/src/async_pserver.png b/doc/fluid/design/dist_train/src/async_pserver.png new file mode 100644 index 0000000000000000000000000000000000000000..7d900b0c0eb291c67537b9cf93227c671bafdc73 Binary files /dev/null and b/doc/fluid/design/dist_train/src/async_pserver.png differ diff --git a/doc/fluid/design/dist_train/src/async_update.graffle b/doc/fluid/design/dist_train/src/async_update.graffle new file mode 100644 index 0000000000000000000000000000000000000000..3a631888688a0d564a873fcb16d943958c91223e Binary files /dev/null and b/doc/fluid/design/dist_train/src/async_update.graffle differ diff --git a/doc/fluid/design/dist_train/src/async_update.png b/doc/fluid/design/dist_train/src/async_update.png new file mode 100644 index 0000000000000000000000000000000000000000..3e8db973f45d6d9ac8dcce1dc7878067e79e6dcc Binary files /dev/null and b/doc/fluid/design/dist_train/src/async_update.png differ diff --git a/doc/fluid/design/dist_train/src/distributed_training.graffle b/doc/fluid/design/dist_train/src/distributed_training.graffle new file mode 100644 index 0000000000000000000000000000000000000000..1168801bc1fadfce310a74cb3110695bd1629f6b Binary files /dev/null and b/doc/fluid/design/dist_train/src/distributed_training.graffle differ diff --git a/doc/fluid/design/dist_train/src/mpi_module.png b/doc/fluid/design/dist_train/src/mpi_module.png new file mode 100644 index 0000000000000000000000000000000000000000..e6b6a3e5d6f68baeeb67d7f71154bd8d85f32b6f Binary files /dev/null and b/doc/fluid/design/dist_train/src/mpi_module.png differ diff --git a/doc/fluid/design/dist_train/src/sync_distributed_training.png b/doc/fluid/design/dist_train/src/sync_distributed_training.png new file mode 100644 index 0000000000000000000000000000000000000000..e4f9a221fea4b7238e8a1d84e609c0371f6ef7a2 Binary files /dev/null and b/doc/fluid/design/dist_train/src/sync_distributed_training.png differ diff --git a/doc/mobile/CMakeLists.txt b/doc/mobile/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..b104a6318d474d6531670b8ac3569448774850c7 --- /dev/null +++ b/doc/mobile/CMakeLists.txt @@ -0,0 +1,53 @@ +if(NOT DEFINED SPHINX_THEME) + set(SPHINX_THEME default) +endif() + +if(NOT DEFINED SPHINX_THEME_DIR) + set(SPHINX_THEME_DIR) +endif() + +# configured documentation tools and intermediate build results +set(BINARY_BUILD_DIR_EN "${CMAKE_CURRENT_BINARY_DIR}/en/_build") + +# Sphinx cache with pickled ReST documents +set(SPHINX_CACHE_DIR_EN "${CMAKE_CURRENT_BINARY_DIR}/en/_doctrees") + +# HTML output director +set(SPHINX_HTML_DIR_EN "${CMAKE_CURRENT_BINARY_DIR}/en/html") + +configure_file( + "${CMAKE_CURRENT_SOURCE_DIR}/../templates/conf.py.en.in" + "${BINARY_BUILD_DIR_EN}/conf.py" + @ONLY) + +sphinx_add_target(paddle_mobile_docs + html + ${BINARY_BUILD_DIR_EN} + ${SPHINX_CACHE_DIR_EN} + ${CMAKE_CURRENT_SOURCE_DIR} + ${SPHINX_HTML_DIR_EN}) + +add_dependencies(paddle_mobile_docs gen_proto_py paddle_python) + +# configured documentation tools and intermediate build results +set(BINARY_BUILD_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/_build") + +# Sphinx cache with pickled ReST documents +set(SPHINX_CACHE_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/_doctrees") + +# HTML output director +set(SPHINX_HTML_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/html") + +configure_file( + "${CMAKE_CURRENT_SOURCE_DIR}/../templates/conf.py.cn.in" + "${BINARY_BUILD_DIR_CN}/conf.py" + @ONLY) + +sphinx_add_target(paddle_mobile_docs_cn + html + ${BINARY_BUILD_DIR_CN} + ${SPHINX_CACHE_DIR_CN} + ${CMAKE_CURRENT_SOURCE_DIR} + ${SPHINX_HTML_DIR_CN}) + +add_dependencies(paddle_mobile_docs_cn gen_proto_py paddle_python) diff --git a/doc/mobile/index_cn.rst b/doc/mobile/index_cn.rst new file mode 100644 index 0000000000000000000000000000000000000000..8297316e8fbb2b8f41954030293feadbcd81295e --- /dev/null +++ b/doc/mobile/index_cn.rst @@ -0,0 +1,9 @@ +移动端 +===== + +.. toctree:: + :maxdepth: 1 + + cross_compiling_for_android_cn.md + cross_compiling_for_ios_cn.md + cross_compiling_for_raspberry_cn.md \ No newline at end of file diff --git a/doc/mobile/index_en.rst b/doc/mobile/index_en.rst new file mode 100644 index 0000000000000000000000000000000000000000..e0acdff0284e3bc84b2cc4a34a142ee01754f940 --- /dev/null +++ b/doc/mobile/index_en.rst @@ -0,0 +1,9 @@ +Mobile +====== + +.. toctree:: + :maxdepth: 1 + + cross_compiling_for_android_en.md + cross_compiling_for_ios_en.md + cross_compiling_for_raspberry_en.md diff --git a/paddle/fluid/framework/details/broadcast_op_handle_test.cc b/paddle/fluid/framework/details/broadcast_op_handle_test.cc index dfc52b012f8b6bf5cf1a3feab90dc1ec7842ad6c..bcd61335be0f7fe64563ee65daaf9de0760c9b1a 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle_test.cc +++ b/paddle/fluid/framework/details/broadcast_op_handle_test.cc @@ -77,14 +77,9 @@ struct TestBroadcastOpHandle { local_scopes_[input_scope_idx]->Var("input"); op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_)); - - vars_.emplace_back(new VarHandle()); - VarHandle* in_var_handle = static_cast(vars_.back().get()); - in_var_handle->place_ = gpu_list_[input_scope_idx]; - in_var_handle->name_ = "input"; - in_var_handle->version_ = 1; - in_var_handle->scope_idx_ = input_scope_idx; - in_var_handle->generated_op_ = nullptr; + auto* in_var_handle = + new VarHandle(1, input_scope_idx, "input", gpu_list_[input_scope_idx]); + vars_.emplace_back(in_var_handle); op_handle_->AddInput(in_var_handle); // add dummy var @@ -96,12 +91,8 @@ struct TestBroadcastOpHandle { for (size_t j = 0; j < gpu_list_.size(); ++j) { op_handle_->dev_ctxes_[gpu_list_[j]] = ctxs_[j].get(); - vars_.emplace_back(new VarHandle()); - VarHandle* out_var_handle = static_cast(vars_.back().get()); - out_var_handle->place_ = gpu_list_[j]; - out_var_handle->name_ = "out"; - out_var_handle->version_ = 2; - out_var_handle->scope_idx_ = j; + VarHandle* out_var_handle = new VarHandle(2, j, "out", gpu_list_[j]); + vars_.emplace_back(out_var_handle); op_handle_->AddOutput(out_var_handle); } diff --git a/paddle/fluid/framework/details/gather_op_handle_test.cc b/paddle/fluid/framework/details/gather_op_handle_test.cc index 10839f239d59e97946575297a6d125968a1458f4..2da8c89d2df73215b748f102d9bbfc5b742cf97f 100644 --- a/paddle/fluid/framework/details/gather_op_handle_test.cc +++ b/paddle/fluid/framework/details/gather_op_handle_test.cc @@ -79,13 +79,8 @@ struct TestGatherOpHandle { // add input for (size_t j = 0; j < gpu_list_.size(); ++j) { op_handle_->dev_ctxes_[gpu_list_[j]] = ctxs_[j].get(); - vars_.emplace_back(new VarHandle()); - VarHandle* in_var_handle = static_cast(vars_.back().get()); - in_var_handle->place_ = gpu_list_[j]; - in_var_handle->name_ = "input"; - in_var_handle->version_ = 1; - in_var_handle->scope_idx_ = j; - in_var_handle->generated_op_ = nullptr; + auto* in_var_handle = new VarHandle(1, j, "input", gpu_list_[j]); + vars_.emplace_back(in_var_handle); op_handle_->AddInput(in_var_handle); } @@ -97,12 +92,9 @@ struct TestGatherOpHandle { op_handle_->AddInput(in_dummy_var_handle); // add output - vars_.emplace_back(new VarHandle()); - VarHandle* out_var_handle = static_cast(vars_.back().get()); - out_var_handle->place_ = gpu_list_[input_scope_idx]; - out_var_handle->name_ = "out"; - out_var_handle->version_ = 2; - out_var_handle->scope_idx_ = input_scope_idx; + auto* out_var_handle = + new VarHandle(2, input_scope_idx, "out", gpu_list_[input_scope_idx]); + vars_.emplace_back(out_var_handle); op_handle_->AddOutput(out_var_handle); // add dummy var diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index 5a95cbc53625888bac539f91af391ff0babec17b..4d76dbf7f6ffcf6c82ebf7defd9334bbe64a451c 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -177,13 +177,9 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( auto &prev_grad = vars[vars.size() - 1]; op_handle->AddInput(prev_grad.get()); - vars.emplace_back(new VarHandle); - auto &var = vars.back(); - var->place_ = p; - var->name_ = og; - var->version_ = vars.size() - 1; - - op_handle->AddOutput(var.get()); + auto var = new VarHandle(vars.size() - 1, i, og, p); + vars.emplace_back(var); + op_handle->AddOutput(var); } #else PADDLE_ENFORCE("Not implemented"); diff --git a/paddle/fluid/framework/details/reduce_op_handle_test.cc b/paddle/fluid/framework/details/reduce_op_handle_test.cc index b0b8eb2cc77bc8d56f89c8adce96e342774c3efa..ed6a1355a3a4c4e5c0b70ef6cb705be0a768280f 100644 --- a/paddle/fluid/framework/details/reduce_op_handle_test.cc +++ b/paddle/fluid/framework/details/reduce_op_handle_test.cc @@ -111,13 +111,9 @@ struct TestReduceOpHandle { if (!use_gpu_) { op_handle_->dev_ctxes_[gpu_list_[j]] = ctxs_[j].get(); } - vars_.emplace_back(new VarHandle()); - VarHandle *in_var_handle = static_cast(vars_.back().get()); - in_var_handle->place_ = gpu_list_[j]; - in_var_handle->name_ = "input"; - in_var_handle->version_ = 1; - in_var_handle->scope_idx_ = j; + auto *in_var_handle = new VarHandle(1, j, "input", gpu_list_[j]); in_var_handle->generated_op_ = nullptr; + vars_.emplace_back(in_var_handle); op_handle_->AddInput(in_var_handle); } @@ -129,12 +125,9 @@ struct TestReduceOpHandle { op_handle_->AddInput(in_dummy_var_handle); // add output - vars_.emplace_back(new VarHandle()); - VarHandle *out_var_handle = static_cast(vars_.back().get()); - out_var_handle->place_ = gpu_list_[input_scope_idx]; - out_var_handle->name_ = "out"; - out_var_handle->version_ = 2; - out_var_handle->scope_idx_ = input_scope_idx; + auto *out_var_handle = + new VarHandle(2, input_scope_idx, "out", gpu_list_[input_scope_idx]); + vars_.emplace_back(out_var_handle); op_handle_->AddOutput(out_var_handle); // add dummy var diff --git a/paddle/fluid/framework/details/ssa_graph_builder.cc b/paddle/fluid/framework/details/ssa_graph_builder.cc index be5fb7577581fd99b1b7b80ccdd2acb8d3a91f01..25e8c77bb489546092b2a93e052da7dd0dd5edf4 100644 --- a/paddle/fluid/framework/details/ssa_graph_builder.cc +++ b/paddle/fluid/framework/details/ssa_graph_builder.cc @@ -54,13 +54,8 @@ VarHandle *SSAGraphBuilder::CreateOrGetLatestVarHandle( auto &var_holder = var_holders[each_var_name]; VarHandle *var = nullptr; if (var_holder.empty()) { - var_holder.emplace_back(new VarHandle); - auto &init_var = var_holder[0]; - init_var->place_ = place; - init_var->name_ = each_var_name; - init_var->generated_op_ = nullptr; - init_var->version_ = 0; - var = init_var.get(); + var = new VarHandle(0, place_offset, each_var_name, place); + var_holder.emplace_back(var); } else { var = var_holder.rbegin()->get(); } @@ -73,12 +68,9 @@ void SSAGraphBuilder::CreateOpOutput(SSAGraph *graph, OpHandleBase *op_handle, size_t place_offset) { auto &vars = graph->vars_[place_offset][each_var_name]; size_t version = vars.size(); - vars.emplace_back(new VarHandle()); - auto &var = vars.back(); - var->version_ = version; - var->name_ = each_var_name; - var->place_ = place; - op_handle->AddOutput(var.get()); + auto var = new VarHandle(version, place_offset, each_var_name, place); + vars.emplace_back(var); + op_handle->AddOutput(var); } template diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index a371ee10fe03cda86c316f3503f9cadb8c716ae5..3d2bd633afff1d453d00faeca3b3dcf77f8dd5d7 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -33,13 +33,6 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( running_ops_(0), allow_op_delay_(allow_op_delay) {} -void ThreadedSSAGraphExecutor::RunDelayedOps( - const std::unordered_set &delayed_ops) { - for (auto op : delayed_ops) { - op->Run(use_event_); - } -} - FeedFetchList ThreadedSSAGraphExecutor::Run( const std::vector &fetch_tensors) { std::unordered_map pending_ops; @@ -51,8 +44,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( // together since we currently cannot overlap computation and memcpy streams. // Should revisit it if overlapping is available. std::unordered_set delayed_ops; - std::unordered_set blocked_by_delayed_ops; - std::unordered_set delayed_vars; auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) { pending_vars.insert(&var); @@ -122,24 +113,26 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( InsertPendingOp(*op); } - auto run_all_ready_ops = [&] { - for (auto *op : ready_ops) { - if (op->IsMultiDeviceTransfer() && allow_op_delay_) { - delayed_ops.insert(op); - delayed_vars.insert(op->outputs_.begin(), op->outputs_.end()); - ready_vars.Extend(op->outputs_); - continue; - } + auto run_all_ops = [&](std::unordered_set &set) { + for (auto *op : set) { running_ops_++; RunOp(&ready_vars, op); } - ready_ops.clear(); + set.clear(); }; // Step 3. Execution - while (!pending_vars.empty() || !ready_ops.empty() || !delayed_ops.empty()) { + while (!pending_vars.empty()) { // 1. Run All Ready ops - run_all_ready_ops(); + // Keep loop until all vars are ready. + // + // NOTE: DelayedOps have a lower priority. It will be scheduled after all + // ready_ops have been performed. + if (ready_ops.empty() && allow_op_delay_) { + run_all_ops(delayed_ops); + } else { + run_all_ops(ready_ops); + } // 2. Find ready variable bool timeout; @@ -160,29 +153,16 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( auto &deps = pending_ops[op]; --deps; if (deps == 0) { - if (delayed_vars.find(ready_var) != delayed_vars.end()) { - blocked_by_delayed_ops.insert(op); + if (op->IsMultiDeviceTransfer() && allow_op_delay_) { + delayed_ops.insert(op); } else { ready_ops.insert(op); } } } } - // When there are no other ops to schedule, schedule buffered delayed - // ops and unblock other ops. - if (ready_ops.empty() && !delayed_ops.empty() && running_ops_ == 0) { - RunDelayedOps(delayed_ops); - delayed_ops.clear(); - for (auto *op : blocked_by_delayed_ops) { - ready_ops.insert(op); - } - blocked_by_delayed_ops.clear(); - } - // Keep loop until all vars are ready. } PADDLE_ENFORCE(ready_ops.empty()); - PADDLE_ENFORCE(delayed_ops.empty()); - PADDLE_ENFORCE(blocked_by_delayed_ops.empty()); // Wait FetchOps. if (!fetch_ops.empty()) { diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index bb5e837b135c35b5aea403496b45aab1ccc288ff..d70bbd4ef0eb02d1b473bf88e526996819aec5f9 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -88,8 +88,6 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { void RunOp(BlockingQueue *ready_var_q, details::OpHandleBase *op); - void RunDelayedOps(const std::unordered_set &delayed_ops); - private: std::unique_ptr<::ThreadPool> pool_; std::vector local_scopes_; diff --git a/paddle/fluid/framework/details/var_handle.h b/paddle/fluid/framework/details/var_handle.h index 871e41343f53b801a22d3a450f0906f37fb372d1..2b887c67e6fc6ea78e42fbb9fd170f740db27d97 100644 --- a/paddle/fluid/framework/details/var_handle.h +++ b/paddle/fluid/framework/details/var_handle.h @@ -16,6 +16,7 @@ #include #include #include +#include #include "paddle/fluid/platform/place.h" @@ -33,10 +34,10 @@ struct VarHandleBase { // The operator who generate this variable. nullptr if the variable // is a root node. - OpHandleBase *generated_op_; + OpHandleBase* generated_op_{nullptr}; // Operators which depend on this variable ready. - std::unordered_set pending_ops_; + std::unordered_set pending_ops_; }; // VarHandle is actually a single version of Runtime Variable. @@ -47,6 +48,13 @@ struct VarHandleBase { struct VarHandle : public VarHandleBase { std::string DebugString() const override; + VarHandle(size_t version, size_t scope_index, std::string name, + platform::Place place) + : version_(version), + scope_idx_(scope_index), + name_(std::move(name)), + place_(std::move(place)) {} + // version field currently is not used, however, just store the version to // debug easily. size_t version_; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index c1486b527d2e06d2b3f7e0f89458bf9a22564586..0962f40c4a64f18f7105626c54a83f1c5b299c50 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -155,13 +155,9 @@ void ParallelExecutor::BCastParamsToGPUs( #endif } -void ParallelExecutor::Run( - const std::vector &fetch_tensors, - const std::string &fetched_var_name, - const std::unordered_map &feed_tensors) { +void ParallelExecutor::Run(const std::vector &fetch_tensors, + const std::string &fetched_var_name) { platform::RecordBlock b(0); - SplitTensorToPlaces(feed_tensors); - // Create local scopes. for (auto &scope : member_->local_scopes_) { Scope &local_scope = scope->NewScope(); @@ -195,14 +191,28 @@ void ParallelExecutor::Run( auto &local_scope = *scope->Var(details::kLocalExecScopeName)->GetMutable(); scope->DeleteScope(local_scope); - local_scope = nullptr; } } -void ParallelExecutor::SplitTensorToPlaces( - const std::unordered_map &feed_tensors) { - for (auto it : feed_tensors) { - auto lod_tensors = it.second.SplitLoDTensor(member_->places_); +void ParallelExecutor::FeedTensorsIntoLocalScopes( + const std::vector> &tensors) { + PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size()); + + for (size_t i = 0; i < tensors.size(); ++i) { + auto &map = tensors[i]; + auto *scope = member_->local_scopes_[i]; + for (auto &pair : map) { + auto *trg = scope->Var(pair.first)->GetMutable(); + trg->ShareDataWith(pair.second); + trg->set_lod(pair.second.lod()); + } + } +} + +void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( + const std::unordered_map &tensors) { + for (auto pair : tensors) { + auto lod_tensors = pair.second.SplitLoDTensor(member_->places_); PADDLE_ENFORCE_EQ( member_->places_.size(), lod_tensors.size(), "The number of samples of current batch is less than the count of " @@ -211,7 +221,7 @@ void ParallelExecutor::SplitTensorToPlaces( for (size_t j = 0; j < member_->places_.size(); ++j) { // TODO(panxy0718): Do I need to delete this var? auto t = - member_->local_scopes_[j]->Var(it.first)->GetMutable(); + member_->local_scopes_[j]->Var(pair.first)->GetMutable(); t->ShareDataWith(lod_tensors[j]); t->set_lod(lod_tensors[j].lod()); } diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index b4f16dba858fb279ec23a8a04257dda6651148cc..303ac3bc55cfed57a03765b27d8aba581eabd1c8 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -44,16 +44,22 @@ class ParallelExecutor { std::vector& GetLocalScopes(); + /** + * Feed tensors to local scopes. The size of tensors should be equal to the + * size of local scopes. + */ + void FeedTensorsIntoLocalScopes( + const std::vector>& tensors); + + void FeedAndSplitTensorIntoLocalScopes( + const std::unordered_map& tensors); + void Run(const std::vector& fetch_tensors, - const std::string& fetched_var_name, - const std::unordered_map& feed_tensors); + const std::string& fetched_var_name); void BCastParamsToGPUs(const std::unordered_set& vars) const; private: - void SplitTensorToPlaces( - const std::unordered_map& feed_tensors); - ParallelExecutorPrivate* member_; }; diff --git a/paddle/fluid/inference/CMakeLists.txt b/paddle/fluid/inference/CMakeLists.txt index 8494edee6c2c714c285c45bbb4fe1d8cb1a524aa..cc45bfe9b17d767be039cc0d8d83234b6994d6c1 100644 --- a/paddle/fluid/inference/CMakeLists.txt +++ b/paddle/fluid/inference/CMakeLists.txt @@ -21,7 +21,7 @@ endif() if(WITH_TESTING) add_subdirectory(tests/book) - if (WITH_TENSORRT) + if (TENSORRT_FOUND) add_subdirectory(tensorrt) endif() endif() diff --git a/paddle/fluid/operators/beam_search_decode_op.cc b/paddle/fluid/operators/beam_search_decode_op.cc index 718f469d38c3c6b7272c1531fae0a1e9ad2e8e3e..4a8dfd4b54227070c2143b180f8ab92753885550 100644 --- a/paddle/fluid/operators/beam_search_decode_op.cc +++ b/paddle/fluid/operators/beam_search_decode_op.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/beam_search_decode_op.h" +#include #include "paddle/fluid/platform/device_context.h" namespace paddle { diff --git a/paddle/fluid/operators/beam_search_decode_op.h b/paddle/fluid/operators/beam_search_decode_op.h index 3cc6ed310575473fae8e91a8507fb9146107e841..4cb0457d9285e20d4b6a2f9987b7fdb1c6ac157f 100644 --- a/paddle/fluid/operators/beam_search_decode_op.h +++ b/paddle/fluid/operators/beam_search_decode_op.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once +#include #include "paddle/fluid/framework/lod_tensor_array.h" #include "paddle/fluid/framework/op_registry.h" @@ -87,7 +88,7 @@ struct BeamSearchDecoder { */ std::vector> PackTwoSteps( const LoDTensor& cur_ids, const LoDTensor& cur_scores, - std::vector>& prefixes_list, + std::vector>* prefixes_list, std::vector>* sentence_vector_list) const; /** @@ -140,7 +141,7 @@ Sentence BeamSearchDecoder::MakeSentence(const BeamNode* node) const { template std::vector> BeamSearchDecoder::PackTwoSteps( const LoDTensor& cur_ids, const LoDTensor& cur_scores, - std::vector>& prefixes_list, + std::vector>* prefixes_list, std::vector>* sentence_vector_list) const { std::vector> result; @@ -153,7 +154,7 @@ std::vector> BeamSearchDecoder::PackTwoSteps( // if prefixes size is 0, it means this is the first step. In this step, // all candidate id is the start of candidate sentences. - if (prefixes_list.empty()) { + if (prefixes_list->empty()) { PADDLE_ENFORCE_EQ(cur_ids.lod().at(kSourceLevel).back(), cur_ids.lod().at(kSentenceLevel).back(), "in the first step"); @@ -162,7 +163,7 @@ std::vector> BeamSearchDecoder::PackTwoSteps( cur_ids.data()[id_idx], cur_scores.data()[id_idx]))); } } else { - BeamNodeVector& prefixes = prefixes_list[src_idx]; + BeamNodeVector& prefixes = prefixes_list->at(src_idx); SentenceVector& sentence_vector = (*sentence_vector_list)[src_idx]; PADDLE_ENFORCE_EQ(src_end - src_start, prefixes.size(), @@ -262,7 +263,7 @@ void BeamSearchDecoder::PackAllSteps(const LoDTensorArray& step_ids, for (size_t step_id = 0; step_id < step_num; ++step_id) { beamnode_vector_list = PackTwoSteps(step_ids.at(step_id), step_scores.at(step_id), - beamnode_vector_list, &sentence_vector_list); + &beamnode_vector_list, &sentence_vector_list); } // append last beam_node to result for (size_t src_idx = 0; src_idx < src_num; ++src_idx) { diff --git a/paddle/fluid/operators/beam_search_decode_op_test.cc b/paddle/fluid/operators/beam_search_decode_op_test.cc index c3faf46e09bb40d01049fd9cfd79836c1d2bd5bb..36f9594969c416c694928811012baf94332bbd91 100644 --- a/paddle/fluid/operators/beam_search_decode_op_test.cc +++ b/paddle/fluid/operators/beam_search_decode_op_test.cc @@ -125,7 +125,7 @@ TEST(BeamSearchDecodeOp, PackTwoStepsFistStep) { BeamSearchDecoder helper; beamnode_vector_list = helper.PackTwoSteps( - ids[0], scores[0], beamnode_vector_list, &sentence_vector_list); + ids[0], scores[0], &beamnode_vector_list, &sentence_vector_list); ASSERT_EQ(beamnode_vector_list.size(), 2UL); ASSERT_EQ(beamnode_vector_list[0].size(), 2UL); ASSERT_EQ(beamnode_vector_list[1].size(), 4UL); @@ -167,7 +167,7 @@ TEST(BeamSearchDecodeOp, PackTwoSteps) { BeamSearchDecoder helper1; beamnode_vector_list = helper1.PackTwoSteps( - ids[0], scores[0], beamnode_vector_list, &sentence_vector_list); + ids[0], scores[0], &beamnode_vector_list, &sentence_vector_list); ASSERT_EQ(sentence_vector_list[0].size(), 1UL); ASSERT_EQ(sentence_vector_list[1].size(), 0UL); diff --git a/paddle/fluid/operators/beam_search_op.cc b/paddle/fluid/operators/beam_search_op.cc index e848b1f12cb9f1ce1d37e0e0233bfc361dc35a33..fdab4e92f47c7c8f241d93268a73dcb8c2eb2dc6 100644 --- a/paddle/fluid/operators/beam_search_op.cc +++ b/paddle/fluid/operators/beam_search_op.cc @@ -14,7 +14,10 @@ limitations under the License. */ #include "paddle/fluid/operators/beam_search_op.h" +#include #include +#include +#include #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/op_registry.h" diff --git a/paddle/fluid/operators/beam_search_op.h b/paddle/fluid/operators/beam_search_op.h index b333ef4e6c73be15dfea2cadb153d2484b3daaf7..0a481a85ce6fbb582b8c0e12710455aaaac72aa1 100644 --- a/paddle/fluid/operators/beam_search_op.h +++ b/paddle/fluid/operators/beam_search_op.h @@ -18,6 +18,8 @@ limitations under the License. */ #include "gtest/gtest.h" #endif +#include +#include #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/operator.h" diff --git a/paddle/fluid/operators/chunk_eval_op.cc b/paddle/fluid/operators/chunk_eval_op.cc index 77d3cffe7c19affe66223363eba26e2d77cdcd43..95440ff89e883e754795c67cd58a08f1131df368 100644 --- a/paddle/fluid/operators/chunk_eval_op.cc +++ b/paddle/fluid/operators/chunk_eval_op.cc @@ -13,6 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/chunk_eval_op.h" +#include +#include namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/chunk_eval_op.h b/paddle/fluid/operators/chunk_eval_op.h index 9e97f7c7762ed6bded94be35ae8a094466e0aec0..8631415062db839476e2536a9836e4b9f069a3e2 100644 --- a/paddle/fluid/operators/chunk_eval_op.h +++ b/paddle/fluid/operators/chunk_eval_op.h @@ -14,6 +14,9 @@ limitations under the License. */ #pragma once #include +#include +#include + #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/op_registry.h" @@ -36,11 +39,11 @@ class ChunkEvalKernel : public framework::OpKernel { }; void GetSegments(const int64_t* label, int length, - std::vector& segments, int num_chunk_types, + std::vector* segments, int num_chunk_types, int num_tag_types, int other_chunk_type, int tag_begin, int tag_inside, int tag_end, int tag_single) const { - segments.clear(); - segments.reserve(length); + segments->clear(); + segments->reserve(length); int chunk_start = 0; bool in_chunk = false; int tag = -1; @@ -58,7 +61,7 @@ class ChunkEvalKernel : public framework::OpKernel { i - 1, // end prev_type, }; - segments.push_back(segment); + segments->push_back(segment); in_chunk = false; } if (ChunkBegin(prev_tag, prev_type, tag, type, other_chunk_type, @@ -73,7 +76,7 @@ class ChunkEvalKernel : public framework::OpKernel { length - 1, // end type, }; - segments.push_back(segment); + segments->push_back(segment); } } @@ -177,8 +180,8 @@ class ChunkEvalKernel : public framework::OpKernel { for (int i = 0; i < num_sequences; ++i) { int seq_length = lod[0][i + 1] - lod[0][i]; EvalOneSeq(inference_data + lod[0][i], label_data + lod[0][i], seq_length, - output_segments, label_segments, *num_infer_chunks_data, - *num_label_chunks_data, *num_correct_chunks_data, + &output_segments, &label_segments, num_infer_chunks_data, + num_label_chunks_data, num_correct_chunks_data, num_chunk_types, num_tag_types, other_chunk_type, tag_begin, tag_inside, tag_end, tag_single, excluded_chunk_types); } @@ -197,10 +200,10 @@ class ChunkEvalKernel : public framework::OpKernel { } void EvalOneSeq(const int64_t* output, const int64_t* label, int length, - std::vector& output_segments, - std::vector& label_segments, - int64_t& num_output_segments, int64_t& num_label_segments, - int64_t& num_correct, int num_chunk_types, int num_tag_types, + std::vector* output_segments, + std::vector* label_segments, + int64_t* num_output_segments, int64_t* num_label_segments, + int64_t* num_correct, int num_chunk_types, int num_tag_types, int other_chunk_type, int tag_begin, int tag_inside, int tag_end, int tag_single, const std::set& excluded_chunk_types) const { @@ -209,25 +212,29 @@ class ChunkEvalKernel : public framework::OpKernel { GetSegments(label, length, label_segments, num_chunk_types, num_tag_types, other_chunk_type, tag_begin, tag_inside, tag_end, tag_single); size_t i = 0, j = 0; - while (i < output_segments.size() && j < label_segments.size()) { - if (output_segments[i] == label_segments[j] && - excluded_chunk_types.count(output_segments[i].type) != 1) { - ++num_correct; + while (i < output_segments->size() && j < label_segments->size()) { + if (output_segments->at(i) == label_segments->at(j) && + excluded_chunk_types.count(output_segments->at(i).type) != 1) { + ++(*num_correct); } - if (output_segments[i].end < label_segments[j].end) { + if (output_segments->at(i).end < label_segments->at(j).end) { ++i; - } else if (output_segments[i].end > label_segments[j].end) { + } else if (output_segments->at(i).end > label_segments->at(j).end) { ++j; } else { ++i; ++j; } } - for (auto& segment : label_segments) { - if (excluded_chunk_types.count(segment.type) != 1) ++num_label_segments; + for (auto& segment : (*label_segments)) { + if (excluded_chunk_types.count(segment.type) != 1) { + ++(*num_label_segments); + } } - for (auto& segment : output_segments) { - if (excluded_chunk_types.count(segment.type) != 1) ++num_output_segments; + for (auto& segment : (*output_segments)) { + if (excluded_chunk_types.count(segment.type) != 1) { + ++(*num_output_segments); + } } } }; diff --git a/paddle/fluid/operators/conv_mkldnn_op.cc b/paddle/fluid/operators/conv_mkldnn_op.cc index 0a8a5d4c71c4510f04eea2f7ef12f836d1fd9c9b..d7a8f918ed2b377be867f9b568434f9a96f7deec 100644 --- a/paddle/fluid/operators/conv_mkldnn_op.cc +++ b/paddle/fluid/operators/conv_mkldnn_op.cc @@ -72,10 +72,10 @@ class ConvMKLDNNOpKernel : public paddle::framework::OpKernel { auto dst_md = platform::MKLDNNMemDesc( dst_tz, mkldnn::memory::data_type::f32, mkldnn::memory::format::nchw); - auto src_memory = - mkldnn::memory({src_md, mkldnn_engine}, (void*)input_data); - auto weights_memory = - mkldnn::memory({weights_md, mkldnn_engine}, (void*)filter_data); + auto src_memory = mkldnn::memory({src_md, mkldnn_engine}, + reinterpret_cast(input_data)); + auto weights_memory = mkldnn::memory({weights_md, mkldnn_engine}, + reinterpret_cast(filter_data)); auto dst_memory = mkldnn::memory({dst_md, mkldnn_engine}, output_data); std::shared_ptr conv_pd = @@ -180,8 +180,9 @@ class ConvMKLDNNGradOpKernel : public paddle::framework::OpKernel { dst_tz, mkldnn::memory::data_type::f32, mkldnn::memory::format::nchw); // create memory - auto diff_dst_memory = mkldnn::memory({diff_weights_md, mkldnn_engine}, - (void*)output_grad_data); + auto diff_dst_memory = + mkldnn::memory({diff_weights_md, mkldnn_engine}, + reinterpret_cast(output_grad_data)); // Retrieve conv_pd from device context auto conv_pd = std::static_pointer_cast( @@ -198,10 +199,11 @@ class ConvMKLDNNGradOpKernel : public paddle::framework::OpKernel { mkldnn_engine); // create memory - auto diff_weights_memory = mkldnn::memory( - {diff_weights_md, mkldnn_engine}, (void*)filter_grad_data); - auto src_memory = - mkldnn::memory({src_md, mkldnn_engine}, (void*)input_data); + auto diff_weights_memory = + mkldnn::memory({diff_weights_md, mkldnn_engine}, + reinterpret_cast(filter_grad_data)); + auto src_memory = mkldnn::memory({src_md, mkldnn_engine}, + reinterpret_cast(input_data)); // create backward conv primitive for weights auto conv_bwd_weights_prim = mkldnn::convolution_backward_weights( @@ -221,9 +223,10 @@ class ConvMKLDNNGradOpKernel : public paddle::framework::OpKernel { // create memory auto diff_src_memory = - mkldnn::memory({diff_src_md, mkldnn_engine}, (void*)input_grad_data); - auto weights_memory = - mkldnn::memory({weights_md, mkldnn_engine}, (void*)filter_data); + mkldnn::memory({diff_src_md, mkldnn_engine}, + reinterpret_cast(input_grad_data)); + auto weights_memory = mkldnn::memory( + {weights_md, mkldnn_engine}, reinterpret_cast(filter_data)); // create backward conv primitive for data auto conv_bwd_data_prim = mkldnn::convolution_backward_data( diff --git a/paddle/fluid/operators/conv_op.h b/paddle/fluid/operators/conv_op.h index 12b45f1d65019f623268cb9da9004bac5e1f72a3..d6f86a5c88e37970379da0afe2a1d46e18b653f4 100644 --- a/paddle/fluid/operators/conv_op.h +++ b/paddle/fluid/operators/conv_op.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once +#include #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/operators/math/depthwise_conv.h" @@ -41,9 +42,10 @@ inline int ConvOutputSize(int input_size, int filter_size, int dilation, return output_size; } -inline bool IsExpand(std::vector& filter_dim, - std::vector& strides, std::vector& paddings, - std::vector& dilations) { +inline bool IsExpand(const std::vector& filter_dim, + const std::vector& strides, + const std::vector& paddings, + const std::vector& dilations) { bool filter_1 = true, strides_1 = true, padding_0 = true, dilation_1 = true; for (size_t j = 0; j < strides.size(); ++j) { filter_1 = filter_1 && (static_cast(filter_dim[j + 2]) == 1); diff --git a/paddle/fluid/operators/detection_map_op.cc b/paddle/fluid/operators/detection_map_op.cc index 93ef15b9332168a9c62abfd4d0827207173ece45..38f43b6d031372948bd82c686a2d9ce5f8ecd07c 100644 --- a/paddle/fluid/operators/detection_map_op.cc +++ b/paddle/fluid/operators/detection_map_op.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/detection_map_op.h" +#include namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/detection_map_op.h b/paddle/fluid/operators/detection_map_op.h index 8c15bfa36bfe72586cfcbdbd8efc4542253adaca..431812e2bfcf926cadf8d7be6a7d1a79e78c7762 100644 --- a/paddle/fluid/operators/detection_map_op.h +++ b/paddle/fluid/operators/detection_map_op.h @@ -13,6 +13,11 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include +#include +#include +#include +#include #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/op_registry.h" @@ -82,7 +87,7 @@ class DetectionMAPOpKernel : public framework::OpKernel { std::vector>> gt_boxes; std::vector>>> detect_boxes; - GetBoxes(*in_label, *in_detect, gt_boxes, detect_boxes); + GetBoxes(*in_label, *in_detect, >_boxes, detect_boxes); std::map label_pos_count; std::map>> true_pos; @@ -95,20 +100,20 @@ class DetectionMAPOpKernel : public framework::OpKernel { } if (in_pos_count != nullptr && state) { - GetInputPos(*in_pos_count, *in_true_pos, *in_false_pos, label_pos_count, - true_pos, false_pos, class_num); + GetInputPos(*in_pos_count, *in_true_pos, *in_false_pos, &label_pos_count, + &true_pos, &false_pos, class_num); } CalcTrueAndFalsePositive(gt_boxes, detect_boxes, evaluate_difficult, - overlap_threshold, label_pos_count, true_pos, - false_pos); + overlap_threshold, &label_pos_count, &true_pos, + &false_pos); int background_label = ctx.Attr("background_label"); T map = CalcMAP(ap_type, label_pos_count, true_pos, false_pos, background_label); - GetOutputPos(ctx, label_pos_count, true_pos, false_pos, *out_pos_count, - *out_true_pos, *out_false_pos, class_num); + GetOutputPos(ctx, label_pos_count, true_pos, false_pos, out_pos_count, + out_true_pos, out_false_pos, class_num); T* map_data = out_map->mutable_data(ctx.GetPlace()); map_data[0] = map; @@ -155,7 +160,7 @@ class DetectionMAPOpKernel : public framework::OpKernel { void GetBoxes(const framework::LoDTensor& input_label, const framework::LoDTensor& input_detect, - std::vector>>& gt_boxes, + std::vector>>* gt_boxes, std::vector>>>& detect_boxes) const { auto labels = framework::EigenTensor::From(input_label); @@ -179,7 +184,7 @@ class DetectionMAPOpKernel : public framework::OpKernel { box.is_difficult = true; boxes[label].push_back(box); } - gt_boxes.push_back(boxes); + gt_boxes->push_back(boxes); } auto detect_index = detect_lod[0]; @@ -200,9 +205,9 @@ class DetectionMAPOpKernel : public framework::OpKernel { const std::map& label_pos_count, const std::map>>& true_pos, const std::map>>& false_pos, - framework::Tensor& output_pos_count, - framework::LoDTensor& output_true_pos, - framework::LoDTensor& output_false_pos, const int class_num) const { + framework::Tensor* output_pos_count, + framework::LoDTensor* output_true_pos, + framework::LoDTensor* output_false_pos, const int class_num) const { int true_pos_count = 0; int false_pos_count = 0; for (auto it = true_pos.begin(); it != true_pos.end(); ++it) { @@ -214,12 +219,12 @@ class DetectionMAPOpKernel : public framework::OpKernel { false_pos_count += fp.size(); } - int* pos_count_data = output_pos_count.mutable_data( + int* pos_count_data = output_pos_count->mutable_data( framework::make_ddim({class_num, 1}), ctx.GetPlace()); - T* true_pos_data = output_true_pos.mutable_data( + T* true_pos_data = output_true_pos->mutable_data( framework::make_ddim({true_pos_count, 2}), ctx.GetPlace()); - T* false_pos_data = output_false_pos.mutable_data( + T* false_pos_data = output_false_pos->mutable_data( framework::make_ddim({false_pos_count, 2}), ctx.GetPlace()); true_pos_count = 0; false_pos_count = 0; @@ -261,21 +266,21 @@ class DetectionMAPOpKernel : public framework::OpKernel { framework::LoD false_pos_lod; false_pos_lod.emplace_back(false_pos_starts); - output_true_pos.set_lod(true_pos_lod); - output_false_pos.set_lod(false_pos_lod); + output_true_pos->set_lod(true_pos_lod); + output_false_pos->set_lod(false_pos_lod); return; } void GetInputPos(const framework::Tensor& input_pos_count, const framework::LoDTensor& input_true_pos, const framework::LoDTensor& input_false_pos, - std::map& label_pos_count, - std::map>>& true_pos, - std::map>>& false_pos, + std::map* label_pos_count, + std::map>>* true_pos, + std::map>>* false_pos, const int class_num) const { const int* pos_count_data = input_pos_count.data(); for (int i = 0; i < class_num; ++i) { - label_pos_count[i] = pos_count_data[i]; + (*label_pos_count)[i] = pos_count_data[i]; } auto SetData = [](const framework::LoDTensor& pos_tensor, @@ -291,8 +296,8 @@ class DetectionMAPOpKernel : public framework::OpKernel { } }; - SetData(input_true_pos, true_pos); - SetData(input_false_pos, false_pos); + SetData(input_true_pos, *true_pos); + SetData(input_false_pos, *false_pos); return; } @@ -301,9 +306,9 @@ class DetectionMAPOpKernel : public framework::OpKernel { const std::vector>>>& detect_boxes, bool evaluate_difficult, float overlap_threshold, - std::map& label_pos_count, - std::map>>& true_pos, - std::map>>& false_pos) const { + std::map* label_pos_count, + std::map>>* true_pos, + std::map>>* false_pos) const { int batch_size = gt_boxes.size(); for (int n = 0; n < batch_size; ++n) { auto image_gt_boxes = gt_boxes[n]; @@ -320,10 +325,10 @@ class DetectionMAPOpKernel : public framework::OpKernel { continue; } int label = it->first; - if (label_pos_count.find(label) == label_pos_count.end()) { - label_pos_count[label] = count; + if (label_pos_count->find(label) == label_pos_count->end()) { + (*label_pos_count)[label] = count; } else { - label_pos_count[label] += count; + (*label_pos_count)[label] += count; } } } @@ -338,8 +343,8 @@ class DetectionMAPOpKernel : public framework::OpKernel { int label = it->first; for (size_t i = 0; i < pred_boxes.size(); ++i) { auto score = pred_boxes[i].first; - true_pos[label].push_back(std::make_pair(score, 0)); - false_pos[label].push_back(std::make_pair(score, 1)); + (*true_pos)[label].push_back(std::make_pair(score, 0)); + (*false_pos)[label].push_back(std::make_pair(score, 1)); } } continue; @@ -351,8 +356,8 @@ class DetectionMAPOpKernel : public framework::OpKernel { if (image_gt_boxes.find(label) == image_gt_boxes.end()) { for (size_t i = 0; i < pred_boxes.size(); ++i) { auto score = pred_boxes[i].first; - true_pos[label].push_back(std::make_pair(score, 0)); - false_pos[label].push_back(std::make_pair(score, 1)); + (*true_pos)[label].push_back(std::make_pair(score, 0)); + (*false_pos)[label].push_back(std::make_pair(score, 1)); } continue; } @@ -381,17 +386,17 @@ class DetectionMAPOpKernel : public framework::OpKernel { (!evaluate_difficult && !matched_bboxes[max_idx].is_difficult); if (match_evaluate_difficult) { if (!visited[max_idx]) { - true_pos[label].push_back(std::make_pair(score, 1)); - false_pos[label].push_back(std::make_pair(score, 0)); + (*true_pos)[label].push_back(std::make_pair(score, 1)); + (*false_pos)[label].push_back(std::make_pair(score, 0)); visited[max_idx] = true; } else { - true_pos[label].push_back(std::make_pair(score, 0)); - false_pos[label].push_back(std::make_pair(score, 1)); + (*true_pos)[label].push_back(std::make_pair(score, 0)); + (*false_pos)[label].push_back(std::make_pair(score, 1)); } } } else { - true_pos[label].push_back(std::make_pair(score, 0)); - false_pos[label].push_back(std::make_pair(score, 1)); + (*true_pos)[label].push_back(std::make_pair(score, 0)); + (*false_pos)[label].push_back(std::make_pair(score, 1)); } } } diff --git a/paddle/fluid/operators/edit_distance_op.cu b/paddle/fluid/operators/edit_distance_op.cu index 3b89ad5d49c339cf05abc0f8577e895f30dddfd4..913a9145420dae7c4f6a4df10c0330636b5796b0 100644 --- a/paddle/fluid/operators/edit_distance_op.cu +++ b/paddle/fluid/operators/edit_distance_op.cu @@ -14,6 +14,7 @@ limitations under the License. */ #include #include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/operators/edit_distance_op.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/platform/cuda_helper.h" #include "paddle/fluid/platform/gpu_info.h" diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 5d293665f0bcc098126ad3ec6c9bf34ff54c3b6f..a4c925b538ef916e88ec06cea6de57f31eaf069b 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include #include #include // NOLINT #include @@ -67,7 +68,7 @@ ListenAndServOp::ListenAndServOp(const std::string &type, const framework::AttributeMap &attrs) : OperatorBase(type, inputs, outputs, attrs) {} -int ListenAndServOp::GetSelectedPort() { +int ListenAndServOp::GetSelectedPort() const { return rpc_service_->GetSelectedPort(); } @@ -99,7 +100,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, framework::Executor executor(dev_place); std::vector block_list; for (size_t blkid = 1; blkid < num_blocks; ++blkid) { - if (blkid != prefetch_block->ID()) { + if (blkid != static_cast(prefetch_block->ID())) { block_list.push_back(blkid); } } @@ -121,10 +122,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, rpc_service_->SetProgram(program); // start the server listening after all member initialized. server_thread_.reset(new std::thread(RunServer, rpc_service_)); - // FIXME(typhoonzero): do we need to wait until the server port is ready? + VLOG(3) << "wait server thread to become ready..."; sleep(5); + // Write to a file of server selected port for python use. + std::ofstream port_file; + port_file.open("/tmp/paddle.selected_port"); + port_file << rpc_service_->GetSelectedPort(); + port_file.close(); - // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; // Record received sparse variables, so that // we could reset those after execute optimize program @@ -175,7 +180,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, parallel_blkids.push_back(1); double ts = detail::GetTimestamp(); for (size_t blkid = 2; blkid < num_blocks; ++blkid) { - if (blkid != prefetch_block->ID()) { + if (blkid != static_cast(prefetch_block->ID())) { if (program->Block(blkid).Parent() != last_parent_blkid) { ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, program, &recv_scope); diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 759b2a462ba5b938991aa86be9b9dc3e59fe3f7e..9744921cef7c0f13c94b7fe729561de8e181650c 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -39,7 +39,7 @@ class ListenAndServOp : public framework::OperatorBase { const framework::VariableNameMap &outputs, const framework::AttributeMap &attrs); - int GetSelectedPort(); + int GetSelectedPort() const; void Stop() override; diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index 3bf5d57809019d3ae469471c2ee2e7aac70b9faf..a342874f97460cf624ff0047915d33ba4161f19b 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -139,7 +139,6 @@ void StartServerNet(bool is_sparse) { attrs.insert({"PrefetchBlock", prefetch_block}); listen_and_serv_op = f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs); - LOG(INFO) << "selected port before run " << selected_port; listen_and_serv_op->Run(scope, place); LOG(INFO) << "server exit"; } @@ -158,16 +157,13 @@ TEST(SendRecvOp, CPUDense) { selected_port = static_cast( listen_and_serv_op.get()) ->GetSelectedPort(); - LOG(INFO) << "selected port " << selected_port; std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); attrs.insert({"endpoints", std::vector({endpoint})}); attrs.insert({"epmap", std::vector({endpoint})}); auto send_op = f::OpRegistry::CreateOp( "send", {{"X", {"x1"}}}, {{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs); - LOG(INFO) << "before run " << endpoint; send_op->Run(scope, place); - LOG(INFO) << "end run"; auto in_var = scope.Var("x1"); auto tensor = in_var->GetMutable(); @@ -180,7 +176,6 @@ TEST(SendRecvOp, CPUDense) { for (int64_t i = 0; i < target->numel(); ++i) { EXPECT_EQ(expected[i] * 2, actual[i]); } - LOG(INFO) << "before stop"; listen_and_serv_op->Stop(); server_thread.join(); listen_and_serv_op.reset(nullptr); @@ -199,7 +194,6 @@ TEST(SendRecvOp, CPUSparse) { selected_port = static_cast( listen_and_serv_op.get()) ->GetSelectedPort(); - LOG(INFO) << "selected port " << selected_port; std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); attrs.insert({"endpoints", std::vector({endpoint})}); attrs.insert({"epmap", std::vector({endpoint})}); diff --git a/paddle/fluid/platform/dynload/CMakeLists.txt b/paddle/fluid/platform/dynload/CMakeLists.txt index b93b925a72a55442c105e4280a3580f4ea5b93a1..364c4901b297dbd647faae85b01f682a1daace9c 100644 --- a/paddle/fluid/platform/dynload/CMakeLists.txt +++ b/paddle/fluid/platform/dynload/CMakeLists.txt @@ -1,7 +1,7 @@ cc_library(dynamic_loader SRCS dynamic_loader.cc DEPS glog gflags enforce) list(APPEND CUDA_SRCS cublas.cc cudnn.cc curand.cc nccl.cc) -if (WITH_TENSORRT) +if (TENSORRT_FOUND) list(APPEND CUDA_SRCS tensorrt.cc) endif() diff --git a/paddle/fluid/platform/float16.h b/paddle/fluid/platform/float16.h index 673e1bcae4af6d039bc969f1de6e4bcab3748cb5..ffd183af68514dbb1a8b3de39000c9ca3f56ddc3 100644 --- a/paddle/fluid/platform/float16.h +++ b/paddle/fluid/platform/float16.h @@ -873,6 +873,11 @@ HOSTDEVICE inline bool(isfinite)(const float16& a) { return !((isnan)(a)) && !((isinf)(a)); } +inline std::ostream& operator<<(std::ostream& os, const float16& a) { + os << static_cast(a); + return os; +} + } // namespace platform } // namespace paddle diff --git a/paddle/fluid/platform/float16_test.cc b/paddle/fluid/platform/float16_test.cc index d60aecf96c8828a5656f81fd3602cfb2e66990cf..a589e32b61a9b6a44bdc4529eee715d987d6922c 100644 --- a/paddle/fluid/platform/float16_test.cc +++ b/paddle/fluid/platform/float16_test.cc @@ -141,5 +141,10 @@ TEST(float16, lod_tensor_cpu) { } } +TEST(float16, print) { + float16 a = float16(1.0f); + std::cout << a << std::endl; +} + } // namespace platform } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index a1e8ff6399f0812773a7bb753c90e4400b1763d9..19bd30d9665dc1e8f9d475868cabbf14c8847352 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -505,11 +505,19 @@ All parameter, weight, gradient are variables in Paddle. scope, local_scopes, allow_op_delay); }) .def("bcast_params", &ParallelExecutor::BCastParamsToGPUs) + // NOTE: even we return a vec* to Python use reference policy. + // We still cannot get local_scope from this vector, since the element + // of vec will be freed by Python GC. We can only return Scope* + // one by one and mark them as reference. .def("local_scopes", [](ParallelExecutor &self) -> std::vector * { return &self.GetLocalScopes(); }, py::return_value_policy::reference) + .def("feed_tensors_into_local_scopes", + &ParallelExecutor::FeedTensorsIntoLocalScopes) + .def("feed_and_split_tensor_into_local_scopes", + &ParallelExecutor::FeedAndSplitTensorIntoLocalScopes) .def("run", &ParallelExecutor::Run); BindRecordIOWriter(&m); diff --git a/paddle/fluid/pybind/tensor_py.h b/paddle/fluid/pybind/tensor_py.h index 4a9dbd324c90380e784cc9457845fabd858585be..159d1d5f4e70033fabf93514bd63b38f83675bff 100644 --- a/paddle/fluid/pybind/tensor_py.h +++ b/paddle/fluid/pybind/tensor_py.h @@ -190,6 +190,11 @@ void PyCUDATensorSetFromArray( static_cast(pool.Get(place)); paddle::platform::GpuMemcpyAsync(dst, array.data(), sizeof(T) * array.size(), cudaMemcpyHostToDevice, dev_ctx->stream()); + // NOTE: For safety, here wait the copy complete. + // It because the CPU array.data() could be destroyed after this method. + // If we make this method async, it could be copied data from a memory buffer + // that has been freed. + dev_ctx->Wait(); } template <> @@ -216,6 +221,11 @@ void PyCUDATensorSetFromArray( paddle::platform::GpuMemcpyAsync(dst, array.data(), sizeof(uint16_t) * array.size(), cudaMemcpyHostToDevice, dev_ctx->stream()); + // NOTE: For safety, here wait the copy complete. + // It because the CPU array.data() could be destroyed after this method. + // If we make this method async, it could be copied data from a memory buffer + // that has been freed. + dev_ctx->Wait(); } template diff --git a/paddle/scripts/docker/build.sh b/paddle/scripts/docker/build.sh index be1565ab533037d4bc72b6d2834c48b04638c297..2b2a904974f3756576fb47851400e344c9357c57 100755 --- a/paddle/scripts/docker/build.sh +++ b/paddle/scripts/docker/build.sh @@ -198,7 +198,7 @@ EOF # run paddle version to install python packages first RUN apt-get update &&\ ${NCCL_DEPS}\ - apt-get install -y wget python-pip dmidecode python-tk && pip install -U pip && \ + apt-get install -y wget python-pip dmidecode python-tk && pip install -U pip==9.0.3 && \ pip install /*.whl; apt-get install -f -y && \ apt-get clean -y && \ rm -f /*.whl && \ diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index e7d6c4e2521bee133c4794ed1db669b02fc2152b..ead57ac370d1bec13c1b21e83dd4be1a7331f87e 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -13,7 +13,7 @@ # limitations under the License. from .. import core -from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program +from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program from ..unique_name import generate as unique_name from control_flow import BlockGuard from ..layer_helper import LayerHelper @@ -158,6 +158,7 @@ class ListenAndServ(object): main_program = self.helper.main_program current_block = main_program.current_block() parent_block = self.parent_block() + empty_block = Program().global_block() parent_block.append_op( type='listen_and_serv', @@ -166,11 +167,12 @@ class ListenAndServ(object): attrs={ 'endpoint': self.endpoint, 'Fanin': self.fan_in, - 'OptimizeBlock': current_block + 'OptimizeBlock': current_block, + 'PrefetchBlock': empty_block }) -def Send(endpoints, send_vars, get_vars): +def Send(endpoints, send_vars, get_vars=None): """ Send layer @@ -184,7 +186,6 @@ def Send(endpoints, send_vars, get_vars): side when server have finished running server side program. """ assert (type(send_vars) == list) - assert (type(get_vars) == list) epmap = endpoints.split(",") endpoints = list(set(epmap)) @@ -192,6 +193,11 @@ def Send(endpoints, send_vars, get_vars): helper = LayerHelper("Send", **locals()) rpc_client_var = default_main_program().global_block().create_var( name="RPC_CLIENT_VAR", persistable=True, type=core.VarDesc.VarType.RAW) + if not get_vars: + get_vars = [] + for s in send_vars: + v = helper.create_tmp_variable(dtype=s.dtype, stop_gradient=True) + get_vars.append(v) helper.append_op( type="send", @@ -200,6 +206,7 @@ def Send(endpoints, send_vars, get_vars): "RPCClient": rpc_client_var}, attrs={"endpoints": endpoints, "epmap": epmap}) + return get_vars def Recv(endpoints, get_vars): diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 5ce2aa1fc4d0b275b502af0f97e4a0f83e85de5b..07cc1e29341bd497e88097a9ee5653631b79d734 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -16,6 +16,7 @@ import core import multiprocessing import framework import executor +import sys __all__ = ['ParallelExecutor'] @@ -123,28 +124,93 @@ class ParallelExecutor(object): allow_op_delay) self.scope = scope - def run(self, fetch_list, feed_dict={}): + def run(self, fetch_list, feed=None, feed_dict=None): """ - :param fetch_list: A list of variable names that will be fetched. - :param feed_dict: A dict mapping for feed variable name to LoDTensor - or numpy array. - :return: fetched value list. - """ - if not isinstance(feed_dict, dict): - raise TypeError("feed_dict should be a dict") + Run a parallel executor with fetch_list. + + The feed parameter can be a dict or a list. If feed is a dict, the + feed data will be split into multiple devices. If feed is a list, we + assume the data has been splitted into multiple devices, the each + element in the list will be copied to each device directly. + + For example, if the feed is a dict: + >>> exe = ParallelExecutor() + >>> # the image will be splitted into devices. If there is two devices + >>> # each device will process an image with shape (24, 1, 28, 28) + >>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))}) + + For example, if the feed is a list: + >>> exe = ParallelExecutor() + >>> # each device will process each element in the list. + >>> # the 1st device will process an image with shape (48, 1, 28, 28) + >>> # the 2nd device will process an image with shape (32, 1, 28, 28) + >>> # + >>> # you can use exe.device_count to get the device number. + >>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))}, + >>> {"image": numpy.random.random(size=(32, 1, 28, 28))}, + >>> ]) + + + Args: + fetch_list(list): The fetched variable names + feed(list|dict|None): The feed variables. If the feed is a dict, + tensors in that dict will be splitted into each devices. If + the feed is a list, each element of the list will be copied + to each device. + feed_dict: Alias for feed parameter, for backward compatibility. + This parameter is deprecated. - feed_tensor_dict = {} - for i, feed_name in enumerate(feed_dict): - feed_tensor = feed_dict[feed_name] - if not isinstance(feed_tensor, core.LoDTensor): - feed_tensor = core.LoDTensor() - feed_tensor.set(feed_dict[feed_name], self._act_places[0]) - feed_tensor_dict[feed_name] = feed_tensor + Returns: fetched result list. + + """ + if feed is None: + feed = feed_dict + print >> sys.stderr, "`feed_dict` is deprecated. Please use `feed=`" + + if isinstance(feed, dict): + feed_tensor_dict = dict() + for feed_name in feed: + feed_tensor = feed[feed_name] + if not isinstance(feed_tensor, core.LoDTensor): + feed_tensor = core.LoDTensor() + # always set to CPU place, since the tensor need to be splitted + # it is fast in CPU + feed_tensor.set(feed[feed_name], core.CPUPlace()) + feed_tensor_dict[feed_name] = feed_tensor + + self.executor.feed_and_split_tensor_into_local_scopes( + feed_tensor_dict) + elif isinstance(feed, list) or isinstance(feed, tuple): + if len(feed) != len(self._act_places): + raise ValueError( + "Feed a list of tensor, the list should be the same size as places" + ) + + res = list() + + for i, each in enumerate(feed): + if not isinstance(each, dict): + raise TypeError( + "Each element of feed list should be a dict") + res_dict = dict() + for feed_name in each: + tensor = each[feed_name] + if not isinstance(tensor, core.LoDTensor): + tmp = core.LoDTensor() + tmp.set(tensor, self._act_places[i]) + tensor = tmp + res_dict[feed_name] = tensor + res.append(res_dict) + self.executor.feed_tensors_into_local_scopes(res) fetch_var_name = '@FETCHED_VAR_NAME@' - self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict) + self.executor.run(fetch_list, fetch_var_name) arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() return [arr[i] for i in range(len(arr))] def bcast_params(self): self.executor.bcast_params(set(self.persistable_vars)) + + @property + def device_count(self): + return len(self._act_places) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 356c3e64b3d03b520a1bec5b5e0174e1d8ee23e8..d9190408e151283ece8460286dd67818dd39da3e 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -1,10 +1,13 @@ file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") -# The fully connected test is removed whe the WITH_MKLDNN flag is OFF -# Because the fully connected layer has only one kernel (MKLDNN) +# The MKLDNN tests are skiped when the MKLDNN flag is OFF if(NOT WITH_MKLDNN) - list(REMOVE_ITEM TEST_OPS test_fc_op) + foreach(src ${TEST_OPS}) + if(${src} MATCHES ".*_mkldnn_op$") + list(REMOVE_ITEM TEST_OPS ${src}) + endif() + endforeach() endif(NOT WITH_MKLDNN) if(NOT WITH_DISTRIBUTE) @@ -62,6 +65,7 @@ list(REMOVE_ITEM TEST_OPS test_registry) list(REMOVE_ITEM TEST_OPS test_fetch_var) list(REMOVE_ITEM TEST_OPS test_parallel_op) list(REMOVE_ITEM TEST_OPS test_dynrnn_static_input) +list(REMOVE_ITEM TEST_OPS test_dist_train) # tests that can be bundled together in one python process for speed. if(WITH_FAST_BUNDLE_TEST) @@ -100,3 +104,4 @@ py_test_modules(test_registry MODULES test_registry) py_test_modules(test_fetch_var MODULES test_fetch_var) py_test_modules(test_dynrnn_static_input MODULES test_dynrnn_static_input) py_test_modules(test_parallel_op MODULES test_parallel_op) +py_test_modules(test_dist_train MODULES test_dist_train) diff --git a/python/paddle/fluid/tests/unittests/test_activation_mkldnn_op.py b/python/paddle/fluid/tests/unittests/test_activation_mkldnn_op.py new file mode 100644 index 0000000000000000000000000000000000000000..7d554c2276c9acd710d14c8f8b32c802e3e17515 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_activation_mkldnn_op.py @@ -0,0 +1,99 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np +import paddle.fluid.core as core +from op_test import OpTest +from scipy.special import expit +from test_activation_op import TestRelu, TestTanh, TestSqrt, TestAbs + + +class TestMKLDNNReluDim2(TestRelu): + def setUp(self): + super(TestMKLDNNReluDim2, self).setUp() + + self.attrs = {"use_mkldnn": True} + + +class TestMKLDNNTanhDim2(TestTanh): + def setUp(self): + super(TestMKLDNNTanhDim2, self).setUp() + + self.attrs = {"use_mkldnn": True} + + +class TestMKLDNNSqrtDim2(TestSqrt): + def setUp(self): + super(TestMKLDNNSqrtDim2, self).setUp() + + self.attrs = {"use_mkldnn": True} + + +class TestMKLDNNAbsDim2(TestAbs): + def setUp(self): + super(TestMKLDNNAbsDim2, self).setUp() + self.attrs = {"use_mkldnn": True} + + +class TestMKLDNNReluDim4(TestRelu): + def setUp(self): + super(TestMKLDNNReluDim4, self).setUp() + + x = np.random.uniform(-1, 1, [2, 4, 3, 5]).astype("float32") + # The same reason with TestAbs + x[np.abs(x) < 0.005] = 0.02 + out = np.maximum(x, 0) + + self.inputs = {'X': OpTest.np_dtype_to_fluid_dtype(x)} + self.outputs = {'Out': out} + self.attrs = {"use_mkldnn": True} + + +class TestMKLDNNTanhDim4(TestTanh): + def setUp(self): + super(TestMKLDNNTanhDim4, self).setUp() + + self.inputs = { + 'X': np.random.uniform(0.1, 1, [2, 4, 3, 5]).astype("float32") + } + self.outputs = {'Out': np.tanh(self.inputs['X'])} + self.attrs = {"use_mkldnn": True} + + +class TestMKLDNNSqrtDim4(TestSqrt): + def setUp(self): + super(TestMKLDNNSqrtDim4, self).setUp() + + self.inputs = { + 'X': np.random.uniform(0.1, 1, [2, 4, 3, 5]).astype("float32") + } + self.outputs = {'Out': np.sqrt(self.inputs['X'])} + self.attrs = {"use_mkldnn": True} + + +class TestMKLDNNAbsDim4(TestAbs): + def setUp(self): + super(TestMKLDNNAbsDim4, self).setUp() + + x = np.random.uniform(-1, 1, [2, 4, 3, 5]).astype("float32") + # The same reason with TestAbs + x[np.abs(x) < 0.005] = 0.02 + self.inputs = {'X': x} + self.outputs = {'Out': np.abs(self.inputs['X'])} + self.attrs = {"use_mkldnn": True} + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_activation_op.py b/python/paddle/fluid/tests/unittests/test_activation_op.py index 57d4a50e913c0d2994c62600f4e479056ed4c306..c9069777faf9d141db93184e8b1e6dc2a7034980 100644 --- a/python/paddle/fluid/tests/unittests/test_activation_op.py +++ b/python/paddle/fluid/tests/unittests/test_activation_op.py @@ -1098,82 +1098,5 @@ class TestFP16Swish(TestSwish): self.check_output_with_place(place, atol=1e-3) -#--------------------test MKLDNN-------------------- -class TestMKLDNNReluDim2(TestRelu): - def setUp(self): - super(TestMKLDNNReluDim2, self).setUp() - - self.attrs = {"use_mkldnn": True} - - -class TestMKLDNNTanhDim2(TestTanh): - def setUp(self): - super(TestMKLDNNTanhDim2, self).setUp() - - self.attrs = {"use_mkldnn": True} - - -class TestMKLDNNSqrtDim2(TestSqrt): - def setUp(self): - super(TestMKLDNNSqrtDim2, self).setUp() - - self.attrs = {"use_mkldnn": True} - - -class TestMKLDNNAbsDim2(TestAbs): - def setUp(self): - super(TestMKLDNNAbsDim2, self).setUp() - - self.attrs = {"use_mkldnn": True} - - -class TestMKLDNNReluDim4(TestRelu): - def setUp(self): - super(TestMKLDNNReluDim4, self).setUp() - - x = np.random.uniform(-1, 1, [2, 4, 3, 5]).astype("float32") - # The same reason with TestAbs - x[np.abs(x) < 0.005] = 0.02 - out = np.maximum(x, 0) - - self.inputs = {'X': OpTest.np_dtype_to_fluid_dtype(x)} - self.outputs = {'Out': out} - self.attrs = {"use_mkldnn": True} - - -class TestMKLDNNTanhDim4(TestTanh): - def setUp(self): - super(TestMKLDNNTanhDim4, self).setUp() - - self.inputs = { - 'X': np.random.uniform(0.1, 1, [2, 4, 3, 5]).astype("float32") - } - self.outputs = {'Out': np.tanh(self.inputs['X'])} - self.attrs = {"use_mkldnn": True} - - -class TestMKLDNNSqrtDim4(TestSqrt): - def setUp(self): - super(TestMKLDNNSqrtDim4, self).setUp() - - self.inputs = { - 'X': np.random.uniform(0.1, 1, [2, 4, 3, 5]).astype("float32") - } - self.outputs = {'Out': np.sqrt(self.inputs['X'])} - self.attrs = {"use_mkldnn": True} - - -class TestMKLDNNAbsDim4(TestAbs): - def setUp(self): - super(TestMKLDNNAbsDim4, self).setUp() - - x = np.random.uniform(-1, 1, [2, 4, 3, 5]).astype("float32") - # The same reason with TestAbs - x[np.abs(x) < 0.005] = 0.02 - self.inputs = {'X': x} - self.outputs = {'Out': np.abs(self.inputs['X'])} - self.attrs = {"use_mkldnn": True} - - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_conv2d_mkldnn_op.py b/python/paddle/fluid/tests/unittests/test_conv2d_mkldnn_op.py new file mode 100644 index 0000000000000000000000000000000000000000..db6be21baaa54d33af9f5c44d1815e4b389eb884 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_conv2d_mkldnn_op.py @@ -0,0 +1,36 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from test_conv2d_op import TestConv2dOp, TestWithPad, TestWithStride + + +class TestMKLDNN(TestConv2dOp): + def init_kernel_type(self): + self.use_mkldnn = True + + +class TestMKLDNNWithPad(TestWithPad): + def init_kernel_type(self): + self.use_mkldnn = True + + +class TestMKLDNNWithStride(TestWithStride): + def init_kernel_type(self): + self.use_mkldnn = True + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_conv2d_op.py b/python/paddle/fluid/tests/unittests/test_conv2d_op.py index 65606a0b4373b28036096cf046da5143a3b8bcd0..a478649541ba9828e55c4239090d5aee554223ac 100644 --- a/python/paddle/fluid/tests/unittests/test_conv2d_op.py +++ b/python/paddle/fluid/tests/unittests/test_conv2d_op.py @@ -373,22 +373,5 @@ class TestDepthwiseConv2(TestConv2dOp): # def init_op_type(self): # self.op_type = "conv_cudnn" - -#----------------Conv2dMKLDNN---------------- -class TestMKLDNN(TestConv2dOp): - def init_kernel_type(self): - self.use_mkldnn = True - - -class TestMKLDNNWithPad(TestWithPad): - def init_kernel_type(self): - self.use_mkldnn = True - - -class TestMKLDNNWithStride(TestWithStride): - def init_kernel_type(self): - self.use_mkldnn = True - - if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_recv_op.py b/python/paddle/fluid/tests/unittests/test_dist_train.py similarity index 57% rename from python/paddle/fluid/tests/unittests/test_recv_op.py rename to python/paddle/fluid/tests/unittests/test_dist_train.py index 2ebceca7e4b7b824194d94180462870e6cfe6d21..c7fdd06f105e3b5fd906d3524d41df8f84160e63 100644 --- a/python/paddle/fluid/tests/unittests/test_recv_op.py +++ b/python/paddle/fluid/tests/unittests/test_dist_train.py @@ -15,31 +15,42 @@ import unittest import paddle.fluid as fluid +import paddle.fluid.core as core import paddle.fluid.layers as layers import numpy from multiprocessing import Process +from threading import Thread import os, sys import time -class TestRecvOp(unittest.TestCase): - def no_test_send(self): +class TestSendOp(unittest.TestCase): + def test_send(self): # Run init_serv in a thread place = fluid.CPUPlace() + # NOTE: python thread will not work here due to GIL. p = Process(target=self.init_serv, args=(place, )) p.daemon = True p.start() - time.sleep(1) - self.init_client(place) + + time.sleep(10) + with open("/tmp/paddle.selected_port", "r") as fn: + selected_port = int(fn.readlines()[0]) + self.init_client(place, selected_port) + + self.run_local(place) + self.assertTrue(numpy.allclose(self.local_out, self.dist_out)) + # FIXME(typhoonzero): find a way to gracefully shutdown the server. os.system("kill -9 %d" % p.pid) p.join() def init_serv(self, place): main = fluid.Program() + with fluid.program_guard(main): serv = layers.ListenAndServ( - "127.0.0.1:6174", ["X"], optimizer_mode=False) + "127.0.0.1:0", ["X"], optimizer_mode=False) with serv.do(): x = layers.data( shape=[32, 32], @@ -50,10 +61,29 @@ class TestRecvOp(unittest.TestCase): o = layers.scale(x=x, scale=10.0) main.global_block().create_var( name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) + + self.server_exe = fluid.Executor(place) + self.server_exe.run(main) + + def init_client(self, place, port): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data( + shape=[32, 32], + dtype='float32', + name='X', + append_batch_size=False) + fluid.initializer.Constant(value=2.3)(x, main.global_block()) + get_var = main.global_block().create_var( + name="scale_0.tmp_0", # server side var + dtype="float32", + persistable=False, + shape=[32, 32]) + o = layers.Send("127.0.0.1:%d" % port, [x], [get_var]) exe = fluid.Executor(place) - exe.run(main) + self.dist_out = exe.run(main, fetch_list=o) # o is a list - def init_client(self, place): + def run_local(self, place): main = fluid.Program() with fluid.program_guard(main): x = layers.data( @@ -61,10 +91,10 @@ class TestRecvOp(unittest.TestCase): dtype='float32', name='X', append_batch_size=False) - fluid.initializer.Constant(value=1.0)(x, main.global_block()) - layers.Send("127.0.0.1:6174", [x], [x]) + fluid.initializer.Constant(value=2.3)(x, main.global_block()) + o = layers.scale(x=x, scale=10.0) exe = fluid.Executor(place) - exe.run(main) + self.local_out = exe.run(main, fetch_list=[o]) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_fc_op.py b/python/paddle/fluid/tests/unittests/test_fc_mkldnn_op.py similarity index 100% rename from python/paddle/fluid/tests/unittests/test_fc_op.py rename to python/paddle/fluid/tests/unittests/test_fc_mkldnn_op.py diff --git a/python/paddle/fluid/tests/unittests/test_lrn_mkldnn_op.py b/python/paddle/fluid/tests/unittests/test_lrn_mkldnn_op.py new file mode 100644 index 0000000000000000000000000000000000000000..966a16dc870c041b9deb140bed57d907cf305fd8 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_lrn_mkldnn_op.py @@ -0,0 +1,49 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from test_lrn_op import TestLRNOp + + +class TestLRNMKLDNNOp(TestLRNOp): + def get_attrs(self): + attrs = TestLRNOp.get_attrs(self) + attrs['use_mkldnn'] = True + return attrs + + def test_check_output(self): + self.check_output(atol=0.002) + + +class TestLRNMKLDNNOpWithIsTest(TestLRNMKLDNNOp): + def get_attrs(self): + attrs = TestLRNMKLDNNOp.get_attrs(self) + attrs['is_test'] = True + return attrs + + def test_check_grad_normal(self): + def check_raise_is_test(): + try: + self.check_grad(['X'], 'Out', max_relative_error=0.01) + except Exception as e: + t = \ + "is_test attribute should be set to False in training phase." + if t in str(e): + raise AttributeError + + self.assertRaises(AttributeError, check_raise_is_test) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_lrn_op.py b/python/paddle/fluid/tests/unittests/test_lrn_op.py index 8fa480b9bce84d2936f23cce9e41e8e54014b074..eaff45cbb2a58798e9d55149510bec72eea370cd 100644 --- a/python/paddle/fluid/tests/unittests/test_lrn_op.py +++ b/python/paddle/fluid/tests/unittests/test_lrn_op.py @@ -87,34 +87,5 @@ class TestLRNOp(OpTest): self.check_grad(['X'], 'Out', max_relative_error=0.01) -class TestLRNMKLDNNOp(TestLRNOp): - def get_attrs(self): - attrs = TestLRNOp.get_attrs(self) - attrs['use_mkldnn'] = True - return attrs - - def test_check_output(self): - self.check_output(atol=0.002) - - -class TestLRNMKLDNNOpWithIsTest(TestLRNMKLDNNOp): - def get_attrs(self): - attrs = TestLRNMKLDNNOp.get_attrs(self) - attrs['is_test'] = True - return attrs - - def test_check_grad_normal(self): - def check_raise_is_test(): - try: - self.check_grad(['X'], 'Out', max_relative_error=0.01) - except Exception as e: - t = \ - "is_test attribute should be set to False in training phase." - if t in str(e): - raise AttributeError - - self.assertRaises(AttributeError, check_raise_is_test) - - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 83d22fd799eea55eedb58f93421b275985edb50b..3ddafbbc57b29d506158bcb57188ab96f814e0d3 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -203,31 +203,32 @@ class TestParallelExecutorBase(unittest.TestCase): iter=10, batch_size=None, allow_op_delay=False, - feed_dict={}): + feed_dict=None): main = fluid.Program() startup = fluid.Program() + startup.random_seed = 1 # Fix random seed with fluid.program_guard(main, startup): - loss = method(use_feed=len(feed_dict) > 0) + loss = method(use_feed=feed_dict is not None) adam = fluid.optimizer.Adam() adam.minimize(loss) if memory_opt: fluid.memory_optimize(main) - place = fluid.CUDAPlace(0) startup_exe = fluid.Executor(place) startup_exe.run(startup) - exe = fluid.ParallelExecutor(True, loss_name=loss.name) + exe = fluid.ParallelExecutor( + True, loss_name=loss.name, allow_op_delay=allow_op_delay) if batch_size is not None: batch_size *= fluid.core.get_cuda_device_count() begin = time.time() - first_loss, = exe.run([loss.name], feed_dict=feed_dict) + first_loss, = exe.run([loss.name], feed=feed_dict) first_loss = numpy.array(first_loss) for i in xrange(iter): - exe.run([], feed_dict=feed_dict) + exe.run([], feed=feed_dict) - last_loss, = exe.run([loss.name], feed_dict=feed_dict) + last_loss, = exe.run([loss.name], feed=feed_dict) end = time.time() if batch_size is not None: @@ -648,5 +649,5 @@ class TestCRFModel(unittest.TestCase): for i in xrange(10): cur_batch = next(data) print map(numpy.array, - pe.run(feed_dict=feeder.feed(cur_batch), + pe.run(feed=feeder.feed(cur_batch), fetch_list=[avg_cost.name]))[0] diff --git a/python/paddle/fluid/tests/unittests/test_pool2d_mkldnn_op.py b/python/paddle/fluid/tests/unittests/test_pool2d_mkldnn_op.py new file mode 100644 index 0000000000000000000000000000000000000000..003ebba18b26198427d9f313596ae85656ac24fa --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_pool2d_mkldnn_op.py @@ -0,0 +1,50 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from test_pool2d_op import TestPool2d_Op, TestCase1, TestCase2, TestCase3, TestCase4, TestCase5 + + +class TestMKLDNNCase1(TestPool2d_Op): + def init_kernel_type(self): + self.use_mkldnn = True + + +class TestMKLDNNCase2(TestCase1): + def init_kernel_type(self): + self.use_mkldnn = True + + +class TestMKLDNNCase3(TestCase2): + def init_kernel_type(self): + self.use_mkldnn = True + + +class TestMKLDNNCase4(TestCase3): + def init_kernel_type(self): + self.use_mkldnn = True + + +class TestMKLDNNCase5(TestCase4): + def init_kernel_type(self): + self.use_mkldnn = True + + +class TestMKLDNNCase6(TestCase5): + def init_kernel_type(self): + self.use_mkldnn = True + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_pool2d_op.py b/python/paddle/fluid/tests/unittests/test_pool2d_op.py index 764fa575fba1615de3171e848890b3836e640849..328a9ffd25b9fce3fd45bbe847e365f090acd17c 100644 --- a/python/paddle/fluid/tests/unittests/test_pool2d_op.py +++ b/python/paddle/fluid/tests/unittests/test_pool2d_op.py @@ -317,36 +317,5 @@ class TestCeilModeCase4(TestCase2): self.ceil_mode = True -#--------------------test pool2d MKLDNN-------------------- -class TestMKLDNNCase1(TestPool2d_Op): - def init_kernel_type(self): - self.use_mkldnn = True - - -class TestMKLDNNCase2(TestCase1): - def init_kernel_type(self): - self.use_mkldnn = True - - -class TestMKLDNNCase3(TestCase2): - def init_kernel_type(self): - self.use_mkldnn = True - - -class TestMKLDNNCase4(TestCase3): - def init_kernel_type(self): - self.use_mkldnn = True - - -class TestMKLDNNCase5(TestCase4): - def init_kernel_type(self): - self.use_mkldnn = True - - -class TestMKLDNNCase6(TestCase5): - def init_kernel_type(self): - self.use_mkldnn = True - - if __name__ == '__main__': unittest.main() diff --git a/python/paddle/v2/reader/__init__.py b/python/paddle/v2/reader/__init__.py index 3b059735a924d58714cd88a761eb83143f1192d6..12efdc4a0fec83fed57bdcbf687aaec69d13ba91 100644 --- a/python/paddle/v2/reader/__init__.py +++ b/python/paddle/v2/reader/__init__.py @@ -50,7 +50,7 @@ An example implementation for single item data reader creator: def reader(): while True: yield numpy.random.uniform(-1, 1, size=width*height) - return reader + return reader An example implementation for multiple item data reader creator: @@ -60,7 +60,7 @@ An example implementation for multiple item data reader creator: def reader(): while True: yield numpy.random.uniform(-1, 1, size=width*height), label - return reader + return reader TODO(yuyang18): Should we add whole design doc here? diff --git a/tools/aws_benchmarking/README.md b/tools/aws_benchmarking/README.md new file mode 100644 index 0000000000000000000000000000000000000000..837fcbb8512bce027ecd09a7f39b806151e9154b --- /dev/null +++ b/tools/aws_benchmarking/README.md @@ -0,0 +1,160 @@ +# AWS benchmark testing tool +This is an automation tool for deploying paddlepaddle benchmark testing to AWS. + +## Features + + - subnet creation to fit just the amount of ec2 instances required. + - pserver and trainer ec2 instances allocation, and instance state verification + - nvidia-docker ready for GPU training + - Instances and network element garbage collection when a task is accomplished or an error occurred + - Test log is collected in realtime + - Web service for checking log or tearing down the testing setup + - No testing code change needed + - Lots of optional configuration options + + ## Usages + + ### Prerequisites + + - You have a working AWS account + - You have [AWS Command Line Interface](https://aws.amazon.com/cli/) installed + - Your AWS cli is bind with a account which has `AmazonEC2FullAccess` permission, and it's set as default credential. + - You have key pair created and pem file downloaded. + - You have a default VPC in the region you want to run the test. + - You have a Security Group created for the VPC mentioned above, which allows port 22 and the port you want to expose your control web service (5436 by default) + - If your test is supposed to run in a GPU machine, especially a multi card GPU machine (p2, p3 series), you might need to contact amazon to raise the limit which allows no more than 1 GPU instance at a time. + + ### Start a benchmark test + +#### Create training image + +*What to expect in this step:* + +*You will have your training logic packed with paddle runtime in a docker image, and be able to be picked up by AWS instance for training.* + +Training python script and PaddlePaddle runtime are supposed to be packed into one docker image. Use PaddlePaddle production images as base image and create the training images with the docker file as follows: + +```Dockerfile +FROM paddlepaddle/paddle:latest-gpu + +ENV HOME /root +COPY ./ /root/ +WORKDIR /root +RUN pip install -r /root/requirements.txt +ENTRYPOINT ["python", "my_training.py"] +``` + +***Please Note*** +Training nodes will run your `ENTRYPOINT` script with the following environment variables: + + - `TASK_NAME`: unique name to identify this training process. + - `TRAINING_ROLE`: current node's role in this training process, either "PSERVER" or "TRAINER" + - `PSERVER_HOSTS`: comma separated value of pserver end points, I.E. "192.168.1.2:5436,192.168.1.3:5436" + - `PSERVERS`: same as above + - `TRAINERS`: trainer count + - `SERVER_ENDPOINT`: current server end point if the node role is a pserver + - `TRAINER_INDEX`: an integer to identify the index of current trainer if the node role is a trainer. + - `PADDLE_INIT_TRAINER_ID`: same as above + + Now we have a working distributed training script which takes advantage of node environment variables and docker file to generate the training image. Run the following command: + + ```bash + docker build -t myreponname/paddle_benchmark . + ``` + + Now you have the image built and tagged with `myreponame/paddle_benchmark`, let's push it to dockerhub so that it can be picked up by out AWS instance. + + ```bash + docker push myreponame/paddle_benchmark + ``` + +#### Create instances and start training + +*What to expect in this step* + +*you will be asked to provide some basic settings to config your training, and this tool will have your training started and monitored* + +Now let's start the training process: + +```bash +docker run -i -v $HOME/.aws:/root/.aws -v :/root/.pem \ +putcn/paddle_aws_client \ +--action create \ +--key_name \ +--security_group_id \ +--docker_image myreponame/paddle_benchmark \ +--pserver_count 2 \ +--trainer_count 2 +``` + +Now just wait until you see this: +``` +master server finished init process, visit http://XXX:XXX/status to check master log +``` +That means you can turn off your laptop and your cluster is creating instances, starting training process, collecting logs and eventually shut all pservers and trainers down when training is finished. + +#### Post creation operations + +To access the master log: + +```bash +docker run -i -v $HOME/.aws:/root/.aws \ +putcn/paddle_aws_client \ +--action status \ +--master_server_public_ip \ +--master_server_port +``` + +To tear down the training setup: + +```bash +docker run -i -v $HOME/.aws:/root/.aws \ +putcn/paddle_aws_client \ +--action cleanup \ +--master_server_public_ip \ +--master_server_port +``` + +To retrieve training logs +TBD + +### Tech details + +*What to expect in this step* + +*You will understand what is happening behind the scene, and how to check the training log, how to tear down the training on the fly, etc.* + +Let's understand what is happening under the hood when you run above command in your laptop + +![alt](diagram.png) + +There are 4 roles in the figure above: + - client: your laptop + - master: who tasks to aws api server to create/tear down instances, and monitor training process + - AWS api server: the one who actually creates and manages instances + - pservers and trainers: training instances + +When you run the `docker run` command above, what it actually does is to ask aws api service to create a subnet (step 1) and a master instance (step 2), and pass all the parameters the client collected or generated (step 3). The master is kept as minimum hardware config to keep the running cost low. + +Then when the master is up and running, it will ask the aws api server to create the heavy lifting training instances who are expensive to run (step 4). And the master will start training process as soon as they are done initializing (step 5). + +Meanwhile, the master will expose a web service for client to check training log or even tear the training setup down by a web service call. + +if you are creating the training with client docker container, and also monitoring your aws dashboard, you will initially see a instance tagged with `ROLE=MASTER` and `TASK_NAME=_master` starts, then you will see several instances tagged with `ROLE=PSERVER` and `ROLE=TRAINER` starts. +When the training is finished, pservers and trainers will be terminated. All their logs are kept in master node's docker env. + +Master exposes 4 major services: + + - GET `/status`: return master log + - GET `/logs`: return list of log file names + - GET `/log/`: return a particular log by log file name + - POST `/cleanup`: teardown the whole setup + + +### Parameters + +TBD, please refer to client/cluster_launcher.py for now + +### Trouble shooting + +TBD diff --git a/tools/aws_benchmarking/client/Dockerfile b/tools/aws_benchmarking/client/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..812c5d4bce0adff404577ce6b5fd3f0f4a91118c --- /dev/null +++ b/tools/aws_benchmarking/client/Dockerfile @@ -0,0 +1,7 @@ +FROM python:2.7.14-stretch + +ENV HOME /root +COPY ./ /root/ +WORKDIR /root +RUN pip install -r /root/requirements.txt +ENTRYPOINT ["python", "cluster_launcher.py"] \ No newline at end of file diff --git a/tools/aws_benchmarking/client/cluster_launcher.py b/tools/aws_benchmarking/client/cluster_launcher.py new file mode 100644 index 0000000000000000000000000000000000000000..594378ff8fc0744a4b11b1c11e2e3b270be7aed0 --- /dev/null +++ b/tools/aws_benchmarking/client/cluster_launcher.py @@ -0,0 +1,407 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +import time +import math +import logging +import copy + +import netaddr +import boto3 +import namesgenerator +import paramiko +from scp import SCPClient +import requests + + +def str2bool(v): + if v.lower() in ('yes', 'true', 't', 'y', '1'): + return True + elif v.lower() in ('no', 'false', 'f', 'n', '0'): + return False + else: + raise argparse.ArgumentTypeError('Boolean value expected.') + + +parser = argparse.ArgumentParser(description=__doc__) +parser.add_argument( + '--key_name', type=str, default="", help="required, key pair name") +parser.add_argument( + '--security_group_id', + type=str, + default="", + help="required, the security group id associated with your VPC") + +parser.add_argument( + '--vpc_id', + type=str, + default="", + help="The VPC in which you wish to run test") +parser.add_argument( + '--subnet_id', + type=str, + default="", + help="The Subnet_id in which you wish to run test") + +parser.add_argument( + '--pserver_instance_type', + type=str, + default="c5.2xlarge", + help="your pserver instance type, c5.2xlarge by default") +parser.add_argument( + '--trainer_instance_type', + type=str, + default="p2.8xlarge", + help="your trainer instance type, p2.8xlarge by default") + +parser.add_argument( + '--task_name', + type=str, + default="", + help="the name you want to identify your job") +parser.add_argument( + '--pserver_image_id', + type=str, + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, \ + use ami-1ae93962 for us-east-2") + +parser.add_argument( + '--pserver_command', type=str, default="", help="pserver start command") + +parser.add_argument( + '--trainer_image_id', + type=str, + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, \ + use ami-1ae93962 for us-west-2") + +parser.add_argument( + '--trainer_command', type=str, default="", help="trainer start command") + +parser.add_argument( + '--availability_zone', + type=str, + default="us-east-2a", + help="aws zone id to place ec2 instances") + +parser.add_argument( + '--trainer_count', type=int, default=1, help="Trainer count") + +parser.add_argument( + '--pserver_count', type=int, default=1, help="Pserver count") + +parser.add_argument( + '--action', type=str, default="create", help="create|cleanup|status") + +parser.add_argument('--pem_path', type=str, help="private key file") + +parser.add_argument( + '--pserver_port', type=str, default="5436", help="pserver port") + +parser.add_argument( + '--docker_image', type=str, default="busybox", help="training docker image") + +parser.add_argument( + '--master_server_port', type=int, default=5436, help="master server port") + +parser.add_argument( + '--master_server_public_ip', type=str, help="master server public ip") + +parser.add_argument( + '--master_docker_image', + type=str, + default="putcn/paddle_aws_master:latest", + help="master docker image id") + +parser.add_argument( + '--no_clean_up', + type=str2bool, + default=False, + help="whether to clean up after training") + +args = parser.parse_args() + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') + +ec2client = boto3.client('ec2') + + +def print_arguments(): + print('----------- Configuration Arguments -----------') + for arg, value in sorted(vars(args).iteritems()): + print('%s: %s' % (arg, value)) + print('------------------------------------------------') + + +def create_subnet(): + # if no vpc id provided, list vpcs + logging.info("start creating subnet") + if not args.vpc_id: + logging.info("no vpc provided, trying to find the default one") + vpcs_desc = ec2client.describe_vpcs( + Filters=[{ + "Name": "isDefault", + "Values": ["true", ] + }], ) + if len(vpcs_desc["Vpcs"]) == 0: + raise ValueError('No default VPC') + args.vpc_id = vpcs_desc["Vpcs"][0]["VpcId"] + vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"] + + logging.info("default vpc fount with id %s and CidrBlock %s" % + (args.vpc_id, vpc_cidrBlock)) + + if not vpc_cidrBlock: + logging.info("trying to find cidrblock for vpc") + vpcs_desc = ec2client.describe_vpcs( + Filters=[{ + "Name": "vpc-id", + "Values": [args.vpc_id, ], + }], ) + if len(vpcs_desc["Vpcs"]) == 0: + raise ValueError('No VPC found') + vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"] + logging.info("cidrblock for vpc is %s" % vpc_cidrBlock) + + # list subnets in vpc in order to create a new one + + logging.info("trying to find ip blocks for new subnet") + subnets_desc = ec2client.describe_subnets( + Filters=[{ + "Name": "vpc-id", + "Values": [args.vpc_id, ], + }], ) + + ips_taken = [] + for subnet_dec in subnets_desc["Subnets"]: + ips_taken.append(subnet_dec["CidrBlock"]) + + ip_blocks_avaliable = netaddr.IPSet( + [vpc_cidrBlock]) ^ netaddr.IPSet(ips_taken) + # adding 10 addresses as buffer + cidr_prefix = 32 - math.ceil( + math.log(args.pserver_count + args.trainer_count + 10, 2)) + if cidr_prefix <= 16: + raise ValueError('Too many nodes to fit in current VPC') + + for ipnetwork in ip_blocks_avaliable.iter_cidrs(): + try: + subnet_cidr = ipnetwork.subnet(int(cidr_prefix)).next() + logging.info("subnet ip block found %s" % (subnet_cidr)) + break + except Exception: + pass + + if not subnet_cidr: + raise ValueError( + 'No avaliable subnet to fit required nodes in current VPC') + + logging.info("trying to create subnet") + subnet_desc = ec2client.create_subnet( + CidrBlock=str(subnet_cidr), + VpcId=args.vpc_id, + AvailabilityZone=args.availability_zone) + + subnet_id = subnet_desc["Subnet"]["SubnetId"] + + subnet_waiter = ec2client.get_waiter('subnet_available') + # sleep for 1s before checking its state + time.sleep(1) + subnet_waiter.wait(SubnetIds=[subnet_id, ]) + + logging.info("subnet created") + + logging.info("adding tags to newly created subnet") + ec2client.create_tags( + Resources=[subnet_id, ], + Tags=[{ + "Key": "Task_name", + 'Value': args.task_name + }]) + return subnet_id + + +def run_instances(image_id, instance_type, count=1, role="MASTER", cmd=""): + response = ec2client.run_instances( + ImageId=image_id, + InstanceType=instance_type, + MaxCount=count, + MinCount=count, + UserData=cmd, + DryRun=False, + InstanceInitiatedShutdownBehavior="stop", + KeyName=args.key_name, + Placement={'AvailabilityZone': args.availability_zone}, + NetworkInterfaces=[{ + 'DeviceIndex': 0, + 'SubnetId': args.subnet_id, + "AssociatePublicIpAddress": True, + 'Groups': args.security_group_ids + }], + TagSpecifications=[{ + 'ResourceType': "instance", + 'Tags': [{ + "Key": 'Task_name', + "Value": args.task_name + "_master" + }, { + "Key": 'Role', + "Value": role + }] + }]) + + instance_ids = [] + for instance in response["Instances"]: + instance_ids.append(instance["InstanceId"]) + + if len(instance_ids) > 0: + logging.info(str(len(instance_ids)) + " instance(s) created") + else: + logging.info("no instance created") + #create waiter to make sure it's running + + logging.info("waiting for instance to become accessible") + waiter = ec2client.get_waiter('instance_status_ok') + waiter.wait( + Filters=[{ + "Name": "instance-status.status", + "Values": ["ok"] + }, { + "Name": "instance-status.reachability", + "Values": ["passed"] + }, { + "Name": "instance-state-name", + "Values": ["running"] + }], + InstanceIds=instance_ids) + + instances_response = ec2client.describe_instances(InstanceIds=instance_ids) + + return instances_response["Reservations"][0]["Instances"] + + +def generate_task_name(): + return namesgenerator.get_random_name() + + +def init_args(): + + if not args.task_name: + args.task_name = generate_task_name() + logging.info("task name generated %s" % (args.task_name)) + + if not args.pem_path: + args.pem_path = os.path.expanduser("~") + "/" + args.key_name + ".pem" + if args.security_group_id: + args.security_group_ids = (args.security_group_id, ) + + +def create(): + + init_args() + + # create subnet + if not args.subnet_id: + args.subnet_id = create_subnet() + + # create master node + + master_instance_response = run_instances( + image_id="ami-7a05351f", instance_type="t2.nano") + + logging.info("master server started") + + args.master_server_public_ip = master_instance_response[0][ + "PublicIpAddress"] + args.master_server_ip = master_instance_response[0]["PrivateIpAddress"] + + logging.info("master server started, master_ip=%s, task_name=%s" % + (args.master_server_public_ip, args.task_name)) + + # cp config file and pems to master node + + ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path) + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_client.connect( + hostname=args.master_server_public_ip, username="ubuntu", pkey=ssh_key) + + with SCPClient(ssh_client.get_transport()) as scp: + scp.put(os.path.expanduser("~") + "/" + ".aws", + recursive=True, + remote_path='/home/ubuntu/') + scp.put(args.pem_path, + remote_path='/home/ubuntu/' + args.key_name + ".pem") + + logging.info("credentials and pem copied to master") + + # set arguments and start docker + kick_off_cmd = "docker run -d -v /home/ubuntu/.aws:/root/.aws/" + kick_off_cmd += " -v /home/ubuntu/" + args.key_name + ".pem:/root/" + args.key_name + ".pem" + kick_off_cmd += " -v /home/ubuntu/logs/:/root/logs/" + kick_off_cmd += " -p " + str(args.master_server_port) + ":" + str( + args.master_server_port) + kick_off_cmd += " " + args.master_docker_image + + args_to_pass = copy.copy(args) + args_to_pass.action = "serve" + del args_to_pass.pem_path + del args_to_pass.security_group_ids + del args_to_pass.master_docker_image + del args_to_pass.master_server_public_ip + for arg, value in sorted(vars(args_to_pass).iteritems()): + if value: + kick_off_cmd += ' --%s %s' % (arg, value) + + logging.info(kick_off_cmd) + stdin, stdout, stderr = ssh_client.exec_command(command=kick_off_cmd) + return_code = stdout.channel.recv_exit_status() + logging.info(return_code) + if return_code != 0: + raise Exception("Error while kicking off master") + + logging.info( + "master server finished init process, visit %s to check master log" % + (get_master_web_url("/status"))) + + +def cleanup(): + print requests.post(get_master_web_url("/cleanup")).text + + +def status(): + print requests.post(get_master_web_url("/status")).text + + +def get_master_web_url(path): + return "http://" + args.master_server_public_ip + ":" + str( + args.master_server_port) + path + + +if __name__ == "__main__": + print_arguments() + if args.action == "create": + if not args.key_name or not args.security_group_id: + raise ValueError("key_name and security_group_id are required") + create() + elif args.action == "cleanup": + if not args.master_server_public_ip: + raise ValueError("master_server_public_ip is required") + cleanup() + elif args.action == "status": + if not args.master_server_public_ip: + raise ValueError("master_server_public_ip is required") + status() diff --git a/tools/aws_benchmarking/client/requirements.txt b/tools/aws_benchmarking/client/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..9454801f2025671cfd1a2c3b71cf4c2ac07cb8fb --- /dev/null +++ b/tools/aws_benchmarking/client/requirements.txt @@ -0,0 +1,6 @@ +netaddr==0.7.19 +boto3==1.6.21 +namesgenerator==0.3 +paramiko==2.4.1 +scp +requests diff --git a/tools/aws_benchmarking/diagram.png b/tools/aws_benchmarking/diagram.png new file mode 100644 index 0000000000000000000000000000000000000000..b97909c5fe78b59d0e636ff73c2ed3e63a0be722 Binary files /dev/null and b/tools/aws_benchmarking/diagram.png differ diff --git a/tools/aws_benchmarking/server/Dockerfile b/tools/aws_benchmarking/server/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..333523abcdb6fbe7dc01bbaf7d32ce1d8e866028 --- /dev/null +++ b/tools/aws_benchmarking/server/Dockerfile @@ -0,0 +1,7 @@ +FROM python:2.7.14-stretch + +ENV HOME /root +COPY ./ /root/ +WORKDIR /root +RUN pip install -r /root/requirements.txt +ENTRYPOINT ["python", "cluster_master.py"] \ No newline at end of file diff --git a/tools/aws_benchmarking/server/cluster_master.py b/tools/aws_benchmarking/server/cluster_master.py new file mode 100644 index 0000000000000000000000000000000000000000..21f85a5fc43e951897eb6b785367630abda722c0 --- /dev/null +++ b/tools/aws_benchmarking/server/cluster_master.py @@ -0,0 +1,673 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +import json +import math +import time +import threading +import logging + +import netaddr +import boto3 +import namesgenerator +import paramiko + +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + + +# You must have aws_access_key_id, aws_secret_access_key, region set in +# ~/.aws/credentials and ~/.aws/config +def str2bool(v): + if v.lower() in ('yes', 'true', 't', 'y', '1'): + return True + elif v.lower() in ('no', 'false', 'f', 'n', '0'): + return False + else: + raise argparse.ArgumentTypeError('Boolean value expected.') + + +parser = argparse.ArgumentParser(description=__doc__) +parser.add_argument( + '--key_name', type=str, default="", help="required, key pair name") +parser.add_argument( + '--security_group_id', + type=str, + default="", + help="required, the security group id associated with your VPC") + +parser.add_argument( + '--vpc_id', + type=str, + default="", + help="The VPC in which you wish to run test") +parser.add_argument( + '--subnet_id', + type=str, + default="", + help="The Subnet_id in which you wish to run test") + +parser.add_argument( + '--pserver_instance_type', + type=str, + default="c5.2xlarge", + help="your pserver instance type, c5.2xlarge by default") +parser.add_argument( + '--trainer_instance_type', + type=str, + default="p2.8xlarge", + help="your trainer instance type, p2.8xlarge by default") + +parser.add_argument( + '--task_name', + type=str, + default="", + help="the name you want to identify your job") +parser.add_argument( + '--pserver_image_id', + type=str, + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, use ami-1ae93962 for us-east-2" +) +parser.add_argument( + '--trainer_image_id', + type=str, + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, use ami-1ae93962 for us-west-2" +) + +parser.add_argument( + '--availability_zone', + type=str, + default="us-east-2a", + help="aws zone id to place ec2 instances") + +parser.add_argument( + '--trainer_count', type=int, default=1, help="Trainer count") + +parser.add_argument( + '--pserver_count', type=int, default=1, help="Pserver count") + +parser.add_argument( + '--pserver_bash_file', + type=str, + default=os.path.join(os.path.dirname(__file__), "pserver.sh.template"), + help="pserver bash file path") + +parser.add_argument( + '--pserver_command', type=str, default="", help="pserver start command") + +parser.add_argument( + '--trainer_bash_file', + type=str, + default=os.path.join(os.path.dirname(__file__), "trainer.sh.template"), + help="trainer bash file path") + +parser.add_argument( + '--trainer_command', type=str, default="", help="trainer start command") + +parser.add_argument( + '--action', type=str, default="serve", help="create|cleanup|serve") + +parser.add_argument('--pem_path', type=str, help="private key file") + +parser.add_argument( + '--pserver_port', type=str, default="5436", help="pserver port") + +parser.add_argument( + '--docker_image', type=str, default="busybox", help="training docker image") + +parser.add_argument( + '--master_server_port', type=int, default=5436, help="master server port") + +parser.add_argument( + '--master_server_ip', type=str, default="", help="master server private ip") + +parser.add_argument( + '--no_clean_up', + type=str2bool, + default=False, + help="whether to clean up after training") + +args = parser.parse_args() + +ec2client = boto3.client('ec2') + +args.log_path = os.path.join(os.path.dirname(__file__), "logs/") + +logging.basicConfig( + filename=args.log_path + 'master.log', + level=logging.INFO, + format='%(asctime)s %(message)s') + +log_files = ["master.log"] + + +def create_subnet(): + # if no vpc id provided, list vpcs + logging.info("start creating subnet") + if not args.vpc_id: + logging.info("no vpc provided, trying to find the default one") + vpcs_desc = ec2client.describe_vpcs( + Filters=[{ + "Name": "isDefault", + "Values": ["true", ] + }], ) + if len(vpcs_desc["Vpcs"]) == 0: + raise ValueError('No default VPC') + args.vpc_id = vpcs_desc["Vpcs"][0]["VpcId"] + vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"] + + logging.info("default vpc fount with id %s and CidrBlock %s" % + (args.vpc_id, vpc_cidrBlock)) + + if not vpc_cidrBlock: + logging.info("trying to find cidrblock for vpc") + vpcs_desc = ec2client.describe_vpcs( + Filters=[{ + "Name": "vpc-id", + "Values": [args.vpc_id, ], + }], ) + if len(vpcs_desc["Vpcs"]) == 0: + raise ValueError('No VPC found') + vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"] + logging.info("cidrblock for vpc is %s" % vpc_cidrBlock) + + # list subnets in vpc in order to create a new one + + logging.info("trying to find ip blocks for new subnet") + subnets_desc = ec2client.describe_subnets( + Filters=[{ + "Name": "vpc-id", + "Values": [args.vpc_id, ], + }], ) + + ips_taken = [] + for subnet_dec in subnets_desc["Subnets"]: + ips_taken.append(subnet_dec["CidrBlock"]) + + ip_blocks_avaliable = netaddr.IPSet( + [vpc_cidrBlock]) ^ netaddr.IPSet(ips_taken) + # adding 10 addresses as buffer + cidr_prefix = 32 - math.ceil( + math.log(args.pserver_count + args.trainer_count + 10, 2)) + if cidr_prefix <= 16: + raise ValueError('Too many nodes to fit in current VPC') + + for ipnetwork in ip_blocks_avaliable.iter_cidrs(): + try: + subnet_cidr = ipnetwork.subnet(int(cidr_prefix)).next() + logging.info("subnet ip block found %s" % (subnet_cidr)) + break + except Exception: + pass + + if not subnet_cidr: + raise ValueError( + 'No avaliable subnet to fit required nodes in current VPC') + + logging.info("trying to create subnet") + subnet_desc = ec2client.create_subnet( + CidrBlock=str(subnet_cidr), + VpcId=args.vpc_id, + AvailabilityZone=args.availability_zone) + + subnet_id = subnet_desc["Subnet"]["SubnetId"] + + subnet_waiter = ec2client.get_waiter('subnet_available') + # sleep for 1s before checking its state + time.sleep(1) + subnet_waiter.wait(SubnetIds=[subnet_id, ]) + + logging.info("subnet created") + + logging.info("adding tags to newly created subnet") + ec2client.create_tags( + Resources=[subnet_id, ], + Tags=[{ + "Key": "Task_name", + 'Value': args.task_name + }]) + return subnet_id + + +def generate_task_name(): + return namesgenerator.get_random_name() + + +def script_to_str(file_path): + if not file_path: + return "echo $PSERVER_HOSTS" + file = open(file_path, 'r') + text = file.read().strip() + file.close() + return text + + +def run_instances(image_id, instance_type, count, role, cmd=""): + response = ec2client.run_instances( + ImageId=image_id, + InstanceType=instance_type, + MaxCount=count, + MinCount=count, + UserData=cmd, + DryRun=False, + InstanceInitiatedShutdownBehavior="stop", + KeyName=args.key_name, + Placement={'AvailabilityZone': args.availability_zone}, + NetworkInterfaces=[{ + 'DeviceIndex': 0, + 'SubnetId': args.subnet_id, + "AssociatePublicIpAddress": True, + 'Groups': args.security_group_ids + }], + TagSpecifications=[{ + 'ResourceType': "instance", + 'Tags': [{ + "Key": 'Task_name', + "Value": args.task_name + }, { + "Key": 'Role', + "Value": role + }] + }]) + + instance_ids = [] + for instance in response["Instances"]: + instance_ids.append(instance["InstanceId"]) + + if len(instance_ids) > 0: + logging.info(str(len(instance_ids)) + " instance(s) created") + else: + logging.info("no instance created") + #create waiter to make sure it's running + + logging.info("waiting for instance to become accessible") + waiter = ec2client.get_waiter('instance_status_ok') + waiter.wait( + Filters=[{ + "Name": "instance-status.status", + "Values": ["ok"] + }, { + "Name": "instance-status.reachability", + "Values": ["passed"] + }, { + "Name": "instance-state-name", + "Values": ["running"] + }], + InstanceIds=instance_ids) + + instances_response = ec2client.describe_instances(InstanceIds=instance_ids) + + return instances_response["Reservations"][0]["Instances"] + + +def create_pservers(): + try: + return run_instances( + image_id=args.pserver_image_id, + instance_type=args.pserver_instance_type, + count=args.pserver_count, + role="PSERVER", ) + except Exception: + logging.exception("error while trying to create pservers") + cleanup(args.task_name) + + +def log_to_file(source, filename): + if not filename in log_files: + log_files.append(filename) + with open(args.log_path + filename, "a") as log_file: + for line in iter(source.readline, ""): + log_file.write(line) + + +def create_trainers(kickoff_cmd, pserver_endpoints_str): + def create_and_start_trainer(trainer_index): + logging.info("trainer " + str(trainer_index) + " is starting") + + instance_response = run_instances( + image_id=args.trainer_image_id, + instance_type=args.trainer_instance_type, + count=1, + role="TRAINER", )[0] + trainer_ip = instance_response["PrivateIpAddress"] + + logging.info("trainer " + str(trainer_index) + " started") + + ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path) + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_client.connect(hostname=trainer_ip, username="ubuntu", pkey=ssh_key) + + logging.info("trainer " + str(trainer_index) + + " terminal connected via ssh") + + cmd = kickoff_cmd.format( + PSERVER_HOSTS=pserver_endpoints_str, + DOCKER_IMAGE=args.docker_image, + TRAINER_INDEX=str(trainer_index), + TASK_NAME=args.task_name, + TRAINER_COUNT=args.trainer_count, + COMMAND=args.trainer_command, + MASTER_ENDPOINT=args.master_server_ip + ":" + + str(args.master_server_port)) + logging.info(cmd) + + stdin, stdout, stderr = ssh_client.exec_command(command=cmd) + + # read and save output log + + logging.info("trainer " + str(trainer_index) + + " command executed, keep fetching log") + + stdout_thread = threading.Thread( + target=log_to_file, + args=( + stdout, + "trainer_" + str(trainer_index) + ".log", )) + stderr_thread = threading.Thread( + target=log_to_file, + args=( + stderr, + "trainer_" + str(trainer_index) + "_err.log", )) + stdout_thread.start() + stderr_thread.start() + + stdout_thread.join() + stderr_thread.join() + + return_code = stdout.channel.recv_exit_status() + if return_code != 0: + trainer_create_results[trainer_index] = {'has_error': True} + raise ValueError("trainer didn't finish with exit code 0") + + ssh_client.close() + + # multi thread starting trainer instance and run kickoff command + + trainer_threads = [] + trainer_create_results = {} + try: + for i in xrange(args.trainer_count): + logging.info("starting tread for trainer " + str(i)) + trainer_thread = threading.Thread( + target=create_and_start_trainer, args=(i, )) + trainer_thread.start() + trainer_threads.append(trainer_thread) + + for trainer_thread in trainer_threads: + trainer_thread.join() + + for result in trainer_create_results: + if result["has_error"]: + logging.error( + "error during trainer starting or training, destorying the while cluster " + ) + cleanup(args.task_name) + break + + logging.info("all trainers stopped") + except Exception, e: + logging.info( + "Training exception, clean up resources, please check log for more info" + ) + finally: + cleanup(args.task_name) + + +def cleanup(task_name): + if args.no_clean_up: + logging.info("no clean up option set, going to leave the setup running") + return + #shutdown all ec2 instances + print("going to clean up " + task_name + " instances") + instances_response = ec2client.describe_instances(Filters=[{ + "Name": "tag:Task_name", + "Values": [task_name] + }]) + + instance_ids = [] + if len(instances_response["Reservations"]) > 0: + for reservation in instances_response["Reservations"]: + for instance in reservation["Instances"]: + instance_ids.append(instance["InstanceId"]) + + ec2client.terminate_instances(InstanceIds=instance_ids) + + instance_termination_waiter = ec2client.get_waiter( + 'instance_terminated') + instance_termination_waiter.wait(InstanceIds=instance_ids) + + #delete the subnet created + + subnet = ec2client.describe_subnets(Filters=[{ + "Name": "tag:Task_name", + "Values": [task_name] + }]) + + if len(subnet["Subnets"]) > 0: + ec2client.delete_subnet(SubnetId=subnet["Subnets"][0]["SubnetId"]) + # no subnet delete waiter, just leave it. + logging.info("Clearnup done") + return + + +def kickoff_pserver(host, pserver_endpoints_str): + try: + ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path) + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_client.connect(hostname=host, username="ubuntu", pkey=ssh_key) + cmd = (script_to_str(args.pserver_bash_file)).format( + PSERVER_HOSTS=pserver_endpoints_str, + DOCKER_IMAGE=args.docker_image, + PSERVER_PORT=args.pserver_port, + TASK_NAME=args.task_name, + COMMAND=args.pserver_command, + TRAINER_COUNT=args.trainer_count, + TRAINER_INDEX=0, + # there is no way to use 0.0.0.0:port to start pserver + # has to docker --network="host" with host ip to make this work + SERVER_ENDPOINT=host + ":" + str(args.pserver_port), + MASTER_ENDPOINT=args.master_server_ip + ":" + + str(args.master_server_port)) + logging.info(cmd) + stdin, stdout, stderr = ssh_client.exec_command(command=cmd) + + stdout_thread = threading.Thread( + target=log_to_file, args=( + stdout, + "pserver_" + host + ".log", )) + stderr_thread = threading.Thread( + target=log_to_file, args=( + stderr, + "pserver_" + host + "_err.log", )) + stdout_thread.start() + stderr_thread.start() + + stdout_thread.join() + stderr_thread.join() + + return_code = stdout.channel.recv_exit_status() + logging.info(return_code) + if return_code != 0: + raise Exception("Error while kicking off pserver training process") + except Exception: + logging.exception("Error while kicking off pserver training process") + cleanup(args.task_name) + finally: + ssh_client.close() + + +def init_args(): + + if not args.task_name: + args.task_name = generate_task_name() + logging.info("task name generated %s" % (args.task_name)) + + if not args.pem_path: + args.pem_path = os.path.expanduser("~") + "/" + args.key_name + ".pem" + if args.security_group_id: + args.security_group_ids = (args.security_group_id, ) + + args.trainers_job_done_count = 0 + + +def create_cluster(): + + if not args.subnet_id: + logging.info("creating subnet for this task") + args.subnet_id = create_subnet() + logging.info("subnet %s created" % (args.subnet_id)) + + logging.info("creating pservers") + pserver_create_response = create_pservers() + logging.info("pserver created, collecting pserver ips") + + pserver_endpoints = [] + for pserver in pserver_create_response: + pserver_endpoints.append(pserver["NetworkInterfaces"][0][ + "PrivateIpAddress"] + ":" + args.pserver_port) + + pserver_endpoints_str = ",".join(pserver_endpoints) + + logging.info("kicking off pserver training process") + pserver_threads = [] + for pserver in pserver_create_response: + pserver_thread = threading.Thread( + target=kickoff_pserver, + args=(pserver["PrivateIpAddress"], pserver_endpoints_str)) + pserver_thread.start() + pserver_threads.append(pserver_thread) + + logging.info("all pserver training process started") + + logging.info("creating trainers and kicking off trainer training process") + create_trainers( + kickoff_cmd=script_to_str(args.trainer_bash_file), + pserver_endpoints_str=pserver_endpoints_str) + + for pserver_thread in pserver_threads: + pserver_thread.join() + + logging.info("all process ended") + + +def start_server(args): + class S(BaseHTTPRequestHandler): + def _set_headers(self): + self.send_response(200) + self.send_header('Content-type', 'text/text') + self.end_headers() + + def do_HEAD(self): + self._set_headers() + + def do_404(self): + self.send_response(404) + self.send_header('Content-type', 'text/text') + self.end_headers() + logging.info("Received invalid GET request" + self.path) + self.wfile.write("NO ACTION FOUND") + + def do_GET(self): + + request_path = self.path + if request_path == "/status" or request_path == "/master_logs": + self._set_headers() + logging.info("Received request to return status") + with open(args.log_path + "master.log", "r") as logfile: + self.wfile.write(logfile.read().strip()) + elif request_path == "/list_logs" or request_path == "/logs": + self._set_headers() + self.wfile.write("\n".join(log_files)) + elif "/log/" in request_path: + self._set_headers() + log_file_path = request_path.replace("/log/", "") + logging.info("requesting log file path is" + args.log_path + + log_file_path) + with open(args.log_path + log_file_path, "r") as logfile: + self.wfile.write(logfile.read().strip()) + else: + self.do_404() + + def do_POST(self): + + request_path = self.path + + if request_path == "/save_data": + self._set_headers() + logging.info("Received request to save data") + self.wfile.write("DATA SAVED!") + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + if args.task_name: + with open(args.task_name + ".txt", "a") as text_file: + text_file.write(post_data + "\n") + + elif request_path == "/cleanup": + self._set_headers() + logging.info("Received request to cleanup cluster") + cleanup(args.task_name) + self.wfile.write("cleanup in progress") + + else: + self.do_404() + + server_address = ('', args.master_server_port) + httpd = HTTPServer(server_address, S) + logging.info("HTTP server is starting") + httpd.serve_forever() + + +def print_arguments(): + logging.info('----------- Configuration Arguments -----------') + for arg, value in sorted(vars(args).iteritems()): + logging.info('%s: %s' % (arg, value)) + logging.info('------------------------------------------------') + + +if __name__ == "__main__": + print_arguments() + if args.action == "create": + logging.info("going to create cluster") + if not args.key_name or not args.security_group_id: + raise ValueError("key_name and security_group_id are required") + init_args() + create_cluster() + elif args.action == "cleanup": + logging.info("going to cleanup cluster") + if not args.task_name: + raise ValueError("task_name is required") + cleanup(args.task_name) + elif args.action == "serve": + # serve mode + if not args.master_server_ip: + raise ValueError( + "No master server ip set, please run with --action create") + + logging.info("going to start serve and create cluster") + + init_args() + + logging.info("starting server in another thread") + server_thread = threading.Thread(target=start_server, args=(args, )) + server_thread.start() + + create_cluster() + server_thread.join() + elif args.action == "test": + start_server(args) diff --git a/tools/aws_benchmarking/server/logs/master.log b/tools/aws_benchmarking/server/logs/master.log new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tools/aws_benchmarking/server/pserver.sh.template b/tools/aws_benchmarking/server/pserver.sh.template new file mode 100644 index 0000000000000000000000000000000000000000..2612856d1e6273fe2642f82e8c616eb9ff24f8a4 --- /dev/null +++ b/tools/aws_benchmarking/server/pserver.sh.template @@ -0,0 +1,2 @@ +#!/bin/bash +docker run --network="host" -i -e "SERVER_ENDPOINT={SERVER_ENDPOINT}" -e "MASTER_ENDPOINT={MASTER_ENDPOINT}" -e "TASK_NAME={TASK_NAME}" -e "TRAINER_INDEX={TRAINER_INDEX}" -e "TRAINING_ROLE=PSERVER" -e "TRAINER_COUNT={TRAINER_COUNT}" -e "TRAINERS={TRAINER_COUNT}" -e "PSERVER_HOSTS={PSERVER_HOSTS}" -e "PSERVERS={PSERVER_HOSTS}" {DOCKER_IMAGE} {COMMAND} --device CPU \ No newline at end of file diff --git a/tools/aws_benchmarking/server/requirements.txt b/tools/aws_benchmarking/server/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..5c523854f28b0a6f024fba2b2f344b53ba967a2f --- /dev/null +++ b/tools/aws_benchmarking/server/requirements.txt @@ -0,0 +1,4 @@ +netaddr==0.7.19 +boto3==1.6.21 +namesgenerator==0.3 +paramiko==2.4.1 diff --git a/tools/aws_benchmarking/server/trainer.sh.template b/tools/aws_benchmarking/server/trainer.sh.template new file mode 100644 index 0000000000000000000000000000000000000000..a4b2876b08cdf05e90e50589f897d74ca5f90443 --- /dev/null +++ b/tools/aws_benchmarking/server/trainer.sh.template @@ -0,0 +1,2 @@ +#!/bin/bash +nvidia-docker run --network="host" -i -e "MASTER_ENDPOINT={MASTER_ENDPOINT}" -e "TASK_NAME={TASK_NAME}" -e "TRAINER_COUNT={TRAINER_COUNT}" -e "TRAINERS={TRAINER_COUNT}" -e "TRAINER_INDEX={TRAINER_INDEX}" -e "PADDLE_INIT_TRAINER_ID={TRAINER_INDEX}" -e "TRAINING_ROLE=TRAINER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" -e "PSERVERS={PSERVER_HOSTS}" {DOCKER_IMAGE} {COMMAND} --device GPU \ No newline at end of file diff --git a/tools/manylinux1/Dockerfile.android b/tools/manylinux1/Dockerfile.android index b6cae228a0c45ab70ba8ecc80ae4df7e0fa5bdbc..7eb040902b0f8f3cc9f7a31ec9f96467de654c3e 100644 --- a/tools/manylinux1/Dockerfile.android +++ b/tools/manylinux1/Dockerfile.android @@ -37,7 +37,7 @@ RUN git config --global credential.helper store # Fix locales to en_US.UTF-8 RUN localedef -i en_US -f UTF-8 en_US.UTF-8 -RUN pip install --upgrade pip && \ +RUN pip install --upgrade pip==9.0.3 && \ pip install -U 'protobuf==3.1.0' && \ pip install -U wheel sphinx && \ pip install pre-commit