提交 fe650648 编写于 作者: Q qiaolongfei

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into refine-prefetch

......@@ -55,12 +55,13 @@ option(WITH_FLUID_ONLY "Compile PaddlePaddle fluid only" OFF)
option(WITH_GOLANG "Compile PaddlePaddle with GOLANG" OFF)
option(GLIDE_INSTALL "Download and install go dependencies " ON)
option(USE_NNPACK "Compile PaddlePaddle with NNPACK library" OFF)
option(WITH_DISTRIBUTE "Compile with grpc distributed support" OFF)
option(WITH_DISTRIBUTE "Compile with distributed support" OFF)
option(USE_EIGEN_FOR_BLAS "Use matrix multiplication in Eigen" OFF)
option(EIGEN_USE_THREADS "Compile with multi-threaded Eigen" OFF)
option(WITH_ARM_FP16 "Use half precision support on armv8.2-a cpu" OFF)
option(WITH_FAST_BUNDLE_TEST "Bundle tests that can be run in a single process together to reduce launch overhead" OFF)
option(WITH_CONTRIB "Compile the third-party contributation" OFF)
option(WITH_GRPC "Use grpc as the default rpc framework" ${WITH_DISTRIBUTE})
# CMAKE_BUILD_TYPE
if(NOT CMAKE_BUILD_TYPE)
......@@ -147,7 +148,16 @@ include(external/any) # download libn::any
include(external/eigen) # download eigen3
include(external/pybind11) # download pybind11
include(external/cares)
include(external/grpc)
if(WITH_DISTRIBUTE)
if(WITH_GRPC)
include(external/grpc)
else()
include(external/leveldb)
include(external/brpc)
endif()
endif()
include(external/snappy) # download snappy
include(external/snappystream)
include(external/threadpool)
......
......@@ -24,10 +24,12 @@ Currently supported `--model` argument include:
* Run the following command to start a benchmark job locally:
```bash
python fluid_benchmark.py --model mnist --device GPU
python fluid_benchmark.py --model mnist --device GPU
```
You can choose to use GPU/CPU training. With GPU training, you can specify
`--gpus <gpu_num>` to run multi GPU training.
You can set async mode parameter server. With async mode, you can specify
`--async_mode` to train model asynchronous.
* Run distributed training with parameter servers:
* see [run_fluid_benchmark.sh](https://github.com/PaddlePaddle/Paddle/blob/develop/benchmark/fluid/run_fluid_benchmark.sh) as an example.
* start parameter servers:
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
__all__ = ['parse_args', ]
BENCHMARK_MODELS = [
"machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm"
]
def parse_args():
parser = argparse.ArgumentParser('Fluid model benchmarks.')
parser.add_argument(
'--model',
type=str,
choices=BENCHMARK_MODELS,
default='resnet',
help='The model to run benchmark with.')
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size.')
# args related to learning rate
parser.add_argument(
'--learning_rate', type=float, default=0.001, help='The learning rate.')
# TODO(wuyi): add "--use_fake_data" option back.
parser.add_argument(
'--skip_batch_num',
type=int,
default=5,
help='The first num of minibatch num to skip, for better performance test'
)
parser.add_argument(
'--iterations', type=int, default=80, help='The number of minibatches.')
parser.add_argument(
'--pass_num', type=int, default=100, help='The number of passes.')
parser.add_argument(
'--data_format',
type=str,
default='NCHW',
choices=['NCHW', 'NHWC'],
help='The data data_format, now only support NCHW.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help='The device type.')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
# this option is available only for vgg and resnet.
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will use ParallelDo to run, else use Executor.')
parser.add_argument(
'--data_set',
type=str,
default='flowers',
choices=['cifar10', 'flowers'],
help='Optional dataset for benchmark.')
parser.add_argument(
'--infer_only', action='store_true', help='If set, run forward only.')
parser.add_argument(
'--use_cprof', action='store_true', help='If set, use cProfile.')
parser.add_argument(
'--use_nvprof',
action='store_true',
help='If set, use nvprof for CUDA.')
parser.add_argument(
'--no_test',
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
help='If set, optimize runtime memory before start.')
parser.add_argument(
'--use_fake_data',
action='store_true',
help='If set ommit the actual read data operators.')
parser.add_argument(
'--profile', action='store_true', help='If set, profile a few steps.')
parser.add_argument(
'--update_method',
type=str,
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--use_reader_op',
action='store_true',
help='Whether to use reader op, and must specify the data path if set this to true.'
)
parser.add_argument(
'--data_path',
type=str,
default="",
help='Directory that contains all the training recordio files.')
args = parser.parse_args()
return args
......@@ -24,108 +24,7 @@ import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler
BENCHMARK_MODELS = [
"machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm"
]
def parse_args():
parser = argparse.ArgumentParser('Fluid model benchmarks.')
parser.add_argument(
'--model',
type=str,
choices=BENCHMARK_MODELS,
default='resnet',
help='The model to run benchmark with.')
parser.add_argument(
'--batch_size',
type=int,
default=32,
help='The batch size on each gpu.')
parser.add_argument(
'--learning_rate', type=float, default=0.001, help='The learning rate.')
parser.add_argument(
'--skip_batch_num',
type=int,
default=5,
help='The first num of minibatch num to skip, for better performance test'
)
parser.add_argument(
'--iterations',
type=int,
default=80,
help='The number of minibatches, set to -1 to run all batches.')
parser.add_argument(
'--pass_num', type=int, default=100, help='The number of passes.')
parser.add_argument(
'--data_format',
type=str,
default='NCHW',
choices=['NCHW', 'NHWC'],
help='The data data_format, now only support NCHW.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help='The device type.')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
# this option is available only for vgg and resnet.
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will use ParallelDo to run, else use Executor.')
parser.add_argument(
'--data_set',
type=str,
default='flowers',
choices=['cifar10', 'flowers', 'imagenet'],
help='Optional dataset for benchmark.')
parser.add_argument(
'--infer_only', action='store_true', help='If set, run forward only.')
parser.add_argument(
'--use_cprof', action='store_true', help='If set, use cProfile.')
parser.add_argument(
'--use_nvprof',
action='store_true',
help='If set, use nvprof for CUDA.')
parser.add_argument(
'--no_test',
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
help='If set, optimize runtime memory before start.')
parser.add_argument(
'--use_fake_data',
action='store_true',
help='If set ommit the actual read data operators.')
parser.add_argument(
'--profile', action='store_true', help='If set, profile a few steps.')
parser.add_argument(
'--update_method',
type=str,
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--use_reader_op',
action='store_true',
help='Whether to use reader op, and must specify the data path if set this to true.'
)
parser.add_argument(
'--data_path',
type=str,
default="",
help='Directory that contains all the training recordio files.')
args = parser.parse_args()
return args
from args import *
def append_nccl2_prepare(trainer_id):
......@@ -160,7 +59,7 @@ def append_nccl2_prepare(trainer_id):
"nccl-based dist train.")
def dist_transpile(trainer_id):
def dist_transpile(trainer_id, args):
if trainer_id < 0:
return None, None
......@@ -182,7 +81,12 @@ def dist_transpile(trainer_id):
training_role = os.getenv("PADDLE_TRAINING_ROLE")
t = distribute_transpiler.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
t.transpile(
trainer_id,
pservers=pserver_endpoints,
trainers=trainers,
sync_mode=not args.async_mode,
slice_var_up=not args.no_split_var)
if training_role == "PSERVER":
pserver_program = t.get_pserver_program(current_endpoint)
pserver_startup_program = t.get_startup_program(current_endpoint,
......@@ -417,7 +321,7 @@ def main():
fluid.memory_optimize(fluid.default_main_program())
if args.update_method == "pserver":
train_prog, startup_prog = dist_transpile(trainer_id)
train_prog, startup_prog = dist_transpile(trainer_id, args)
if not train_prog:
raise Exception(
"Must configure correct environments to run dist train.")
......
......@@ -104,8 +104,9 @@ def get_model(args):
loss = fluid.layers.mean(x=loss)
# add acc
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(input=logit, label=fluid.layers.data(name='label', \
shape=[1], dtype='int64'))
shape=[1], dtype='int64'), total=batch_size_tensor)
inference_program = fluid.default_main_program().clone()
with fluid.program_guard(inference_program):
......
......@@ -82,7 +82,8 @@ def get_model(args):
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
else:
images = fluid.layers.data(name='data', shape=dshape, dtype='float32')
images = fluid.layers.data(
name='data', shape=data_shape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# Train program
......
......@@ -166,3 +166,7 @@ if(WITH_GOLANG)
endif()
endif(WITH_GOLANG)
if(WITH_GRPC)
add_definitions(-DPADDLE_WITH_GRPC)
endif(WITH_GRPC)
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
INCLUDE(ExternalProject)
SET(BRPC_SOURCES_DIR ${THIRD_PARTY_PATH}/brpc)
SET(BRPC_INSTALL_DIR ${THIRD_PARTY_PATH}/install/brpc)
SET(BRPC_INCLUDE_DIR "${BRPC_INSTALL_DIR}/include" CACHE PATH "brpc include directory." FORCE)
SET(BRPC_LIBRARIES "${BRPC_INSTALL_DIR}/lib/libbrpc.a" CACHE FILEPATH "brpc library." FORCE)
INCLUDE_DIRECTORIES(${BRPC_INCLUDE_DIR})
# Reference https://stackoverflow.com/questions/45414507/pass-a-list-of-prefix-paths-to-externalproject-add-in-cmake-args
set(prefix_path "${THIRD_PARTY_PATH}/install/gflags|${THIRD_PARTY_PATH}/install/leveldb|${THIRD_PARTY_PATH}/install/snappy|${THIRD_PARTY_PATH}/install/gtest|${THIRD_PARTY_PATH}/install/protobuf")
# If minimal .a is need, you can set WITH_DEBUG_SYMBOLS=OFF
ExternalProject_Add(
extern_brpc
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/brpc/brpc"
GIT_TAG "6d153dd7ff00f960ae6895c9c5fff0ce9f07aff2"
PREFIX ${BRPC_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_INSTALL_PREFIX=${BRPC_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${BRPC_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
-DCMAKE_PREFIX_PATH=${prefix_path}
-DBRPC_WITH_GLOG=ON
${EXTERNAL_OPTIONAL_ARGS}
LIST_SEPARATOR |
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${BRPC_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR:PATH=${BRPC_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
)
ADD_DEPENDENCIES(extern_brpc protobuf leveldb gflags glog gtest snappy)
ADD_LIBRARY(brpc STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET brpc PROPERTY IMPORTED_LOCATION ${BRPC_LIBRARIES})
ADD_DEPENDENCIES(brpc extern_brpc)
LIST(APPEND external_project_dependencies brpc)
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
INCLUDE(ExternalProject)
SET(LEVELDB_SOURCES_DIR ${THIRD_PARTY_PATH}/leveldb)
SET(LEVELDB_INSTALL_DIR ${THIRD_PARTY_PATH}/install/leveldb)
SET(LEVELDB_INCLUDE_DIR "${LEVELDB_INSTALL_DIR}/include" CACHE PATH "leveldb include directory." FORCE)
SET(LEVELDB_LIBRARIES "${LEVELDB_INSTALL_DIR}/lib/libleveldb.a" CACHE FILEPATH "leveldb library." FORCE)
INCLUDE_DIRECTORIES(${LEVELDB_INCLUDE_DIR})
ExternalProject_Add(
extern_leveldb
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${LEVELDB_SOURCES_DIR}
URL "https://github.com/google/leveldb/archive/v1.18.tar.gz"
URL_MD5 "73770de34a2a5ab34498d2e05b2b7fa0"
CONFIGURE_COMMAND ""
BUILD_COMMAND CXXFLAGS=-fPIC make -j ${NUM_OF_PROCESSOR} libleveldb.a
INSTALL_COMMAND mkdir -p ${LEVELDB_INSTALL_DIR}/lib/
&& cp ${LEVELDB_SOURCES_DIR}/src/extern_leveldb/libleveldb.a ${LEVELDB_LIBRARIES}
&& cp -r ${LEVELDB_SOURCES_DIR}/src/extern_leveldb/include ${LEVELDB_INSTALL_DIR}/
BUILD_IN_SOURCE 1
)
ADD_DEPENDENCIES(extern_leveldb snappy)
ADD_LIBRARY(leveldb STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET leveldb PROPERTY IMPORTED_LOCATION ${LEVELDB_LIBRARIES})
ADD_DEPENDENCIES(leveldb extern_leveldb)
LIST(APPEND external_project_dependencies leveldb)
......@@ -610,3 +610,21 @@ function(grpc_library TARGET_NAME)
COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
cc_library("${TARGET_NAME}" SRCS "${grpc_library_SRCS}" DEPS "${TARGET_NAME}_grpc" "${TARGET_NAME}_proto" "${grpc_library_DEPS}")
endfunction()
function(brpc_library TARGET_NAME)
set(oneValueArgs PROTO)
set(multiValueArgs SRCS DEPS)
set(options "")
cmake_parse_arguments(brpc_library "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
message(STATUS "generating brpc ${brpc_library_PROTO}")
get_filename_component(ABS_PROTO ${brpc_library_PROTO} ABSOLUTE)
get_filename_component(PROTO_WE ${brpc_library_PROTO} NAME_WE)
get_filename_component(PROTO_PATH ${ABS_PROTO} PATH)
protobuf_generate_cpp(brpc_proto_srcs brpc_proto_hdrs "${ABS_PROTO}")
cc_library("${TARGET_NAME}_proto" SRCS "${brpc_proto_srcs}")
cc_library("${TARGET_NAME}" SRCS "${brpc_library_SRCS}" DEPS "${TARGET_NAME}_proto" "${brpc_library_DEPS}")
endfunction()
......@@ -109,7 +109,6 @@ void MainWord2Vec(bool use_gpu) {
void MainImageClassification(bool use_gpu) {
int batch_size = 2;
bool use_mkldnn = false;
bool repeat = false;
NativeConfig config = GetConfig();
config.use_gpu = use_gpu;
......@@ -134,12 +133,8 @@ void MainImageClassification(bool use_gpu) {
std::vector<framework::LoDTensor*> cpu_fetchs1;
cpu_fetchs1.push_back(&output1);
TestInference<platform::CPUPlace, false, true>(config.model_dir,
cpu_feeds,
cpu_fetchs1,
repeat,
is_combined,
use_mkldnn);
TestInference<platform::CPUPlace, false, true>(
config.model_dir, cpu_feeds, cpu_fetchs1, repeat, is_combined);
auto predictor = CreatePaddlePredictor(config);
std::vector<PaddleTensor> paddle_tensor_feeds;
......
......@@ -87,7 +87,7 @@ cc_library(executor SRCS executor.cc DEPS op_registry device_context scope
framework_proto glog lod_rank_table feed_fetch_method)
cc_library(parallel_executor SRCS parallel_executor.cc DEPS graph_builder_factory threaded_ssa_graph_executor scope_buffered_ssa_graph_executor)
cc_library(parallel_executor SRCS parallel_executor.cc DEPS ssa_graph_builder_factory threaded_ssa_graph_executor scope_buffered_ssa_graph_executor)
cc_library(prune SRCS prune.cc DEPS framework_proto)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
......
......@@ -8,6 +8,7 @@ cc_library(rpc_op_handle SRCS rpc_op_handle.cc DEPS framework_proto scope place
cc_library(ssa_graph SRCS ssa_graph.cc DEPS var_handle op_handle_base)
cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS ssa_graph)
cc_library(ssa_graph_printer SRCS ssa_graph_printer.cc DEPS ssa_graph_builder)
cc_library(ssa_graph_checker SRCS ssa_graph_checker.cc DEPS ssa_graph_builder)
cc_library(variable_visitor SRCS variable_visitor.cc DEPS lod_tensor selected_rows)
......@@ -31,7 +32,7 @@ cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS
scale_loss_grad_op_handle rpc_op_handle ${multi_devices_graph_builder_deps} reduce_op_handle broadcast_op_handle)
cc_library(graph_builder_factory SRCS graph_builder_factory.cc DEPS multi_devices_graph_builder ssa_graph_printer)
cc_library(ssa_graph_builder_factory SRCS ssa_graph_builder_factory.cc DEPS multi_devices_graph_builder ssa_graph_printer ssa_graph_checker)
cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ssa_graph framework_proto)
cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope
......
......@@ -89,7 +89,7 @@ std::vector<std::string> MultiDevSSAGraphBuilder::FindDistTrainSendVars(
for (auto *op : program.Block(0).AllOps()) {
// TODO(Yancey1989): use a graceful method to find send op,
// instead of the the hard code string
if (op->Type() == "send_vars") {
if (op->Type() == "send") {
auto op_vars = op->InputArgumentNames();
send_vars.reserve(send_vars.size() +
std::distance(op_vars.begin(), op_vars.end()));
......@@ -468,17 +468,17 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(SSAGraph *result,
new RPCOpHandle(op, local_scopes_[0], op.Type(), places_[0]));
if (op.Type() == "send_barrier") {
ConnectOp(result, result->ops_.back().get(), "send_vars");
ConnectOp(result, result->ops_.back().get(), "send");
} else if (op.Type() == "recv") {
ConnectOp(result, result->ops_.back().get(), "send_barrier");
} else if (op.Type() == "fetch_barrier") {
ConnectOp(result, result->ops_.back().get(), "recv");
} else if (op.Type() == "send_vars") {
} else if (op.Type() == "send") {
// do nothing
} else {
PADDLE_THROW(
"rpc op should be in ["
"send_vars, send_barrier. recv, fetch_barrier]");
"send, send_barrier. recv, fetch_barrier]");
}
// TODO(Yancey1989): schedule rpc op on different place may
......
......@@ -11,8 +11,8 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/ssa_graph_builder.h"
#include <utility>
namespace paddle {
namespace framework {
......
......@@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/graph_builder_factory.h"
#include "paddle/fluid/framework/details/ssa_graph_builder_factory.h"
#include <fstream>
#include "paddle/fluid/framework/details/multi_devices_graph_builder.h"
#include "paddle/fluid/framework/details/ssa_graph_checker.h"
#include "paddle/fluid/framework/details/ssa_graph_printer.h"
namespace paddle {
......@@ -40,6 +41,8 @@ std::unique_ptr<SSAGraphBuilder> SSAGraphBuilderFactory::Create() {
res.reset(new SSAGraghBuilderWithPrinter(
std::move(fout), std::move(graphviz_printer), std::move(res)));
}
res.reset(new SSAGraghBuilderWithChecker(std::move(res)));
return res;
}
} // namespace details
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/ssa_graph.h"
#include <string>
#include "paddle/fluid/framework/details/ssa_graph_checker.h"
namespace paddle {
namespace framework {
namespace details {
bool SSAGraghBuilderWithChecker::IsValidGraph(const SSAGraph *graph) const {
std::unordered_map<OpHandleBase *, size_t> pending_ops;
std::unordered_set<VarHandleBase *> pending_vars;
std::unordered_set<VarHandleBase *> ready_vars;
std::unordered_set<OpHandleBase *> ready_ops;
auto insert_pending_var = [&](VarHandleBase *var) {
pending_vars.insert(var);
if (var->generated_op_ == nullptr) {
ready_vars.emplace(var);
}
};
for (auto &var_map : graph->vars_) {
for (auto &name_pair : var_map) {
for (auto &version_pair : name_pair.second) {
insert_pending_var(version_pair.get());
}
}
}
for (auto &var : graph->dep_vars_) {
insert_pending_var(var.get());
}
for (auto &op : graph->ops_) {
if (op->Inputs().empty()) {
ready_ops.insert(op.get());
} else {
pending_ops.insert({op.get(), op.get()->NoDupInputSize()});
}
}
auto run_all_ops = [&](std::unordered_set<OpHandleBase *> &set) {
for (auto *op : set) {
for (auto out : op->Outputs()) {
ready_vars.emplace(out);
}
}
set.clear();
};
while (!pending_vars.empty()) {
run_all_ops(ready_ops);
if (ready_vars.empty()) {
return false;
}
for (auto ready_var : ready_vars) {
pending_vars.erase(ready_var);
for (auto *op : ready_var->pending_ops_) {
auto &deps = --pending_ops[op];
if (deps == 0) {
ready_ops.insert(op);
}
}
}
ready_vars.clear();
}
return true;
}
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/framework/details/ssa_graph_builder.h"
namespace paddle {
namespace framework {
namespace details {
class SSAGraph;
class SSAGraghBuilderWithChecker : public SSAGraphBuilder {
public:
explicit SSAGraghBuilderWithChecker(
std::unique_ptr<SSAGraphBuilder>&& builder)
: builder_(std::move(builder)) {}
std::unique_ptr<SSAGraph> Build(const ProgramDesc& program) const override {
auto graph = builder_->Build(program);
PADDLE_ENFORCE(IsValidGraph(graph.get()));
return graph;
}
bool IsValidGraph(const SSAGraph* graph) const;
private:
std::unique_ptr<SSAGraphBuilder> builder_;
};
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -185,6 +185,7 @@ void ThreadedSSAGraphExecutor::InsertPendingVar(
ready_vars->Push(var);
}
}
void ThreadedSSAGraphExecutor::RunOp(
BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) {
auto op_run = [ready_var_q, op, this] {
......
......@@ -24,6 +24,7 @@ limitations under the License. */
#include "paddle/fluid/platform/profiler.h"
DECLARE_bool(benchmark);
DEFINE_bool(use_mkldnn, false, "Use MKLDNN to run");
namespace paddle {
namespace framework {
......@@ -115,6 +116,7 @@ void Executor::CreateVariables(const ProgramDesc& pdesc, Scope* scope,
void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
bool create_local_scope, bool create_vars) {
platform::RecordBlock b(block_id);
if (FLAGS_use_mkldnn) EnableMKLDNN(pdesc);
auto ctx = Prepare(pdesc, block_id);
RunPreparedContext(ctx.get(), scope, create_local_scope, create_vars);
}
......@@ -214,6 +216,7 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
const std::string& feed_holder_name,
const std::string& fetch_holder_name) {
platform::RecordBlock b(kProgramId);
if (FLAGS_use_mkldnn) EnableMKLDNN(program);
bool has_feed_ops =
has_feed_operators(program.Block(0), *feed_targets, feed_holder_name);
bool has_fetch_ops =
......@@ -225,7 +228,6 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
unique_ptr_of_copy_program.reset(new ProgramDesc(program));
copy_program = unique_ptr_of_copy_program.get();
}
auto* global_block = copy_program->MutableBlock(0);
if (!has_feed_ops) {
......@@ -378,5 +380,19 @@ void Executor::RunPreparedContext(
}
}
void Executor::EnableMKLDNN(const ProgramDesc& program) {
#ifdef PADDLE_WITH_MKLDNN
VLOG(3) << "use_mkldnn=True";
for (size_t bid = 0; bid < program.Size(); ++bid) {
auto* block = const_cast<ProgramDesc&>(program).MutableBlock(bid);
for (auto* op : block->AllOps()) {
if (op->HasAttr("use_mkldnn")) {
op->SetAttr("use_mkldnn", true);
}
}
}
#endif
}
} // namespace framework
} // namespace paddle
......@@ -81,6 +81,8 @@ class Executor {
const std::string& feed_holder_name = "feed",
const std::string& fetch_holder_name = "fetch");
void EnableMKLDNN(const ProgramDesc& program);
private:
const platform::Place place_;
};
......
......@@ -71,6 +71,7 @@ message OpProto {
optional bool duplicable = 3 [ default = false ];
optional bool intermediate = 4 [ default = false ];
optional bool dispensable = 5 [ default = false ];
optional string reuse = 6;
}
// AttrProto describes the C++ type Attribute.
......
......@@ -17,12 +17,11 @@ limitations under the License. */
namespace paddle {
namespace framework {
static OpInfoMap* g_op_info_map = nullptr;
// C++11 removes the need for manual locking. Concurrent execution shall wait if
// a static local variable is already being initialized.
// https://stackoverflow.com/questions/11711920/how-to-implement-multithread-safe-singleton-in-c11-without-using-mutex
OpInfoMap& OpInfoMap::Instance() {
if (g_op_info_map == nullptr) {
g_op_info_map = new OpInfoMap();
}
static OpInfoMap* g_op_info_map = new OpInfoMap();
return *g_op_info_map;
}
} // namespace framework
......
......@@ -21,6 +21,7 @@ namespace framework {
void OpProtoAndCheckerMaker::Validate() {
validated_ = true;
CheckNoDuplicatedInOutAttrs();
CheckReuseVars();
}
OpProtoAndCheckerMaker::VariableBuilder OpProtoAndCheckerMaker::AddInput(
......@@ -56,6 +57,24 @@ void OpProtoAndCheckerMaker::CheckNoDuplicatedInOutAttrs() {
}
}
void OpProtoAndCheckerMaker::CheckReuseVars() {
std::unordered_set<std::string> names;
for (auto& input : proto_->inputs()) {
names.insert(input.name());
}
auto checker = [&](const std::string& name, const std::string& reused) {
PADDLE_ENFORCE(
names.count(reused),
"Output [%s] reuse Input [%s], but the input is not registered.", name,
reused);
};
for (auto& output : proto_->outputs()) {
if (output.has_reuse()) {
checker(output.name(), output.reuse());
}
}
}
void OpProtoAndCheckerMaker::operator()(proto::OpProto* proto,
OpAttrChecker* attr_checker) {
proto_ = proto;
......
......@@ -14,6 +14,8 @@ limitations under the License. */
#pragma once
#include <string>
#include <unordered_set>
#include "glog/logging.h"
#include "paddle/fluid/framework/attribute.h"
#include "paddle/fluid/framework/framework.pb.h"
......@@ -64,6 +66,11 @@ class OpProtoAndCheckerMaker {
var_->set_dispensable(true);
return *this;
}
VariableBuilder &Reuse(const std::string &name) {
var_->set_reuse(name);
return *this;
}
};
VariableBuilder AddInput(const std::string &name, const std::string &comment);
......@@ -89,6 +96,8 @@ class OpProtoAndCheckerMaker {
void CheckNoDuplicatedInOutAttrs();
void Validate();
void CheckReuseVars();
proto::OpProto *proto_;
OpAttrChecker *op_checker_;
bool validated_{false};
......
......@@ -47,3 +47,23 @@ TEST(ProtoMaker, DuplicatedInOut) {
ASSERT_THROW(proto_maker(&op_proto, &op_checker),
paddle::platform::EnforceNotMet);
}
class TestInplaceProtoMaker : public paddle::framework::OpProtoAndCheckerMaker {
public:
void Make() {
AddInput("X", "input of test op");
AddOutput("XOut", "output of test op").Reuse("X");
AddOutput("NoOut", "output of test op").Reuse("NotExists");
}
};
TEST(ProtoMaker, InplaceOutput) {
paddle::framework::proto::OpProto op_proto;
paddle::framework::OpAttrChecker op_checker;
TestInplaceProtoMaker proto_maker;
ASSERT_THROW(proto_maker(&op_proto, &op_checker),
paddle::platform::EnforceNotMet);
// proto_maker(&op_proto, &op_checker);
// proto_maker.Make();
// ASSERT_THROW(proto_maker.Validate(), paddle::platform::EnforceNotMet);
}
......@@ -22,8 +22,8 @@ limitations under the License. */
#include "paddle/fluid/platform/nccl_helper.h"
#endif
#include "paddle/fluid/framework/details/graph_builder_factory.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/ssa_graph_builder_factory.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
#include "paddle/fluid/platform/profiler.h"
......
......@@ -64,7 +64,8 @@ class TRTConvertValidation {
TRTConvertValidation(int batch_size,
const std::unordered_set<std::string>& parameters,
framework::Scope& scope, int workspace_size = 1 << 10)
framework::Scope& scope, // NOLINT
int workspace_size = 1 << 10)
: parameters_(parameters), scope_(scope) {
// create engine.
engine_.reset(new TensorRTEngine(10, 1 << 10, &stream_));
......
......@@ -21,7 +21,6 @@ DEFINE_string(fp16_dirname, "", "Directory of the float16 inference model.");
DEFINE_int32(batch_size, 1, "Batch size of input data");
DEFINE_int32(repeat, 1, "Running the inference program repeat times");
DEFINE_bool(skip_cpu, false, "Skip the cpu test");
DEFINE_bool(use_mkldnn, false, "Use MKLDNN to run inference");
TEST(inference, image_classification) {
if (FLAGS_dirname.empty() || FLAGS_batch_size < 1 || FLAGS_repeat < 1) {
......@@ -59,10 +58,8 @@ TEST(inference, image_classification) {
// Run inference on CPU
LOG(INFO) << "--- CPU Runs: ---";
LOG(INFO) << "Batch size is " << FLAGS_batch_size;
LOG(INFO) << "FLAGS_use_mkldnn: " << FLAGS_use_mkldnn;
TestInference<paddle::platform::CPUPlace, false, true>(
dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, is_combined,
FLAGS_use_mkldnn);
dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, is_combined);
LOG(INFO) << output1.dims();
}
......
......@@ -27,7 +27,6 @@ limitations under the License. */
DEFINE_string(model_path, "", "Directory of the inference model.");
DEFINE_string(data_file, "", "File of input index data.");
DEFINE_int32(repeat, 100, "Running the inference program repeat times");
DEFINE_bool(use_mkldnn, false, "Use MKLDNN to run inference");
DEFINE_bool(prepare_vars, true, "Prepare variables before executor");
DEFINE_int32(num_threads, 1, "Number of threads should be used");
......@@ -190,9 +189,6 @@ TEST(inference, nlp) {
std::unique_ptr<paddle::framework::ProgramDesc> inference_program;
inference_program = InitProgram(&executor, scope.get(), FLAGS_model_path,
/*model combined*/ false);
if (FLAGS_use_mkldnn) {
EnableMKLDNN(inference_program);
}
// always prepare context
std::unique_ptr<paddle::framework::ExecutorPrepareContext> ctx;
ctx = executor.Prepare(*inference_program, 0);
......
......@@ -22,6 +22,8 @@ limitations under the License. */
#include "paddle/fluid/inference/io.h"
#include "paddle/fluid/platform/profiler.h"
DECLARE_bool(use_mkldnn);
template <typename T>
void SetupTensor(paddle::framework::LoDTensor* input,
paddle::framework::DDim dims, T lower, T upper) {
......@@ -133,24 +135,11 @@ std::vector<std::vector<int64_t>> GetFeedTargetShapes(
return feed_target_shapes;
}
void EnableMKLDNN(
const std::unique_ptr<paddle::framework::ProgramDesc>& program) {
for (size_t bid = 0; bid < program->Size(); ++bid) {
auto* block = program->MutableBlock(bid);
for (auto* op : block->AllOps()) {
if (op->HasAttr("use_mkldnn")) {
op->SetAttr("use_mkldnn", true);
}
}
}
}
template <typename Place, bool CreateVars = true, bool PrepareContext = false>
void TestInference(const std::string& dirname,
const std::vector<paddle::framework::LoDTensor*>& cpu_feeds,
const std::vector<paddle::framework::LoDTensor*>& cpu_fetchs,
const int repeat = 1, const bool is_combined = false,
const bool use_mkldnn = false) {
const int repeat = 1, const bool is_combined = false) {
// 1. Define place, executor, scope
auto place = Place();
auto executor = paddle::framework::Executor(place);
......@@ -182,9 +171,6 @@ void TestInference(const std::string& dirname,
"init_program",
paddle::platform::DeviceContextPool::Instance().Get(place));
inference_program = InitProgram(&executor, scope, dirname, is_combined);
if (use_mkldnn) {
EnableMKLDNN(inference_program);
}
}
// Disable the profiler and print the timing information
paddle::platform::DisableProfiler(paddle::platform::EventSortingKey::kDefault,
......@@ -210,7 +196,10 @@ void TestInference(const std::string& dirname,
fetch_targets[fetch_target_names[i]] = cpu_fetchs[i];
}
// 6. Run the inference program
// 6. If export Flags_use_mkldnn=True, use mkldnn related ops.
if (FLAGS_use_mkldnn) executor.EnableMKLDNN(*inference_program);
// 7. Run the inference program
{
if (!CreateVars) {
// If users don't want to create and destroy variables every time they
......
......@@ -186,19 +186,23 @@ endif()
add_subdirectory(detail)
if(WITH_DISTRIBUTE)
set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf)
set(DISTRIBUTE_DEPS "")
if(WITH_GRPC)
set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf)
else()
set(DISTRIBUTE_DEPS sendrecvop_brpc brpc leveldb snappystream snappy protobuf ssl crypto zlib)
endif()
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
op_library(send_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(send_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(prefetch_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(prefetch_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(recv_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(recv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(listen_and_serv_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(listen_and_serv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(send_vars_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(send_vars_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(send_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(send_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(send_barrier_op DEPS ${DISTRIBUTE_DEPS})
op_library(fetch_barrier_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
......@@ -208,15 +212,18 @@ if(WITH_DISTRIBUTE)
# listen_and_serv_op sum_op executor SERIAL)
if(WITH_GPU)
set_source_files_properties(test_send_nccl_id.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(test_send_nccl_id SRCS test_send_nccl_id.cc DEPS send_op
listen_and_serv_op executor SERIAL)
op_library(gen_nccl_id_op DEPS nccl_common sendrecvop_grpc)
cc_test(test_send_nccl_id SRCS test_send_nccl_id.cc DEPS listen_and_serv_op executor SERIAL)
if(WITH_GRPC)
op_library(gen_nccl_id_op DEPS nccl_common sendrecvop_grpc)
else()
op_library(gen_nccl_id_op DEPS nccl_common sendrecvop_brpc)
endif()
set_source_files_properties(gen_nccl_id_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
else()
set(DEPS_OPS ${DEPS_OPS} gen_nccl_id_op)
endif()
else()
set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op fetch_barrier_op gen_nccl_id_op)
set(DEPS_OPS ${DEPS_OPS} prefetch_op recv_op listen_and_serv_op send_op send_barrier_op fetch_barrier_op gen_nccl_id_op)
endif()
op_library(cross_entropy_op DEPS cross_entropy)
......
......@@ -25,7 +25,7 @@ namespace operators {
public: \
void Make() override { \
AddInput("X", "Input of " #OP_NAME " operator"); \
AddOutput("Out", "Output of " #OP_NAME " operator"); \
AddOutput("Out", "Output of " #OP_NAME " operator").Reuse("X"); \
AddAttr<bool>("use_mkldnn", \
"(bool, default false) Only used in mkldnn kernel") \
.SetDefault(false); \
......
......@@ -89,9 +89,9 @@ class AdamOpMaker : public framework::OpProtoAndCheckerMaker {
AddInput("Beta1Pow", "(Tensor) Input beta1 power accumulator");
AddInput("Beta2Pow", "(Tensor) Input beta2 power accumulator");
AddOutput("ParamOut", "(Tensor) Output parameter");
AddOutput("Moment1Out", "(Tensor) Output first moment");
AddOutput("Moment2Out", "(Tensor) Output second moment");
AddOutput("ParamOut", "(Tensor) Output parameter").Reuse("Param");
AddOutput("Moment1Out", "(Tensor) Output first moment").Reuse("Moment1");
AddOutput("Moment2Out", "(Tensor) Output second moment").Reuse("Moment2");
AddAttr<float>("beta1",
"(float, default 0.9) "
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/arg_min_max_op_base.h"
REGISTER_OPERATOR(arg_max, paddle::operators::ArgMinMaxOp,
paddle::operators::ArgMaxOpMaker,
paddle::framework::EmptyGradOpMaker);
REGISTER_OP_CPU_KERNEL(
arg_max,
paddle::operators::ArgMaxKernel<paddle::platform::CPUDeviceContext, float>,
paddle::operators::ArgMaxKernel<paddle::platform::CPUDeviceContext, double>,
paddle::operators::ArgMaxKernel<paddle::platform::CPUDeviceContext,
int64_t>,
paddle::operators::ArgMaxKernel<paddle::platform::CPUDeviceContext,
int32_t>,
paddle::operators::ArgMaxKernel<paddle::platform::CPUDeviceContext,
int16_t>,
paddle::operators::ArgMaxKernel<paddle::platform::CPUDeviceContext, size_t>,
paddle::operators::ArgMaxKernel<paddle::platform::CPUDeviceContext,
uint8_t>);
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/arg_min_max_op_base.h"
REGISTER_OP_CUDA_KERNEL(
arg_max,
paddle::operators::ArgMaxKernel<paddle::platform::CUDADeviceContext, float>,
paddle::operators::ArgMaxKernel<paddle::platform::CUDADeviceContext,
double>,
paddle::operators::ArgMaxKernel<paddle::platform::CUDADeviceContext,
int64_t>,
paddle::operators::ArgMaxKernel<paddle::platform::CUDADeviceContext,
int32_t>,
paddle::operators::ArgMaxKernel<paddle::platform::CUDADeviceContext,
int16_t>,
paddle::operators::ArgMaxKernel<paddle::platform::CUDADeviceContext,
size_t>,
paddle::operators::ArgMaxKernel<paddle::platform::CUDADeviceContext,
uint8_t>);
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <string>
#include <type_traits>
#include <vector>
#include "paddle/fluid/framework/ddim.h"
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/string/printf.h"
namespace paddle {
namespace operators {
enum ArgMinMaxType { kArgMin, kArgMax };
template <typename DeviceContext, typename T, typename Tout, int64_t Rank,
ArgMinMaxType argMinMaxValue>
struct ArgMinMaxFunctor {};
#define DECLARE_ARG_MIN_MAX_FUNCTOR(eigen_op_type, enum_argminmax_value) \
template <typename DeviceContext, typename T, typename Tout, int64_t Rank> \
struct ArgMinMaxFunctor<DeviceContext, T, Tout, Rank, \
enum_argminmax_value> { \
void operator()(const DeviceContext& ctx, const framework::LoDTensor& in, \
framework::LoDTensor* out, int64_t axis) { \
auto in_eigen = framework::EigenTensor<T, Rank>::From(in); \
auto out_eigen = framework::EigenTensor<Tout, Rank - 1>::From(*out); \
out_eigen.device(*(ctx.eigen_device())) = \
in_eigen.eigen_op_type(axis).template cast<Tout>(); \
} \
}
DECLARE_ARG_MIN_MAX_FUNCTOR(argmin, ArgMinMaxType::kArgMin);
DECLARE_ARG_MIN_MAX_FUNCTOR(argmax, ArgMinMaxType::kArgMax);
template <typename DeviceContext, typename T, typename Tout,
ArgMinMaxType EnumArgMinMaxValue>
class ArgMinMaxKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
auto& x = *(ctx.Input<framework::LoDTensor>("X"));
auto& out = *(ctx.Output<framework::LoDTensor>("Out"));
out.mutable_data<Tout>(ctx.GetPlace());
auto axis = ctx.Attr<int64_t>("axis");
auto& dev_ctx = ctx.template device_context<DeviceContext>();
#define CALL_ARG_MINMAX_FUNCTOR(rank) \
ArgMinMaxFunctor<DeviceContext, T, Tout, rank, EnumArgMinMaxValue> \
functor##rank; \
functor##rank(dev_ctx, x, &out, axis)
switch (x.dims().size()) {
case 1:
CALL_ARG_MINMAX_FUNCTOR(1);
break;
case 2:
CALL_ARG_MINMAX_FUNCTOR(2);
break;
case 3:
CALL_ARG_MINMAX_FUNCTOR(3);
break;
case 4:
CALL_ARG_MINMAX_FUNCTOR(4);
break;
case 5:
CALL_ARG_MINMAX_FUNCTOR(5);
break;
case 6:
CALL_ARG_MINMAX_FUNCTOR(6);
break;
default:
PADDLE_THROW(
"%s operator doesn't supports tensors whose ranks are greater "
"than 6.",
(EnumArgMinMaxValue == kArgMin ? "argmin" : "argmax"));
break;
#undef CALL_ARG_MINMAX_FUNCTOR
}
}
};
template <typename DeviceContext, typename T>
using ArgMinKernel =
ArgMinMaxKernel<DeviceContext, T, int64_t, ArgMinMaxType::kArgMin>;
template <typename DeviceContext, typename T>
using ArgMaxKernel =
ArgMinMaxKernel<DeviceContext, T, int64_t, ArgMinMaxType::kArgMax>;
class ArgMinMaxOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext* ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("X"), "Input(X) should not be null");
PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) should not be null");
const auto& x_dims = ctx->GetInputDim("X");
int64_t axis = ctx->Attrs().Get<int64_t>("axis");
PADDLE_ENFORCE(axis >= -x_dims.size() && axis < x_dims.size(),
"'axis' must be inside [-Rank(X), Rank(X))");
auto x_rank = x_dims.size();
if (axis < 0) axis += x_rank;
std::vector<int64_t> vec;
for (int64_t i = 0; i < axis; i++) vec.push_back(x_dims[i]);
for (int64_t i = axis + 1; i < x_rank; i++) vec.push_back(x_dims[i]);
ctx->SetOutputDim("Out", framework::make_ddim(vec));
}
};
class BaseArgMinMaxOpMaker : public framework::OpProtoAndCheckerMaker {
protected:
virtual const char* OpName() const = 0;
virtual const char* Name() const = 0;
public:
void Make() override {
AddInput("X", "Input tensor.");
AddOutput("Out", "Output tensor.");
AddAttr<int64_t>("axis", "The axis in which to compute the arg indics.");
AddComment(string::Sprintf(R"DOC(
%s Operator.
Computes the indices of the %s elements of the input tensor's element
along the provided axis.
)DOC",
OpName(), Name()));
}
};
class ArgMinOpMaker : public BaseArgMinMaxOpMaker {
protected:
const char* OpName() const override { return "ArgMin"; }
const char* Name() const override { return "min"; }
};
class ArgMaxOpMaker : public BaseArgMinMaxOpMaker {
protected:
const char* OpName() const override { return "ArgMax"; }
const char* Name() const override { return "max"; }
};
} // namespace operators
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/arg_min_max_op_base.h"
REGISTER_OPERATOR(arg_min, paddle::operators::ArgMinMaxOp,
paddle::operators::ArgMinOpMaker,
paddle::framework::EmptyGradOpMaker);
REGISTER_OP_CPU_KERNEL(
arg_min,
paddle::operators::ArgMinKernel<paddle::platform::CPUDeviceContext, float>,
paddle::operators::ArgMinKernel<paddle::platform::CPUDeviceContext, double>,
paddle::operators::ArgMinKernel<paddle::platform::CPUDeviceContext,
int64_t>,
paddle::operators::ArgMinKernel<paddle::platform::CPUDeviceContext,
int32_t>,
paddle::operators::ArgMinKernel<paddle::platform::CPUDeviceContext,
int16_t>,
paddle::operators::ArgMinKernel<paddle::platform::CPUDeviceContext, size_t>,
paddle::operators::ArgMinKernel<paddle::platform::CPUDeviceContext,
uint8_t>);
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/arg_min_max_op_base.h"
REGISTER_OP_CUDA_KERNEL(
arg_min,
paddle::operators::ArgMinKernel<paddle::platform::CUDADeviceContext, float>,
paddle::operators::ArgMinKernel<paddle::platform::CUDADeviceContext,
double>,
paddle::operators::ArgMinKernel<paddle::platform::CUDADeviceContext,
int64_t>,
paddle::operators::ArgMinKernel<paddle::platform::CUDADeviceContext,
int32_t>,
paddle::operators::ArgMinKernel<paddle::platform::CUDADeviceContext,
int16_t>,
paddle::operators::ArgMinKernel<paddle::platform::CUDADeviceContext,
size_t>,
paddle::operators::ArgMinKernel<paddle::platform::CUDADeviceContext,
uint8_t>);
......@@ -151,13 +151,15 @@ class BatchNormOpMaker : public framework::OpProtoAndCheckerMaker {
AddInput("Variance",
"The global variance (for training) "
"or estimated Variance (for testing)");
AddOutput("Y", "result after normalization");
AddOutput("Y", "result after normalization").Reuse("X");
AddOutput("MeanOut",
"Share memory with Mean. "
"Store the global mean when training");
"Store the global mean when training")
.Reuse("Mean");
AddOutput("VarianceOut",
"Share memory with Variance. "
"Store the global Variance when training");
"Store the global Variance when training")
.Reuse("Variance");
AddOutput("SavedMean",
"Mean of the current mini batch, "
"will apply to output when training")
......
......@@ -54,18 +54,18 @@ class BatchSizeLikeOp : public framework::OperatorWithKernel {
class BatchSizeLikeOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() final {
AddInput("Input",
"(Tensor) Tensor "
"whose input_dim_idx'th dimension specifies the batch_size");
AddInput(
"Input",
"Tensor whose input_dim_idx'th dimension specifies the batch_size");
AddOutput("Out",
"(Tensor) Tensor of specified shape will be filled "
"Tensor of specified shape will be filled "
"with the specified value");
AddAttr<std::vector<int>>("shape", "(vector<int>) The shape of the output");
AddAttr<std::vector<int>>("shape", "The shape of the output");
AddAttr<int>("input_dim_idx",
"(int, default 0) The index of input's batch size dimension")
"default 0. The index of input's batch size dimension")
.SetDefault(0);
AddAttr<int>("output_dim_idx",
"(int, default 0) The index of output's batch size dimension")
"default 0. The index of output's batch size dimension")
.SetDefault(0);
Apply();
}
......
......@@ -56,17 +56,16 @@ class BilinearInterpOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("X",
"(Tensor) The input tensor of bilinear interpolation, "
"The input tensor of bilinear interpolation, "
"This is a 4-D tensor with shape of (N x C x h x w)");
AddInput("OutSize",
"(Tensor) This is a 1-D tensor with two number. "
"This is a 1-D tensor with two number. "
"The first number is height and the second number is width.")
.AsDispensable();
AddOutput("Out",
"(Tensor) The dimension of output is (N x C x out_h x out_w]");
AddOutput("Out", "The dimension of output is (N x C x out_h x out_w)");
AddAttr<int>("out_h", "(int) output height of bilinear interpolation op.");
AddAttr<int>("out_w", "(int) output width of bilinear interpolation op.");
AddAttr<int>("out_h", "output height of bilinear interpolation op.");
AddAttr<int>("out_w", "output width of bilinear interpolation op.");
AddComment(R"DOC(
Bilinear interpolation is an extension of linear interpolation for
interpolating functions of two variables (e.g. H-direction and
......
......@@ -125,7 +125,8 @@ void Conv2DOpMaker::Make() {
"input image channels divided by the groups.");
AddOutput("Output",
"(Tensor) The output tensor of convolution operator. "
"The format of output tensor is also NCHW.");
"The format of output tensor is also NCHW.")
.Reuse("Input");
AddAttr<std::vector<int>>("strides",
"(vector<int> default:{1, 1}), the "
"strides(h_stride, w_stride) of "
......@@ -220,7 +221,8 @@ void Conv3DOpMaker::Make() {
"input image channels divided by the groups.");
AddOutput("Output",
"(Tensor) The output tensor of convolution operator."
"The format of output tensor is also NCDHW.");
"The format of output tensor is also NCDHW.")
.Reuse("Input");
AddAttr<std::vector<int>>("strides",
"(vector<int>, default:{1, 1, 1}), the "
"strides(d_stride, h_stride, w_stride) of "
......
......@@ -124,7 +124,8 @@ class CrossEntropyOpMaker : public framework::OpProtoAndCheckerMaker {
"Tensor<float/double> with shape [N x D].");
AddOutput("Y",
"(Tensor, default Tensor<float>), a 2-D tensor with shape "
"[N x 1]. The cross entropy loss.");
"[N x 1]. The cross entropy loss.")
.Reuse("X");
AddAttr<bool>("soft_label",
"(bool, default false), a flag indicating whether to "
"interpretate the given labels as soft labels.")
......
if(WITH_DISTRIBUTE)
if(NOT WITH_DISTRIBUTE)
return()
endif()
if(WITH_GRPC)
grpc_library(sendrecvop_grpc SRCS bytebuffer_stream.cc sendrecvop_utils.cc grpc_client.cc
request_handler_impl.cc rpc_client.cc rpc_server.cc grpc_server.cc variable_response.cc PROTO send_recv.proto DEPS lod_tensor
selected_rows memory)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(serde_test.cc grpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(serde_test SRCS serde_test.cc variable_response.cc DEPS grpc++_unsecure grpc_unsecure gpr
set_source_files_properties(grpc_serde_test.cc rpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(serde_test SRCS grpc_serde_test.cc variable_response.cc DEPS grpc++_unsecure grpc_unsecure gpr
cares zlib protobuf sendrecvop_grpc SERIAL)
cc_test(grpc_server_test SRCS grpc_server_test.cc DEPS sendrecvop_grpc
cc_test(grpc_server_test SRCS rpc_server_test.cc DEPS sendrecvop_grpc
grpc++_unsecure grpc_unsecure gpr cares zlib protobuf executor
proto_desc lookup_table_op SERIAL)
return()
endif()
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(brpc_server.cc brpc_client.cc rpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
brpc_library(sendrecvop_brpc SRCS brpc_client.cc brpc_server.cc rpc_server.cc rpc_client.cc request_handler_impl.cc
PROTO send_recv.proto
DEPS lod_tensor selected_rows memory)
find_library(OPENSSL_CRYPTO_LIBRARY_STATIC NAMES libcrypto.so)
ADD_LIBRARY(crypto SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET crypto PROPERTY IMPORTED_LOCATION ${OPENSSL_CRYPTO_LIBRARY_STATIC})
find_library(OPENSSL_SSL_LIBRARY_STATIC NAMES libssl.so)
ADD_LIBRARY(ssl SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET ssl PROPERTY IMPORTED_LOCATION ${OPENSSL_SSL_LIBRARY_STATIC})
cc_test(brpc_server_test SRCS rpc_server_test.cc DEPS sendrecvop_brpc
brpc protobuf leveldb gflags glog
protobuf executor proto_desc lookup_table_op snappystream snappy ssl crypto SERIAL)
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/detail/brpc_client.h"
#include "paddle/fluid/framework/threadpool.h"
namespace paddle {
namespace operators {
namespace detail {
DEFINE_int32(brpc_channel_num, 24,
"Number of channels to send requests connected to one server");
DEFINE_int32(timeout_ms, 30000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
BRPCClient::~BRPCClient() { Wait(); }
void HandleSendResponse(brpc::Controller* cntl,
sendrecv::VoidMessage* response) {
// std::unique_ptr makes sure cntl/response will be deleted before returning.
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<sendrecv::VoidMessage> response_guard(response);
if (cntl->Failed()) {
LOG(WARNING) << "Fail to send EchoRequest, " << cntl->ErrorText();
return;
}
LOG(INFO) << "Received response from " << cntl->remote_side()
<< " latency=" << cntl->latency_us() << "us";
}
bool BRPCClient::AsyncSendVar(const std::string& ep,
const platform::DeviceContext& ctx,
const framework::Scope& scope,
const std::string& var_name, int64_t time_out) {
const platform::DeviceContext* p_ctx = &ctx;
const std::string ep_val = ep;
const std::string var_name_val = var_name;
const framework::Scope* p_scope = &scope;
const auto ch_ptr = GetChannel(ep_val);
framework::AsyncIO(
[var_name_val, p_ctx, ep_val, p_scope, time_out, ch_ptr, this] {
auto ch_ctx = ch_ptr->Pop();
brpc::Controller* cntl = new brpc::Controller();
sendrecv::VoidMessage* response = new sendrecv::VoidMessage();
cntl->set_timeout_ms(time_out);
google::protobuf::Closure* done =
brpc::NewCallback(&HandleSendResponse, cntl, response);
sendrecv::VariableMessage request;
ch_ctx->stub->SendVariable(cntl, &request, response, done);
});
req_count_++;
return true;
}
void HandleGetResponse(brpc::Controller* cntl,
sendrecv::VariableMessage* response) {
// std::unique_ptr makes sure cntl/response will be deleted before returning.
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<sendrecv::VariableMessage> response_guard(response);
if (cntl->Failed()) {
LOG(WARNING) << "Fail to send EchoRequest, " << cntl->ErrorText();
return;
}
LOG(INFO) << "Received response from " << cntl->remote_side()
<< " latency=" << cntl->latency_us() << "us";
// framework::Variable* outvar = nullptr;
// DeserializeFromByteBuffer(ret_msg, *var_h.ctx, var_h.scope, &outvar);
}
bool BRPCClient::AsyncGetVar(const std::string& ep,
const platform::DeviceContext& ctx,
const framework::Scope& scope,
const std::string& var_name, int64_t time_out) {
const platform::DeviceContext* p_ctx = &ctx;
const std::string ep_val = ep;
const std::string var_name_val = var_name;
const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val);
framework::AsyncIO(
[var_name_val, ep_val, p_scope, p_ctx, time_out, ch, this] {});
req_count_++;
return true;
}
bool BRPCClient::AsyncPrefetchVar(const std::string& ep,
const platform::DeviceContext& ctx,
const framework::Scope& scope,
const std::string& in_var_name,
const std::string& out_var_name,
int64_t time_out) {
const platform::DeviceContext* p_ctx = &ctx;
const std::string ep_val = ep;
const std::string in_var_name_val = in_var_name;
const std::string out_var_name_val = out_var_name;
const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val);
framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx,
time_out, ch, this] {});
req_count_++;
return true;
}
void BRPCClient::AsyncSendBatchBarrier(const std::string& ep,
int64_t time_out) {
req_count_++;
}
void BRPCClient::AsyncSendFetchBarrier(const std::string& ep,
int64_t time_out) {
req_count_++;
}
void BRPCClient::Wait() {
std::unique_lock<std::mutex> lk(sync_mutex_);
sync_cond_.wait(lk, [this] { return req_count_ == 0; });
}
ChannelQueuePtr BRPCClient::GetChannel(const std::string& ep) {
{
std::lock_guard<std::mutex> guard(chan_mutex_);
auto it = channels_.find(ep);
if (it != channels_.end()) {
return it->second;
}
}
ChannelQueuePtr q(new framework::BlockingQueue<ChannelContextPtr>());
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.connection_type = "pooled";
options.connect_timeout_ms = 100;
options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
options.max_retry = FLAGS_max_retry;
for (int i = 0; i < FLAGS_brpc_channel_num; ++i) {
std::shared_ptr<ChannelContext> c(new ChannelContext());
if (c->channel.Init(ep.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return nullptr;
}
c->stub.reset(new sendrecv::SendRecvService_Stub(
static_cast<google::protobuf::RpcChannel*>(&c->channel)));
q->Push(c);
}
{
std::lock_guard<std::mutex> guard(chan_mutex_);
channels_[ep] = q;
}
return q;
}
} // namespace detail
} // namespace operators
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <time.h>
#include <chrono> // NOLINT
#include <ctime>
#include <functional>
#include <iostream>
#include <map>
#include <mutex> // NOLINT
#include <string>
#include <vector>
#include "brpc/channel.h"
#include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/operators/detail/rpc_client.h"
#include "paddle/fluid/operators/detail/send_recv.pb.h"
#include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN
namespace paddle {
namespace operators {
namespace detail {
struct ChannelContext {
brpc::Channel channel;
std::shared_ptr<sendrecv::SendRecvService_Stub> stub;
};
typedef std::shared_ptr<ChannelContext> ChannelContextPtr;
typedef std::shared_ptr<framework::BlockingQueue<ChannelContextPtr>>
ChannelQueuePtr;
class BRPCClient : public RPCClient {
public:
BRPCClient() {}
virtual ~BRPCClient();
bool AsyncSendVar(const std::string& ep, const platform::DeviceContext& ctx,
const framework::Scope& scope, const std::string& var_name,
int64_t time_out = RPCClient::rpc_time_out) override;
bool AsyncGetVar(const std::string& ep, const platform::DeviceContext& ctx,
const framework::Scope& scope, const std::string& var_name,
int64_t time_out = RPCClient::rpc_time_out) override;
bool AsyncPrefetchVar(const std::string& ep,
const platform::DeviceContext& ctx,
const framework::Scope& scope,
const std::string& in_var_name,
const std::string& out_var_name,
int64_t time_out = RPCClient::rpc_time_out) override;
void AsyncSendBatchBarrier(
const std::string& ep,
int64_t time_out = RPCClient::rpc_time_out) override;
void AsyncSendFetchBarrier(
const std::string& ep,
int64_t time_out = RPCClient::rpc_time_out) override;
void Wait() override;
private:
void Proceed();
ChannelQueuePtr GetChannel(const std::string& ep);
private:
std::unordered_map<std::string, ChannelQueuePtr> channels_;
// mutex for Wait client sync
std::mutex sync_mutex_;
std::condition_variable sync_cond_;
std::atomic<int64_t> req_count_{0};
// mutex for GetChannel thread safety
std::mutex chan_mutex_;
DISABLE_COPY_AND_ASSIGN(BRPCClient);
};
} // namespace detail
} // namespace operators
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/detail/brpc_server.h"
#include "paddle/fluid/operators/detail/request_handler.h"
namespace sendrecv {
typedef std::unordered_map<std::string,
paddle::operators::detail::RequestHandler*>
HandlerMap;
class BRPCServiceImpl : public SendRecvService {
public:
explicit BRPCServiceImpl(const HandlerMap& rpc_call_map)
: request_send_h_(nullptr),
request_get_h_(nullptr),
request_prefetch_h_(nullptr) {
auto it = rpc_call_map.find(paddle::operators::detail::kRequestSend);
if (it != rpc_call_map.end()) {
request_send_h_ = it->second;
}
it = rpc_call_map.find(paddle::operators::detail::kRequestSend);
if (it != rpc_call_map.end()) {
request_get_h_ = it->second;
}
it = rpc_call_map.find(paddle::operators::detail::kRequestPrefetch);
if (it != rpc_call_map.end()) {
request_prefetch_h_ = it->second;
}
}
virtual ~BRPCServiceImpl() {}
void SendVariable(google::protobuf::RpcController* cntl_butil,
const VariableMessage* request, VoidMessage* response,
google::protobuf::Closure* done) override {
PADDLE_ENFORCE(request_send_h_ != nullptr,
"RequestSend handler should be registed first!");
brpc::ClosureGuard done_guard(done);
paddle::framework::Scope* local_scope = request_send_h_->scope();
paddle::framework::Variable* outvar = nullptr;
paddle::framework::Variable* invar = nullptr;
std::string varname = request->varname();
if (!request_send_h_->sync_mode()) {
local_scope = &request_send_h_->scope()->NewScope();
invar = local_scope->Var(varname);
} else {
invar = local_scope->FindVar(varname);
}
request_send_h_->Handle(varname, local_scope, invar, &outvar);
if (!request_send_h_->sync_mode()) {
request_send_h_->scope()->DeleteScope(local_scope);
}
}
void GetVariable(google::protobuf::RpcController* cntl_butil,
const VariableMessage* request, VariableMessage* response,
google::protobuf::Closure* done) override {
PADDLE_ENFORCE(request_get_h_ != nullptr,
"RequestGet handler should be registed first!");
}
void PrefetchVariable(google::protobuf::RpcController* cntl_butil,
const VariableMessage* request,
VariableMessage* response,
google::protobuf::Closure* done) override {
PADDLE_ENFORCE(request_prefetch_h_ != nullptr,
"kRequestPrefetch handler should be registed first!");
}
private:
paddle::operators::detail::RequestHandler* request_send_h_;
paddle::operators::detail::RequestHandler* request_get_h_;
paddle::operators::detail::RequestHandler* request_prefetch_h_;
};
} // namespace sendrecv
namespace paddle {
namespace operators {
namespace detail {
void AsyncBRPCServer::StartServer() {
// Instance of your service.
sendrecv::BRPCServiceImpl service_impl(rpc_call_map_);
// Add the service into server. Notice the second parameter, because the
// service is put on stack, we don't want server to delete it, otherwise
// use brpc::SERVER_OWNS_SERVICE.
if (server_.AddService(&service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(FATAL) << "Fail to add service";
return;
}
brpc::ServerOptions options;
options.idle_timeout_sec = idle_timeout_s_;
options.max_concurrency = max_concurrency_;
if (server_.Start(bind_address_.c_str(), &options) != 0) {
LOG(FATAL) << "Fail to start EchoServer" << bind_address_;
return;
}
butil::EndPoint ep = server_.listen_address();
selected_port_ = ep.port;
{
std::lock_guard<std::mutex> lock(this->mutex_ready_);
ready_ = 1;
}
condition_ready_.notify_all();
server_.Join();
}
void AsyncBRPCServer::ShutDownImpl() { server_.Stop(1000); }
void AsyncBRPCServer::WaitServerReady() {
VLOG(3) << "AsyncGRPCServer is wait server ready";
std::unique_lock<std::mutex> lock(this->mutex_ready_);
condition_ready_.wait(lock, [=] { return this->ready_ == 1; });
VLOG(3) << "AsyncGRPCServer WaitSeverReady";
}
}; // namespace detail
}; // namespace operators
}; // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <condition_variable> // NOLINT
#include <mutex> // NOLINT
#include <string>
#include "brpc/server.h"
#include "paddle/fluid/operators/detail/rpc_server.h"
#include "paddle/fluid/operators/detail/send_recv.pb.h"
namespace paddle {
namespace operators {
namespace detail {
class AsyncBRPCServer final : public RPCServer {
public:
explicit AsyncBRPCServer(const std::string& address, int client_num)
: RPCServer(address, client_num), ready_(0) {}
virtual ~AsyncBRPCServer() {}
void StartServer() override;
void WaitServerReady() override;
private:
void ShutDownImpl() override;
brpc::Server server_;
static constexpr int idle_timeout_s_ = -1;
static constexpr int max_concurrency_ = 0;
std::mutex mutex_ready_;
std::condition_variable condition_ready_;
int ready_;
};
}; // namespace detail
}; // namespace operators
}; // namespace paddle
......@@ -19,6 +19,7 @@ limitations under the License. */
#include <limits>
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/detail/request_handler.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle {
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef PADDLE_WITH_GRPC
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/detail/grpc_server.h"
#define RPCSERVER_T detail::AsyncGRPCServer
#define RPCCLIENT_T detail::GRPCClient
#else
#include "paddle/fluid/operators/detail/brpc_client.h"
#include "paddle/fluid/operators/detail/brpc_server.h"
#define RPCSERVER_T detail::AsyncBRPCServer
#define RPCCLIENT_T detail::BRPCClient
#endif
......@@ -28,7 +28,6 @@
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
namespace paddle {
namespace operators {
......@@ -38,6 +37,10 @@ constexpr char kRequestSend[] = "RequestSend";
constexpr char kRequestGet[] = "RequestGet";
constexpr char kRequestPrefetch[] = "RequestPrefetch";
#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV"
#define BATCH_BARRIER_MESSAGE "BATCH_BARRIER@RECV"
#define FETCH_BARRIER_MESSAGE "FETCH_BARRIER@RECV"
class RPCServer;
class RequestHandler {
......
......@@ -16,15 +16,12 @@
#include <string>
#include <vector>
#include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/operators/detail/request_handler_impl.h"
#include "paddle/fluid/operators/detail/rpc_server.h"
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
#include "paddle/fluid/operators/detail/variable_response.h"
namespace paddle {
namespace operators {
......
......@@ -29,7 +29,6 @@
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/detail/request_handler.h"
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
namespace paddle {
namespace operators {
......
......@@ -26,6 +26,8 @@ namespace detail {
class RPCClient {
public:
RPCClient() {}
virtual ~RPCClient() {}
virtual bool AsyncSendVar(const std::string& ep,
const platform::DeviceContext& ctx,
const framework::Scope& scope,
......
......@@ -17,15 +17,14 @@ limitations under the License. */
#include <thread> // NOLINT
#include "gtest/gtest.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/detail/grpc_server.h"
#include "paddle/fluid/operators/detail/rpc_client.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/operators/detail/request_handler_impl.h"
#include "paddle/fluid/operators/detail/rpc_client.h"
#include "paddle/fluid/operators/detail/rpc_server.h"
namespace framework = paddle::framework;
namespace platform = paddle::platform;
......@@ -33,7 +32,7 @@ namespace detail = paddle::operators::detail;
USE_OP(lookup_table);
std::unique_ptr<detail::AsyncGRPCServer> g_rpc_service;
std::unique_ptr<detail::RPCServer> g_rpc_service;
std::unique_ptr<detail::RequestHandler> g_req_handler;
framework::BlockDesc* AppendPrefetchBlcok(framework::ProgramDesc* program) {
......@@ -118,20 +117,19 @@ void StartServer() {
g_req_handler->SetRPCServer(g_rpc_service.get());
std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::StartServer, g_rpc_service.get()));
std::bind(&detail::RPCServer::StartServer, g_rpc_service.get()));
server_thread.join();
}
TEST(PREFETCH, CPU) {
g_req_handler.reset(new detail::RequestPrefetchHandler(true));
g_rpc_service.reset(new detail::AsyncGRPCServer("127.0.0.1:0", 1));
g_rpc_service.reset(new RPCSERVER_T("127.0.0.1:0", 1));
detail::RPCClient* client = detail::RPCClient::GetInstance<RPCCLIENT_T>();
std::thread server_thread(StartServer);
g_rpc_service->WaitServerReady();
detail::RPCClient* client =
detail::RPCClient::GetInstance<detail::GRPCClient>();
int port = g_rpc_service->GetSelectedPort();
std::string ep = paddle::string::Sprintf("127.0.0.1:%d", port);
......
......@@ -14,6 +14,8 @@ limitations under the License. */
syntax = "proto3";
package sendrecv;
// option cc_generic_services = true;
service SendRecvService {
// For parameter server round-robin like hashing, do not split tensors.
// Send and recv only one tensor
......
......@@ -32,16 +32,6 @@ namespace paddle {
namespace operators {
namespace detail {
#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV"
#define BATCH_BARRIER_MESSAGE "BATCH_BARRIER@RECV"
#define FETCH_BARRIER_MESSAGE "FETCH_BARRIER@RECV"
static int64_t GetTimestamp() {
struct timeval tp;
gettimeofday(&tp, NULL);
return tp.tv_sec * 1000 + tp.tv_usec / 1000;
}
typedef void (*DestroyCallback)(void*);
void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
......
......@@ -59,7 +59,7 @@ class ElementwiseOpMaker : public framework::OpProtoAndCheckerMaker {
void Make() final {
AddInput("X", "(Tensor), The first input tensor of elementwise op.");
AddInput("Y", "(Tensor), The second input tensor of elementwise op.");
AddOutput("Out", "The output of elementwise op.");
AddOutput("Out", "The output of elementwise op.").Reuse("X");
AddAttr<int>("axis",
"(int, default -1). The start dimension index "
"for broadcasting Y onto X.")
......
......@@ -19,9 +19,7 @@ limitations under the License. */
#include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/detail/rpc_client.h"
#include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle {
......@@ -45,7 +43,7 @@ class FetchBarrierOp : public framework::OperatorBase {
platform::RecordEvent record_event(Type(), &ctx);
detail::RPCClient* rpc_client =
detail::RPCClient::GetInstance<detail::GRPCClient>();
detail::RPCClient::GetInstance<RPCCLIENT_T>();
rpc_client->Wait();
......
......@@ -32,16 +32,16 @@ class FillConstantBatchSizeLikeOp : public BatchSizeLikeOp {
class FillConstantBatchSizeLikeOpMaker : public BatchSizeLikeOpMaker {
protected:
void Apply() override {
AddAttr<int>("dtype",
"(int, default 5 (FP32)) "
"Output data type")
AddAttr<int>(
"dtype",
"It could be numpy.dtype. Output data type. Default is float32")
.SetDefault(framework::proto::VarType::FP32);
AddAttr<float>("value", "(float, default 0) The value to be filled")
AddAttr<float>("value", "default 0. The value to be filled")
.SetDefault(0.0f);
AddComment(R"DOC(
FillConstantBatchSizeLike Operator.
Fill up a variable with specified constant value.
This function creates a tensor of specified *shape*, *dtype* and batch size,
and initializes this with a constant supplied in *value*. The batch size is
obtained from the `input` tensor.
)DOC");
}
......
......@@ -21,8 +21,7 @@ limitations under the License. */
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/detail/grpc_server.h"
#include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/operators/detail/request_handler_impl.h"
#include "paddle/fluid/platform/nccl_helper.h"
......@@ -61,8 +60,8 @@ class GenNCCLIdOp : public framework::OperatorBase {
std::vector<std::string> endpoint_list =
Attr<std::vector<std::string>>("endpoint_list");
detail::RPCClient* client =
detail::RPCClient::GetInstance<detail::GRPCClient>();
detail::RPCClient* client = detail::RPCClient::GetInstance<RPCCLIENT_T>();
for (auto& ep : endpoint_list) {
VLOG(3) << "sending nccl id to " << ep;
client->AsyncSendVar(ep, dev_ctx, *scope, NCCL_ID_VARNAME);
......@@ -78,9 +77,11 @@ class GenNCCLIdOp : public framework::OperatorBase {
// deleter will call GRPC Server's base class's dtor and
// that will cause a wired crash.
detail::RequestSendHandler rpc_h(true);
detail::AsyncGRPCServer rpc_service(endpoint, 1);
rpc_service.RegisterRPC(detail::kRequestSend, &rpc_h);
rpc_h.SetRPCServer(&rpc_service);
std::unique_ptr<detail::RPCServer> rpc_service(
new RPCSERVER_T(endpoint, 1));
rpc_service->RegisterRPC(detail::kRequestSend, &rpc_h);
rpc_h.SetRPCServer(rpc_service.get());
framework::ProgramDesc empty_program;
framework::Executor executor(dev_ctx.GetPlace());
......@@ -90,12 +91,13 @@ class GenNCCLIdOp : public framework::OperatorBase {
rpc_h.SetExecutor(&executor);
std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::StartServer, &rpc_service));
rpc_service.SetCond(detail::kRequestSend);
std::bind(&detail::RPCServer::StartServer, rpc_service.get()));
rpc_service->SetCond(detail::kRequestSend);
VLOG(3) << "start getting nccl id from trainer 0...";
rpc_service.WaitBarrier(detail::kRequestSend);
rpc_service->WaitBarrier(detail::kRequestSend);
VLOG(3) << "got nccl id and stop server...";
rpc_service.ShutDown();
rpc_service->ShutDown();
VLOG(3) << "rpc server stopped";
server_thread.join();
}
......
......@@ -67,8 +67,6 @@ class LinearChainCRFOpMaker : public framework::OpProtoAndCheckerMaker {
"mini-batch. Note: S is equal to the sequence number in a mini-batch. "
"The output is no longer a LoDTensor.");
AddComment(R"DOC(
LinearChainCRF Operator.
Conditional Random Field defines an undirected probabilistic graph with nodes
denoting random variables and edges denoting dependencies between these
variables. CRF learns the conditional probability $P(Y|X)$, where
......
......@@ -19,7 +19,8 @@ limitations under the License. */
#include <thread> // NOLINT
#include <vector>
#include "paddle/fluid/operators/detail/grpc_server.h"
#include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/operators/detail/request_handler_impl.h"
#include "paddle/fluid/operators/listen_and_serv_op.h"
#include "paddle/fluid/platform/profiler.h"
......@@ -89,6 +90,12 @@ void ListenAndServOp::SavePort() const {
rpc_service_->SavePort();
}
static int64_t GetTimestamp() {
struct timeval tp;
gettimeofday(&tp, NULL);
return tp.tv_sec * 1000 + tp.tv_usec / 1000;
}
void ListenAndServOp::RunSyncLoop(
framework::Executor *executor, framework::ProgramDesc *program,
framework::Scope *recv_scope,
......@@ -130,7 +137,7 @@ void ListenAndServOp::RunSyncLoop(
int32_t last_parent_blkid = program->Block(1).Parent();
std::vector<size_t> parallel_blkids;
parallel_blkids.push_back(1);
double ts = detail::GetTimestamp();
double ts = GetTimestamp();
for (size_t i = 1; i < optimize_block_id_list.size(); ++i) {
// skip the first optimize block because it is already in the
// parallel_blkids.
......@@ -145,7 +152,7 @@ void ListenAndServOp::RunSyncLoop(
}
ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program,
recv_scope);
VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
VLOG(2) << "run all blocks spent " << GetTimestamp() - ts << "(ms)";
rpc_service_->SetCond(detail::kRequestGet);
rpc_service_->WaitBarrier(detail::kRequestGet);
......@@ -240,8 +247,8 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
LOG(INFO) << "sync_mode:" << sync_mode << ", fan_in:" << fan_in
<< ", end_point:" << endpoint;
// request_handler_.reset(new detail::GRPCRequestSendHandler(sync_mode));
rpc_service_.reset(new detail::AsyncGRPCServer(endpoint, fan_in));
rpc_service_.reset(new RPCSERVER_T(endpoint, fan_in));
request_send_handler_.reset(new detail::RequestSendHandler(sync_mode));
request_get_handler_.reset(new detail::RequestGetHandler(sync_mode));
request_prefetch_handler_.reset(
......
......@@ -74,25 +74,18 @@ class LoadOp : public framework::OperatorBase {
class LoadOpProtoMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddOutput("Out", "(Tensor) The tensor need to be loaded");
AddOutput("Out", "The tensor need to be loaded");
AddAttr<bool>(
"load_as_fp16",
"(boolean, default false)"
"If true, the tensor will be first loaded and then "
"converted to float16 data type. Otherwise, the tensor will be "
"directly loaded without data type conversion.")
"directly loaded without data type conversion. Default is false.")
.SetDefault(false);
AddAttr<std::string>("file_path",
"(string) "
"Variable will be loaded from \"file_path\".")
R"(Variable will be loaded from "file_path")")
.AddCustomChecker(
[](const std::string &path) { return !path.empty(); });
AddComment(R"DOC(
Load Operator.
Load operator will load a tensor variable from disk file.
)DOC");
AddComment("Load operator will load a tensor variable from disk file.");
}
};
} // namespace operators
......
......@@ -42,10 +42,15 @@ class MaxSeqenceLenOp : public framework::OperatorBase {
class MaxSeqenceLenOpProtoMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("RankTable", "The lod_rank_table.");
AddOutput("Out", "The max sequence length.");
AddComment(
R"DOC(Calculate the max sequence length through lod_rank_table.)DOC");
AddInput("RankTable", "Input variable which is a LoDRankTable object");
AddOutput("Out", "The max sequence length");
AddComment(R"DOC(
Given a LoDRankTable object, this layer returns the max length of
a batch of sequences. In fact, a LoDRankTable object contains a list of
tuples(<sequence index, sequence length>) and the list is already sorted by
sequence length in descending order, so the operator just returns the
sequence length of the first tuple element
)DOC");
}
};
......
......@@ -34,7 +34,7 @@ class MeanOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("X", "The input of mean op");
AddOutput("Out", "The output of mean op");
AddOutput("Out", "The output of mean op").Reuse("X");
AddComment(R"DOC(
Mean Operator.
......
......@@ -18,9 +18,14 @@ limitations under the License. */
namespace paddle {
namespace operators {
using mkldnn::memory; // Note: paddle has also "memory" namespace
using mkldnn::pooling_forward;
using framework::DataLayout;
using mkldnn::memory;
using mkldnn::pooling_backward;
using mkldnn::pooling_forward;
using mkldnn::primitive;
using mkldnn::reorder;
using mkldnn::stream;
using platform::to_void_cast;
// Generate keys for storing/retriving primitives for this operator
// TODO(jczaja): Make hashing function more optimial
......@@ -55,8 +60,9 @@ class PoolMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
const Tensor* input = ctx.Input<Tensor>("X");
Tensor* output = ctx.Output<Tensor>("Out");
// Get an unique name from "argument" name of "Out" variable
// This name will be used as key when saving info into device context
PADDLE_ENFORCE(input->layout() == DataLayout::kMKLDNN &&
input->format() != memory::format::format_undef,
"Wrong layout/format set for Input tensor");
std::string pooling_type = ctx.Attr<std::string>("pooling_type");
std::vector<int> ksize = ctx.Attr<std::vector<int>>("ksize");
......@@ -82,6 +88,9 @@ class PoolMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
std::vector<int> src_tz = paddle::framework::vectorize2int(input->dims());
std::vector<int> dst_tz = paddle::framework::vectorize2int(output->dims());
auto input_format = input->format();
memory::format output_format{memory::format::format_undef};
const std::string key = gethash(src_tz, pooling_type, ksize, strides,
paddings, ctx.op().Output("Out"));
const std::string key_pool_p = key + "@pool_p";
......@@ -94,16 +103,17 @@ class PoolMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
auto pool_p =
std::static_pointer_cast<pooling_forward>(dev_ctx.GetBlob(key_pool_p));
if (pool_p == nullptr) {
// TODO(pzelazko-intel): support more formats
auto src_md = platform::MKLDNNMemDesc(
src_tz, platform::MKLDNNGetDataType<T>(), input_format);
auto src_md =
platform::MKLDNNMemDesc(src_tz, platform::MKLDNNGetDataType<T>(),
mkldnn::memory::format::nchw);
auto dst_md =
platform::MKLDNNMemDesc(dst_tz, platform::MKLDNNGetDataType<T>(),
mkldnn::memory::format::nchw);
/* create memory descriptor for pooling without specified format
* ('any') which lets a primitive (pooling in this case) choose
* the memory format preferred for best performance
*/
auto dst_md = platform::MKLDNNMemDesc(dst_tz, mkldnn::memory::f32,
mkldnn::memory::format::any);
std::shared_ptr<pooling_forward::primitive_desc> pool_pd =
std::shared_ptr<mkldnn::pooling_forward::primitive_desc> pool_pd =
CreatePrimitiveDesc(src_md, dst_md, strides, paddings, ksize,
pooling_type, mkldnn_engine);
......@@ -116,20 +126,22 @@ class PoolMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
// save pool_workspace_memory to be referred in backward path
dev_ctx.SetBlob(key_pool_workspace_memory, workspace_memory);
auto pool_src_memory_p = std::make_shared<memory>(
memory::primitive_desc{src_md, mkldnn_engine},
static_cast<void*>(const_cast<T*>(input_data)));
dev_ctx.SetBlob(key_pool_src_mem_p, pool_src_memory_p);
auto src_memory = std::make_shared<memory>(pool_pd->src_primitive_desc(),
to_void_cast<T>(input_data));
auto dst_memory =
std::make_shared<memory>(pool_pd->dst_primitive_desc(), output_data);
auto pool_dst_memory_p = std::make_shared<memory>(
memory::primitive_desc{dst_md, mkldnn_engine},
static_cast<void*>(output_data));
dev_ctx.SetBlob(key_pool_dst_mem_p, pool_dst_memory_p);
dev_ctx.SetBlob(key_pool_src_mem_p, src_memory);
dev_ctx.SetBlob(key_pool_dst_mem_p, dst_memory);
pool_p = std::make_shared<pooling_forward>(*pool_pd, *(src_memory.get()),
*(dst_memory.get()),
*workspace_memory);
pool_p = std::make_shared<pooling_forward>(
*pool_pd, *(pool_src_memory_p.get()), *(pool_dst_memory_p.get()),
*workspace_memory);
dev_ctx.SetBlob(key_pool_p, pool_p);
output_format =
(memory::format)dst_memory->get_primitive_desc().desc().data.format;
} else {
// Primitives already exist
auto pool_src_memory_p =
......@@ -140,14 +152,20 @@ class PoolMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
std::static_pointer_cast<memory>(dev_ctx.GetBlob(key_pool_dst_mem_p));
PADDLE_ENFORCE(pool_dst_memory_p != nullptr,
"Fail to find pooling dst mem_p in device context");
pool_src_memory_p->set_data_handle(
reinterpret_cast<void*>(const_cast<T*>(input_data)));
pool_src_memory_p->set_data_handle(to_void_cast<T>(input_data));
pool_dst_memory_p->set_data_handle(output_data);
output_format = (memory::format)pool_dst_memory_p->get_primitive_desc()
.desc()
.data.format;
}
// push primitive to stream and wait until it's executed
std::vector<mkldnn::primitive> pipeline{*(pool_p.get())};
mkldnn::stream(mkldnn::stream::kind::eager).submit(pipeline).wait();
stream(stream::kind::eager).submit(pipeline).wait();
output->set_layout(DataLayout::kMKLDNN);
output->set_format(output_format);
}
private:
......@@ -194,6 +212,13 @@ class PoolMKLDNNGradOpKernel : public paddle::framework::OpKernel<T> {
const Tensor* out_grad = ctx.Input<Tensor>(framework::GradVarName("Out"));
Tensor* in_x_grad = ctx.Output<Tensor>(framework::GradVarName("X"));
PADDLE_ENFORCE(in_x->layout() == DataLayout::kMKLDNN &&
in_x->format() != memory::format::format_undef,
"Wrong layout/format set for Input X tensor");
PADDLE_ENFORCE(out_grad->layout() == DataLayout::kMKLDNN &&
out_grad->format() != memory::format::format_undef,
"Wrong layout/format set for Input output_grad tensor");
std::string pooling_type = ctx.Attr<std::string>("pooling_type");
std::vector<int> ksize = ctx.Attr<std::vector<int>>("ksize");
std::vector<int> strides = ctx.Attr<std::vector<int>>("strides");
......@@ -212,6 +237,7 @@ class PoolMKLDNNGradOpKernel : public paddle::framework::OpKernel<T> {
const T* out_grad_data = out_grad->data<T>();
T* in_x_grad_data = in_x_grad->mutable_data<T>(ctx.GetPlace());
memory::format in_x_grad_format{memory::format::format_undef};
std::vector<int> diff_src_tz =
paddle::framework::vectorize2int(in_x_grad->dims());
......@@ -225,39 +251,48 @@ class PoolMKLDNNGradOpKernel : public paddle::framework::OpKernel<T> {
const std::string key_pool_bwd_p = key + "@pool_bwd_p";
const std::string key_pool_diff_src_mem_p = key + "@pool_diff_src_mem_p";
const std::string key_pool_diff_dst_mem_p = key + "@pool_diff_dst_mem_p";
const std::string key_pool_src_mem_p = key + "@pool_src_mem_p";
const std::string key_pool_dst_mem_p = key + "@pool_dst_mem_p";
const std::string key_pool_pd = key + "@pool_pd";
const std::string key_pool_workspace_memory =
key + "@pool_workspace_memory";
auto user_diff_dst_memory =
memory({{{diff_dst_tz}, memory::data_type::f32, out_grad->format()},
mkldnn_engine},
to_void_cast<T>(out_grad_data));
std::shared_ptr<memory> diff_src_memory;
std::shared_ptr<memory> diff_dst_memory;
auto dst_memory =
std::static_pointer_cast<memory>(dev_ctx.GetBlob(key_pool_dst_mem_p));
PADDLE_ENFORCE(dst_memory != nullptr,
"Fail to find dst_memory in device context");
primitive reorder_diff_dst;
bool is_diff_dst_reordered = false;
auto pool_bwd_p = std::static_pointer_cast<pooling_backward>(
dev_ctx.GetBlob(key_pool_bwd_p));
if (pool_bwd_p == nullptr) {
auto diff_src_md =
platform::MKLDNNMemDesc(diff_src_tz, platform::MKLDNNGetDataType<T>(),
mkldnn::memory::format::nchw);
auto diff_dst_md =
platform::MKLDNNMemDesc(diff_dst_tz, platform::MKLDNNGetDataType<T>(),
mkldnn::memory::format::nchw);
// Retrieve src_memory/dst_memory saved in forward pass
auto src_memory =
std::static_pointer_cast<memory>(dev_ctx.GetBlob(key_pool_src_mem_p));
PADDLE_ENFORCE(src_memory != nullptr,
"Fail to find src_memory in device context");
// Retrieve pool_pd/pool_workspace_memory from device context
auto pool_pd =
std::static_pointer_cast<mkldnn::pooling_forward::primitive_desc>(
dev_ctx.GetBlob(key_pool_pd));
PADDLE_ENFORCE(pool_pd != nullptr,
"Fail to find pool_pd in device context");
auto workspace_memory = std::static_pointer_cast<mkldnn::memory>(
auto workspace_memory = std::static_pointer_cast<memory>(
dev_ctx.GetBlob(key_pool_workspace_memory));
PADDLE_ENFORCE(workspace_memory != nullptr,
"Fail to find workspace_memory in device context");
auto pool_diff_src_memory_p = std::make_shared<memory>(memory(
{diff_src_md, mkldnn_engine}, static_cast<void*>(in_x_grad_data)));
dev_ctx.SetBlob(key_pool_diff_src_mem_p, pool_diff_src_memory_p);
auto pool_diff_dst_memory_p = std::make_shared<memory>(
memory({diff_dst_md, mkldnn_engine},
static_cast<void*>(const_cast<T*>(out_grad_data))));
dev_ctx.SetBlob(key_pool_diff_dst_mem_p, pool_diff_dst_memory_p);
// create memory descriptors for pooling
auto diff_src_md = src_memory.get()->get_primitive_desc().desc();
auto diff_dst_md = dst_memory.get()->get_primitive_desc().desc();
auto pool_bwd_desc = mkldnn::pooling_backward::desc(
pooling_type == "max" ? mkldnn::algorithm::pooling_max
......@@ -267,35 +302,74 @@ class PoolMKLDNNGradOpKernel : public paddle::framework::OpKernel<T> {
auto pool_bwd_pd = mkldnn::pooling_backward::primitive_desc(
pool_bwd_desc, mkldnn_engine, *pool_pd);
// reorder between user_diff_dst and pool diff_dst if needed
diff_dst_memory = std::make_shared<memory>(user_diff_dst_memory);
if (memory::primitive_desc(dst_memory->get_primitive_desc()) !=
user_diff_dst_memory.get_primitive_desc()) {
diff_dst_memory =
std::make_shared<memory>(dst_memory.get()->get_primitive_desc());
reorder_diff_dst = reorder(user_diff_dst_memory, *diff_dst_memory);
is_diff_dst_reordered = true;
}
diff_src_memory = std::make_shared<memory>(
pool_bwd_pd.diff_src_primitive_desc(), in_x_grad_data);
dev_ctx.SetBlob(key_pool_diff_src_mem_p, diff_src_memory);
dev_ctx.SetBlob(key_pool_diff_dst_mem_p, diff_dst_memory);
pool_bwd_p = std::make_shared<pooling_backward>(
pool_bwd_pd, *(pool_diff_dst_memory_p.get()), *workspace_memory,
*(pool_diff_src_memory_p));
pool_bwd_pd, *(diff_dst_memory.get()), *workspace_memory,
*(diff_src_memory));
dev_ctx.SetBlob(key_pool_bwd_p, pool_bwd_p);
} else {
// Primitives already exist
auto pool_diff_src_memory_p = std::static_pointer_cast<memory>(
diff_src_memory = std::static_pointer_cast<memory>(
dev_ctx.GetBlob(key_pool_diff_src_mem_p));
PADDLE_ENFORCE(pool_diff_src_memory_p != nullptr,
PADDLE_ENFORCE(diff_src_memory != nullptr,
"Fail to find pooling src mem_p in device context");
auto pool_diff_dst_memory_p = std::static_pointer_cast<memory>(
diff_dst_memory = std::static_pointer_cast<memory>(
dev_ctx.GetBlob(key_pool_diff_dst_mem_p));
PADDLE_ENFORCE(pool_diff_dst_memory_p != nullptr,
PADDLE_ENFORCE(diff_dst_memory != nullptr,
"Fail to find pooling dst mem_p in device context");
pool_diff_src_memory_p->set_data_handle(
reinterpret_cast<void*>(in_x_grad_data));
pool_diff_dst_memory_p->set_data_handle(const_cast<T*>(out_grad_data));
diff_src_memory->set_data_handle(reinterpret_cast<void*>(in_x_grad_data));
diff_dst_memory->set_data_handle(const_cast<T*>(out_grad_data));
// reorder between user_diff_dst and pool diff_dst if needed
if (memory::primitive_desc(dst_memory->get_primitive_desc()) !=
user_diff_dst_memory.get_primitive_desc()) {
diff_dst_memory =
std::make_shared<memory>(dst_memory.get()->get_primitive_desc());
reorder_diff_dst = reorder(user_diff_dst_memory, *diff_dst_memory);
is_diff_dst_reordered = true;
}
}
in_x_grad_format = (memory::format)diff_src_memory->get_primitive_desc()
.desc()
.data.format;
// push primitive to stream and wait until it's executed
std::vector<mkldnn::primitive> pipeline{*(pool_bwd_p.get())};
std::vector<mkldnn::primitive> pipeline;
if (is_diff_dst_reordered) {
pipeline.push_back(reorder_diff_dst);
}
pipeline.push_back(*(pool_bwd_p.get()));
mkldnn::stream(mkldnn::stream::kind::eager).submit(pipeline).wait();
in_x_grad->set_layout(DataLayout::kMKLDNN);
in_x_grad->set_format(in_x_grad_format);
} // Compute()
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OP_KERNEL(pool2d, MKLDNN, ::paddle::platform::CPUPlace,
paddle::operators::PoolMKLDNNOpKernel<float>);
ops::PoolMKLDNNOpKernel<float>);
REGISTER_OP_KERNEL(pool2d_grad, MKLDNN, ::paddle::platform::CPUPlace,
paddle::operators::PoolMKLDNNGradOpKernel<float>);
ops::PoolMKLDNNGradOpKernel<float>);
......@@ -151,7 +151,8 @@ void Pool2dOpMaker::Make() {
"The format of output tensor is also NCHW, "
"where N is batch size, C is the number of channels, "
"H is the height of the feature, "
"and W is the width of the feature.");
"and W is the width of the feature.")
.Reuse("X");
AddAttr<std::string>("pooling_type",
"(string), pooling type, can be \"max\" for max-pooling "
......@@ -244,7 +245,8 @@ void Pool3dOpMaker::Make() {
"The format of output tensor is also NCDHW, "
"where N is batch size, C is "
"the number of channels, and D, H and W is the depth, height and "
"width of the feature, respectively.");
"width of the feature, respectively.")
.Reuse("X");
AddAttr<std::string>("pooling_type",
"(string) Pooling type, can be \"max\" for max-pooling "
......
......@@ -18,7 +18,7 @@ limitations under the License. */
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/operators/send_recv_util.h"
namespace paddle {
......@@ -42,7 +42,7 @@ class PrefetchOp : public framework::OperatorBase {
auto& ctx = *pool.Get(place);
detail::RPCClient* rpc_client =
detail::RPCClient::GetInstance<detail::GRPCClient>();
detail::RPCClient::GetInstance<RPCCLIENT_T>();
for (size_t i = 0; i < ins.size(); i++) {
if (NeedSend(scope, ins[i])) {
......
......@@ -19,8 +19,7 @@ limitations under the License. */
#include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle {
......@@ -45,7 +44,7 @@ class RecvOp : public framework::OperatorBase {
platform::RecordEvent record_event(Type(), &ctx);
detail::RPCClient* rpc_client =
detail::RPCClient::GetInstance<detail::GRPCClient>();
detail::RPCClient::GetInstance<RPCCLIENT_T>();
for (size_t i = 0; i < outs.size(); i++) {
VLOG(3) << "getting " << outs[i] << " from " << epmap[i];
......@@ -78,9 +77,15 @@ This operator can get variables from server side.
}
};
class RecvOpShapeInference : public framework::InferShapeBase {
public:
void operator()(framework::InferShapeContext* ctx) const override {}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(recv, ops::RecvOp, ops::RecvOpMaker);
REGISTER_OPERATOR(recv, ops::RecvOp, paddle::framework::EmptyGradOpMaker,
ops::RecvOpMaker, ops::RecvOpShapeInference);
......@@ -19,8 +19,8 @@ limitations under the License. */
#include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle {
......@@ -45,7 +45,7 @@ class SendBarrierOp : public framework::OperatorBase {
platform::RecordEvent record_event(Type(), &ctx);
detail::RPCClient* rpc_client =
detail::RPCClient::GetInstance<detail::GRPCClient>();
detail::RPCClient::GetInstance<RPCCLIENT_T>();
VLOG(3) << "SendBarrierOp sync_mode:" << sync_mode;
......
......@@ -16,10 +16,9 @@ limitations under the License. */
#include <ostream>
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/operators/send_recv_util.h"
#include "paddle/fluid/platform/profiler.h"
......@@ -36,12 +35,9 @@ class SendOp : public framework::OperatorBase {
void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override {
auto ins = Inputs("X");
auto outs = Outputs("Out");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
std::vector<std::string> endpoints =
Attr<std::vector<std::string>>("endpoints");
bool sync_mode = Attr<bool>("sync_mode");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
int sync_send = Attr<int>("sync_mode");
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);
......@@ -50,37 +46,19 @@ class SendOp : public framework::OperatorBase {
platform::RecordEvent record_event(Type(), &ctx);
detail::RPCClient* rpc_client =
detail::RPCClient::GetInstance<detail::GRPCClient>();
detail::RPCClient::GetInstance<RPCCLIENT_T>();
for (size_t i = 0; i < ins.size(); i++) {
if (NeedSend(scope, ins[i])) {
VLOG(3) << "sending " << ins[i] << " to " << epmap[i];
// TODO(Yancey1989): we need to use an IO threadpool which has
// a larger number of threads than the computing threadpool.
rpc_client->AsyncSendVar(epmap[i], ctx, scope, ins[i]);
} else {
VLOG(3) << "don't send no-initialied variable: " << ins[i];
}
}
rpc_client->Wait();
if (sync_mode) {
for (auto& ep : endpoints) {
VLOG(3) << "batch barrier, ep: " << ep;
rpc_client->AsyncSendBatchBarrier(ep);
}
rpc_client->Wait();
}
if (outs.size() > 0) {
for (size_t i = 0; i < outs.size(); i++) {
VLOG(2) << "getting " << outs[i] << " from " << epmap[i];
rpc_client->AsyncGetVar(epmap[i], ctx, scope, outs[i]);
}
rpc_client->Wait();
// tell pservers that current trainer have called fetch
for (auto& ep : endpoints) {
VLOG(2) << "send fetch barrier, ep: " << ep;
rpc_client->AsyncSendFetchBarrier(ep);
}
if (sync_send) {
rpc_client->Wait();
}
}
......@@ -89,26 +67,22 @@ class SendOp : public framework::OperatorBase {
class SendOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() {
AddInput("X", "(Tensor) Input tensor to be sent").AsDuplicable();
AddOutput("Out", "(Tensor) Output tensor to be received from server")
AddInput("X", "(Tensor, SelectedRows) Input variables to be sent")
.AsDuplicable();
AddComment(R"DOC(
Send operator
This operator will send tensor to recv_op at the parameter server.
This operator will send variables to listen_and_serve op at the parameter server.
)DOC");
// TODO(typhoonzero): remove this attr generate de-duplicated vector from
// epmap when initializing.
AddAttr<std::vector<std::string>>("endpoints",
"(string vector, default 127.0.0.1:6164)"
"Server endpoints to send variables to.")
.SetDefault({});
AddAttr<int>("sync_mode",
"(int, default 0)"
"sync send or async send.")
.SetDefault(0);
AddAttr<std::vector<std::string>>("epmap",
"(string vector, default 127.0.0.1:6164)"
"Server endpoints in the order of input "
"variables for mapping")
.SetDefault({});
AddAttr<bool>("sync_mode", "work in sync_mode or not").SetDefault(true);
.SetDefault({"127.0.0.1:6164"});
}
};
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <future> // NOLINT
#include <ostream>
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/send_recv_util.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle {
namespace operators {
class SendVarsOp : public framework::OperatorBase {
public:
SendVarsOp(const std::string& type, const framework::VariableNameMap& inputs,
const framework::VariableNameMap& outputs,
const framework::AttributeMap& attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override {
auto ins = Inputs("X");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
int sync_send = Attr<int>("sync_send");
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);
// For profiling
platform::RecordEvent record_event(Type(), &ctx);
detail::RPCClient* rpc_client =
detail::RPCClient::GetInstance<detail::GRPCClient>();
for (size_t i = 0; i < ins.size(); i++) {
if (NeedSend(scope, ins[i])) {
VLOG(3) << "sending " << ins[i] << " to " << epmap[i];
// TODO(Yancey1989): we need to use an IO threadpool which has
// a larger number of threads than the computing threadpool.
rpc_client->AsyncSendVar(epmap[i], ctx, scope, ins[i]);
} else {
VLOG(3) << "don't send no-initialied variable: " << ins[i];
}
}
if (sync_send) {
rpc_client->Wait();
}
}
};
class SendVarsOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() {
AddInput("X", "(Tensor, SelectedRows) Input variables to be sent")
.AsDuplicable();
AddComment(R"DOC(
Send operator
This operator will send variables to listen_and_serve op at the parameter server.
)DOC");
AddAttr<int>("sync_send",
"(int, default 0)"
"sync send or async send.")
.SetDefault(0);
AddAttr<std::vector<std::string>>("epmap",
"(string vector, default 127.0.0.1:6164)"
"Server endpoints in the order of input "
"variables for mapping")
.SetDefault({"127.0.0.1:6164"});
}
};
class SendVarsOpShapeInference : public framework::InferShapeBase {
public:
void operator()(framework::InferShapeContext* ctx) const override {}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(send_vars, ops::SendVarsOp,
paddle::framework::EmptyGradOpMaker, ops::SendVarsOpMaker,
ops::SendVarsOpShapeInference);
......@@ -74,7 +74,8 @@ class SGDOpMaker : public framework::OpProtoAndCheckerMaker {
AddInput("Grad", "(Tensor or SelectedRows) Input gradient");
AddOutput("ParamOut",
"(Tensor or SelectedRows, same with Param) "
"Output parameter, should share the same memory with Param");
"Output parameter, should share the same memory with Param")
.Reuse("Param");
AddComment(R"DOC(
SGD operator
......
......@@ -83,7 +83,8 @@ class SoftmaxOpMaker : public framework::OpProtoAndCheckerMaker {
AddInput("X",
"The input tensor of softmax. "
"2-D with shape [batch_size, input_feature_dimensions].");
AddOutput("Out", "The normalized values with the same shape as X.");
AddOutput("Out", "The normalized values with the same shape as X.")
.Reuse("X");
AddAttr<bool>(
"use_cudnn",
"(bool, default false) Only used in cudnn kernel, need install cudnn")
......
......@@ -115,7 +115,7 @@ class SumOpMaker : public framework::OpProtoAndCheckerMaker {
void Make() override {
AddInput("X", "(vector<Tensor>) The input tensors of sum operator.")
.AsDuplicable();
AddOutput("Out", "(Tensor) The output tensor of sum operator.");
AddOutput("Out", "(Tensor) The output tensor of sum operator.").Reuse("X");
AddComment(R"DOC(
Sum operator.
......
......@@ -20,8 +20,7 @@ limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/detail/grpc_server.h"
#include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/operators/detail/request_handler_impl.h"
#include "paddle/fluid/operators/listen_and_serv_op.h"
#include "paddle/fluid/operators/math/math_function.h"
......@@ -29,6 +28,10 @@ limitations under the License. */
#include "paddle/fluid/platform/nccl_helper.h"
#include "paddle/fluid/string/printf.h"
#ifdef PADDLE_WITH_GRPC
#include "paddle/fluid/operators/send_recv_util.h"
#endif
USE_NO_KERNEL_OP(listen_and_serv);
namespace f = paddle::framework;
......@@ -37,7 +40,7 @@ namespace m = paddle::operators::math;
namespace detail = paddle::operators::detail;
namespace string = paddle::string;
std::unique_ptr<detail::AsyncGRPCServer> g_rpc_service;
std::unique_ptr<detail::RPCServer> g_rpc_service;
std::unique_ptr<detail::RequestHandler> g_req_handler;
void StartServer() {
......@@ -58,7 +61,7 @@ void StartServer() {
g_req_handler->SetRPCServer(g_rpc_service.get());
std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::StartServer, g_rpc_service.get()));
std::bind(&detail::RPCServer::StartServer, g_rpc_service.get()));
g_rpc_service->SetCond(detail::kRequestSend);
g_rpc_service->WaitBarrier(detail::kRequestSend);
......@@ -68,9 +71,9 @@ void StartServer() {
server_thread.join();
}
TEST(SendNcclId, GrpcServer) {
TEST(SendNcclId, RPCServer) {
g_req_handler.reset(new detail::RequestSendHandler(true));
g_rpc_service.reset(new detail::AsyncGRPCServer("127.0.0.1:0", 1));
g_rpc_service.reset(new RPCSERVER_T("127.0.0.1:0", 1));
std::thread server_thread(StartServer);
g_rpc_service->WaitServerReady();
......@@ -87,8 +90,9 @@ TEST(SendNcclId, GrpcServer) {
int port = g_rpc_service->GetSelectedPort();
std::string ep = string::Sprintf("127.0.0.1:%d", port);
detail::RPCClient* client =
detail::RPCClient::GetInstance<detail::GRPCClient>();
detail::RPCClient* client = detail::RPCClient::GetInstance<RPCCLIENT_T>();
LOG(INFO) << "connect to server" << ep;
client->AsyncSendVar(ep, dev_ctx, scope, NCCL_ID_VARNAME);
client->Wait();
......
......@@ -50,7 +50,7 @@ class TopkOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("X", "(Tensor) The input of Topk op");
AddOutput("Out", "(Tensor) The output tensor of Topk op");
AddOutput("Out", "(Tensor) The output tensor of Topk op").Reuse("X");
AddOutput("Indices", "(Tensor) The indices of Topk elements of input");
AddComment(R"DOC(
Top K operator
......
......@@ -11,6 +11,7 @@ limitations under the License. */
#pragma once
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <unordered_map>
#include <vector>
......@@ -100,6 +101,7 @@ class CUDADeviceContext : public DeviceContext {
template <typename Callback>
void RecordEvent(cudaEvent_t ev, Callback callback) {
std::lock_guard<std::mutex> guard(mtx_);
callback();
PADDLE_ENFORCE(cudaEventRecord(ev, stream_));
}
......@@ -116,6 +118,8 @@ class CUDADeviceContext : public DeviceContext {
int compute_capability;
int multi_process;
int max_threads_per_mp;
std::mutex mtx_;
};
template <>
......
......@@ -30,7 +30,7 @@ int main(int argc, char** argv) {
new_argv.push_back(
strdup("--tryfromenv=fraction_of_gpu_memory_to_use,use_pinned_memory"));
#else
new_argv.push_back(strdup("--tryfromenv=use_pinned_memory"));
new_argv.push_back(strdup("--tryfromenv=use_pinned_memory,use_mkldnn"));
#endif
int new_argc = static_cast<int>(new_argv.size());
char** new_argv_address = new_argv.data();
......
......@@ -117,7 +117,7 @@ def __bootstrap__():
read_env_flags = [
'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir',
'eager_delete_scope'
'eager_delete_scope', 'use_mkldnn'
]
if core.is_compiled_with_cuda():
read_env_flags += [
......
......@@ -13,7 +13,7 @@
# limitations under the License.
import contextlib
from layer_function_generator import autodoc
from layer_function_generator import autodoc, templatedoc
from tensor import assign, fill_constant
from .. import core
from ..framework import Program, Variable, Operator
......@@ -721,26 +721,22 @@ def lod_rank_table(x, level=0):
return table
@templatedoc()
def max_sequence_len(rank_table):
"""Max Sequence Len Operator. Given a LoDRankTable object, this layer
returns the max length of a batch of sequences. In fact, a LoDRankTable
object contains a list of tuples(<sequence index, sequence length>) and
the list is already sorted by sequence length in descending order, so the
operator just returns the sequence length of the first tuple element.
"""
${comment}
>>> import paddle.fluid as fluid
>>> x = fluid.layers.data(name='x', shape=[10], dtype='float32',
>>> lod_level=1)
>>> rank_table = layers.lod_rank_table(x=x, level=0)
>>> max_seq_len = layers.max_sequence_len(rank_table)
Args:
rank_table (Variable): Input variable which is a LoDRankTable object.
rank_table(${rank_table_type}): ${rank_table_comment}.
Returns:
Variable: The max length of sequence.
Examples:
.. code-block:: python
x = fluid.layers.data(name='x', shape=[10],
dtype='float32', lod_level=1)
rank_table = layers.lod_rank_table(x=x, level=0)
max_seq_len = layers.max_sequence_len(rank_table)
${out_comment}.
"""
helper = LayerHelper("max_seqence_len", **locals())
res = helper.create_tmp_variable(dtype="int64")
......
......@@ -19,11 +19,12 @@ from ..unique_name import generate as unique_name
from control_flow import BlockGuard
from ..layer_helper import LayerHelper
from ..executor import global_scope
from layer_function_generator import generate_layer_fn, templatedoc
__all__ = [
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file',
'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
'random_data_generator', 'Preprocessor'
'random_data_generator', 'Preprocessor', 'load'
]
......@@ -662,3 +663,29 @@ class Preprocessor(object):
"sink_var_names": self.sink_var_names
})
return monkey_patch_reader_methods(self.reader)
@templatedoc()
def load(out, file_path, load_as_fp16=None):
"""
${comment}
>>> import paddle.fluid as fluid
>>> tmp_tensor = fluid.layers.create_tensor(dtype='float32')
>>> fluid.layers.load(tmp_tensor, "./tmp_tensor.bin")
Args:
out(${out_type}): ${out_comment}.
file_path(${file_path_type}): ${file_path_comment}.
load_as_fp16(${load_as_fp16_type}): ${load_as_fp16_comment}.
Returns:
None
"""
helper = LayerHelper("load", **locals())
attrs = {"file_path": file_path}
if load_as_fp16 is not None:
attrs['load_as_fp16'] = load_as_fp16
helper.append_op(type="load", inputs={}, output={"Out": out}, args=attrs)
......@@ -224,7 +224,10 @@ def autodoc(comment=""):
return __impl__
def templatedoc():
_inline_math_single_dollar = re.compile(r"\$([^\$]+)\$")
def templatedoc(op_type=None):
"""
Decorator of layer function. It will use the docstring from the layer
function as the template. The template arguments are:
......@@ -238,32 +241,47 @@ def templatedoc():
Decorated function.
"""
def trim_ending_dot(msg):
return msg.rstrip('.')
def escape_inline_math(msg):
return _inline_math_single_dollar.sub(repl=r':math:`\1`', string=msg)
def __impl__(func):
op_proto = OpProtoHolder.instance().get_op_proto(func.__name__)
if op_type is None:
op_type_name = func.__name__
else:
op_type_name = op_type
op_proto = OpProtoHolder.instance().get_op_proto(op_type_name)
tmpl = string.Template(func.__doc__)
comment_lines = op_proto.comment.split("\n")
comment = ""
for line in comment_lines:
line = line.lstrip()
comment += line
comment += "\n"
args = {"comment": comment}
line = line.strip()
if len(line) != 0:
comment += escape_inline_math(line)
comment += " "
elif len(comment) != 0:
comment += "\n \n "
args = {"comment": trim_ending_dot(comment)}
for each_input in op_proto.inputs:
input_name = _convert_(each_input.name)
args["{0}_comment".format(input_name)] = each_input.comment
args["{0}_comment".format(input_name)] = trim_ending_dot(
each_input.comment)
args["{0}_type".format(input_name)] = "Variable"
for each_attr in op_proto.attrs:
input_name = _convert_(each_attr.name)
args["{0}_comment".format(input_name)] = each_attr.comment
args["{0}_comment".format(input_name)] = trim_ending_dot(
each_attr.comment)
args["{0}_type".format(input_name)] = _type_to_str_(each_attr.type)
for each_opt in op_proto.outputs:
output_name = _convert_(each_opt.name)
args["{0}_comment".format(output_name)] = each_opt.comment
args["{0}_comment".format(output_name)] = trim_ending_dot(
each_opt.comment)
args["{0}_type".format(output_name)] = "Variable"
func.__doc__ = tmpl.substitute(args)
return func
......
......@@ -11,6 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
When training a model, it's often useful to decay the
learning rate during training process, this is called
learning_rate_decay. There are many strategies to do
this, this module will provide some classical method.
User can also implement their own learning_rate_decay
strategy according to this module.
"""
import control_flow
import nn
......@@ -22,14 +30,6 @@ __all__ = [
'exponential_decay', 'natural_exp_decay', 'inverse_time_decay',
'polynomial_decay', 'piecewise_decay', 'noam_decay'
]
"""
When training a model, it's often useful to decay the
learning rate during training process, this is called
learning_rate_decay. There are many strategies to do
this, this module will provide some classical method.
User can also implement their own learning_rate_decay
strategy according to this module.
"""
def _decay_step_counter(begin=0):
......@@ -41,18 +41,20 @@ def _decay_step_counter(begin=0):
def noam_decay(d_model, warmup_steps):
"""Apply decay to learning rate.
```python
lr_value = np.power(d_model, -0.5) * np.min([
np.power(current_steps, -0.5),
np.power(warmup_steps, -1.5) * current_steps
])
```
"""
Noam decay method. The numpy implementation of noam decay as follows.
>>> import numpy as np
>>> lr_value = np.power(d_model, -0.5) * np.min([
>>> np.power(current_steps, -0.5),
>>> np.power(warmup_steps, -1.5) * current_steps])
Please reference `attention is all you need
<https://arxiv.org/pdf/1706.03762.pdf>`_.
Args:
d_model(Variable): The dimensionality of input and output of model.
Reference: attention is all you need
https://arxiv.org/pdf/1706.03762.pdf
warmup_steps(Variable): A super parameter.
Returns:
......
......@@ -4037,18 +4037,25 @@ def image_resize(input,
return out
@templatedoc(op_type="bilinear_interp")
def resize_bilinear(input, out_shape=None, scale=None, name=None):
"""
This is an alias of layer 'image_resize' with bilinear interpolation.
${comment}
Args:
input(${x_type}): ${x_comment}.
out_shape(${out_size_type}): ${out_size_comment}.
The mathematical meaning of resize bilinear layer is
Bilinear interpolation.
Bilinear interpolation is an extension of linear interpolation for
interpolating functions of two variables (e.g. H-direction and
W-direction in this layer) on a rectilinear 2D grid.
scale(float|None): The multiplier for the input height or width. At
least one of out_shape or scale must be set. And out_shape has
a higher priority than scale. Default: None.
name(str|None): The output variable name.
Returns:
For details, please refer to Wikipedia:
https://en.wikipedia.org/wiki/Bilinear_interpolation
${out_comment}.
"""
return image_resize(input, out_shape, scale, name, 'BILINEAR')
......
......@@ -18,6 +18,7 @@ from ..framework import convert_np_dtype_to_dtype_
from ..framework import Variable
from ..initializer import Constant, force_init_on_cpu
from ..core import VarDesc
from layer_function_generator import templatedoc
import numpy
__all__ = [
......@@ -30,6 +31,8 @@ __all__ = [
'assign',
'fill_constant_batch_size_like',
'fill_constant',
'argmin',
'argmax',
'ones',
'zeros',
]
......@@ -266,6 +269,7 @@ def fill_constant(shape, dtype, value, force_cpu=False, out=None):
return out
@templatedoc()
def fill_constant_batch_size_like(input,
shape,
dtype,
......@@ -273,30 +277,28 @@ def fill_constant_batch_size_like(input,
input_dim_idx=0,
output_dim_idx=0):
"""
**fill_constant_batch_size_like**
This function creates a tensor of specified *shape*, *dtype* and batch size,
and initializes this with a constant supplied in *value*. The batch size is
obtained from the `input` tensor.
${comment}
It also sets *stop_gradient* to True.
>>> data = fluid.layers.fill_constant_batch_size_like(
>>> input=like, shape=[1], value=0, dtype='int64')
Args:
input(Variable): Tensor whose dimensions will be used to get batch size
shape(tuple|list|None): Shape of output tensor
dtype(np.dtype|core.VarDesc.VarType|str): Data type of output tensor
value(float): Constant value to initialize the output tensor
input_dim_idx(int): Index of input's batch size dimension
output_dim_idx(int): Index of output's batch size dimension
input(${input_type}): ${input_comment}.
Returns:
Variable: The tensor variable storing the output
shape(${shape_type}): ${shape_comment}.
Examples:
.. code-block:: python
dtype(${dtype_type}): ${dtype_comment}.
value(${value_type}): ${value_comment}.
input_dim_idx(${input_dim_idx_type}): ${input_dim_idx_comment}.
output_dim_idx(${output_dim_idx_type}): ${output_dim_idx_comment}.
data = fluid.layers.fill_constant_batch_size_like(
input=like, shape=[1], value=0, dtype='int64')
Returns:
${out_comment}.
"""
helper = LayerHelper("fill_constant_batch_size_like", **locals())
out = helper.create_tmp_variable(dtype=dtype)
......@@ -315,6 +317,68 @@ def fill_constant_batch_size_like(input,
return out
def argmin(x, axis=0):
"""
**argmin**
This function computes the indices of the min elements
of the input tensor's element along the provided axis.
Args:
x(Variable): The input to compute the indices of
the min elements.
axis(int): Axis to compute indices along.
Returns:
Variable: The tensor variable storing the output
Examples:
.. code-block:: python
out = fluid.layers.argmin(x=in, axis=0)
out = fluid.layers.argmin(x=in, axis=-1)
"""
helper = LayerHelper("arg_min", **locals())
out = helper.create_tmp_variable(VarDesc.VarType.INT64)
helper.append_op(
type='arg_min',
inputs={'X': x},
outputs={'Out': [out]},
attrs={'axis': axis})
return out
def argmax(x, axis=0):
"""
**argmax**
This function computes the indices of the max elements
of the input tensor's element along the provided axis.
Args:
x(Variable): The input to compute the indices of
the max elements.
axis(int): Axis to compute indices along.
Returns:
Variable: The tensor variable storing the output
Examples:
.. code-block:: python
out = fluid.layers.argmax(x=in, axis=0)
out = fluid.layers.argmax(x=in, axis=-1)
"""
helper = LayerHelper("arg_max", **locals())
out = helper.create_tmp_variable(VarDesc.VarType.INT64)
helper.append_op(
type='arg_max',
inputs={'X': x},
outputs={'Out': [out]},
attrs={'axis': axis})
return out
def ones(shape, dtype, force_cpu=False):
"""
**ones**
......@@ -437,22 +501,6 @@ def save_combine(x, file_path, overwrite=True):
"overwrite": overwrite})
def load(out, file_path):
"""
Loads a variable from a given file.
Args:
out(variable): The variable to be read from the disk file.
file_path(str): The path of the disk file.
"""
helper = LayerHelper("load", **locals())
helper.append_op(
type="load",
inputs={},
output={"Out": out},
args={"file_path": file_path})
def load_combine(out, file_path):
"""
Loads a list of vairables from a single file.
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
from op_test import OpTest
class BaseTestCase(OpTest):
def initTestCase(self):
self.op_type = 'arg_min'
self.dims = (3, 4, 5)
self.dtype = 'float32'
self.axis = 0
def setUp(self):
self.initTestCase()
self.x = (1000 * np.random.random(self.dims)).astype(self.dtype)
self.inputs = {'X': self.x}
self.attrs = {'axis': self.axis}
if self.op_type == "arg_min":
self.outputs = {'Out': np.argmin(self.x, axis=self.axis)}
else:
self.outputs = {'Out': np.argmax(self.x, axis=self.axis)}
def test_check_output(self):
self.check_output()
class TestCase0(BaseTestCase):
def initTestCase(self):
self.op_type = 'arg_max'
self.dims = (3, 4, 5)
self.dtype = 'float32'
self.axis = 0
class TestCase1(BaseTestCase):
def initTestCase(self):
self.op_type = 'arg_min'
self.dims = (3, 4)
self.dtype = 'float64'
self.axis = 1
class TestCase2(BaseTestCase):
def initTestCase(self):
self.op_type = 'arg_max'
self.dims = (3, 4)
self.dtype = 'int64'
self.axis = 0
class TestCase3(BaseTestCase):
def initTestCase(self):
self.op_type = 'arg_max'
self.dims = (3, )
self.dtype = 'int64'
self.axis = 0
class TestCase4(BaseTestCase):
def initTestCase(self):
self.op_type = 'arg_min'
self.dims = (1, )
self.dtype = 'int32'
self.axis = 0
if __name__ == '__main__':
unittest.main()
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import delete_ops
......@@ -54,10 +55,10 @@ class TestDistTranspiler(TranspilerTest):
delete_ops(trainer.global_block(), optimize_ops)
ops = [op.type for op in trainer.global_block().ops] + [
"split_byref", "send_vars", "send_barrier", "recv", "recv",
"split_byref", "send", "send_barrier", "recv", "recv",
"fetch_barrier", "concat"
]
ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars")
ops.insert(ops.index("elementwise_add_grad") + 1, "send")
return ops
......
......@@ -70,17 +70,18 @@ class TestListenAndServOp(OpTest):
return p.pid
def _wait_ps_ready(self, pid):
retry_times = self.ps_timeout
start_left_time = self.ps_timeout
sleep_time = 0.5
while True:
assert retry_times >= 0, "wait ps ready failed"
time.sleep(0.5)
assert start_left_time >= 0, "wait ps ready failed"
time.sleep(sleep_time)
try:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os.stat("/tmp/paddle.%d.port" % pid)
return
except os.error:
retry_times -= 1
start_left_time -= sleep_time
def test_rpc_interfaces(self):
# TODO(Yancey1989): need to make sure the rpc interface correctly.
......
......@@ -59,9 +59,9 @@ class TestSimpleDistTranspiler(TranspilerTest):
delete_ops(trainer.global_block(), optimize_ops)
ops = [op.type for op in trainer.global_block().ops] + [
"send_vars", "send_barrier", "recv", "recv", "fetch_barrier"
"send", "send_barrier", "recv", "recv", "fetch_barrier"
]
ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars")
ops.insert(ops.index("elementwise_add_grad") + 1, "send")
return ops
def _transpiler_instance(self):
......
......@@ -24,9 +24,9 @@ Steps to transpile trainer:
1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
2. rename splited grad variables to add trainer_id suffix ".trainer_%d".
3. modify trainer program add split_op to each grad variable.
4. append send_op to send splited variables to server and fetch
params(splited blocks or origin param) from server.
5. append concat_op to merge splited blocks to update local weights.
4. append send_op to send splited variables to server and
5. add recv_op to fetch params(splited blocks or origin param) from server.
6. append concat_op to merge splited blocks to update local weights.
Steps to transpile pserver:
1. create new program for parameter server.
......@@ -317,7 +317,7 @@ class DistributeTranspiler:
program.global_block().insert_op(
index=index + 1,
type="send_vars",
type="send",
inputs={"X": splited_vars},
outputs={},
attrs={
......@@ -685,7 +685,7 @@ class DistributeTranspiler:
break
def _split_table_grad_and_add_send_vars(self, program, pserver_endpoints):
# 2. add split_ids_op and send_vars_op to send gradient to pservers
# 2. add split_ids_op and send_op to send gradient to pservers
# there should only be one table_name
all_ops = program.global_block().ops
table_grad_name = grad_var_name(self.table_name)
......@@ -702,11 +702,11 @@ class DistributeTranspiler:
outputs={"Out": self.trainer_side_table_grad_list})
program.global_block().insert_op(
index=op_index + 2,
type="send_vars",
type="send",
inputs={'X': self.trainer_side_table_grad_list},
outputs={},
attrs={
"sync_send": True,
"sync_mode": True,
"epmap": pserver_endpoints,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册