提交 b858c103 编写于 作者: W weixing02

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into data_reader

# A image for building paddle binaries
# Use cuda devel base image for both cpu and gpu environment
FROM nvidia/cuda:8.0-cudnn5-devel-ubuntu16.04
FROM nvidia/cuda:8.0-cudnn7-devel-ubuntu16.04
MAINTAINER PaddlePaddle Authors <paddle-dev@baidu.com>
ARG UBUNTU_MIRROR
......
......@@ -62,29 +62,33 @@ endif()
## Then find the reference-cblas. www.netlib.org/blas/
set(REFERENCE_CBLAS_ROOT $ENV{REFERENCE_CBLAS_ROOT} CACHE PATH
"Folder contains reference-cblas")
set(REFERENCE_CBLAS_INCLUDE_SEARCH_PATHS
if(NOT CMAKE_CROSSCOMPILING)
set(REFERENCE_CBLAS_INCLUDE_SEARCH_PATHS
${REFERENCE_CBLAS_ROOT}/include
/usr/include
/usr/include/cblas
)
)
set(REFERENCE_CBLAS_LIB_SEARCH_PATHS
set(REFERENCE_CBLAS_LIB_SEARCH_PATHS
${REFERENCE_CBLAS_ROOT}/lib
/usr/lib
/usr/lib/blas/reference/
/usr/lib/reference/
)
)
else()
# Disable the finding of reference cblas under host's system path
set(REFERENCE_CBLAS_INCLUDE_SEARCH_PATHS ${REFERENCE_CBLAS_ROOT}/include)
set(REFERENCE_CBLAS_LIB_SEARCH_PATHS ${REFERENCE_CBLAS_ROOT}/lib)
endif()
find_path(REFERENCE_CBLAS_INCLUDE_DIR NAMES cblas.h PATHS
${REFERENCE_CBLAS_INCLUDE_SEARCH_PATHS})
find_library(REFERENCE_CBLAS_LIBRARY NAMES cblas PATHS
${REFERENCE_CBLAS_LIB_SEARCH_PATHS})
if (REFERENCE_CBLAS_INCLUDE_DIR AND REFERENCE_CBLAS_LIBRARY)
if(REFERENCE_CBLAS_INCLUDE_DIR AND REFERENCE_CBLAS_LIBRARY)
set(CBLAS_FOUND ON)
set(CBLAS_PROVIDER REFERENCE)
set(CBLAS_INC_DIR ${REFERENCE_CBLAS_INCLUDE_DIR})
......
......@@ -24,16 +24,16 @@ SET(GRPC_INSTALL_DIR ${THIRD_PARTY_PATH}/install/grpc)
SET(GRPC_INCLUDE_DIR "${GRPC_INSTALL_DIR}/include/" CACHE PATH "grpc include directory." FORCE)
SET(GRPC_CPP_PLUGIN "${GRPC_INSTALL_DIR}/bin/grpc_cpp_plugin" CACHE FILEPATH "GRPC_CPP_PLUGIN" FORCE)
IF(APPLE)
SET(BUILD_CMD make -n HAS_SYSTEM_PROTOBUF=false -s -j8 static grpc_cpp_plugin | sed "s/-Werror//g" | sh)
SET(BUILD_CMD make -n HAS_SYSTEM_PROTOBUF=false -s -j static grpc_cpp_plugin | sed "s/-Werror//g" | sh)
ELSE()
SET(BUILD_CMD make HAS_SYSTEM_PROTOBUF=false -s -j8 static grpc_cpp_plugin)
SET(BUILD_CMD make HAS_SYSTEM_PROTOBUF=false -s -j static grpc_cpp_plugin)
ENDIF()
ExternalProject_Add(
extern_grpc
DEPENDS protobuf zlib
GIT_REPOSITORY "https://github.com/grpc/grpc.git"
GIT_TAG "v1.8.x"
GIT_TAG "v1.11.x"
PREFIX ${GRPC_SOURCES_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
......
......@@ -11,19 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
IF(MOBILE_INFERENCE)
if(MOBILE_INFERENCE OR RPI)
return()
ENDIF()
endif()
include (ExternalProject)
# NOTE: snappy is needed when linking with recordio
SET(SNAPPY_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy)
SET(SNAPPY_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy)
SET(SNAPPY_INCLUDE_DIR "${SNAPPY_INSTALL_DIR}/include/" CACHE PATH "snappy include directory." FORCE)
set(SNAPPY_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy)
set(SNAPPY_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy)
set(SNAPPY_INCLUDE_DIR "${SNAPPY_INSTALL_DIR}/include" CACHE PATH "snappy include directory." FORCE)
set(SNAPPY_LIBRARIES "${SNAPPY_INSTALL_DIR}/lib/libsnappy.a")
ExternalProject_Add(
extern_snappy
......@@ -51,8 +52,7 @@ ExternalProject_Add(
)
add_library(snappy STATIC IMPORTED GLOBAL)
set_property(TARGET snappy PROPERTY IMPORTED_LOCATION
"${SNAPPY_INSTALL_DIR}/lib/libsnappy.a")
set_property(TARGET snappy PROPERTY IMPORTED_LOCATION ${SNAPPY_LIBRARIES})
include_directories(${SNAPPY_INCLUDE_DIR})
add_dependencies(snappy extern_snappy)
......@@ -11,9 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
IF(MOBILE_INFERENCE)
IF(MOBILE_INFERENCE OR RPI)
return()
ENDIF()
......@@ -21,9 +20,11 @@ include (ExternalProject)
# NOTE: snappy is needed when linking with recordio
SET(SNAPPYSTREAM_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy_stream)
SET(SNAPPYSTREAM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy_stream)
SET(SNAPPYSTREAM_INCLUDE_DIR "${SNAPPYSTREAM_INSTALL_DIR}/include/" CACHE PATH "snappy stream include directory." FORCE)
set(SNAPPYSTREAM_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy_stream)
set(SNAPPYSTREAM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy_stream)
set(SNAPPYSTREAM_INCLUDE_DIR "${SNAPPYSTREAM_INSTALL_DIR}/include" CACHE PATH "snappy stream include directory." FORCE)
set(SNAPPYSTREAM_LIBRARIES "${SNAPPYSTREAM_INSTALL_DIR}/lib/libsnappystream.a")
ExternalProject_Add(
extern_snappystream
......@@ -51,8 +52,7 @@ ExternalProject_Add(
)
add_library(snappystream STATIC IMPORTED GLOBAL)
set_property(TARGET snappystream PROPERTY IMPORTED_LOCATION
"${SNAPPYSTREAM_INSTALL_DIR}/lib/libsnappystream.a")
set_property(TARGET snappystream PROPERTY IMPORTED_LOCATION ${SNAPPYSTREAM_LIBRARIES})
include_directories(${SNAPPYSTREAM_INCLUDE_DIR}) # For snappysteam to include its own headers.
include_directories(${THIRD_PARTY_PATH}/install) # For Paddle to include snappy stream headers.
......
......@@ -195,14 +195,7 @@ function(cc_library TARGET_NAME)
list(REMOVE_ITEM cc_library_DEPS warpctc)
add_dependencies(${TARGET_NAME} warpctc)
endif()
if("${cc_library_DEPS}" MATCHES "ARCHIVE_START")
# Support linking flags: --whole-archive (Linux) / -force_load (MacOS).
# WARNING: Please don't use ARCHIVE_START&ARCHIVE_END if TARGET_NAME will be linked by other libraries.
target_circle_link_libraries(${TARGET_NAME} ${cc_library_DEPS})
list(REMOVE_ITEM cc_library_DEPS ARCHIVE_START ARCHIVE_END)
else()
target_link_libraries(${TARGET_NAME} ${cc_library_DEPS})
endif()
add_dependencies(${TARGET_NAME} ${cc_library_DEPS})
endif()
......@@ -243,11 +236,7 @@ function(cc_test TARGET_NAME)
set(multiValueArgs SRCS DEPS ARGS)
cmake_parse_arguments(cc_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
add_executable(${TARGET_NAME} ${cc_test_SRCS})
# Support linking flags: --whole-archive (Linux) / -force_load (MacOS)
target_circle_link_libraries(${TARGET_NAME} ${cc_test_DEPS} paddle_gtest_main memory gtest gflags glog)
if("${cc_test_DEPS}" MATCHES "ARCHIVE_START")
list(REMOVE_ITEM cc_test_DEPS ARCHIVE_START ARCHIVE_END)
endif()
target_link_libraries(${TARGET_NAME} ${cc_test_DEPS} paddle_gtest_main memory gtest gflags glog)
add_dependencies(${TARGET_NAME} ${cc_test_DEPS} paddle_gtest_main memory gtest gflags glog)
add_test(NAME ${TARGET_NAME}
COMMAND ${TARGET_NAME} ${cc_test_ARGS}
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set_property(GLOBAL PROPERTY FLUID_MODULES "")
# find all fluid modules is used for paddle fluid static library
function(find_fluid_modules TARGET_NAME)
get_filename_component(__target_path ${TARGET_NAME} ABSOLUTE)
string(REGEX REPLACE "^${PADDLE_SOURCE_DIR}/" "" __target_path ${__target_path})
string(FIND "${__target_path}" "fluid" pos)
if(pos GREATER 1)
get_property(fluid_modules GLOBAL PROPERTY FLUID_MODULES)
......@@ -77,6 +92,23 @@ elseif (WITH_MKLML)
)
endif()
if(NOT MOBILE_INFERENCE AND NOT RPI)
set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/install/snappy")
copy(snappy_lib
SRCS ${SNAPPY_INCLUDE_DIR} ${SNAPPY_LIBRARIES}
DSTS ${dst_dir} ${dst_dir}/lib)
set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/install/snappystream")
copy(snappystream_lib
SRCS ${SNAPPYSTREAM_INCLUDE_DIR} ${SNAPPYSTREAM_LIBRARIES}
DSTS ${dst_dir} ${dst_dir}/lib)
set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/install/zlib")
copy(zlib_lib
SRCS ${ZLIB_INCLUDE_DIR} ${ZLIB_LIBRARIES}
DSTS ${dst_dir} ${dst_dir}/lib)
endif()
# paddle fluid module
set(src_dir "${PADDLE_SOURCE_DIR}/paddle/fluid")
set(dst_dir "${CMAKE_INSTALL_PREFIX}/paddle/fluid")
......
......@@ -119,7 +119,7 @@ An actual Fluid example is described [here](https://github.com/PaddlePaddle/Pad
From the example, the Fluid programs look very similar to their PyTorch equivalent programs, except that Fluid's loop structure, wrapped with Python's `with` statement, could run much faster than just a Python loop.
We have more examples of the [`if-then-else`](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/if_else_op.md) structure of Fluid.
We have more examples of the [`if-then-else`](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/fluid/design/execution/if_else_op.md) structure of Fluid.
## Turing Completeness
......
......@@ -24,6 +24,6 @@ if(NOT WITH_FLUID_ONLY)
endif()
add_subdirectory(testing)
if(NOT MOBILE_INFERENCE AND NOT ANDROID AND NOT IOS)
if(NOT MOBILE_INFERENCE AND NOT RPI)
add_subdirectory(fluid)
endif()
......@@ -3,6 +3,7 @@ add_subdirectory(platform)
add_subdirectory(framework)
add_subdirectory(operators)
add_subdirectory(pybind)
add_subdirectory(inference)
add_subdirectory(string)
add_subdirectory(recordio)
# NOTE: please add subdirectory inference at last.
add_subdirectory(inference)
......@@ -92,7 +92,7 @@ class BlockDesc {
/*
* Remove Op and its input/output variables.
* Note that for either input or ouput variable, if it is also an input or
* Note that for either input or output variable, if it is also an input or
* output variable of other ops, we should remain it.
*/
void RemoveOp(size_t s, size_t e);
......
......@@ -14,6 +14,8 @@
#include "paddle/fluid/framework/details/computation_op_handle.h"
#include <string>
namespace paddle {
namespace framework {
namespace details {
......@@ -33,7 +35,7 @@ void ComputationOpHandle::RunImpl() {
}
}
op_->Run(*scope_->FindVar("@TMP_SCOPE@")->Get<Scope *>(), place_);
op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(), place_);
}
std::string ComputationOpHandle::Name() const { return op_->Type(); }
......
......@@ -14,6 +14,9 @@
#include "paddle/fluid/framework/details/fetch_op_handle.h"
#include <string>
#include <vector>
namespace paddle {
namespace framework {
namespace details {
......@@ -57,7 +60,10 @@ void FetchOpHandle::RunImpl() {
for (size_t i = 0; i < scopes.size(); ++i) {
auto &scope = scopes[i];
auto &t = scope->FindVar(var_name)->Get<framework::LoDTensor>();
auto &t = scope->FindVar(kLocalExecScopeName)
->Get<Scope *>()
->FindVar(var_name)
->Get<framework::LoDTensor>();
if (platform::is_gpu_place(var->place_)) {
#ifdef PADDLE_WITH_CUDA
TensorCopy(t, cpu, *dev_ctxes_[t.place()], &tensors_[i]);
......
......@@ -24,6 +24,8 @@ namespace paddle {
namespace framework {
namespace details {
constexpr char kLocalExecScopeName[] = "@LCOAL_SCOPE@";
class OpHandleBase {
private:
DISABLE_COPY_AND_ASSIGN(OpHandleBase);
......
......@@ -15,13 +15,15 @@
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/ssa_graph.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
namespace paddle {
namespace framework {
namespace details {
class SSAGraphExecutor {
DISABLE_COPY_AND_ASSIGN(SSAGraphExecutor);
......
......@@ -136,12 +136,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
ready_ops.clear();
};
// Create local scopes.
for (auto &scope : local_scopes_) {
auto &local_scope = scope->NewScope();
*scope->Var("@TMP_SCOPE@")->GetMutable<Scope *>() = &local_scope;
}
// Step 3. Execution
while (!pending_vars.empty() || !ready_ops.empty() || !delayed_ops.empty()) {
// 1. Run All Ready ops
......@@ -189,34 +183,10 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
PADDLE_ENFORCE(ready_ops.empty());
PADDLE_ENFORCE(delayed_ops.empty());
PADDLE_ENFORCE(blocked_by_delayed_ops.empty());
++computation_count_;
auto sync_computation = [&] {
computation_count_ = 0;
// Wait All computational streams
for (auto p : this->places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
for (auto &scope : local_scopes_) {
scope->DropKids();
}
};
// Wait FetchOps.
if (!fetch_ops.empty()) {
fetch_ops.clear();
sync_computation();
}
if (computation_count_ == max_async_computation) {
sync_computation();
}
// NOTE: the temp scope can be dropped lazily if needed.
// Drop tmp scopes;
for (auto &scope : local_scopes_) {
auto &kid = *scope->Var("@TMP_SCOPE@")->GetMutable<Scope *>();
kid = nullptr;
}
return fetch_data;
......
......@@ -99,9 +99,6 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::unique_ptr<platform::EnforceNotMet> exception_;
std::atomic<int> running_ops_;
bool allow_op_delay_;
size_t computation_count_{0};
size_t max_async_computation{100};
};
} // namespace details
......
......@@ -83,8 +83,8 @@ static void CheckTensorNANOrInf(const std::string& name,
if (tensor.memory_size() == 0) {
return;
}
if (tensor.type().hash_code() != typeid(float).hash_code() &&
tensor.type().hash_code() != typeid(double).hash_code()) {
if (tensor.type().hash_code() != typeid(float).hash_code() && // NOLINT
tensor.type().hash_code() != typeid(double).hash_code()) { // NOLINT
return;
}
PADDLE_ENFORCE(!framework::TensorContainsInf(tensor),
......@@ -145,12 +145,13 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
// Return true if the block has feed operators and holder of matching info.
static bool has_feed_operators(
const BlockDesc& block,
std::map<std::string, const LoDTensor*>& feed_targets,
const std::map<std::string, const LoDTensor*>& feed_targets,
const std::string& feed_holder_name) {
size_t feed_count = 0;
for (auto* op : block.AllOps()) {
if (op->Type() == kFeedOpType) {
feed_count++;
// The input variable's name of feed_op should be feed_holder_name.
PADDLE_ENFORCE_EQ(op->Input("X")[0], feed_holder_name,
"Input to feed op should be '%s'", feed_holder_name);
std::string feed_target_name = op->Output("Out")[0];
......@@ -166,7 +167,8 @@ static bool has_feed_operators(
feed_count, feed_targets.size(),
"The number of feed operators should match 'feed_targets'");
// When feed operator are present, so should be feed_holder
if (!feed_holder_name.empty()) {
// When feed operator are present, so should be feed_holder.
auto var = block.FindVar(feed_holder_name);
PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable",
feed_holder_name);
......@@ -174,6 +176,7 @@ static bool has_feed_operators(
"'%s' variable should be 'FEED_MINIBATCH' type",
feed_holder_name);
}
}
return feed_count > 0;
}
......@@ -185,12 +188,14 @@ static bool has_feed_operators(
// and fetch_holder_name. Raise exception when any mismatch is found.
// Return true if the block has fetch operators and holder of matching info.
static bool has_fetch_operators(
const BlockDesc& block, std::map<std::string, LoDTensor*>& fetch_targets,
const BlockDesc& block,
const std::map<std::string, LoDTensor*>& fetch_targets,
const std::string& fetch_holder_name) {
size_t fetch_count = 0;
for (auto* op : block.AllOps()) {
if (op->Type() == kFetchOpType) {
fetch_count++;
// The output variable's name of fetch_op should be fetch_holder_name.
PADDLE_ENFORCE_EQ(op->Output("Out")[0], fetch_holder_name,
"Output of fetch op should be '%s'", fetch_holder_name);
std::string fetch_target_name = op->Input("X")[0];
......@@ -206,7 +211,8 @@ static bool has_fetch_operators(
fetch_count, fetch_targets.size(),
"The number of fetch operators should match 'fetch_targets'");
// When fetch operator are present, so should be fetch_holder
if (!fetch_holder_name.empty()) {
// When fetch operator are present, so should be fetch_holder.
auto var = block.FindVar(fetch_holder_name);
PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable",
fetch_holder_name);
......@@ -214,6 +220,7 @@ static bool has_fetch_operators(
"'%s' variable should be 'FETCH_LIST' type",
fetch_holder_name);
}
}
return fetch_count > 0;
}
......@@ -259,16 +266,6 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
}
}
// map the data of feed_targets to feed_holder
for (auto* op : global_block->AllOps()) {
if (op->Type() == kFeedOpType) {
std::string feed_target_name = op->Output("Out")[0];
int idx = boost::get<int>(op->GetAttr("col"));
SetFeedVariable(scope, *feed_targets[feed_target_name], feed_holder_name,
idx);
}
}
if (!has_fetch_ops) {
// create fetch_holder variable
auto* fetch_holder = global_block->Var(fetch_holder_name);
......@@ -292,17 +289,9 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
}
}
Run(*copy_program, scope, 0, create_vars, create_vars);
// obtain the data of fetch_targets from fetch_holder
for (auto* op : global_block->AllOps()) {
if (op->Type() == kFetchOpType) {
std::string fetch_target_name = op->Input("X")[0];
int idx = boost::get<int>(op->GetAttr("col"));
*fetch_targets[fetch_target_name] =
GetFetchVariable(*scope, fetch_holder_name, idx);
}
}
auto ctx = Prepare(*copy_program, 0);
RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, create_vars,
feed_holder_name, fetch_holder_name);
}
std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
......@@ -370,5 +359,42 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
}
}
void Executor::RunPreparedContext(
ExecutorPrepareContext* ctx, Scope* scope,
std::map<std::string, const LoDTensor*>& feed_targets,
std::map<std::string, LoDTensor*>& fetch_targets, bool create_vars,
const std::string& feed_holder_name, const std::string& fetch_holder_name) {
auto& global_block = ctx->prog_.Block(ctx->block_id_);
PADDLE_ENFORCE(
has_feed_operators(global_block, feed_targets, feed_holder_name),
"Program in ExecutorPrepareContext should has feed_ops.");
PADDLE_ENFORCE(
has_fetch_operators(global_block, fetch_targets, fetch_holder_name),
"Program in the prepared context should has fetch_ops.");
// map the data of feed_targets to feed_holder
for (auto* op : global_block.AllOps()) {
if (op->Type() == kFeedOpType) {
std::string feed_target_name = op->Output("Out")[0];
int idx = boost::get<int>(op->GetAttr("col"));
SetFeedVariable(scope, *feed_targets[feed_target_name], feed_holder_name,
idx);
}
}
RunPreparedContext(ctx, scope, create_vars, create_vars);
// obtain the data of fetch_targets from fetch_holder
for (auto* op : global_block.AllOps()) {
if (op->Type() == kFetchOpType) {
std::string fetch_target_name = op->Input("X")[0];
int idx = boost::get<int>(op->GetAttr("col"));
*fetch_targets[fetch_target_name] =
GetFetchVariable(*scope, fetch_holder_name, idx);
}
}
}
} // namespace framework
} // namespace paddle
......@@ -14,6 +14,9 @@ limitations under the License. */
#pragma once
#include <map>
#include <string>
#include <vector>
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
......@@ -70,6 +73,13 @@ class Executor {
bool create_local_scope = true,
bool create_vars = true);
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
std::map<std::string, const LoDTensor*>& feed_targets,
std::map<std::string, LoDTensor*>& fetch_targets,
bool create_vars = true,
const std::string& feed_holder_name = "feed",
const std::string& fetch_holder_name = "fetch");
private:
const platform::Place place_;
};
......
......@@ -46,7 +46,8 @@ proto::VarType::Type GetDataTypeOfVar(const Variable* var) {
}
}
static DDim GetDims(const Scope& scope, const std::string& name) {
static DDim GetDims(const Scope& scope, const std::string& name,
bool get_actual_dim = false) {
Variable* var = scope.FindVar(name);
if (var == nullptr) {
return DDim({-1});
......@@ -55,7 +56,11 @@ static DDim GetDims(const Scope& scope, const std::string& name) {
if (var->IsType<LoDTensor>()) {
return var->Get<LoDTensor>().dims();
} else if (var->IsType<SelectedRows>()) {
if (get_actual_dim) {
return var->Get<SelectedRows>().value().dims();
} else {
return var->Get<SelectedRows>().GetCompleteDims();
}
} else {
return DDim({-1});
}
......@@ -129,7 +134,7 @@ std::string OperatorBase::DebugStringEx(const Scope* scope) const {
for (size_t i = 0; i < input.second.size(); ++i) {
ss << input.second[i];
if (scope) {
ss << "[" << GetDims(*scope, input.second[i]) << "]";
ss << "[" << GetDims(*scope, input.second[i], true) << "]";
ss << "(" << GetLoD(*scope, input.second[i]) << ")";
}
if (i != input.second.size() - 1) {
......@@ -149,7 +154,7 @@ std::string OperatorBase::DebugStringEx(const Scope* scope) const {
for (size_t i = 0; i < output.second.size(); ++i) {
ss << output.second[i];
if (scope) {
ss << "[" << GetDims(*scope, output.second[i]) << "]";
ss << "[" << GetDims(*scope, output.second[i], true) << "]";
ss << "(" << GetLoD(*scope, output.second[i]) << ")";
}
if (i != output.second.size() - 1) {
......
......@@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/framework/parallel_executor.h"
#include <string>
#include <tuple>
#include <vector>
#ifdef PADDLE_WITH_CUDA
......@@ -41,6 +42,8 @@ class ParallelExecutorPrivate {
#ifdef PADDLE_WITH_CUDA
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
#endif
std::vector<std::tuple<std::string, proto::VarType::Type, bool>> var_types_;
};
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
......@@ -97,14 +100,9 @@ ParallelExecutor::ParallelExecutor(
allow_op_delay));
// Step 3. Create vars in each scope;
for (auto *scope : member_->local_scopes_) {
for (auto *var : main_program.Block(0).AllVars()) {
if (scope->FindVar(var->Name()) != nullptr) {
continue;
}
InitializeVariable(scope->Var(var->Name()), var->GetType());
}
member_->var_types_.emplace_back(var->Name(), var->GetType(),
var->Persistable());
}
}
......@@ -163,9 +161,42 @@ void ParallelExecutor::Run(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
platform::RecordBlock b(0);
SplitTensorToPlaces(feed_tensors);
// Create local scopes.
for (auto &scope : member_->local_scopes_) {
Scope &local_scope = scope->NewScope();
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>() =
&local_scope;
for (auto &name_type_pair : member_->var_types_) {
if (scope->FindVar(std::get<0>(name_type_pair)) != nullptr) {
continue;
}
if (std::get<2>(name_type_pair)) { // Persistable
InitializeVariable(scope->Var(std::get<0>(name_type_pair)),
std::get<1>(name_type_pair));
} else {
InitializeVariable(scope->Var(std::get<0>(name_type_pair)),
std::get<1>(name_type_pair));
}
}
}
auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
fetch_data;
// Wait All computational streams
for (auto p : member_->places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
for (auto &scope : member_->local_scopes_) {
auto &local_scope =
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
scope->DeleteScope(local_scope);
local_scope = nullptr;
}
}
void ParallelExecutor::SplitTensorToPlaces(
......
......@@ -14,8 +14,12 @@
#include "paddle/fluid/framework/threadpool.h"
#include "gflags/gflags.h"
#include "paddle/fluid/platform/enforce.h"
DEFINE_int32(io_threadpool_size, 100,
"number of threads used for doing IO, default 100");
namespace paddle {
namespace framework {
......@@ -91,5 +95,20 @@ void ThreadPool::TaskLoop() {
}
}
std::unique_ptr<ThreadPool> ThreadPoolIO::io_threadpool_(nullptr);
std::once_flag ThreadPoolIO::io_init_flag_;
ThreadPool* ThreadPoolIO::GetInstanceIO() {
std::call_once(io_init_flag_, &ThreadPoolIO::InitIO);
return io_threadpool_.get();
}
void ThreadPoolIO::InitIO() {
if (io_threadpool_.get() == nullptr) {
// TODO(typhoonzero1986): make this configurable
io_threadpool_.reset(new ThreadPool(FLAGS_io_threadpool_size));
}
}
} // namespace framework
} // namespace paddle
......@@ -14,12 +14,12 @@ limitations under the License. */
#pragma once
#include <condition_variable>
#include <condition_variable> // NOLINT
#include <functional>
#include <future>
#include <mutex>
#include <future> // NOLINT
#include <mutex> // NOLINT
#include <queue>
#include <thread>
#include <thread> // NOLINT
#include <vector>
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"
......@@ -28,6 +28,22 @@ limitations under the License. */
namespace paddle {
namespace framework {
struct ExceptionHandler {
mutable std::future<std::unique_ptr<platform::EnforceNotMet>> future_;
explicit ExceptionHandler(
std::future<std::unique_ptr<platform::EnforceNotMet>>&& f)
: future_(std::move(f)) {}
void operator()() const {
auto ex = this->future_.get();
if (ex != nullptr) {
LOG(FATAL) << "The exception is thrown inside the thread pool. You "
"should use RunAndGetException to handle the exception.\n"
"The default exception handler is LOG(FATAL)."
<< ex->what();
}
}
};
// ThreadPool maintains a queue of tasks, and runs them using a fixed
// number of threads.
class ThreadPool {
......@@ -87,22 +103,6 @@ class ThreadPool {
void Wait();
private:
struct ExceptionHandler {
mutable std::future<std::unique_ptr<platform::EnforceNotMet>> future_;
explicit ExceptionHandler(
std::future<std::unique_ptr<platform::EnforceNotMet>>&& f)
: future_(std::move(f)) {}
void operator()() const {
auto ex = this->future_.get();
if (ex != nullptr) {
LOG(FATAL) << "The exception is thrown inside the thread pool. You "
"should use RunAndGetException to handle the exception.\n"
"The default exception handler is LOG(FATAL)."
<< ex->what();
}
}
};
DISABLE_COPY_AND_ASSIGN(ThreadPool);
// If the task queue is empty and avaialbe is equal to the number of
......@@ -135,6 +135,17 @@ class ThreadPool {
std::condition_variable completed_;
};
class ThreadPoolIO : ThreadPool {
public:
static ThreadPool* GetInstanceIO();
static void InitIO();
private:
// NOTE: threadpool in base will be inhereted here.
static std::unique_ptr<ThreadPool> io_threadpool_;
static std::once_flag io_init_flag_;
};
// Run a function asynchronously.
// NOTE: The function must return void. If the function need to return a value,
// you can use lambda to capture a value pointer.
......@@ -143,5 +154,10 @@ std::future<void> Async(Callback callback) {
return ThreadPool::GetInstance()->Run(callback);
}
template <typename Callback>
std::future<void> AsyncIO(Callback callback) {
return ThreadPoolIO::GetInstanceIO()->Run(callback);
}
} // namespace framework
} // namespace paddle
set(FLUID_CORE_MODULES proto_desc memory lod_tensor executor prune init)
set(FLUID_CORE_MODULES proto_desc memory lod_tensor executor init)
cc_library(paddle_fluid_api
SRCS io.cc
......@@ -11,7 +11,7 @@ cc_library(paddle_fluid DEPS ${fluid_modules})
# Create shared library
cc_library(paddle_fluid_shared SHARED
SRCS io.cc
DEPS ARCHIVE_START ${GLOB_OP_LIB} ${FLUID_CORE_MODULES} ARCHIVE_END)
DEPS ${fluid_modules})
set_target_properties(paddle_fluid_shared PROPERTIES OUTPUT_NAME paddle_fluid)
if(NOT APPLE)
# TODO(liuyiqun): Temporarily disable the link flag because it is not support on Mac.
......
......@@ -17,10 +17,16 @@ limitations under the License. */
#include <fstream>
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/pybind/pybind.h"
namespace paddle {
namespace inference {
// Temporarily add this function for exposing framework::InitDevices() when
// linking the inference shared library.
void Init(bool init_p2p) { framework::InitDevices(init_p2p); }
void ReadBinaryFile(const std::string& filename, std::string& contents) {
std::ifstream fin(filename, std::ios::in | std::ios::binary);
PADDLE_ENFORCE(static_cast<bool>(fin), "Cannot open file %s", filename);
......
......@@ -18,12 +18,15 @@ limitations under the License. */
#include <string>
#include <vector>
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/init.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
namespace paddle {
namespace inference {
void Init(bool init_p2p);
void LoadPersistables(framework::Executor& executor, framework::Scope& scope,
const framework::ProgramDesc& main_program,
const std::string& dirname,
......
......@@ -17,7 +17,7 @@ function(inference_test TARGET_NAME)
string(REGEX REPLACE "^_$" "" arg "${arg}")
cc_test(test_inference_${TARGET_NAME}${arg}
SRCS test_inference_${TARGET_NAME}.cc
DEPS ARCHIVE_START paddle_fluid ARCHIVE_END
DEPS paddle_fluid
ARGS --dirname=${PYTHON_TESTS_DIR}/book/${TARGET_NAME}${arg}.inference.model)
set_tests_properties(test_inference_${TARGET_NAME}${arg}
PROPERTIES DEPENDS test_${TARGET_NAME})
......
......@@ -46,8 +46,8 @@ TEST(inference, image_classification) {
// Run inference on CPU
LOG(INFO) << "--- CPU Runs: ---";
TestInference<paddle::platform::CPUPlace, false>(dirname, cpu_feeds,
cpu_fetchs1, FLAGS_repeat);
TestInference<paddle::platform::CPUPlace, false, true>(
dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat);
LOG(INFO) << output1.dims();
#ifdef PADDLE_WITH_CUDA
......@@ -57,8 +57,8 @@ TEST(inference, image_classification) {
// Run inference on CUDA GPU
LOG(INFO) << "--- GPU Runs: ---";
TestInference<paddle::platform::CUDAPlace, false>(dirname, cpu_feeds,
cpu_fetchs2, FLAGS_repeat);
TestInference<paddle::platform::CUDAPlace, false, true>(
dirname, cpu_feeds, cpu_fetchs2, FLAGS_repeat);
LOG(INFO) << output2.dims();
CheckError<float>(output1, output2);
......
......@@ -89,7 +89,7 @@ void CheckError(const paddle::framework::LoDTensor& output1,
EXPECT_EQ(count, 0U) << "There are " << count << " different elements.";
}
template <typename Place, bool CreateVars = true>
template <typename Place, bool CreateVars = true, bool PrepareContext = false>
void TestInference(const std::string& dirname,
const std::vector<paddle::framework::LoDTensor*>& cpu_feeds,
const std::vector<paddle::framework::LoDTensor*>& cpu_fetchs,
......@@ -175,8 +175,15 @@ void TestInference(const std::string& dirname,
}
// Ignore the profiling results of the first run
std::unique_ptr<paddle::framework::ExecutorPrepareContext> ctx;
if (PrepareContext) {
ctx = executor.Prepare(*inference_program, 0);
executor.RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets,
CreateVars);
} else {
executor.Run(*inference_program, scope, feed_targets, fetch_targets,
CreateVars);
}
// Enable the profiler
paddle::platform::EnableProfiler(state);
......@@ -187,9 +194,16 @@ void TestInference(const std::string& dirname,
"run_inference",
paddle::platform::DeviceContextPool::Instance().Get(place));
if (PrepareContext) {
// Note: if you change the inference_program, you need to call
// executor.Prepare() again to get a new ExecutorPrepareContext.
executor.RunPreparedContext(ctx.get(), scope, feed_targets,
fetch_targets, CreateVars);
} else {
executor.Run(*inference_program, scope, feed_targets, fetch_targets,
CreateVars);
}
}
// Disable the profiler and print the timing information
paddle::platform::DisableProfiler(
......
......@@ -245,9 +245,17 @@ op_library(channel_send_op DEPS concurrency)
op_library(channel_recv_op DEPS concurrency)
list(REMOVE_ITEM GENERAL_OPS ${DEPS_OPS})
# The fully connected layer is deleted when the WITH_MKLDNN flag is OFF
# Because the fully connected layer has only one MKLDNN's operator
if(NOT WITH_MKLDNN)
list(REMOVE_ITEM GENERAL_OPS fc_op)
endif(NOT WITH_MKLDNN)
foreach(src ${GENERAL_OPS})
op_library(${src})
endforeach()
file(APPEND ${pybind_file} "USE_OP(less_than);\nUSE_OP(logical_and);\nUSE_NO_KERNEL_OP(read_from_array);\n")
add_subdirectory(reader)
......
......@@ -114,23 +114,11 @@ class BatchNormKernel<platform::CUDADeviceContext, T>
const auto *bias = ctx.Input<Tensor>("Bias");
auto *y = ctx.Output<Tensor>("Y");
auto *mean_out = ctx.Output<Tensor>("MeanOut");
auto *variance_out = ctx.Output<Tensor>("VarianceOut");
auto *saved_mean = ctx.Output<Tensor>("SavedMean");
auto *saved_variance = ctx.Output<Tensor>("SavedVariance");
// alloc memory
y->mutable_data<T>(ctx.GetPlace());
mean_out->mutable_data<BatchNormParamType<T>>(ctx.GetPlace());
variance_out->mutable_data<BatchNormParamType<T>>(ctx.GetPlace());
saved_mean->mutable_data<BatchNormParamType<T>>(ctx.GetPlace());
saved_variance->mutable_data<BatchNormParamType<T>>(ctx.GetPlace());
auto &dev_ctx = ctx.template device_context<platform::CUDADeviceContext>();
math::SetConstant<platform::CUDADeviceContext, BatchNormParamType<T>>
functor;
functor(dev_ctx, saved_mean, static_cast<BatchNormParamType<T>>(0));
functor(dev_ctx, saved_variance, static_cast<BatchNormParamType<T>>(0));
auto handle = dev_ctx.cudnn_handle();
......@@ -159,6 +147,21 @@ class BatchNormKernel<platform::CUDADeviceContext, T>
// Run training mode.
// obtain running mean and running inv var, and see if we need to
// initialize them.
auto *mean_out = ctx.Output<Tensor>("MeanOut");
auto *variance_out = ctx.Output<Tensor>("VarianceOut");
mean_out->mutable_data<BatchNormParamType<T>>(ctx.GetPlace());
variance_out->mutable_data<BatchNormParamType<T>>(ctx.GetPlace());
auto *saved_mean = ctx.Output<Tensor>("SavedMean");
auto *saved_variance = ctx.Output<Tensor>("SavedVariance");
saved_mean->mutable_data<BatchNormParamType<T>>(ctx.GetPlace());
saved_variance->mutable_data<BatchNormParamType<T>>(ctx.GetPlace());
math::SetConstant<platform::CUDADeviceContext, BatchNormParamType<T>>
functor;
functor(dev_ctx, saved_mean, static_cast<BatchNormParamType<T>>(0));
functor(dev_ctx, saved_variance, static_cast<BatchNormParamType<T>>(0));
double this_factor = 1. - momentum;
CUDNN_ENFORCE(platform::dynload::cudnnBatchNormalizationForwardTraining(
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/concat_op.h"
#include <string>
#include <vector>
......@@ -34,7 +35,10 @@ class ConcatOp : public framework::OperatorWithKernel {
size_t axis = static_cast<size_t>(ctx->Attrs().Get<int>("axis"));
const size_t n = ins.size();
PADDLE_ENFORCE_GT(n, 1, "Input tensors count should > 1.");
PADDLE_ENFORCE_GT(n, 0, "Input tensors count should > 0.");
if (n == 1) {
VLOG(3) << "Warning: concat op have only one input, may waste memory";
}
auto out_dims = ins[0];
size_t in_zero_dims_size = out_dims.size();
......
......@@ -35,7 +35,8 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,
const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val);
framework::Async([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, this] {
framework::AsyncIO([var_name_val, p_ctx, ep_val, p_scope, time_out, ch,
this] {
auto* var = p_scope->FindVar(var_name_val);
::grpc::ByteBuffer req;
......@@ -89,7 +90,8 @@ bool RPCClient::AsyncGetVariable(const std::string& ep,
const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val);
framework::Async([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, this] {
framework::AsyncIO([var_name_val, ep_val, p_scope, p_ctx, time_out, ch,
this] {
// prepare input
sendrecv::VariableMessage req;
req.set_varname(var_name_val);
......@@ -132,7 +134,7 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep,
const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val);
framework::Async([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx,
framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx,
time_out, ch, this] {
auto* var = p_scope->FindVar(in_var_name_val);
......@@ -196,7 +198,7 @@ bool RPCClient::Wait() {
std::vector<std::future<void>> waits(req_count_);
for (int i = 0; i < req_count_; i++) {
waits[i] = framework::Async([i, &a, this] { a[i] = Proceed(); });
waits[i] = framework::AsyncIO([i, &a, this] { a[i] = Proceed(); });
}
for (int i = 0; i < req_count_; i++) {
......
......@@ -161,6 +161,7 @@ class RequestPrefetch final : public RequestBase {
::grpc::ByteBuffer reply;
std::string var_name = request_->OutVarname();
VLOG(3) << "prefetch var " << var_name;
auto var_desc = program_->Block(0).FindVar(var_name);
framework::Scope* local_scope = &scope_->NewScope();
auto* var = local_scope->FindVar(var_name);
......@@ -216,10 +217,10 @@ void AsyncGRPCServer::RunSyncUpdate() {
std::function<void()> prefetch_register =
std::bind(&AsyncGRPCServer::TryToRegisterNewPrefetchOne, this);
// TODO(wuyi): Run these "HandleRequest" in thread pool
t_send_.reset(
new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this,
cq_send_.get(), "cq_send", send_register)));
t_get_.reset(
new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this,
cq_get_.get(), "cq_get", get_register)));
......
......@@ -13,7 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include <ostream>
#include <thread>
#include <thread> // NOLINT
#include <vector>
#include "paddle/fluid/operators/listen_and_serv_op.h"
......@@ -88,8 +89,9 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
auto ins = Inputs("X");
auto fan_in = Attr<int>("Fanin");
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *program = block->Program();
auto *optimize_block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *prefetch_block = Attr<framework::BlockDesc *>(kPrefetchBlock);
auto *program = optimize_block->Program();
size_t num_blocks = program->Size();
PADDLE_ENFORCE_GE(num_blocks, 2,
"server program should have at least 2 blocks");
......@@ -97,18 +99,25 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
framework::Executor executor(dev_place);
std::vector<int> block_list;
for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
if (blkid != prefetch_block->ID()) {
block_list.push_back(blkid);
}
auto prepared = executor.Prepare(*program, block_list);
}
auto optimize_prepared = executor.Prepare(*program, block_list);
// Insert placeholder for block0 which holds current op itself.
prepared.insert(prepared.begin(),
optimize_prepared.insert(
optimize_prepared.begin(),
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
// TODO(qiao) set proper fields for table lookup and update
rpc_service_->SetExecutor(&executor);
rpc_service_->SetPrefetchBlkdId(0);
VLOG(3) << "prefetch block id is " << prefetch_block->ID();
auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID());
rpc_service_->SetPrefetchBlkdId(prefetch_block->ID());
rpc_service_->SetPrefetchPreparedCtx(prefetch_prepared.get());
prefetch_prepared.release();
rpc_service_->SetProgram(program);
// start the server listening after all member initialized.
server_thread_.reset(new std::thread(RunServer, rpc_service_));
......@@ -166,16 +175,18 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
parallel_blkids.push_back(1);
double ts = detail::GetTimestamp();
for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
if (blkid != prefetch_block->ID()) {
if (program->Block(blkid).Parent() != last_parent_blkid) {
ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
&recv_scope);
ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared,
program, &recv_scope);
parallel_blkids.clear();
last_parent_blkid = program->Block(blkid).Parent();
}
parallel_blkids.push_back(blkid);
}
ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
&recv_scope);
}
ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared,
program, &recv_scope);
VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
// Reset the received sparse variables, the sum operator would not
......@@ -211,6 +222,8 @@ from send_op and send back variables to recv_op.
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
AddAttr<framework::BlockDesc *>(kOptimizeBlock,
"BlockID to run on server side.");
AddAttr<framework::BlockDesc *>(kPrefetchBlock,
"prefetch block to run on server side.");
AddAttr<int>("Fanin", "How many clients send to this server.")
.SetDefault(1);
}
......
......@@ -16,6 +16,7 @@ limitations under the License. */
#include <stdint.h>
#include <ostream>
#include <string>
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/lod_tensor.h"
......@@ -27,6 +28,7 @@ namespace paddle {
namespace operators {
constexpr char kOptimizeBlock[] = "OptimizeBlock";
constexpr char kPrefetchBlock[] = "PrefetchBlock";
void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service);
......
......@@ -78,6 +78,9 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker {
"(boolean, default false) "
"Sparse update.")
.SetDefault(false);
AddAttr<bool>("is_distributed",
"(boolean, default false) distributed lookup table.")
.SetDefault(false);
AddAttr<int64_t>("padding_idx",
"(int64, default -1) "
"If the value is -1, it makes no effect to lookup. "
......
......@@ -288,9 +288,14 @@ void batched_gemm<platform::CUDADeviceContext, float16>(
// TODO(kexinzhao): add processing code for compute capability < 53 case
PADDLE_ENFORCE_GE(context.GetComputeCapability(), 53,
"cublas Hgemm requires GPU compute capability >= 53");
#if CUDA_VERSION >= 8000
PADDLE_ENFORCE(platform::dynload::cublasHgemmStridedBatched(
context.cublas_handle(), cuTransB, cuTransA, N, M, K, &h_alpha, h_B, ldb,
strideB, h_A, lda, strideA, &h_beta, h_C, ldc, strideC, batchCount));
#else
PADDLE_ENFORCE(false, "HgemmStridedBatched is not supported on cuda <= 7.5");
#endif
}
template <>
......@@ -310,9 +315,13 @@ void batched_gemm<platform::CUDADeviceContext, float>(
(transB == CblasNoTrans) ? CUBLAS_OP_N : CUBLAS_OP_T;
const int strideC = M * N;
#if CUDA_VERSION >= 8000
PADDLE_ENFORCE(platform::dynload::cublasSgemmStridedBatched(
context.cublas_handle(), cuTransB, cuTransA, N, M, K, &alpha, B, ldb,
strideB, A, lda, strideA, &beta, C, ldc, strideC, batchCount));
#else
PADDLE_ENFORCE(false, "SgemmStridedBatched is not supported on cuda <= 7.5");
#endif
}
template <>
......@@ -332,9 +341,13 @@ void batched_gemm<platform::CUDADeviceContext, double>(
(transB == CblasNoTrans) ? CUBLAS_OP_N : CUBLAS_OP_T;
const int strideC = M * N;
#if CUDA_VERSION >= 8000
PADDLE_ENFORCE(platform::dynload::cublasDgemmStridedBatched(
context.cublas_handle(), cuTransB, cuTransA, N, M, K, &alpha, B, ldb,
strideB, A, lda, strideA, &beta, C, ldc, strideC, batchCount));
#else
PADDLE_ENFORCE(false, "DgemmStridedBatched is not supported on cuda <= 7.5");
#endif
}
template <>
......
......@@ -14,6 +14,8 @@ limitations under the License. */
#pragma once
#include <utility>
#include <vector>
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
......
......@@ -83,9 +83,11 @@ class PoolMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
dev_ctx.SetBlob(key_pool_workspace_memory, workspace_memory);
auto src_memory =
mkldnn::memory({src_md, mkldnn_engine}, (void*)input_data);
mkldnn::memory({src_md, mkldnn_engine},
static_cast<void*>(const_cast<T*>(input_data)));
auto dst_memory =
mkldnn::memory({dst_md, mkldnn_engine}, (void*)output_data);
mkldnn::memory({dst_md, mkldnn_engine},
static_cast<void*>(const_cast<T*>(output_data)));
auto pool_prim = mkldnn::pooling_forward(*pool_pd, src_memory, dst_memory,
*workspace_memory);
......@@ -195,9 +197,11 @@ class PoolMKLDNNGradOpKernel : public paddle::framework::OpKernel<T> {
pool_bwd_desc, mkldnn_engine, *pool_pd);
auto diff_src_memory =
mkldnn::memory({diff_src_md, mkldnn_engine}, (void*)in_x_grad_data);
mkldnn::memory({diff_src_md, mkldnn_engine},
static_cast<void*>(const_cast<T*>(in_x_grad_data)));
auto diff_dst_memory =
mkldnn::memory({diff_dst_md, mkldnn_engine}, (void*)out_grad_data);
mkldnn::memory({diff_dst_md, mkldnn_engine},
static_cast<void*>(const_cast<T*>(out_grad_data)));
auto bwd_prim = mkldnn::pooling_backward(
pool_bwd_pd, diff_dst_memory, *workspace_memory, diff_src_memory);
......
......@@ -14,6 +14,8 @@ limitations under the License. */
#pragma once
#include <string>
#include <vector>
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h"
......
......@@ -14,6 +14,7 @@ limitations under the License. */
#pragma once
#include <vector>
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h"
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <future>
#include <future> // NOLINT
#include <ostream>
#include "paddle/fluid/framework/data_type.h"
......@@ -50,8 +50,8 @@ class PrefetchOp : public framework::OperatorBase {
for (size_t i = 0; i < ins.size(); i++) {
if (NeedSend(scope, ins[i])) {
VLOG(3) << "sending " << ins[i] << " to " << epmap[i] << "to get "
<< outs[i] << "back";
VLOG(3) << "sending " << ins[i] << " to " << epmap[i] << " to get "
<< outs[i] << " back";
rpc_client->AsyncPrefetchVariable(epmap[i], ctx, scope, ins[i],
outs[i]);
} else {
......@@ -71,7 +71,7 @@ class PrefetchOpMaker : public framework::OpProtoAndCheckerMaker {
"(RPCClient) The RPC client object which will be"
"initialized at most once.");
AddOutput("Out",
"(SelectedRows) result "
"(LoDTensor) result "
"to be fetched from parameter server")
.AsDuplicable();
AddAttr<std::vector<std::string>>(
......
......@@ -13,7 +13,6 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/prelu_op.h"
#include <string>
namespace paddle {
......
......@@ -45,7 +45,7 @@ class PriorBoxOp : public framework::OperatorWithKernel {
bool flip = ctx->Attrs().Get<bool>("flip");
std::vector<float> aspect_ratios_vec;
ExpandAspectRatios(aspect_ratios, flip, aspect_ratios_vec);
ExpandAspectRatios(aspect_ratios, flip, &aspect_ratios_vec);
size_t num_priors = aspect_ratios_vec.size() * min_sizes.size();
if (max_sizes.size() > 0) {
......
......@@ -96,7 +96,7 @@ class PriorBoxOpCUDAKernel : public framework::OpKernel<T> {
auto clip = ctx.Attr<bool>("clip");
std::vector<float> aspect_ratios;
ExpandAspectRatios(input_aspect_ratio, flip, aspect_ratios);
ExpandAspectRatios(input_aspect_ratio, flip, &aspect_ratios);
T step_w = static_cast<T>(ctx.Attr<float>("step_w"));
T step_h = static_cast<T>(ctx.Attr<float>("step_h"));
......
......@@ -13,6 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <algorithm>
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/transform.h"
......@@ -22,23 +24,23 @@ namespace operators {
inline void ExpandAspectRatios(const std::vector<float>& input_aspect_ratior,
bool flip,
std::vector<float>& output_aspect_ratior) {
std::vector<float>* output_aspect_ratior) {
constexpr float epsilon = 1e-6;
output_aspect_ratior.clear();
output_aspect_ratior.push_back(1.0f);
output_aspect_ratior->clear();
output_aspect_ratior->push_back(1.0f);
for (size_t i = 0; i < input_aspect_ratior.size(); ++i) {
float ar = input_aspect_ratior[i];
bool already_exist = false;
for (size_t j = 0; j < output_aspect_ratior.size(); ++j) {
if (fabs(ar - output_aspect_ratior[j]) < epsilon) {
for (size_t j = 0; j < output_aspect_ratior->size(); ++j) {
if (fabs(ar - output_aspect_ratior->at(j)) < epsilon) {
already_exist = true;
break;
}
}
if (!already_exist) {
output_aspect_ratior.push_back(ar);
output_aspect_ratior->push_back(ar);
if (flip) {
output_aspect_ratior.push_back(1.0f / ar);
output_aspect_ratior->push_back(1.0f / ar);
}
}
}
......@@ -68,7 +70,7 @@ class PriorBoxOpKernel : public framework::OpKernel<T> {
auto clip = ctx.Attr<bool>("clip");
std::vector<float> aspect_ratios;
ExpandAspectRatios(input_aspect_ratio, flip, aspect_ratios);
ExpandAspectRatios(input_aspect_ratio, flip, &aspect_ratios);
T step_w = static_cast<T>(ctx.Attr<float>("step_w"));
T step_h = static_cast<T>(ctx.Attr<float>("step_h"));
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/rank_loss_op.h"
#include <string>
namespace paddle {
namespace operators {
......
......@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <future> // NOLINT
#include <ostream>
#include "paddle/fluid/framework/data_type.h"
......@@ -19,7 +20,6 @@ limitations under the License. */
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include <future>
#include "paddle/fluid/operators/detail/grpc_client.h"
namespace paddle {
......
......@@ -60,7 +60,7 @@ class ReshapeOp : public framework::OperatorWithKernel {
static framework::DDim ValidateShape(const std::vector<int> shape,
const framework::DDim &in_dims) {
const int64_t in_size = framework::product(in_dims);
// only one dimension canbe set to -1, whose size will be automatically
// only one dimension can be set to -1, whose size will be automatically
// infered.
const int64_t unk_dim_val = -1;
const int64_t copy_dim_val = 0;
......@@ -119,13 +119,15 @@ class ReshapeKernel : public framework::OpKernel<T> {
auto *shape_tensor = ctx.Input<framework::LoDTensor>("Shape");
framework::DDim out_dims = out->dims();
if (shape_tensor) {
auto *shape_data = shape_tensor->data<int>();
if (platform::is_gpu_place(ctx.GetPlace())) {
framework::Tensor cpu_shape_tensor;
if (platform::is_gpu_place(ctx.GetPlace())) {
TensorCopy(*shape_tensor, platform::CPUPlace(), ctx.device_context(),
&cpu_shape_tensor);
shape_data = cpu_shape_tensor.data<int>();
ctx.device_context().Wait();
}
auto shape =
std::vector<int>(shape_data, shape_data + shape_tensor->numel());
......
......@@ -13,6 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <algorithm>
#include <limits>
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h"
......
......@@ -14,7 +14,7 @@ limitations under the License. */
#include <unistd.h>
#include <string>
#include <thread>
#include <thread> // NOLINT
#include "gtest/gtest.h"
#include "paddle/fluid/framework/op_registry.h"
......@@ -37,11 +37,11 @@ namespace m = paddle::operators::math;
std::unique_ptr<f::OperatorBase> listen_and_serv_op;
int selected_port;
void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) {
void InitTensorsInScope(const p::CPUPlace &place, f::Scope *scope) {
p::CPUDeviceContext ctx(place);
for (int i = 0; i < 2; ++i) {
auto var_name = paddle::string::Sprintf("x%d", i);
auto var = scope.Var(var_name);
auto var = scope->Var(var_name);
auto tensor = var->GetMutable<f::LoDTensor>();
tensor->Resize({10, 10});
float *expect = tensor->mutable_data<float>(place);
......@@ -50,20 +50,20 @@ void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) {
}
}
auto out_var = scope.Var("Out");
auto out_var = scope->Var("Out");
auto out_tensor = out_var->GetMutable<f::LoDTensor>();
out_tensor->Resize({10, 10});
out_tensor->mutable_data<float>(place); // allocate
}
void InitSelectedRowsInScope(f::Scope &scope, p::CPUPlace &place) {
void InitSelectedRowsInScope(const p::CPUPlace &place, f::Scope *scope) {
p::CPUDeviceContext ctx(place);
int64_t height = 10;
int64_t row_numel = 10;
m::SetConstant<p::CPUDeviceContext, float> set_one;
// init x0
std::vector<int64_t> rows0{0, 4, 7};
auto x0_var = scope.Var("x0");
auto x0_var = scope->Var("x0");
auto x0 = x0_var->GetMutable<f::SelectedRows>();
x0->set_rows(rows0);
x0->set_height(height);
......@@ -74,7 +74,7 @@ void InitSelectedRowsInScope(f::Scope &scope, p::CPUPlace &place) {
// init x1
std::vector<int64_t> rows1{2, 9};
auto x1_var = scope.Var("x1");
auto x1_var = scope->Var("x1");
auto x1 = x1_var->GetMutable<f::SelectedRows>();
x1->set_rows(rows1);
x1->set_height(height);
......@@ -83,7 +83,7 @@ void InitSelectedRowsInScope(f::Scope &scope, p::CPUPlace &place) {
f::make_ddim({static_cast<int64_t>(rows1.size()), row_numel}), place);
set_one(ctx, x1_value, 1.0);
auto out_var = scope.Var("Out");
auto out_var = scope->Var("Out");
auto out = out_var->GetMutable<f::SelectedRows>();
auto out_value = out->mutable_value();
out->set_height(height);
......@@ -117,15 +117,16 @@ void StartServerNet(bool is_sparse) {
f::Scope scope;
p::CPUPlace place;
if (is_sparse) {
InitSelectedRowsInScope(scope, place);
InitSelectedRowsInScope(place, &scope);
} else {
InitTensorsInScope(scope, place);
InitTensorsInScope(place, &scope);
}
// sub program run in listen_and_serv_op, for simple test we use sum
f::ProgramDesc program;
const auto &root_block = program.Block(0);
auto *optimize_block = program.AppendBlock(root_block);
auto *prefetch_block = program.AppendBlock(root_block);
// X for server side tensors, RX for received tensers, must be of same shape.
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block);
......@@ -135,6 +136,7 @@ void StartServerNet(bool is_sparse) {
attrs.insert({"ParamList", std::vector<std::string>({"Out"})});
attrs.insert({"GradList", std::vector<std::string>({"x1"})});
attrs.insert({"OptimizeBlock", optimize_block});
attrs.insert({"PrefetchBlock", prefetch_block});
listen_and_serv_op =
f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs);
LOG(INFO) << "selected port before run " << selected_port;
......@@ -148,7 +150,7 @@ TEST(SendRecvOp, CPUDense) {
// local net
f::Scope scope;
p::CPUPlace place;
InitTensorsInScope(scope, place);
InitTensorsInScope(place, &scope);
// create rpc client var
scope.Var("RPC_CLIENT_VAR");
......@@ -191,7 +193,7 @@ TEST(SendRecvOp, CPUSparse) {
f::Scope scope;
p::CPUPlace place;
p::CPUDeviceContext ctx(place);
InitSelectedRowsInScope(scope, place);
InitSelectedRowsInScope(place, &scope);
scope.Var("RPC_CLIENT_VAR");
f::AttributeMap attrs;
selected_port = static_cast<paddle::operators::ListenAndServOp *>(
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <future>
#include <future> // NOLINT
#include <ostream>
#include "paddle/fluid/framework/data_type.h"
......@@ -36,7 +36,7 @@ class SendVarsOp : public framework::OperatorBase {
auto ins = Inputs("X");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
int sync_send = Attr<int>("sync_sent");
int sync_send = Attr<int>("sync_send");
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);
......
......@@ -35,8 +35,8 @@ class SGDOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE_EQ(framework::product(lr_dims), 1,
"Learning rate should have 1 element");
auto param_dim = ctx->GetInputDim("Param");
// TODO(qijun): check dimensions of Param and Grad at complie
// and run time.
// TODO(qijun): check dimensions of Param and Grad at compile
// and runtime.
ctx->SetOutputDim("ParamOut", param_dim);
}
......
......@@ -65,7 +65,8 @@ class SGDOpKernel : public framework::OpKernel<T> {
auto &grad_rows = grad->rows();
size_t grad_row_numel = grad_value.numel() / grad_rows.size();
PADDLE_ENFORCE_EQ(grad_row_numel, param_out->numel() / grad_height);
PADDLE_ENFORCE_EQ(static_cast<int64_t>(grad_row_numel),
param_out->numel() / grad_height);
auto *grad_data = grad_value.data<T>();
auto *out_data = param_out->data<T>();
......@@ -73,7 +74,7 @@ class SGDOpKernel : public framework::OpKernel<T> {
for (size_t i = 0; i < grad_rows.size(); i++) {
PADDLE_ENFORCE(grad_rows[i] < grad_height,
"Input rows index should less than height");
for (int64_t j = 0; j < grad_row_numel; j++) {
for (size_t j = 0; j < grad_row_numel; j++) {
out_data[grad_rows[i] * grad_row_numel + j] -=
lr[0] * grad_data[i * grad_row_numel + j];
}
......@@ -107,7 +108,7 @@ class SGDOpKernel : public framework::OpKernel<T> {
PADDLE_ENFORCE(grad.rows()[i] < grad.height(),
"Input rows index should less than height");
int64_t id_index = param.index(grad.rows()[i]);
for (int64_t j = 0; j < grad_row_width; j++) {
for (size_t j = 0; j < grad_row_width; j++) {
out_data[id_index * grad_row_width + j] -=
lr[0] * grad_data[i * grad_row_width + j];
}
......
......@@ -48,20 +48,21 @@ class SplitIdsOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitIdsOp must has output Out.");
auto ids_var_type = ctx->GetInputsVarType("Ids").front();
PADDLE_ENFORCE_EQ(ids_var_type, framework::proto::VarType::LOD_TENSOR);
auto ids_dims = ctx->GetInputDim("Ids");
if (ids_var_type == framework::proto::VarType::LOD_TENSOR) {
PADDLE_ENFORCE_EQ(ids_dims.size(), 2);
PADDLE_ENFORCE_EQ(ids_dims[1], 1);
}
}
};
class SplitIdsOpInferVarType : public framework::VarTypeInference {
public:
void operator()(const framework::OpDesc &op_desc,
framework::BlockDesc *block) const override {
auto *input_var = block->Var(op_desc.Input("Ids")[0]);
for (auto &out_var : op_desc.Output("Out")) {
block->Var(out_var)->SetType(framework::proto::VarType::LOD_TENSOR);
block->Var(out_var)->SetType(input_var->GetType());
}
}
};
......@@ -73,4 +74,5 @@ namespace ops = paddle::operators;
REGISTER_OPERATOR(split_ids, ops::SplitIdsOp, ops::SplitIdsOpMaker,
ops::SplitIdsOpInferVarType);
REGISTER_OP_CPU_KERNEL(
split_ids, ops::SplitIdsOpKernel<paddle::platform::CPUPlace, int64_t>);
split_ids, ops::SplitIdsOpKernel<paddle::platform::CPUPlace, int64_t>,
ops::SplitIdsOpKernel<paddle::platform::CPUPlace, float>);
......@@ -24,14 +24,16 @@ namespace operators {
template <typename DeviceContext, typename T>
class SplitIdsOpKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
void Compute(const framework::ExecutionContext &ctx) const override {
auto place = ctx.GetPlace();
if (!platform::is_cpu_place(place)) {
PADDLE_THROW("SplitIds do not support GPU kernel");
}
auto& ids_dims = ctx.Input<framework::LoDTensor>("Ids")->dims();
const T* ids = ctx.Input<framework::LoDTensor>("Ids")->data<T>();
const auto *ids_var = ctx.InputVar("Ids");
if (ids_var->IsType<framework::LoDTensor>()) {
const auto &ids_dims = ctx.Input<framework::LoDTensor>("Ids")->dims();
const T *ids = ctx.Input<framework::LoDTensor>("Ids")->data<T>();
auto outs = ctx.MultiOutput<framework::LoDTensor>("Out");
const size_t shard_num = outs.size();
......@@ -47,14 +49,40 @@ class SplitIdsOpKernel : public framework::OpKernel<T> {
// create tensor for each shard and send to parameter server
for (size_t i = 0; i < out_ids.size(); ++i) {
auto* shard_t = outs[i];
auto *shard_t = outs[i];
std::vector<T> ids = out_ids[i];
auto* shard_data = shard_t->mutable_data<T>(
auto *shard_data = shard_t->mutable_data<T>(
framework::make_ddim({static_cast<int64_t>(ids.size()), 1}), place);
for (size_t i = 0; i < ids.size(); ++i) {
shard_data[i] = ids[i];
}
}
} else if (ids_var->IsType<framework::SelectedRows>()) {
const auto *ids_selected_rows = ctx.Input<framework::SelectedRows>("Ids");
auto &ids_dims = ids_selected_rows->value().dims();
PADDLE_ENFORCE_EQ(ids_dims[0], ids_selected_rows->rows().size(), "");
const T *ids = ids_selected_rows->value().data<T>();
const auto &ids_rows = ids_selected_rows->rows();
auto outs = ctx.MultiOutput<framework::SelectedRows>("Out");
const size_t shard_num = outs.size();
// get rows for outputs
for (auto &id : ids_rows) {
size_t shard_id = static_cast<size_t>(id) % shard_num;
outs[shard_id]->mutable_rows()->push_back(id);
}
int64_t row_width = ids_dims[1];
for (auto &out : outs) {
out->set_height(ids_selected_rows->height());
framework::DDim ddim = framework::make_ddim(
{static_cast<int64_t>(out->rows().size()), row_width});
T *output = out->mutable_value()->mutable_data<T>(ddim, place);
for (size_t i = 0; i < ddim[0]; ++i) {
memcpy(output + i * row_width, ids + out->rows()[i] * row_width,
row_width * sizeof(T));
}
}
}
}
};
......
......@@ -37,8 +37,8 @@ inline void StridedMemcpy(const platform::DeviceContext& dev_ctx, const T* src,
const framework::DDim& src_stride,
const framework::DDim& dst_dim,
const framework::DDim& dst_stride, T* dst) {
using namespace detail;
StridedCopyDimVisitor<T> func(dev_ctx, src, src_stride, dst_stride, dst);
paddle::operators::detail::StridedCopyDimVisitor<T> func(
dev_ctx, src, src_stride, dst_stride, dst);
boost::apply_visitor(func, dst_dim);
}
......
......@@ -10,9 +10,11 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/sum_op.h"
#include <algorithm>
#include <string>
#include <vector>
#include "paddle/fluid/framework/var_type_inference.h"
#include "paddle/fluid/operators/detail/safe_ref.h"
......@@ -37,7 +39,10 @@ class SumOp : public framework::OperatorWithKernel {
auto x_dims = ctx->GetInputsDim("X");
size_t N = x_dims.size();
PADDLE_ENFORCE_GT(N, 1, "Input tensors count should > 1.");
PADDLE_ENFORCE_GT(N, 0, "Input tensors count should > 0.");
if (N == 1) {
VLOG(3) << "Warning: sum have only one input, may waste memory";
}
framework::DDim in_dim({0});
for (auto& x_dim : x_dims) {
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/top_k_op.h"
#include "paddle/fluid/platform/assert.h"
namespace paddle {
......@@ -133,71 +134,71 @@ __device__ __forceinline__ void GetTopK(Pair<T> topk[], const T* val, int* col,
}
template <typename T, int MaxLength, int BlockSize>
__device__ __forceinline__ void ThreadGetTopK(Pair<T> topk[], int& beam,
__device__ __forceinline__ void ThreadGetTopK(Pair<T> topk[], int* beam,
int beam_size, const T* src,
bool& firstStep, bool& is_empty,
Pair<T>& max, int dim,
bool* firstStep, bool* is_empty,
Pair<T>* max, int dim,
const int tid) {
if (beam > 0) {
int length = beam < beam_size ? beam : beam_size;
if (firstStep) {
firstStep = false;
if (*beam > 0) {
int length = (*beam) < beam_size ? *beam : beam_size;
if (*firstStep) {
*firstStep = false;
GetTopK<T, BlockSize>(topk, src, tid, dim, length);
} else {
for (int k = 0; k < MaxLength; k++) {
if (k < MaxLength - beam) {
topk[k] = topk[k + beam];
if (k < MaxLength - (*beam)) {
topk[k] = topk[k + *beam];
} else {
topk[k].set(-INFINITY, -1);
}
}
if (!is_empty) {
GetTopK<T, BlockSize>(topk + MaxLength - beam, src, tid, dim, max,
if (!(*is_empty)) {
GetTopK<T, BlockSize>(topk + MaxLength - *beam, src, tid, dim, *max,
length);
}
}
max = topk[MaxLength - 1];
if (max.v == -1) is_empty = true;
beam = 0;
*max = topk[MaxLength - 1];
if ((*max).v == -1) *is_empty = true;
*beam = 0;
}
}
template <typename T, int MaxLength, int BlockSize>
__device__ __forceinline__ void ThreadGetTopK(Pair<T> topk[], int& beam,
__device__ __forceinline__ void ThreadGetTopK(Pair<T> topk[], int* beam,
int beam_size, const T* val,
int* col, bool& firstStep,
bool& is_empty, Pair<T>& max,
int* col, bool* firstStep,
bool* is_empty, Pair<T>* max,
int dim, const int tid) {
if (beam > 0) {
int length = beam < beam_size ? beam : beam_size;
if (firstStep) {
firstStep = false;
if (*beam > 0) {
int length = (*beam) < beam_size ? *beam : beam_size;
if (*firstStep) {
*firstStep = false;
GetTopK<T, BlockSize>(topk, val, col, tid, dim, length);
} else {
for (int k = 0; k < MaxLength; k++) {
if (k < MaxLength - beam) {
topk[k] = topk[k + beam];
if (k < MaxLength - *beam) {
topk[k] = topk[k + *beam];
} else {
topk[k].set(-INFINITY, -1);
}
}
if (!is_empty) {
GetTopK<T, BlockSize>(topk + MaxLength - beam, val, col, tid, dim, max,
if (!(*is_empty)) {
GetTopK<T, BlockSize>(topk + MaxLength - *beam, val, col, tid, dim, max,
length);
}
}
max = topk[MaxLength - 1];
if (max.v == -1) is_empty = true;
beam = 0;
*max = topk[MaxLength - 1];
if ((*max).v == -1) *is_empty = true;
*beam = 0;
}
}
template <typename T, int MaxLength, int BlockSize>
__device__ __forceinline__ void BlockReduce(Pair<T>* sh_topk, int* maxid,
Pair<T> topk[], T** topVal,
int64_t** topIds, int& beam, int& k,
int64_t** topIds, int* beam, int* k,
const int tid, const int warp) {
while (true) {
__syncthreads();
......@@ -225,17 +226,17 @@ __device__ __forceinline__ void BlockReduce(Pair<T>* sh_topk, int* maxid,
(*topVal)++;
(*topIds)++;
}
if (tid == maxid[0]) beam++;
if (--k == 0) break;
if (tid == maxid[0]) (*beam)++;
if (--(*k) == 0) break;
__syncthreads();
if (tid == maxid[0]) {
if (beam < MaxLength) {
sh_topk[tid] = topk[beam];
if (*beam < MaxLength) {
sh_topk[tid] = topk[*beam];
}
}
if (maxid[0] / 32 == warp) {
if (__shfl(beam, (maxid[0]) % 32, 32) == MaxLength) break;
if (__shfl(*beam, (maxid[0]) % 32, 32) == MaxLength) break;
}
}
}
......@@ -268,13 +269,13 @@ __global__ void KeMatrixTopK(T* output, int output_stride, int64_t* indices,
topk[k].set(-INFINITY, -1);
}
while (k) {
ThreadGetTopK<T, MaxLength, BlockSize>(topk, beam, k,
src + blockIdx.x * lds, firststep,
is_empty, max, dim, tid);
ThreadGetTopK<T, MaxLength, BlockSize>(topk, &beam, k,
src + blockIdx.x * lds, &firststep,
&is_empty, &max, dim, tid);
sh_topk[tid] = topk[0];
BlockReduce<T, MaxLength, BlockSize>(sh_topk, maxid, topk, &output,
&indices, beam, k, tid, warp);
&indices, &beam, &k, tid, warp);
}
}
......@@ -308,9 +309,9 @@ class TopkOpCUDAKernel : public framework::OpKernel<T> {
KeMatrixTopK<T, 5, 256><<<
grid, threads, 0, reinterpret_cast<const platform::CUDADeviceContext&>(
ctx.device_context())
.stream()>>>(output_data, output->dims()[1],
indices_data, input_data,
input_width, input_width, int(k));
.stream()>>>(
output_data, output->dims()[1], indices_data, input_data, input_width,
input_width, static_cast<int>(k));
}
};
......
......@@ -13,14 +13,17 @@
# limitations under the License.
from __future__ import print_function
import framework
from framework import Program, default_main_program, default_startup_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
import distributed_splitter as splitter
import math
import distributed_splitter as splitter
import framework
from framework import Program, default_main_program, Variable
from . import core
import debuger
LOOKUP_TABLE_TYPE = "lookup_table"
LOOKUP_TABLE_GRAD_TYPE = "lookup_table_grad"
RPC_CLIENT_VAR_NAME = "RPC_CLIENT_VAR"
class VarBlock:
......@@ -35,9 +38,9 @@ class VarBlock:
class UnionFind(object):
""" Union-find data struct.
""" Union-find data structure.
Union-find is a data struct that keeps track of a set of elements partitioned
Union-find is a data structure that keeps track of a set of elements partitioned
into a number of disjoint (non-overlapping) subsets.
Reference:
......@@ -185,19 +188,66 @@ class DistributeTranspiler:
assert (callable(split_method))
if program is None:
program = default_main_program()
self.program = program
self.trainers = trainers
self.origin_program = program
self.trainer_num = trainers
self.optimize_ops = optimize_ops
# TODO(typhoonzero): currently trainer_id is fetched from cluster system
# like Kubernetes, we should port this to use etcd later when developing
# fluid distributed training with fault-tolerance.
self.trainer_id = trainer_id
pserver_endpoints = pservers.split(",")
self.pserver_endpoints = pserver_endpoints
# process lookup_table_op
# 1. check all lookup_table_op is distributed
# 2. check all lookup_table_op share the same table.
distributed_lookup_table_ops = []
# support only one distributed_lookup_table now
self.table_name = None
for op in program.global_block().ops:
if op.type == LOOKUP_TABLE_TYPE:
if op.attrs['is_distributed'] is True:
if self.table_name is None:
self.table_name = op.input("W")[0]
if self.table_name != op.input("W")[0]:
raise RuntimeError("all distributed lookup_table_ops"
" should have only one table")
distributed_lookup_table_ops.append(op)
else:
if self.table_name is not None:
assert op.input("W")[0] != self.table_name
self.has_distributed_lookup_table = len(
distributed_lookup_table_ops) > 0
# step1: For large parameters and gradients, split them into smaller
# blocks.
param_list = [pg[0] for pg in params_grads]
grad_list = [pg[1] for pg in params_grads]
if self.has_distributed_lookup_table:
param_list = [
param for param in param_list if param.name != self.table_name
]
grad_list = [
grad for grad in grad_list
if grad.name != framework.grad_var_name(self.table_name)
]
self.table_param_grad = [
param_grad for param_grad in params_grads
if param_grad[0].name == self.table_name
][0]
table_grad_var = self.table_param_grad[1]
self.table_grad_list = [
program.global_block().create_var(
name="%s.trainer_%d.pserver_%d" %
(table_grad_var.name, trainer_id, index),
type=table_grad_var.type,
shape=table_grad_var.shape,
dtype=table_grad_var.dtype)
for index in range(len(self.pserver_endpoints))
]
grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints))
param_blocks = split_dense_variable(param_list, len(pserver_endpoints))
# step2: Create new vars for the parameters and gradients blocks and
......@@ -229,7 +279,7 @@ class DistributeTranspiler:
self.param_grad_ep_mapping[ep]["grads"].append(grad)
rpc_client_var = program.global_block().create_var(
name="RPC_CLIENT_VAR",
name=RPC_CLIENT_VAR_NAME,
persistable=True,
type=core.VarDesc.VarType.RAW)
......@@ -252,13 +302,19 @@ class DistributeTranspiler:
outputs={"Out": [orig_param]},
attrs={"axis": 0})
if self.has_distributed_lookup_table:
self._replace_lookup_table_op_with_prefetch(program, rpc_client_var,
eplist)
self._split_table_grad_and_add_send_vars(program, rpc_client_var,
pserver_endpoints)
def get_trainer_program(self):
# remove optimize ops and add a send op to main_program
self.program.global_block().delete_ops(self.optimize_ops)
self.program.sync_with_cpp()
self.origin_program.global_block().delete_ops(self.optimize_ops)
self.origin_program.sync_with_cpp()
# FIXME(typhoonzero): serialize once will fix error occurs when clone.
self.program.__str__()
return self.program
self.origin_program.__str__()
return self.origin_program
def get_pserver_program(self, endpoint):
"""
......@@ -294,8 +350,8 @@ class DistributeTranspiler:
type=v.type,
dtype=v.dtype,
shape=v.shape)
if self.trainers > 1:
for trainer_id in xrange(self.trainers):
if self.trainer_num > 1:
for trainer_id in xrange(self.trainer_num):
var = pserver_program.global_block().create_var(
name="%s.trainer_%d" % (orig_var_name, trainer_id),
persistable=False,
......@@ -309,7 +365,7 @@ class DistributeTranspiler:
# step3
optimize_block = pserver_program.create_block(0)
# step 4
# Create a union-find data struct from optimize ops,
# Create a union-find data structure from optimize ops,
# If two ops are connected, we could add these two ops
# into one set.
ufind = self._create_ufind(self.optimize_ops)
......@@ -384,6 +440,23 @@ class DistributeTranspiler:
# __append_optimize_op__(glb_op, optimize_block)
# break
# process distributed lookup_table
prefetch_block = None
if self.has_distributed_lookup_table:
pserver_index = self.pserver_endpoints.index(endpoint)
self._create_table_optimize_block(pserver_index, pserver_program,
append_block)
prefetch_block = self._create_prefetch_block(
pserver_index, pserver_program, optimize_block)
# NOTE: if has_distributed_lookup_table is False, then prefetch_block will
# not be executed, so it's safe to use optimize_block to hold the place
if self.has_distributed_lookup_table:
assert prefetch_block is not None
else:
assert prefetch_block is None
prefetch_block = pserver_program.global_block()
# step5 append the listen_and_serv op
pserver_program.global_block().append_op(
type="listen_and_serv",
......@@ -392,8 +465,10 @@ class DistributeTranspiler:
attrs={
"OptimizeBlock": optimize_block,
"endpoint": endpoint,
"Fanin": self.trainers
"Fanin": self.trainer_num,
"PrefetchBlock": prefetch_block
})
pserver_program.sync_with_cpp()
return pserver_program
......@@ -451,6 +526,197 @@ class DistributeTranspiler:
attrs=op.attrs)
return s_prog
# transpiler function for dis lookup_table
def _replace_lookup_table_op_with_prefetch(self, program, rpc_client_var,
eplist):
# 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op
self.prefetch_input_vars = None
self.prefetch_output_vars = None
continue_search_lookup_table_op = True
while continue_search_lookup_table_op:
continue_search_lookup_table_op = False
all_ops = program.global_block().ops
for op in all_ops:
if op.type == LOOKUP_TABLE_TYPE:
continue_search_lookup_table_op = True
op_index = list(all_ops).index(op)
ids_name = op.input("Ids")
out_name = op.output("Out")
if self.prefetch_input_vars is None:
ids_var = program.global_block().vars[ids_name[0]]
self.prefetch_input_vars = self.create_splited_vars(
source_var=ids_var,
block=program.global_block(),
tag="_prefetch_in_")
if self.prefetch_output_vars is None:
out_var = program.global_block().vars[out_name[0]]
self.prefetch_output_vars = self.create_splited_vars(
source_var=out_var,
block=program.global_block(),
tag="_prefetch_out_")
# insert split_ids_op
program.global_block().insert_op(
index=op_index,
type="split_ids",
inputs={
'Ids': [
program.global_block().vars[varname]
for varname in ids_name
]
},
outputs={"Out": self.prefetch_input_vars})
# insert prefetch_op
program.global_block().insert_op(
index=op_index + 1,
type="prefetch",
inputs={'X': self.prefetch_input_vars},
outputs={
"Out": self.prefetch_output_vars,
"RPCClient": rpc_client_var
},
attrs={"epmap": eplist})
# insert concat_op
program.global_block().insert_op(
index=op_index + 2,
type="concat",
inputs={'X': self.prefetch_output_vars},
outputs={
"Out": [
program.global_block().vars[varname]
for varname in out_name
]
},
attrs={"axis": 0})
# delete lookup_table_op
program.global_block().delete_ops([op])
program.sync_with_cpp()
# break for loop
break
def _split_table_grad_and_add_send_vars(self, program, rpc_client_var,
pserver_endpoints):
# 2. add split_ids_op and send_vars_op to send gradient to pservers
# there should only be one table_name
all_ops = program.global_block().ops
table_grad_name = framework.grad_var_name(self.table_name)
for op in all_ops:
if table_grad_name in op.output_arg_names:
op_index = list(all_ops).index(op)
# insert split_ids_op
program.global_block().insert_op(
index=op_index + 1,
type="split_ids",
inputs={
'Ids': [program.global_block().vars[table_grad_name]]
},
outputs={"Out": self.table_grad_list})
program.global_block().insert_op(
index=op_index + 2,
type="send_vars",
inputs={'X': self.table_grad_list},
outputs={"RPCClient": rpc_client_var},
attrs={"sync_send": True,
"epmap": pserver_endpoints})
break
def _create_prefetch_block(self, pserver_index, pserver_program,
optimize_block):
# STEP: create prefetch block
table_var = pserver_program.global_block().vars[self.table_name]
prefetch_block = pserver_program.create_block(optimize_block.idx)
trainer_ids = self.prefetch_input_vars[pserver_index]
pserver_ids = pserver_program.global_block().create_var(
name=trainer_ids.name,
type=trainer_ids.type,
shape=trainer_ids.shape,
dtype=trainer_ids.dtype)
trainer_out = self.prefetch_output_vars[pserver_index]
pserver_out = pserver_program.global_block().create_var(
name=trainer_out.name,
type=trainer_out.type,
shape=trainer_out.shape,
dtype=trainer_out.dtype)
prefetch_block.append_op(
type=LOOKUP_TABLE_TYPE,
inputs={'Ids': pserver_ids,
"W": table_var},
outputs={"Out": pserver_out},
attrs={
"is_sparse": True, # has no effect on lookup_table op
"is_distributed": True,
"padding_idx": -1
})
return prefetch_block
def _create_table_optimize_block(self, pserver_index, pserver_program,
append_block):
def _clone_var(block, var, persistable=True):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
persistable=persistable)
# STEP: create table optimize block
# create table param and grad var in pserver program
param_var = _clone_var(
pserver_program.global_block(),
self.origin_program.global_block().vars[self.table_name])
grad_var = _clone_var(
pserver_program.global_block(),
self.origin_program.global_block().vars[framework.grad_var_name(
self.table_name)],
persistable=False)
# create grad vars in pserver program
table_grad_var = self.table_param_grad[1]
table_grad_list = [
pserver_program.global_block().create_var(
name="%s.trainer_%d.pserver_%d" %
(table_grad_var.name, index, pserver_index),
type=table_grad_var.type,
shape=table_grad_var.shape,
dtype=table_grad_var.dtype) for index in range(self.trainer_num)
]
# create table optimize block in pserver program
table_opt_op = [
op for op in self.optimize_ops
if op.input("Param")[0] == self.table_name
][0]
table_opt_block = pserver_program.create_block(append_block.idx)
# only support sgd now
assert table_opt_op.type == "sgd"
# append sum op for table_grad_list
table_opt_block.append_op(
type="sum",
inputs={"X": table_grad_list},
outputs={"Out": [grad_var]})
lr_var = pserver_program.global_block().vars[table_opt_op.input(
"LearningRate")[0]]
inputs = {
"Param": [param_var],
"Grad": [grad_var],
"LearningRate": [lr_var]
}
outputs = {"ParamOut": [param_var]}
table_opt_block.append_op(
type=table_opt_op.type,
inputs=inputs,
outputs=outputs,
attrs=table_opt_op.attrs)
# ====================== private transpiler functions =====================
def _create_vars_from_blocklist(self,
program,
......@@ -512,7 +778,17 @@ class DistributeTranspiler:
program.global_block().sync_with_cpp()
return var_mapping
def _clone_var(self, block, var):
def create_splited_vars(self, source_var, block, tag):
return [
block.create_var(
name=str(source_var.name + tag + str(index)),
type=source_var.type,
shape=source_var.shape,
dtype=source_var.dtype)
for index in range(len(self.pserver_endpoints))
]
def _clone_var(self, block, var, persistable=True):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
......@@ -520,12 +796,12 @@ class DistributeTranspiler:
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=True)
persistable=persistable)
def _append_split_op(self, program, gradblocks):
# Split variables that need to be split and append respective ops
add_suffix = False
if self.trainers > 1:
if self.trainer_num > 1:
add_suffix = True
var_mapping = self._create_vars_from_blocklist(
program, gradblocks, add_trainer_suffix=add_suffix)
......@@ -616,9 +892,9 @@ class DistributeTranspiler:
return
merged_var = \
pserver_block.vars[self._orig_varname(grad_block.name)]
if self.trainers > 1:
if self.trainer_num > 1:
vars2merge = []
for i in xrange(self.trainers):
for i in xrange(self.trainer_num):
per_trainer_name = "%s.trainer_%d" % \
(self._orig_varname(grad_block.name), i)
vars2merge.append(pserver_block.vars[per_trainer_name])
......@@ -633,7 +909,7 @@ class DistributeTranspiler:
type="scale",
inputs={"X": merged_var},
outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)})
attrs={"scale": 1.0 / float(self.trainer_num)})
new_inputs[key] = merged_var
elif key == "Param":
# param is already created on global program
......@@ -669,7 +945,7 @@ class DistributeTranspiler:
new_shape = None
if key in ["Param", "Grad", "LearningRate"]:
continue
var = self.program.global_block().vars[opt_op.input(key)[0]]
var = self.origin_program.global_block().vars[opt_op.input(key)[0]]
# update accumulator variable shape
param_shape = new_inputs["Param"].shape
new_shape = self._get_optimizer_input_shape(opt_op.type, key,
......@@ -682,8 +958,8 @@ class DistributeTranspiler:
new_inputs[key] = tmpvar
# change output's ParamOut variable
outputs = self._get_output_map_from_op(self.program.global_block().vars,
opt_op)
outputs = self._get_output_map_from_op(
self.origin_program.global_block().vars, opt_op)
outputs["ParamOut"] = new_inputs["Param"]
optimize_block.append_op(
......@@ -695,8 +971,8 @@ class DistributeTranspiler:
def _append_pserver_non_opt_ops(self, optimize_block, opt_op):
program = optimize_block.program
# Append the ops for parameters that do not need to be optimized/updated
inputs = self._get_input_map_from_op(self.program.global_block().vars,
opt_op)
inputs = self._get_input_map_from_op(
self.origin_program.global_block().vars, opt_op)
for varlist in inputs.itervalues():
if not isinstance(varlist, list):
varlist = [varlist]
......@@ -709,8 +985,8 @@ class DistributeTranspiler:
dtype=var.dtype,
shape=var.shape)
outputs = self._get_output_map_from_op(self.program.global_block().vars,
opt_op)
outputs = self._get_output_map_from_op(
self.origin_program.global_block().vars, opt_op)
for varlist in outputs.itervalues():
if not isinstance(varlist, list):
......@@ -783,7 +1059,6 @@ class DistributeTranspiler:
if same_or_split_var(n, param) and n != param:
return True
return False
return False
def _get_input_map_from_op(self, varmap, op):
"""Returns a dict from op input name to the vars in varmap."""
......@@ -821,7 +1096,7 @@ class DistributeTranspiler:
find_ops = []
# find ops which output is lr var
block = self.program.global_block()
block = self.origin_program.global_block()
for op in block.ops:
if set(op.output_arg_names) & lr_vars:
find_ops.append(op)
......
......@@ -1183,6 +1183,8 @@ class Parameter(Variable):
self.gradient_clip_attr = kwargs.get('gradient_clip_attr', None)
self.do_model_average = kwargs.get('do_model_average', None)
def __str__(self):
return self.to_string(True)
......@@ -1203,7 +1205,7 @@ class Parameter(Variable):
if with_details:
res_str = Variable.to_string(self, throw_on_error, True)
additional_attr = ("trainable", "optimize_attr", "regularizer",
"gradient_clip_attr")
"gradient_clip_attr", "do_model_average")
for attr_name in additional_attr:
res_str += "%s: %s\n" % (attr_name,
str(getattr(self, attr_name)))
......
......@@ -218,6 +218,7 @@ def fc(input,
def embedding(input,
size,
is_sparse=False,
is_distributed=False,
padding_idx=None,
param_attr=None,
dtype='float32'):
......@@ -268,8 +269,11 @@ def embedding(input,
inputs={'Ids': input,
'W': w},
outputs={'Out': tmp},
attrs={'is_sparse': is_sparse,
'padding_idx': padding_idx})
attrs={
'is_sparse': is_sparse,
'is_distributed': is_distributed,
'padding_idx': padding_idx
})
return tmp
......@@ -1516,7 +1520,8 @@ def batch_norm(input,
in_place=False,
name=None,
moving_mean_name=None,
moving_variance_name=None):
moving_variance_name=None,
do_model_average_for_mean_and_var=False):
"""
This function helps create an operator to implement
the BatchNorm layer using the configurations from the input parameters.
......@@ -1547,7 +1552,10 @@ def batch_norm(input,
mean = helper.create_parameter(
attr=ParamAttr(
name=moving_mean_name, initializer=Constant(0.0), trainable=False),
name=moving_mean_name,
initializer=Constant(0.0),
trainable=False,
do_model_average=do_model_average_for_mean_and_var),
shape=param_shape,
dtype=input.dtype)
mean.stop_gradient = True
......@@ -1556,7 +1564,8 @@ def batch_norm(input,
attr=ParamAttr(
name=moving_variance_name,
initializer=Constant(1.0),
trainable=False),
trainable=False,
do_model_average=do_model_average_for_mean_and_var),
shape=param_shape,
dtype=input.dtype)
variance.stop_gradient = True
......
......@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from collections import defaultdict
from paddle.fluid.framework import Program
import framework
......@@ -818,8 +818,8 @@ class ModelAverage(Optimizer):
min_average_window, max_average_window and current update times.
Args:
params_grads: A list of parameter-grad variable pairs.
average_window_rate: The rate of average window.
params_grads: A list of parameter-grad variable pairs.
min_average_window: The minimum size of average window.
max_average_window: The maximum size of average window.
......@@ -840,8 +840,8 @@ class ModelAverage(Optimizer):
"""
def __init__(self,
params_grads,
average_window_rate,
params_grads=None,
min_average_window=10000,
max_average_window=10000,
**kwargs):
......@@ -849,23 +849,36 @@ class ModelAverage(Optimizer):
self.average_window = average_window_rate
self.min_average_window = min_average_window
self.max_average_window = max_average_window
self.params_grads = params_grads
self.params_grads = [] if params_grads is None else params_grads
params = {}
for param, grad in self.params_grads:
if param.do_model_average != False:
params[param.name] = (param, grad)
for param in framework.default_main_program().global_block(
).all_parameters():
if param.name not in params and param.do_model_average != False:
grad = param.block.create_var(
name=unique_name.generate(".".join([param.name, 'tmp'])),
dtype=param.dtype,
persistable=False,
stop_gradient=True)
params[param.name] = (param, grad)
self.params_grads = params.values()
for param, grad in self.params_grads:
if grad is not None:
self._append_average_accumulate_op(param)
self.apply_program = Program()
block = self.apply_program.global_block()
with program_guard(main_program=self.apply_program):
for param_grad in self.params_grads:
if param_grad[1] is not None:
self._add_average_apply_op(block, param_grad)
self.restore_program = Program()
block = self.restore_program.global_block()
with program_guard(main_program=self.restore_program):
for param_grad in self.params_grads:
if param_grad[1] is not None:
self._add_average_restore_op(block, param_grad)
def _add_average_apply_op(self, block, param_grad):
......
......@@ -28,13 +28,15 @@ class ParamAttr(object):
learning_rate=1.0,
regularizer=None,
trainable=True,
gradient_clip=None):
gradient_clip=None,
do_model_average=None):
self.name = name
self.initializer = initializer
self.learning_rate = learning_rate
self.regularizer = regularizer
self.trainable = trainable
self.gradient_clip = gradient_clip
self.model_average = do_model_average
def set_default_initializer(self, initializer):
if initializer is None:
......@@ -80,7 +82,8 @@ class ParamAttr(object):
},
'regularizer': self.regularizer,
'trainable': self.trainable,
'gradient_clip_attr': self.gradient_clip
'gradient_clip_attr': self.gradient_clip,
'model_average': self.model_average
}
if with_initializer:
kwargs['initializer'] = self.initializer
......
......@@ -37,7 +37,7 @@ depth = 8
mix_hidden_lr = 1e-3
IS_SPARSE = True
PASS_NUM = 10
PASS_NUM = 100
BATCH_SIZE = 10
embedding_name = 'emb'
......@@ -77,7 +77,8 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
emb_layers.append(mark_embedding)
hidden_0_layers = [
fluid.layers.fc(input=emb, size=hidden_dim) for emb in emb_layers
fluid.layers.fc(input=emb, size=hidden_dim, act='tanh')
for emb in emb_layers
]
hidden_0 = fluid.layers.sums(input=hidden_0_layers)
......@@ -94,8 +95,8 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
for i in range(1, depth):
mix_hidden = fluid.layers.sums(input=[
fluid.layers.fc(input=input_tmp[0], size=hidden_dim),
fluid.layers.fc(input=input_tmp[1], size=hidden_dim)
fluid.layers.fc(input=input_tmp[0], size=hidden_dim, act='tanh'),
fluid.layers.fc(input=input_tmp[1], size=hidden_dim, act='tanh')
])
lstm = fluid.layers.dynamic_lstm(
......@@ -109,8 +110,8 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
input_tmp = [mix_hidden, lstm]
feature_out = fluid.layers.sums(input=[
fluid.layers.fc(input=input_tmp[0], size=label_dict_len),
fluid.layers.fc(input=input_tmp[1], size=label_dict_len)
fluid.layers.fc(input=input_tmp[0], size=label_dict_len, act='tanh'),
fluid.layers.fc(input=input_tmp[1], size=label_dict_len, act='tanh')
])
return feature_out
......@@ -171,7 +172,7 @@ def train(use_cuda, save_dirname=None, is_local=True):
# check other optimizers and check why out will be NAN
sgd_optimizer = fluid.optimizer.SGD(
learning_rate=fluid.layers.exponential_decay(
learning_rate=0.0001,
learning_rate=0.01,
decay_steps=100000,
decay_rate=0.5,
staircase=True))
......@@ -233,7 +234,7 @@ def train(use_cuda, save_dirname=None, is_local=True):
print("second per batch: " + str((time.time(
) - start_time) / batch_id))
# Set the threshold low to speed up the CI test
if float(pass_precision) > 0.05:
if float(pass_precision) > 0.01:
if save_dirname is not None:
# TODO(liuyiqun): Change the target to crf_decode
fluid.io.save_inference_model(save_dirname, [
......
......@@ -157,7 +157,6 @@ def train(nn_type,
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist) # ip:port,ip:port...
pserver_endpoints = os.getenv("PSERVERS")
trainers = int(os.getenv("TRAINERS"))
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
......
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
# The fully connected test is removed whe the WITH_MKLDNN flag is OFF
# Because the fully connected layer has only one kernel (MKLDNN)
if(NOT WITH_MKLDNN)
list(REMOVE_ITEM TEST_OPS test_fc_op)
endif(NOT WITH_MKLDNN)
if(NOT WITH_DISTRIBUTE)
list(REMOVE_ITEM TEST_OPS test_recv_op)
endif(NOT WITH_DISTRIBUTE)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册