提交 2a672d24 编写于 作者: L Luo Tao

Merge branch 'develop' into demo

......@@ -66,6 +66,12 @@ option(WITH_ANAKIN "Compile with Anakin library" OFF)
option(WITH_GRPC "Use grpc as the default rpc framework" ${WITH_DISTRIBUTE})
option(WITH_BRPC_RDMA "Use brpc rdma as the rpc protocal" OFF)
option(WITH_SYSTEM_BLAS "Use system blas library" OFF)
option(PY_VERSION "Compile PaddlePaddle with python3 support" ${PY_VERSION})
# PY_VERSION
if(NOT PY_VERSION)
set(PY_VERSION 2.7)
endif()
# CMAKE_BUILD_TYPE
if(NOT CMAKE_BUILD_TYPE)
......@@ -146,6 +152,7 @@ endif()
########################################################################################
include(external/mklml) # download mklml package
include(external/libxsmm) # download, build, install libxsmm
include(external/zlib) # download, build, install zlib
include(external/gflags) # download, build, install gflags
include(external/glog) # download, build, install glog
......@@ -232,6 +239,10 @@ if(WITH_MKLML)
list(APPEND EXTERNAL_LIBS ${MKLML_IOMP_LIB})
endif()
if(WITH_LIBXSMM)
list(APPEND EXTERNAL_LIBS ${LIBXSMM_LIBS})
endif()
if(WITH_MKLDNN)
list(APPEND EXTERNAL_LIBS ${MKLDNN_LIB})
endif()
......
......@@ -80,7 +80,7 @@ RUN pip install pre-commit 'ipython==5.3.0' && \
pip install opencv-python
#For docstring checker
RUN pip install pylint pytest astroid isort
RUN pip install pylint pytest astroid isort LinkChecker
COPY ./python/requirements.txt /root/
RUN pip install -r /root/requirements.txt
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
OPTION(WITH_LIBXSMM "Compile with libxsmm" OFF)
IF(NOT WITH_LIBXSMM)
return()
ENDIF()
IF(WIN32 OR APPLE OR ANDROID OR IOS)
MESSAGE(WARNING "Windows, Mac or Mobile are not supported with libxsmm in Paddle yet.")
SET(WITH_LIBXSMM OFF CACHE STRING "Disable LIBXSMM" FORCE)
return()
ENDIF()
INCLUDE (ExternalProject)
SET(LIBXSMM_SOURCES_DIR ${THIRD_PARTY_PATH}/libxsmm)
SET(LIBXSMM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/libxsmm)
SET(LIBXSMM_INCLUDE_DIR "${LIBXSMM_INSTALL_DIR}/include" CACHE PATH "LIBXSMM include directory." FORCE)
SET(LIBXSMM_LIBRARY_DIR "${LIBXSMM_INSTALL_DIR}/lib" CACHE PATH "LIBXSMM library directory." FORCE)
SET(LIBXSMM_LIBS "${LIBXSMM_LIBRARY_DIR}/libxsmm.a"
"${LIBXSMM_LIBRARY_DIR}/libxsmmnoblas.a")
ExternalProject_Add(
extern_libxsmm
GIT_REPOSITORY "https://github.com/hfp/libxsmm.git"
GIT_TAG "7cc03b5b342fdbc6b6d990b190671c5dbb8489a2"
PREFIX ${LIBXSMM_SOURCES_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_IN_SOURCE 1
BUILD_COMMAND $(MAKE) --silent PREFIX=${LIBXSMM_INSTALL_DIR} CXX=g++ CC=gcc WARP=0 install
INSTALL_COMMAND ""
)
ADD_LIBRARY(libxsmm STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET libxsmm PROPERTY IMPORTED_LOCATION "${LIBXSMM_LIBRARY_DIR}/libxsmm.a")
SET_PROPERTY(TARGET libxsmm PROPERTY IMPORTED_LOCATION "${LIBXSMM_LIBRARY_DIR}/libxsmmnoblas.a")
MESSAGE(STATUS "Libxsmm library: ${LIBXSMM_LIBS}")
include_directories(${LIBXSMM_INCLUDE_DIR})
ADD_DEFINITIONS(-DPADDLE_WITH_LIBXSMM)
ADD_DEPENDENCIES(libxsmm extern_libxsmm)
LIST(APPEND external_project_dependencies libxsmm)
......@@ -121,6 +121,11 @@ ELSE()
TARGET_LINK_LIBRARIES(cblas ${CBLAS_LIBRARIES})
ENDIF("${CBLAS_PROVIDER}" STREQUAL "MKLML")
IF(WITH_LIBXSMM)
TARGET_LINK_LIBRARIES(cblas ${LIBXSMM_LIBS})
ADD_DEPENDENCIES(cblas extern_libxsmm)
ENDIF()
IF(NOT ${CBLAS_FOUND})
ADD_DEPENDENCIES(cblas extern_openblas)
LIST(APPEND external_project_dependencies cblas)
......
......@@ -18,8 +18,9 @@ ENDIF()
INCLUDE(python_module)
FIND_PACKAGE(PythonInterp 2.7)
FIND_PACKAGE(PythonLibs 2.7)
FIND_PACKAGE(PythonInterp ${PY_VERSION})
FIND_PACKAGE(PythonLibs ${PY_VERSION})
# Fixme: Maybe find a static library. Get SHARED/STATIC by FIND_PACKAGE.
ADD_LIBRARY(python SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET python PROPERTY IMPORTED_LOCATION ${PYTHON_LIBRARIES})
......
......@@ -276,13 +276,22 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
}
}
// Insert BCast Ops
for (size_t dev_id = 0; dev_id < bcast_var_name_set.size(); ++dev_id) {
auto &to_bcast_set = bcast_var_name_set[dev_id];
for (auto &bcast_name : to_bcast_set) {
CreateBroadcastOp(&result, bcast_name, dev_id);
bool use_gpu = false;
#ifdef PADDLE_WITH_CUDA
use_gpu = nccl_ctxs_ != nullptr;
#endif
if (use_gpu ||
strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) {
// Insert BCast Ops
for (size_t dev_id = 0; dev_id < bcast_var_name_set.size(); ++dev_id) {
auto &to_bcast_set = bcast_var_name_set[dev_id];
for (auto &bcast_name : to_bcast_set) {
CreateBroadcastOp(&result, bcast_name, dev_id);
}
}
}
/*
Dependency graph has been constructed. However, there are still data
hazards need to be handled.
......@@ -412,14 +421,19 @@ int MultiDevSSAGraphBuilder::GetOpDeviceID(const OpDesc &op) const {
if (strategy_.reduce_ != BuildStrategy::ReduceStrategy::kReduce) {
return -1;
}
for (auto &varname : op.InputArgumentNames()) {
int dev_id = GetVarDeviceID(varname);
if (dev_id != -1) {
return dev_id;
}
int op_role = boost::get<int>(
op.GetAttr(framework::OpProtoAndCheckerMaker::OpRoleAttrName()));
if (op_role != static_cast<int>(framework::OpRole::kOptimize)) {
return -1;
}
return -1;
auto param_grad = boost::get<std::vector<std::string>>(
op.GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName()));
PADDLE_ENFORCE_EQ(param_grad.size(), 2U);
int dev_id = GetVarDeviceID(param_grad[1]);
PADDLE_ENFORCE_NE(dev_id, -1, "dev_id should not be -1.[%s, %s]", op.Type(),
param_grad[0]);
return dev_id;
}
int MultiDevSSAGraphBuilder::GetVarDeviceID(const std::string &varname) const {
......
......@@ -45,6 +45,7 @@ class ParallelExecutorPrivate {
#endif
bool own_local_scope_;
bool use_cuda_;
bool use_all_reduce_;
};
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
......@@ -62,6 +63,14 @@ ParallelExecutor::ParallelExecutor(
: member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope;
member_->use_cuda_ = exec_strategy.use_cuda_;
member_->use_all_reduce_ =
build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce;
if (!member_->use_all_reduce_) {
PADDLE_ENFORCE(places.size() > 1,
"If you set build_strategy.reduce with 'Reduce',"
"the number of places must be greater than 1.");
}
// Step 1. Bcast the params to devs.
// Create local scopes
......@@ -95,7 +104,7 @@ ParallelExecutor::ParallelExecutor(
}
if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
BCastParamsToDevs(bcast_vars);
BCastParamsToDevices(bcast_vars);
}
// Startup Program has been run. All local scopes has correct parameters.
......@@ -117,7 +126,7 @@ ParallelExecutor::ParallelExecutor(
#ifdef PADDLE_WITH_CUDA
builder_factory.SetNCCLContextMap(member_->nccl_ctxs_.get());
#else
PADDLE_THROW("Not compiled with CUDA");
PADDLE_THROW("Not compiled with CUDA.");
#endif
}
......@@ -131,9 +140,9 @@ ParallelExecutor::ParallelExecutor(
member_->places_, std::move(member_->executor_)));
}
void ParallelExecutor::BCastParamsToDevs(
void ParallelExecutor::BCastParamsToDevices(
const std::unordered_set<std::string> &vars) const {
// the the initializing bcast, all vars would be bcast from device(0),
// the initializing bcast, all vars would be bcast from device(0),
// otherwise
// bcast from the specified device.
bool initializing = builder_.get() == nullptr ? true : false;
......@@ -209,9 +218,16 @@ void ParallelExecutor::BCastParamsToDevs(
auto local_scope = member_->local_scopes_[i];
auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
t->Resize(dims);
t->mutable_data(cpu, main_tensor.type());
paddle::framework::TensorCopy(main_tensor, cpu, t);
// FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix.
if (member_->use_all_reduce_ || member_->use_cuda_ ||
var == "@LR_DECAY_COUNTER@") {
t->Resize(dims);
t->mutable_data(cpu, main_tensor.type());
paddle::framework::TensorCopy(main_tensor, cpu, t);
} else {
t->ShareDataWith(main_tensor);
}
}
}
}
......
......@@ -66,7 +66,7 @@ class ParallelExecutor {
void Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name);
void BCastParamsToDevs(const std::unordered_set<std::string> &vars) const;
void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const;
private:
ParallelExecutorPrivate *member_;
......
......@@ -48,7 +48,7 @@ class CheckpointNotifyOp : public framework::OperatorBase {
VLOG(3) << "checkpoint notify sending lookup table: " << lookup_table_name
<< " and dir:" << dir << " to " << epmap[i];
}
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
};
......
......@@ -281,9 +281,10 @@ void GRPCClient::AsyncCheckpointNotify(const std::string& ep,
req_count_++;
}
void GRPCClient::Wait() {
bool GRPCClient::Wait() {
std::unique_lock<std::mutex> lk(sync_mutex_);
sync_cond_.wait(lk, [this] { return req_count_ == 0; });
sync_cond_.wait(lk, [this] { return (req_count_ == 0 || ok_ == false); });
return ok_;
}
void GRPCClient::Proceed() {
......@@ -297,6 +298,14 @@ void GRPCClient::Proceed() {
if (c->status_.ok()) {
VLOG(3) << c->var_h_.String() << " process";
c->Process();
} else if (c->status_.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) {
LOG(ERROR) << c->var_h_.String()
<< " meets grpc error:" << c->status_.error_message();
{
std::lock_guard<std::mutex> lk(sync_mutex_);
ok_ = false;
}
sync_cond_.notify_all();
} else {
LOG(FATAL) << c->var_h_.String()
<< " meets grpc error:" << c->status_.error_message();
......
......@@ -188,7 +188,7 @@ class CheckpointNotifyProcessor : public BaseProcessor {
class GRPCClient : public RPCClient {
public:
GRPCClient() {}
GRPCClient() : ok_(true) {}
virtual ~GRPCClient();
bool AsyncSendVar(const std::string& ep, const platform::DeviceContext& ctx,
......@@ -221,7 +221,7 @@ class GRPCClient : public RPCClient {
void AsyncSendEndPass(const std::string& ep,
int64_t time_out = FLAGS_rpc_deadline) override;
void Wait() override;
bool Wait() override;
void SendBeginPass() override;
......@@ -247,6 +247,7 @@ class GRPCClient : public RPCClient {
std::mutex sync_mutex_;
std::condition_variable sync_cond_;
std::atomic<int64_t> req_count_{0};
bool ok_;
// mutex for GetChannel thread safety
std::mutex chan_mutex_;
......
......@@ -72,7 +72,7 @@ class RPCClient {
virtual void SendBeginPass() = 0;
virtual void SendEndPass() = 0;
virtual void Wait() = 0;
virtual bool Wait() = 0;
template <typename T>
static RPCClient* GetInstance() {
......
......@@ -45,13 +45,13 @@ class FetchBarrierOp : public framework::OperatorBase {
distributed::RPCClient* rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>();
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
for (auto& ep : eps) {
VLOG(3) << "fetch barrier, ep: " << ep;
rpc_client->AsyncSendFetchBarrier(ep);
}
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
};
......
......@@ -21,6 +21,10 @@
#include "paddle/fluid/platform/dynload/mklml.h"
#endif
#ifdef PADDLE_WITH_LIBXSMM
#include <libxsmm.h>
#endif
#ifdef PADDLE_USE_OPENBLAS
#include <cblas.h>
#endif
......
......@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <limits>
#include <vector>
#include "paddle/fluid/operators/math/math_function.h"
......@@ -30,6 +31,12 @@ struct CBlas<float> {
platform::dynload::cblas_sgemm(args...);
}
#ifdef PADDLE_WITH_LIBXSMM
template <typename... ARGS>
static void SMM_GEMM(ARGS... args) {
libxsmm_sgemm(args...);
}
#endif
template <typename... ARGS>
static void AXPY(ARGS... args) {
platform::dynload::cblas_saxpy(args...);
......@@ -63,6 +70,12 @@ struct CBlas<double> {
platform::dynload::cblas_dgemm(args...);
}
#ifdef PADDLE_WITH_LIBXSMM
template <typename... ARGS>
static void SMM_GEMM(ARGS... args) {
libxsmm_dgemm(args...);
}
#endif
template <typename... ARGS>
static void AXPY(ARGS... args) {
platform::dynload::cblas_daxpy(args...);
......@@ -140,6 +153,9 @@ struct CBlas<double> {
template <>
struct CBlas<platform::float16> {
static void GEMM(...) { PADDLE_THROW("float16 GEMM not supported on CPU"); }
static void SMM_GEMM(...) {
PADDLE_THROW("float16 SMM_GEMM not supported on CPU");
}
#ifdef PADDLE_WITH_MKLML
static void GEMM_BATCH(...) {
PADDLE_THROW("float16 GEMM_BATCH not supported on CPU");
......@@ -147,6 +163,33 @@ struct CBlas<platform::float16> {
#endif
};
template <typename T>
inline bool UseXSMM(const int &m, const int &n, const int &k, bool transa,
bool transb, const T &alpha, const T &beta) {
#ifdef PADDLE_WITH_LIBXSMM
// Refer to https://github.com/hfp/libxsmm/blob/master/README.md
// But the threshold is custom
constexpr int LIBXSMM_THRESHOLD = 20 * 20 * 20;
if (m * n * k > LIBXSMM_THRESHOLD || transa || transb ||
std::abs<T>(alpha - static_cast<T>(1) >
std::numeric_limits<T>::epsilon()) ||
std::abs<T>(beta) > std::numeric_limits<T>::epsilon()) {
return false;
} else {
return true;
}
#endif
return false;
}
template <>
inline bool UseXSMM<platform::float16>(const int &m, const int &n, const int &k,
bool transa, bool transb,
const platform::float16 &alpha,
const platform::float16 &beta) {
return false;
}
template <>
template <typename T>
void Blas<platform::CPUDeviceContext>::GEMM(CBLAS_TRANSPOSE transA,
......@@ -156,8 +199,21 @@ void Blas<platform::CPUDeviceContext>::GEMM(CBLAS_TRANSPOSE transA,
int lda = (transA == CblasNoTrans) ? K : M;
int ldb = (transB == CblasNoTrans) ? N : K;
int ldc = N;
CBlas<T>::GEMM(CblasRowMajor, transA, transB, M, N, K, alpha, A, lda, B, ldb,
beta, C, ldc);
#ifdef PADDLE_WITH_LIBXSMM
if (UseXSMM(M, N, K, transA != CblasNoTrans, transB != CblasNoTrans, alpha,
beta)) {
// Note: SMM use ColMajor
const char transa = 'N';
const char transb = 'N';
CBlas<T>::SMM_GEMM(&transa, &transb, &N, &M, &K, &alpha, B, &ldb, A, &lda,
&beta, C, &ldc);
} else {
#endif
CBlas<T>::GEMM(CblasRowMajor, transA, transB, M, N, K, alpha, A, lda, B,
ldb, beta, C, ldc);
#ifdef PADDLE_WITH_LIBXSMM
}
#endif
}
template <>
......
......@@ -54,8 +54,64 @@ TEST(math_function, gemm_notrans_cblas) {
EXPECT_EQ(input3_ptr[6], 86);
EXPECT_EQ(input3_ptr[7], 99);
}
#ifdef PADDLE_WITH_LIBXSMM
template <typename T>
void MklSmmCompare(int m, int n, int k) {
paddle::framework::Tensor mat_a;
paddle::framework::Tensor mat_b;
paddle::framework::Tensor mat_c_smm;
paddle::framework::Tensor mat_c_mkl;
auto* cpu_place = new paddle::platform::CPUPlace();
T* A = mat_a.mutable_data<T>({m, k}, *cpu_place);
T* B = mat_b.mutable_data<T>({k, n}, *cpu_place);
T* CSMM = mat_c_smm.mutable_data<T>({m, n}, *cpu_place);
T* CMKL = mat_c_mkl.mutable_data<T>({m, n}, *cpu_place);
T alpha = static_cast<T>(1);
T beta = static_cast<T>(0);
for (int i = 0; i < mat_a.numel(); ++i) {
A[i] = static_cast<T>(i);
}
for (int i = 0; i < mat_b.numel(); ++i) {
B[i] = static_cast<T>(i);
}
// lda,ldb,ldc follow RowMajor
int lda = k;
int ldb = n;
int ldc = n;
auto smm = [&, m, n, k, lda, ldb, ldc, alpha, beta]() {
const char transa = 'N';
const char transb = 'N';
paddle::operators::math::CBlas<T>::SMM_GEMM(&transa, &transb, &n, &m, &k,
&alpha, B, &ldb, A, &lda, &beta,
CSMM, &ldc);
};
auto mkl = [&, m, n, k, lda, ldb, ldc, alpha, beta]() {
paddle::operators::math::CBlas<T>::GEMM(CblasRowMajor, CblasNoTrans,
CblasNoTrans, m, n, k, alpha, A,
lda, B, ldb, beta, CMKL, ldc);
};
smm();
mkl();
ASSERT_EQ(mat_c_mkl.numel(), mat_c_smm.numel());
for (int i = 0; i < mat_c_mkl.numel(); ++i) {
EXPECT_FLOAT_EQ(CSMM[i], CMKL[i]);
}
}
TEST(math_function, gemm_mkl_vs_smm) {
MklSmmCompare<float>(1, 2, 3);
MklSmmCompare<double>(1, 2, 3);
MklSmmCompare<float>(3, 2, 1);
MklSmmCompare<double>(3, 2, 1);
MklSmmCompare<float>(3, 8, 5);
MklSmmCompare<double>(3, 8, 5);
}
#endif
TEST(math_function, gemm_trans_clbas) {
TEST(math_function, gemm_trans_cblas) {
paddle::framework::Tensor input1;
paddle::framework::Tensor input2;
paddle::framework::Tensor input3;
......
......@@ -53,7 +53,7 @@ class PrefetchOp : public framework::OperatorBase {
VLOG(3) << "don't send no-initialied variable: " << ins[i];
}
}
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
};
......
......@@ -51,7 +51,7 @@ class RecvOp : public framework::OperatorBase {
rpc_client->AsyncGetVar(epmap[i], ctx, scope, outs[i]);
}
if (sync_mode) {
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
}
};
......
......@@ -50,13 +50,13 @@ class SendBarrierOp : public framework::OperatorBase {
VLOG(3) << "SendBarrierOp sync_mode:" << sync_mode;
// need to wait before sending send_barrier message
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
if (sync_mode) {
for (auto& ep : eps) {
VLOG(3) << "send barrier, ep: " << ep;
rpc_client->AsyncSendBatchBarrier(ep);
}
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
}
};
......
......@@ -59,7 +59,7 @@ class SendOp : public framework::OperatorBase {
}
}
if (sync_send) {
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
}
};
......
......@@ -60,6 +60,7 @@ class TopkKernel : public framework::OpKernel<T> {
#endif
for (size_t i = 0; i < row; i++) {
std::vector<std::pair<T, size_t>> vec;
vec.reserve(col);
for (size_t j = 0; j < col; j++) {
vec.push_back(std::pair<T, size_t>(eg_input(i, j), j));
}
......
......@@ -68,7 +68,7 @@ bool IsCompiledWithCUDA() {
}
bool IsCompiledWithDIST() {
#ifdef PADDLE_WITH_DIST
#ifdef PADDLE_WITH_DISTRIBUTE
return true;
#else
return false;
......@@ -669,7 +669,7 @@ All parameter, weight, gradient are variables in Paddle.
const std::string &, Scope *, std::vector<Scope *> &,
const ExecutionStrategy &, const BuildStrategy &, size_t,
size_t>())
.def("bcast_params", &ParallelExecutor::BCastParamsToDevs)
.def("bcast_params", &ParallelExecutor::BCastParamsToDevices)
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
// We still cannot get local_scope from this vector, since the element
// of vec<Scope*> will be freed by Python GC. We can only return Scope*
......
......@@ -136,7 +136,13 @@ std::string callPythonFunc(const std::string& moduleName,
const std::string& funcName,
const std::vector<std::string>& args) {
PyObjectPtr obj = callPythonFuncRetPyObj(moduleName, funcName, args);
#if PY_MAJOR_VERSION >= 3
Py_ssize_t str_size = 0u;
const char* str = PyUnicode_AsUTF8AndSize(obj.get(), &str_size);
return std::string(str, (size_t)str_size);
#else
return std::string(PyString_AsString(obj.get()), PyString_Size(obj.get()));
#endif // PY_MAJOR_VERSION >= 3
}
PyObjectPtr createPythonClass(
......
......@@ -88,6 +88,33 @@ PyObjectPtr createPythonClass(const std::string& moduleName,
namespace py {
PyObjectPtr import(const std::string& moduleName);
#if PY_MAJOR_VERSION >= 3
/**
* Cast a PyLong to int type T.
* @tparam T return type.
* @param [in] obj PyLong object.
* @param [out] ok status for casting. False if error occured. nullptr if user
* don't care is ok or not.
* @return The value of python object, or 0 if not ok.
*/
template <typename T>
T castInt(PyObject* obj, bool* ok = nullptr) {
// Refer to https://www.python.org/dev/peps/pep-0237/, the int and long object
// were unified to long since python3
if (PyLong_Check(obj)) {
if (ok) *ok = true;
return (T)PyLong_AsUnsignedLong(obj);
} else {
if (ok) *ok = false;
return (T)0;
}
}
// Convert PyAPI from 2.x to 3.x
#define PyString_FromString PyUnicode_FromString
#define PyString_AsString PyUnicode_AsUTF8
#else
/**
* Cast a PyLong or PyInt to int type T.
* @tparam T return type.
......@@ -109,6 +136,7 @@ T castInt(PyObject* obj, bool* ok = nullptr) {
return (T)0;
}
}
#endif // PY_MAJOR_VERSION >= 3
/**
* Invoke repr of python object.
......
......@@ -78,6 +78,12 @@ function cmake_gen() {
PYTHON_FLAGS="-DPYTHON_EXECUTABLE:FILEPATH=/opt/python/cp27-cp27mu/bin/python
-DPYTHON_INCLUDE_DIR:PATH=/opt/python/cp27-cp27mu/include/python2.7
-DPYTHON_LIBRARIES:FILEPATH=/opt/_internal/cpython-2.7.11-ucs4/lib/libpython2.7.so"
elif [ "$1" == "cp35-cp35m" ]; then
export LD_LIBRARY_PATH=/opt/_internal/cpython-3.5.1/lib/:${LD_LIBRARY_PATH}
export PATH=/opt/_internal/cpython-3.5.1/bin/:${PATH}
export PYTHON_FLAGS="-DPYTHON_EXECUTABLE:FILEPATH=/opt/_internal/cpython-3.5.1/bin/python3
-DPYTHON_INCLUDE_DIR:PATH=/opt/_internal/cpython-3.5.1/include/python3.5m
-DPYTHON_LIBRARIES:FILEPATH=/opt/_internal/cpython-3.5.1/lib/libpython3.so"
fi
fi
......@@ -108,6 +114,7 @@ function cmake_gen() {
-DWITH_CONTRIB=${WITH_CONTRIB:-ON}
-DWITH_ANAKIN=${WITH_ANAKIN:-OFF}
-DWITH_INFERENCE_DEMO=${WITH_INFERENCE_DEMO:-ON}
-DPY_VERSION=${PY_VERSION:-2.7}
========================================
EOF
# Disable UNITTEST_USE_VIRTUALENV in docker because
......@@ -136,7 +143,8 @@ EOF
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DWITH_CONTRIB=${WITH_CONTRIB:-ON} \
-DWITH_ANAKIN=${WITH_ANAKIN:-OFF} \
-DWITH_INFERENCE_DEMO=${WITH_INFERENCE_DEMO:-ON}
-DWITH_INFERENCE_DEMO=${WITH_INFERENCE_DEMO:-ON} \
-DPY_VERSION=${PY_VERSION:-2.7}
}
function abort(){
......
......@@ -12,16 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
try:
from version import full_version as __version__
from version import commit as __git_commit__
from paddle.version import full_version as __version__
from paddle.version import commit as __git_commit__
except ImportError:
import sys
sys.stderr.write('''Warning with import paddle: you should not
sys.stderr.write('''Warning with import paddle: you should not
import paddle from the source directory; please install paddlepaddle*.whl firstly.'''
)
import reader
import dataset
import batch
import paddle.reader
import paddle.dataset
import paddle.batch
batch = batch.batch
......@@ -15,20 +15,20 @@
Dataset package.
"""
import mnist
import imikolov
import imdb
import cifar
import movielens
import conll05
import uci_housing
import sentiment
import wmt14
import wmt16
import mq2007
import flowers
import voc2012
import image
import paddle.dataset.mnist
import paddle.dataset.imikolov
import paddle.dataset.imdb
import paddle.dataset.cifar
import paddle.dataset.movielens
import paddle.dataset.conll05
import paddle.dataset.uci_housing
import paddle.dataset.sentiment
import paddle.dataset.wmt14
import paddle.dataset.wmt16
import paddle.dataset.mq2007
import paddle.dataset.flowers
import paddle.dataset.voc2012
import paddle.dataset.image
__all__ = [
'mnist',
......
......@@ -324,10 +324,12 @@ def set_gradient_clip(clip, param_list=None, program=None):
param.gradient_clip_attr = copy.deepcopy(clip)
def append_gradient_clip_ops(param_grad):
def append_gradient_clip_ops(param_grads):
context = dict()
for p, g in param_grad:
with p.block.program.optimized_guard(p):
for p, g in param_grads:
if g is None:
continue
with p.block.program.optimized_guard([p, g]):
clip_attr = getattr(p, 'gradient_clip_attr', NullGradientClipAttr())
if clip_attr is None:
clip_attr = NullGradientClipAttr()
......@@ -339,8 +341,10 @@ def append_gradient_clip_ops(param_grad):
clip_attr._process_context(context=context, param=p, grad=g)
res = []
for p, g in param_grad:
with p.block.program.optimized_guard(p):
for p, g in param_grads:
if g is None:
continue
with p.block.program.optimized_guard([p, g]):
res.append(clip_attr._create_operators(param=p, grad=g))
return res
......
......@@ -1319,7 +1319,7 @@ class Program(object):
self._op_role_var = [var_name]
@contextlib.contextmanager
def optimized_guard(self, var):
def optimized_guard(self, param_and_grads):
"""
A with guard to set :code:`Optimization` :code:`OpRole` and
:code:`OpRoleVar` automatically.
......@@ -1327,17 +1327,20 @@ class Program(object):
Notes: This is a very low level API. Users should not use it directly.
Args:
var(Variable|str): The variable (name) to be optimized.
param_and_grads(list): The variables (names) to be optimized.
Examples:
>>> p, g = backward(...)
>>> with program.optimized_guard(p):
>>> with program.optimized_guard([p,g]):
>>> p = p - 0.001 * g
"""
OpRole = core.op_proto_and_checker_maker.OpRole
self._current_role = OpRole.Optimize
self._op_role_var = [var.name if isinstance(var, Variable) else var]
self._op_role_var = [
var.name if isinstance(var, Variable) else var
for var in param_and_grads
]
yield
self._op_role_var = []
self._current_role = OpRole.Forward
......
......@@ -123,7 +123,7 @@ class Optimizer(object):
"""
pass
def _finish_update(self, block, parameters):
def _finish_update(self, block, parameters_and_grads):
"""Finish any custom updates needed
before completing an optimization step
......@@ -226,18 +226,18 @@ class Optimizer(object):
optimize_ops = []
for param_and_grad in parameters_and_grads:
if param_and_grad[1] is None:
continue
with param_and_grad[0].block.program.optimized_guard(
param_and_grad[0]):
if param_and_grad[0].trainable is True and param_and_grad[
1] is not None:
param_and_grad):
if param_and_grad[0].trainable is True:
optimize_op = self._append_optimize_op(loss.block,
param_and_grad)
optimize_ops.append(optimize_op)
# Get custom finish ops for subclasses
# FIXME: Need to fix this once we figure out how to handle dependencies
self._finish_update(loss.block,
[p[0] for p in parameters_and_grads])
self._finish_update(loss.block, parameters_and_grads)
end = len(global_block.ops)
return global_block.slice_ops(start, end)
......@@ -564,13 +564,15 @@ class AdamOptimizer(Optimizer):
return adam_op
def _finish_update(self, block, parameters):
def _finish_update(self, block, param_and_grads):
"""Update Beta1 and Beta2 Power accumulators
"""
assert isinstance(block, framework.Block)
main_block = block.program.global_block()
for param in parameters:
with param.block.program.optimized_guard(param):
for param, grad in param_and_grads:
if grad is None:
continue
with param.block.program.optimized_guard([param, grad]):
beta1_pow_acc = self._get_accumulator(self._beta1_pow_acc_str,
param)
beta2_pow_acc = self._get_accumulator(self._beta2_pow_acc_str,
......@@ -691,13 +693,15 @@ class AdamaxOptimizer(Optimizer):
return adamax_op
def _finish_update(self, block, parameters):
def _finish_update(self, block, parameters_and_grads):
"""Update Beta1 Power accumulator
"""
assert isinstance(block, framework.Block)
main_block = block.program.global_block()
for param in parameters:
with param.block.program.optimized_guard(param):
for param, grad in parameters_and_grads:
if grad is None:
continue
with param.block.program.optimized_guard([param, grad]):
beta1_pow_acc = self._get_accumulator(self._beta1_pow_acc_str,
param)
main_block.append_op(
......@@ -1158,7 +1162,9 @@ class ModelAverage(Optimizer):
self.params_grads.append((param, grad))
for param, grad in self.params_grads:
with param.block.program.optimized_guard(param):
if grad is None:
continue
with param.block.program.optimized_guard([param, grad]):
self._append_average_accumulate_op(param)
self.apply_program = Program()
......
......@@ -41,12 +41,11 @@ def append_regularization_ops(parameters_and_grads, regularization=None):
"""
params_and_grads = []
for param, grad in parameters_and_grads:
with param.block.program.optimized_guard(param):
# If no gradient then we don't need to do anything
if grad is None:
params_and_grads.append((param, grad))
continue
# If no gradient then we don't need to do anything
if grad is None:
params_and_grads.append((param, grad))
continue
with param.block.program.optimized_guard([param, grad]):
regularization_term = None
if param.regularizer is not None:
# Add variable for regularization term in grad block
......
......@@ -35,7 +35,8 @@ class TestParallelExecutorBase(unittest.TestCase):
feed_dict=None,
seed=None,
use_parallel_executor=True,
balance_parameter_opt_between_cards=False):
use_reduce=False,
optimizer=fluid.optimizer.Adam):
def run_executor(exe, feed, fetch_list, program=None):
if isinstance(exe, fluid.ParallelExecutor):
res = exe.run(fetch_list=fetch_list, feed=feed)
......@@ -50,14 +51,19 @@ class TestParallelExecutorBase(unittest.TestCase):
main = fluid.Program()
startup = fluid.Program()
startup.random_seed = 1 # Fix random seed
main.random_seed = 1
with fluid.program_guard(main, startup):
if seed is not None:
startup.random_seed = seed
main.random_seed = seed
loss = method(use_feed=feed_dict is not None)
adam = fluid.optimizer.Adam()
adam.minimize(loss)
optimizer().minimize(loss)
if memory_opt:
fluid.memory_optimize(main)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
startup_exe = fluid.Executor(place)
startup_exe.run(startup)
......@@ -65,7 +71,8 @@ class TestParallelExecutorBase(unittest.TestCase):
exec_strategy.allow_op_delay = allow_op_delay
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce if balance_parameter_opt_between_cards else fluid.BuildStrategy.ReduceStrategy.AllReduce
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \
if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce
if use_parallel_executor:
exe = fluid.ParallelExecutor(
......
......@@ -101,9 +101,7 @@ class TestMNIST(TestParallelExecutorBase):
fluid.recordio_writer.convert_reader_to_recordio_file(
MNIST_RECORDIO_FILE, reader, feeder)
def check_simple_fc_convergence(self,
balance_parameter_opt_between_cards,
use_cuda=True):
def check_simple_fc_convergence(self, use_cuda, use_reduce=False):
self.check_network_convergence(simple_fc_net, use_cuda=use_cuda)
self.check_network_convergence(
simple_fc_net, use_cuda=use_cuda, allow_op_delay=True)
......@@ -115,20 +113,19 @@ class TestMNIST(TestParallelExecutorBase):
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
use_reduce=use_reduce)
def test_simple_fc(self):
self.check_simple_fc_convergence(False, use_cuda=True)
self.check_simple_fc_convergence(False, use_cuda=False)
# use_cuda
self.check_simple_fc_convergence(True)
self.check_simple_fc_convergence(False)
def test_simple_fc_with_new_strategy(self):
self.check_simple_fc_convergence(True, use_cuda=True)
self.check_simple_fc_convergence(True, use_cuda=False)
# use_cuda, use_reduce
self.check_simple_fc_convergence(True, True)
self.check_simple_fc_convergence(False, True)
def check_simple_fc_parallel_accuracy(self,
balance_parameter_opt_between_cards,
use_cuda=True):
def check_simple_fc_parallel_accuracy(self, use_cuda, use_reduce=False):
img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64')
single_first_loss, single_last_loss = self.check_network_convergence(
......@@ -145,8 +142,7 @@ class TestMNIST(TestParallelExecutorBase):
"label": label},
use_cuda=use_cuda,
use_parallel_executor=True,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
use_reduce=use_reduce)
for p_f in parallel_first_loss:
self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6)
......@@ -154,15 +150,15 @@ class TestMNIST(TestParallelExecutorBase):
self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)
def test_simple_fc_parallel_accuracy(self):
self.check_simple_fc_parallel_accuracy(False, use_cuda=True)
self.check_simple_fc_parallel_accuracy(False, use_cuda=False)
self.check_simple_fc_parallel_accuracy(True)
self.check_simple_fc_parallel_accuracy(False)
def test_simple_fc_parallel_accuracy_with_new_strategy(self):
self.check_simple_fc_parallel_accuracy(True, use_cuda=True)
self.check_simple_fc_parallel_accuracy(True, use_cuda=False)
# use_cuda, use_reduce
self.check_simple_fc_parallel_accuracy(True, True)
self.check_simple_fc_parallel_accuracy(False, True)
def check_batchnorm_fc_convergence(
self, balance_parameter_opt_between_cards, use_cuda):
def check_batchnorm_fc_convergence(self, use_cuda, use_reduce=False):
self.check_network_convergence(fc_with_batchnorm, use_cuda=use_cuda)
img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64')
......@@ -171,16 +167,16 @@ class TestMNIST(TestParallelExecutorBase):
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
use_reduce=use_reduce)
def test_batchnorm_fc(self):
self.check_batchnorm_fc_convergence(False, use_cuda=True)
self.check_batchnorm_fc_convergence(False, use_cuda=False)
self.check_batchnorm_fc_convergence(True)
self.check_batchnorm_fc_convergence(False)
def test_batchnorm_fc_with_new_strategy(self):
self.check_batchnorm_fc_convergence(True, use_cuda=True)
self.check_batchnorm_fc_convergence(True, use_cuda=False)
# use_cuda, use_reduce
self.check_batchnorm_fc_convergence(True, True)
self.check_batchnorm_fc_convergence(False, True)
if __name__ == '__main__':
......
......@@ -13,8 +13,12 @@
# limitations under the License.
import paddle.fluid as fluid
import paddle.fluid.layers.ops as ops
from paddle.fluid.initializer import init_on_cpu
from paddle.fluid.layers.learning_rate_scheduler import _decay_step_counter
from parallel_executor_test_base import TestParallelExecutorBase
import unittest
import math
import os
......@@ -131,30 +135,71 @@ def SE_ResNeXt50Small(batch_size=2, use_feed=False):
class TestResnet(TestParallelExecutorBase):
def check_resnet_convergence(self,
balance_parameter_opt_between_cards,
use_cuda=True,
iter=20):
def check_resnet_convergence_with_learning_rate_decay(self,
use_cuda=True,
use_reduce=False,
iter=20):
os.environ['CPU_NUM'] = str(4)
def _cosine_decay(learning_rate, step_each_epoch, epochs=120):
"""
Applies cosine decay to the learning rate.
lr = 0.05 * (math.cos(epoch * (math.pi / 120)) + 1)
"""
global_step = _decay_step_counter()
with init_on_cpu():
epoch = ops.floor(global_step / step_each_epoch)
decayed_lr = learning_rate * \
(ops.cos(epoch * (math.pi / epochs)) + 1)/2
return decayed_lr
def _optimizer(learning_rate=0.01):
optimizer = fluid.optimizer.Momentum(
learning_rate=_cosine_decay(
learning_rate=learning_rate, step_each_epoch=2, epochs=1),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
return optimizer
import functools
batch_size = 2
self.check_network_convergence(
single_first_loss, single_last_loss = self.check_network_convergence(
functools.partial(
SE_ResNeXt50Small, batch_size=batch_size),
iter=iter,
batch_size=batch_size,
use_cuda=use_cuda,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
def test_resnet(self):
self.check_resnet_convergence(False, use_cuda=True)
self.check_resnet_convergence(False, use_cuda=False, iter=5)
use_reduce=use_reduce,
optimizer=_optimizer,
use_parallel_executor=False)
def test_resnet_with_new_strategy(self):
self.check_resnet_convergence(True, use_cuda=True)
self.check_resnet_convergence(True, use_cuda=False, iter=5)
parallel_first_loss, parallel_last_loss = self.check_network_convergence(
functools.partial(
SE_ResNeXt50Small, batch_size=batch_size),
iter=iter,
batch_size=batch_size,
use_cuda=use_cuda,
use_reduce=use_reduce,
optimizer=_optimizer)
for p_f in parallel_first_loss:
self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6)
for p_l in parallel_last_loss:
self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)
def test_seresnext_with_learning_rate_decay(self):
self.check_resnet_convergence_with_learning_rate_decay(True, False)
self.check_resnet_convergence_with_learning_rate_decay(
False, False, iter=5)
def test_seresnext_with_new_strategy_with_learning_rate_decay(self):
self.check_resnet_convergence_with_learning_rate_decay(True, True)
self.check_resnet_convergence_with_learning_rate_decay(
False, True, iter=5)
if __name__ == '__main__':
......
......@@ -31,6 +31,7 @@ Steps to transpile pserver:
from __future__ import print_function
import math
import random
import numpy as np
from ps_dispatcher import RoundRobin, HashName, PSDispatcher
......@@ -197,7 +198,8 @@ class DistributeTranspiler(object):
# shuffle the map will avoid the uneven distribution above
grad_var_mapping_items = self.grad_var_mapping.items()
if not slice_var_up:
np.random.shuffle(grad_var_mapping_items)
random.seed(self.trainer_num)
random.shuffle(grad_var_mapping_items)
for orig_varname, splited_vars in grad_var_mapping_items:
eplist = ps_dispatcher.dispatch(splited_vars)
......
......@@ -66,9 +66,9 @@ An example implementation for multiple item data reader creator:
TODO(yuyang18): Should we add whole design doc here?
"""
import decorator
from decorator import *
import paddle.reader.decorator
from paddle.reader.decorator import *
import creator
import paddle.reader.creator
__all__ = decorator.__all__ + ['creator']
......@@ -20,7 +20,7 @@ __all__ = [
from threading import Thread
import subprocess
from Queue import Queue
from six.moves.queue import Queue
import itertools
import random
import zlib
......
......@@ -8,4 +8,4 @@ scipy>=0.19.0
Pillow
nltk>=3.2.2
graphviz
LinkChecker
six
......@@ -17,7 +17,8 @@ def git_commit():
git_commit = subprocess.Popen(cmd, stdout = subprocess.PIPE).communicate()[0].strip()
except:
git_commit = 'Unknown'
return git_commit
git_commit = git_commit.decode()
return str(git_commit)
def _get_version_detail(idx):
assert idx < 3, "vesion info consists of %(major)d.%(minor)d.%(patch)d, \
......@@ -44,6 +45,7 @@ def is_taged():
try:
cmd = ['git', 'describe', '--exact-match', '--tags', 'HEAD', '2>/dev/null']
git_tag = subprocess.Popen(cmd, stdout = subprocess.PIPE).communicate()[0].strip()
git_tag = git_tag.decode()
except:
return False
......@@ -67,13 +69,13 @@ with_mkl = '%(with_mkl)s'
def show():
if istaged:
print 'full_version:', full_version
print 'major:', major
print 'minor:', minor
print 'patch:', patch
print 'rc:', rc
print('full_version:', full_version)
print('major:', major)
print('minor:', minor)
print('patch:', patch)
print('rc:', rc)
else:
print 'commit:', commit
print('commit:', commit)
def mkl():
return with_mkl
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册