未验证 提交 be528f98 编写于 作者: Y yuyang18

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into...

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into feature/combine_open_files_and_double_buffer
......@@ -210,7 +210,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
# generate fake:
if args.use_fake_data:
for var in feed_var_list:
v = startup_prog.global_block().clone_variable(var)
v = startup_prog.global_block()._clone_variable(var)
var.persistable = True
v.persistable = True
......
......@@ -98,13 +98,13 @@ class Block(objects):
def append_operator(self, ...):
self.ops.append(Operator(self, ...))
def prepend_operator(self, ...): # Parameter's ctor prepands initialize operators.
def _prepend_operator(self, ...): # Parameter's ctor prepands initialize operators.
self.ops.prepend(Operator(self, ...))
```
`create_parameter` is necessary because parameters are global variables, defined in the global block, but can be created in some sub-blocks. For example, an FC layer in the step block of an RNN operator.
`prepend_operator` is necessary because the constructor of `Parameter` needs to create the initialize (or load) operator of the parameter, and would like to put it in the *preamble* of the global block.
`_prepend_operator` is necessary because the constructor of `Parameter` needs to create the initialize (or load) operator of the parameter, and would like to put it in the *preamble* of the global block.
### Operator
......
......@@ -78,7 +78,7 @@ def error_clip_callback(block, context):
op_desc = block.desc.op(block.desc.op_size() - 1)
for grad_n in filter(lambda n: grad_to_var.has_key(n),
op_desc.output_arg_names()):
fwd_var = block.var_recursive(grad_to_var[grad_n])
fwd_var = block.__var_recursive(grad_to_var[grad_n])
error_clip = getattr(fwd_var, "error_clip", None)
if not (error_clip is None or isinstance(error_clip,
BaseErrorClipAttr)):
......
......@@ -35,11 +35,16 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安
# 2. 可选步骤:源码中构建用于编译PaddlePaddle的Docker镜像
docker build -t paddle:dev .
# 3. 执行下面的命令编译CPU-Only的二进制
docker run -it -v $PWD:/paddle -w /paddle -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddlepaddle/paddle_manylinux_devel:cuda8.0_cudnn5 ./paddle/scripts/paddle_build.sh build
docker run -it -v $PWD:/paddle -w /paddle -e "PYTHON_ABI=cp27-cp27mu" -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddlepaddle/paddle_manylinux_devel:cuda8.0_cudnn5 ./paddle/scripts/paddle_build.sh build
# 4. 或者也可以使用为上述可选步骤构建的镜像(必须先执行第2步)
docker run -it -v $PWD:/paddle -w /paddle -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddle:dev ./paddle/scripts/paddle_build.sh build
注:上述命令把当前目录(源码树根目录)映射为 container 里的 :code:`/paddle` 目录。
注:
- 上述命令把当前目录(源码树根目录)映射为 container 里的 :code:`/paddle` 目录。
- 如果您使用的是 manylinux 的镜像进行编译, 那么您需要通过环境变量 :code:`PYTHON_ABI` 来指定一个 `Python ABI <https://www.python.org/dev/peps/pep-0425/#id8>`__.
PaddlePaddle目前支持的 Python ABI 有 :code:`cp27-cp27m` 和 :code:`cp27-cp27mu`.
编译完成后会在build/python/dist目录下生成输出的whl包,可以选在在当前机器安装也可以拷贝到目标机器安装:
......
......@@ -36,13 +36,18 @@ If you don't wish to use docker,you need to install several compile dependenci
# 2. Optional: build development docker image from source
docker build -t paddle:dev .
# 3. Run the following command to build a CPU-Only binaries
docker run -it -v $PWD:/paddle -w /paddle -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddlepaddle/paddle_manylinux_devel:cuda8.0_cudnn5 ./paddle/scripts/paddle_build.sh build
docker run -it -v $PWD:/paddle -w /paddle -e "PYTHON_ABI=cp27-cp27mu" -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddlepaddle/paddle_manylinux_devel:cuda8.0_cudnn5 ./paddle/scripts/paddle_build.sh build
# 4. Or, use your built Docker image to build PaddlePaddle (must run step 2)
docker run -it -v $PWD:/paddle -w /paddle -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddle:dev ./paddle/scripts/paddle_build.sh build
NOTE: The above command try to mount the current working directory (root directory of source code)
NOTE:
- The above command try to mount the current working directory (root directory of source code)
into :code:`/paddle` directory inside docker container.
- You need to pass in the required environment variable :code:`PYTHON_ABI` to specify a `Python ABI <https://www.python.org/dev/peps/pep-0425/#id8>`__.
Currently PaddlePaddle supported Python ABIs include :code:`cp27-cp27m` and :code:`cp27-cp27mu` .
When the compile finishes, you can get the output whl package under
build/python/dist, then you can choose to install the whl on local
machine or copy it to the target machine.
......
......@@ -118,7 +118,7 @@ class Float16Transpiler:
for var in self.block.vars.keys():
if var not in args:
self.block.remove_var(var)
self.block._remove_var(var)
def _modify_feed_fetch(self):
'''
......@@ -165,7 +165,7 @@ class Float16Transpiler:
dtype=core.VarDesc.VarType.FP16,
shape=var.shape,
persistable=var.persistable)
self.block.insert_op(
self.block._insert_op(
i + 1,
type="cast",
inputs={"X": var},
......@@ -188,7 +188,7 @@ class Float16Transpiler:
persistable=var.persistable)
find_op(var)
var.op.rename_output(var_name, tmp_var_name)
self.block.insert_op(
self.block._insert_op(
i,
type="cast",
inputs={"X": tmp_var},
......@@ -253,4 +253,4 @@ class Float16Transpiler:
# old var will be replaced by the fp16 var in program desc
self.input_map[var.name] = fp16_var_name
self.block.remove_var(var.name)
self.block._remove_var(var.name)
......@@ -104,7 +104,3 @@ if (WITH_ANAKIN) # only needed in CI
target_compile_options(inference_anakin_test BEFORE PUBLIC ${ANAKIN_COMPILE_EXTRA_FLAGS})
endif(WITH_TESTING)
endif()
if(WITH_TESTING)
add_subdirectory(demo)
endif()
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
option(WITH_INFERENCE_DEMO "Compile with Inference demo" OFF)
if(NOT WITH_INFERENCE_DEMO)
return()
endif()
set(DEMO_INSTALL_DIR "${PADDLE_BINARY_DIR}/inference_demo")
set(URL_ROOT http://paddlemodels.bj.bcebos.com/inference-vis-demos%2F)
function(inference_download_test_demo TARGET)
if (NOT WITH_TESTING)
return()
endif()
set(options "")
set(oneValueArgs URL)
set(multiValueArgs SRCS)
cmake_parse_arguments(tests "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
set(test_dir "${DEMO_INSTALL_DIR}/${TARGET}")
message(STATUS "inference demo ${test_dir}")
if(NOT EXISTS "${test_dir}")
message(STATUS "Download ${TARGET} model from ${tests_URL}")
execute_process(COMMAND bash -c "mkdir -p ${test_dir}")
execute_process(COMMAND bash -c "cd ${test_dir}; wget -q ${tests_URL}")
execute_process(COMMAND bash -c "cd ${test_dir}; tar xzf *.tar.gz")
endif()
cc_test(${TARGET} SRCS "${tests_SRCS}"
DEPS paddle_inference_api paddle_fluid
ARGS --data=${test_dir}/data.txt
--modeldir=${test_dir}/model
--refer=${test_dir}/result.txt)
endfunction()
# disable mobilenet test
#inference_download_test_demo(mobilenet_inference_demo
# SRCS vis_demo.cc
# URL ${URL_ROOT}mobilenet.tar.gz)
inference_download_test_demo(se_resnext50_inference_demo
SRCS vis_demo.cc
URL ${URL_ROOT}se_resnext50.tar.gz)
inference_download_test_demo(ocr_inference_demo
SRCS vis_demo.cc
URL ${URL_ROOT}ocr.tar.gz)
# Infernce Demos
Input data format:
- Each line contains a single record
- Each record's format is
```
<space splitted floats as data>\t<space splitted ints as shape>
```
Follow the C++ codes in `vis_demo.cc`.
## MobileNet
To execute the demo, simply run
```sh
./mobilenet_inference_demo --modeldir <model> --data <datafile>
```
## SE-ResNeXt-50
To execute the demo, simply run
```sh
./se_resnext50_inference_demo --modeldir <model> --data <datafile>
```
## OCR
To execute the demo, simply run
```sh
./ocr_inference_demo --modeldir <model> --data <datafile>
```
......@@ -52,14 +52,12 @@ else()
set(MATH_LIB ${PADDLE_LIB}/third_party/install/openblas/lib/libopenblas.a)
endif()
# Note: libpaddle_inference_api.so/a must put before libpaddle_fluid.so/a
if(WITH_STATIC_LIB)
set(DEPS
"-Wl,--whole-archive"
${PADDLE_LIB}/paddle/fluid/inference/libpaddle_fluid.a
"-Wl,--no-whole-archive"
${PADDLE_LIB}/contrib/inference/libpaddle_inference_api.a)
${PADDLE_LIB}/contrib/inference/libpaddle_inference_api.a
${PADDLE_LIB}/paddle/fluid/inference/libpaddle_fluid.a)
else()
# Note: libpaddle_inference_api.so must put before libpaddle_fluid.so
set(DEPS
${PADDLE_LIB}/contrib/inference/libpaddle_inference_api.so
${PADDLE_LIB}/paddle/fluid/inference/libpaddle_fluid.so)
......
# Inference Demos
There are several demos:
- simple_on_word2vec:
- Follow the C++ codes is in `simple_on_word2vec.cc`.
- It is suitable for word2vec model.
- vis_demo:
- Follow the C++ codes is in `vis_demo.cc`.
- It is suitable for mobilenet, se_resnext50 and ocr three models.
- Input data format:
- Each line contains a single record
- Each record's format is
```
<space splitted floats as data>\t<space splitted ints as shape>
```
To build and execute the demos, simply run
```
./run.sh $PADDLE_ROOT $TURN_ON_MKL $TEST_GPU_CPU
```
- It will build and execute the demos in both static and shared library.
- `$PADDLE_ROOT`: paddle library path
- `$TURN_ON_MKL`: use MKL or Openblas
- `$TEST_GPU_CPU`: test both GPU/CPU mode or only CPU mode
- NOTE: for simple_on_word2vec, must run `ctest -R test_word2vec -R` to obtain word2vec model at first.
set -x
PADDLE_ROOT=$1
WITH_MKL=$2
WITH_GPU=$3
if [ $3 == "ON" ]; then
TURN_ON_MKL=$2 # use MKL or Openblas
TEST_GPU_CPU=$3 # test both GPU/CPU mode or only CPU mode
if [ $2 == ON ]; then
# You can export yourself if move the install path
MKL_LIB=${PADDLE_ROOT}/build/fluid_install_dir/third_party/install/mklml/lib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${MKL_LIB}
fi
if [ $3 == ON ]; then
use_gpu_list='true false'
else
use_gpu_list='false'
fi
# download vis_demo data
function download() {
dir_name=$1
mkdir -p $dir_name
cd $dir_name
wget -q ${URL_ROOT}$dir_name.tar.gz
tar xzf *.tar.gz
cd ..
}
URL_ROOT=http://paddlemodels.bj.bcebos.com/inference-vis-demos%2F
mkdir -p data
cd data
vis_demo_list='se_resnext50 ocr mobilenet'
for vis_demo_name in $vis_demo_list; do
download $vis_demo_name
done
cd ..
# compile and test the demo
mkdir -p build
cd build
for WITH_STATIC_LIB in false; do
for WITH_STATIC_LIB in ON OFF; do
# -----simple_on_word2vec-----
rm -rf *
cmake .. -DPADDLE_LIB=${PADDLE_ROOT}/build/fluid_install_dir/ \
-DWITH_MKL=$WITH_MKL \
-DWITH_MKL=$TURN_ON_MKL \
-DDEMO_NAME=simple_on_word2vec \
-DWITH_GPU=$WITH_GPU \
-DWITH_GPU=$TEST_GPU_CPU \
-DWITH_STATIC_LIB=$WITH_STATIC_LIB
make
for use_gpu in $use_gpu_list; do
./simple_on_word2vec \
--dirname=${PADDLE_ROOT}/build/python/paddle/fluid/tests/book/word2vec.inference.model \
--use_gpu=$use_gpu
make -j
word2vec_model=${PADDLE_ROOT}'/build/python/paddle/fluid/tests/book/word2vec.inference.model'
if [ -d $word2vec_model ]; then
for use_gpu in $use_gpu_list; do
./simple_on_word2vec \
--dirname=$word2vec_model \
--use_gpu=$use_gpu
if [ $? -ne 0 ]; then
echo "simple_on_word2vec demo runs fail."
exit 1
fi
done
fi
# ---------vis_demo---------
rm -rf *
cmake .. -DPADDLE_LIB=${PADDLE_ROOT}/build/fluid_install_dir/ \
-DWITH_MKL=$TURN_ON_MKL \
-DDEMO_NAME=vis_demo \
-DWITH_GPU=$TEST_GPU_CPU \
-DWITH_STATIC_LIB=$WITH_STATIC_LIB
make -j
for use_gpu in false; do
for vis_demo_name in $vis_demo_list; do
./vis_demo \
--modeldir=../data/$vis_demo_name/model \
--data=../data/$vis_demo_name/data.txt \
--refer=../data/$vis_demo_name/result.txt \
--use_gpu=$use_gpu
if [ $? -ne 0 ]; then
echo "vis demo $vis_demo_name runs fail."
exit 1
fi
done
done
done
if [ $? -eq 0 ]; then
exit 0
else
echo "inference demo runs fail."
exit 1
fi
set +x
......@@ -16,7 +16,7 @@
#include <string>
#include <vector>
#include "paddle/contrib/inference/paddle_inference_api.h"
#include "contrib/inference/paddle_inference_api.h"
namespace paddle {
namespace demo {
......
......@@ -18,19 +18,14 @@ limitations under the License. */
#include <gflags/gflags.h>
#include <glog/logging.h> // use glog instead of PADDLE_ENFORCE to avoid importing other paddle header files.
#include <gtest/gtest.h>
#include <fstream>
#include <iostream>
#include "paddle/contrib/inference/demo/utils.h"
#include "paddle/contrib/inference/paddle_inference_api.h"
#include "paddle/fluid/platform/enforce.h"
#include "utils.h"
#ifdef PADDLE_WITH_CUDA
DECLARE_double(fraction_of_gpu_memory_to_use);
#endif
namespace paddle {
namespace demo {
DEFINE_string(modeldir, "", "Directory of the inference model.");
DEFINE_string(refer, "", "path to reference result for comparison.");
DEFINE_string(
......@@ -38,6 +33,10 @@ DEFINE_string(
"",
"path of data; each line is a record, format is "
"'<space splitted floats as data>\t<space splitted ints as shape'");
DEFINE_bool(use_gpu, false, "Whether use gpu.");
namespace paddle {
namespace demo {
struct Record {
std::vector<float> data;
......@@ -47,7 +46,7 @@ struct Record {
void split(const std::string& str, char sep, std::vector<std::string>* pieces);
Record ProcessALine(const std::string& line) {
LOG(INFO) << "process a line";
VLOG(3) << "process a line";
std::vector<std::string> columns;
split(line, '\t', &columns);
CHECK_EQ(columns.size(), 2UL)
......@@ -65,8 +64,8 @@ Record ProcessALine(const std::string& line) {
for (auto& s : shape_strs) {
record.shape.push_back(std::stoi(s));
}
LOG(INFO) << "data size " << record.data.size();
LOG(INFO) << "data shape size " << record.shape.size();
VLOG(3) << "data size " << record.data.size();
VLOG(3) << "data shape size " << record.shape.size();
return record;
}
......@@ -78,20 +77,22 @@ void CheckOutput(const std::string& referfile, const PaddleTensor& output) {
file.close();
size_t numel = output.data.length() / PaddleDtypeSize(output.dtype);
LOG(INFO) << "predictor output numel " << numel;
LOG(INFO) << "reference output numel " << refer.data.size();
EXPECT_EQ(numel, refer.data.size());
VLOG(3) << "predictor output numel " << numel;
VLOG(3) << "reference output numel " << refer.data.size();
PADDLE_ENFORCE_EQ(numel, refer.data.size());
switch (output.dtype) {
case PaddleDType::INT64: {
for (size_t i = 0; i < numel; ++i) {
EXPECT_EQ(static_cast<int64_t*>(output.data.data())[i], refer.data[i]);
PADDLE_ENFORCE_EQ(static_cast<int64_t*>(output.data.data())[i],
refer.data[i]);
}
break;
}
case PaddleDType::FLOAT32:
for (size_t i = 0; i < numel; ++i) {
EXPECT_NEAR(
static_cast<float*>(output.data.data())[i], refer.data[i], 1e-5);
PADDLE_ENFORCE_LT(
fabs(static_cast<float*>(output.data.data())[i] - refer.data[i]),
1e-5);
}
break;
}
......@@ -106,15 +107,15 @@ void Main(bool use_gpu) {
config.prog_file = FLAGS_modeldir + "/__model__";
config.use_gpu = use_gpu;
config.device = 0;
#ifdef PADDLE_WITH_CUDA
config.fraction_of_gpu_memory = FLAGS_fraction_of_gpu_memory_to_use;
#endif
if (FLAGS_use_gpu) {
config.fraction_of_gpu_memory = 0.1; // set by yourself
}
LOG(INFO) << "init predictor";
VLOG(3) << "init predictor";
auto predictor =
CreatePaddlePredictor<NativeConfig, PaddleEngineKind::kNative>(config);
LOG(INFO) << "begin to process data";
VLOG(3) << "begin to process data";
// Just a single batch of data.
std::string line;
std::ifstream file(FLAGS_data);
......@@ -129,21 +130,26 @@ void Main(bool use_gpu) {
.data = PaddleBuf(record.data.data(), record.data.size() * sizeof(float)),
.dtype = PaddleDType::FLOAT32};
LOG(INFO) << "run executor";
VLOG(3) << "run executor";
std::vector<PaddleTensor> output;
predictor->Run({input}, &output);
LOG(INFO) << "output.size " << output.size();
VLOG(3) << "output.size " << output.size();
auto& tensor = output.front();
LOG(INFO) << "output: " << SummaryTensor(tensor);
VLOG(3) << "output: " << SummaryTensor(tensor);
// compare with reference result
CheckOutput(FLAGS_refer, tensor);
}
TEST(demo, vis_demo_cpu) { Main(false /*use_gpu*/); }
#ifdef PADDLE_WITH_CUDA
TEST(demo, vis_demo_gpu) { Main(true /*use_gpu*/); }
#endif
} // namespace demo
} // namespace paddle
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
paddle::demo::Main(false /* use_gpu*/);
if (FLAGS_use_gpu) {
paddle::demo::Main(true /*use_gpu*/);
}
return 0;
}
......@@ -54,6 +54,7 @@ bool PaddleInferenceAnakinPredictor::Run(
LOG(ERROR) << "copy data from CPU to GPU error";
return false;
}
cudaStreamSynchronize(NULL);
}
executor_.prediction();
......@@ -76,6 +77,7 @@ bool PaddleInferenceAnakinPredictor::Run(
LOG(ERROR) << "copy data from GPU to CPU error";
return false;
}
cudaStreamSynchronize(NULL);
}
return true;
}
......
......@@ -104,7 +104,7 @@ ParallelExecutor::ParallelExecutor(
}
if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
BCastParamsToDevs(bcast_vars);
BCastParamsToDevices(bcast_vars);
}
// Startup Program has been run. All local scopes has correct parameters.
......@@ -140,7 +140,7 @@ ParallelExecutor::ParallelExecutor(
member_->places_, std::move(member_->executor_)));
}
void ParallelExecutor::BCastParamsToDevs(
void ParallelExecutor::BCastParamsToDevices(
const std::unordered_set<std::string> &vars) const {
// the initializing bcast, all vars would be bcast from device(0),
// otherwise
......@@ -218,7 +218,10 @@ void ParallelExecutor::BCastParamsToDevs(
auto local_scope = member_->local_scopes_[i];
auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
if (member_->use_all_reduce_ || member_->use_cuda_) {
// FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix.
if (member_->use_all_reduce_ || member_->use_cuda_ ||
var == "@LR_DECAY_COUNTER@") {
t->Resize(dims);
t->mutable_data(cpu, main_tensor.type());
paddle::framework::TensorCopy(main_tensor, cpu, t);
......
......@@ -66,7 +66,7 @@ class ParallelExecutor {
void Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name);
void BCastParamsToDevs(const std::unordered_set<std::string> &vars) const;
void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const;
private:
ParallelExecutorPrivate *member_;
......
# analysis and tensorrt must be added before creating static library,
# otherwise, there would be undefined reference to them in static library.
add_subdirectory(analysis)
if (TENSORRT_FOUND)
add_subdirectory(tensorrt)
endif()
set(FLUID_CORE_MODULES proto_desc memory lod_tensor executor )
# TODO(panyx0718): Should this be called paddle_fluid_inference_api_internal?
......@@ -7,10 +14,6 @@ cc_library(paddle_fluid_api
get_property(fluid_modules GLOBAL PROPERTY FLUID_MODULES)
if(WITH_CONTRIB)
set(fluid_modules "${fluid_modules}" paddle_inference_api)
endif()
# Create static library
cc_library(paddle_fluid DEPS ${fluid_modules} paddle_fluid_api)
if(NOT APPLE)
......@@ -35,9 +38,3 @@ if(WITH_TESTING)
# both tests/book and analysis depends the models that generated by python/paddle/fluid/tests/book
add_subdirectory(tests/book)
endif()
add_subdirectory(analysis)
if (TENSORRT_FOUND)
add_subdirectory(tensorrt)
endif()
......@@ -35,7 +35,14 @@ class AucOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE_EQ(inference_height, label_height,
"Out and Label should have same height.");
int num_thres = ctx->Attrs().Get<int>("num_thresholds");
ctx->SetOutputDim("AUC", {1});
ctx->SetOutputDim("TPOut", {num_thres});
ctx->SetOutputDim("TNOut", {num_thres});
ctx->SetOutputDim("FPOut", {num_thres});
ctx->SetOutputDim("FNOut", {num_thres});
ctx->ShareLoD("Out", /*->*/ "AUC");
}
......@@ -63,10 +70,18 @@ class AucOpMaker : public framework::OpProtoAndCheckerMaker {
AddInput("Label",
"A 2D int tensor indicating the label of the training data."
"The height is batch size and width is always 1.");
AddInput("TP", "True-Positive value.");
AddInput("FP", "False-Positive value.");
AddInput("TN", "True-Negative value.");
AddInput("FN", "False-Negative value.");
// TODO(typhoonzero): support weight input
AddOutput("AUC",
"A scalar representing the "
"current area-under-the-curve.");
AddOutput("TPOut", "True-Positive value.");
AddOutput("FPOut", "False-Positive value.");
AddOutput("TNOut", "True-Negative value.");
AddOutput("FNOut", "False-Negative value.");
AddAttr<std::string>("curve", "Curve type, can be 'ROC' or 'PR'.")
.SetDefault("ROC");
......
......@@ -34,6 +34,12 @@ class AucKernel : public framework::OpKernel<T> {
auto* inference = ctx.Input<Tensor>("Out");
auto* label = ctx.Input<Tensor>("Label");
auto* auc = ctx.Output<Tensor>("AUC");
// Only use output var for now, make sure it's persistable and
// not cleaned up for each batch.
auto* true_positive = ctx.Output<Tensor>("TPOut");
auto* false_positive = ctx.Output<Tensor>("FPOut");
auto* true_negative = ctx.Output<Tensor>("TNOut");
auto* false_negative = ctx.Output<Tensor>("FNOut");
float* auc_data = auc->mutable_data<float>(ctx.GetPlace());
......@@ -54,19 +60,10 @@ class AucKernel : public framework::OpKernel<T> {
const T* inference_data = inference->data<T>();
const int64_t* label_data = label->data<int64_t>();
// Create local tensor for storing the curve: TP, FN, TN, FP
// TODO(typhoonzero): use eigen op to caculate these values.
Tensor true_positive, false_positive, true_negative, false_negative;
true_positive.Resize({num_thresholds});
false_negative.Resize({num_thresholds});
true_negative.Resize({num_thresholds});
false_positive.Resize({num_thresholds});
int64_t* tp_data = true_positive.mutable_data<int64_t>(ctx.GetPlace());
int64_t* fn_data = false_negative.mutable_data<int64_t>(ctx.GetPlace());
int64_t* tn_data = true_negative.mutable_data<int64_t>(ctx.GetPlace());
int64_t* fp_data = false_positive.mutable_data<int64_t>(ctx.GetPlace());
auto* tp_data = true_positive->mutable_data<int64_t>(ctx.GetPlace());
auto* fn_data = false_negative->mutable_data<int64_t>(ctx.GetPlace());
auto* tn_data = true_negative->mutable_data<int64_t>(ctx.GetPlace());
auto* fp_data = false_positive->mutable_data<int64_t>(ctx.GetPlace());
for (int idx_thresh = 0; idx_thresh < num_thresholds; idx_thresh++) {
// caculate TP, FN, TN, FP for current thresh
......@@ -91,10 +88,10 @@ class AucKernel : public framework::OpKernel<T> {
}
}
// store rates
tp_data[idx_thresh] = tp;
fn_data[idx_thresh] = fn;
tn_data[idx_thresh] = tn;
fp_data[idx_thresh] = fp;
tp_data[idx_thresh] += tp;
fn_data[idx_thresh] += fn;
tn_data[idx_thresh] += tn;
fp_data[idx_thresh] += fp;
}
// epsilon to avoid divide by zero.
float epsilon = 1e-6;
......
......@@ -48,7 +48,7 @@ class CheckpointNotifyOp : public framework::OperatorBase {
VLOG(3) << "checkpoint notify sending lookup table: " << lookup_table_name
<< " and dir:" << dir << " to " << epmap[i];
}
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
};
......
......@@ -281,9 +281,10 @@ void GRPCClient::AsyncCheckpointNotify(const std::string& ep,
req_count_++;
}
void GRPCClient::Wait() {
bool GRPCClient::Wait() {
std::unique_lock<std::mutex> lk(sync_mutex_);
sync_cond_.wait(lk, [this] { return req_count_ == 0; });
sync_cond_.wait(lk, [this] { return (req_count_ == 0 || ok_ == false); });
return ok_;
}
void GRPCClient::Proceed() {
......@@ -297,6 +298,14 @@ void GRPCClient::Proceed() {
if (c->status_.ok()) {
VLOG(3) << c->var_h_.String() << " process";
c->Process();
} else if (c->status_.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) {
LOG(ERROR) << c->var_h_.String()
<< " meets grpc error:" << c->status_.error_message();
{
std::lock_guard<std::mutex> lk(sync_mutex_);
ok_ = false;
}
sync_cond_.notify_all();
} else {
LOG(FATAL) << c->var_h_.String()
<< " meets grpc error:" << c->status_.error_message();
......
......@@ -188,7 +188,7 @@ class CheckpointNotifyProcessor : public BaseProcessor {
class GRPCClient : public RPCClient {
public:
GRPCClient() {}
GRPCClient() : ok_(true) {}
virtual ~GRPCClient();
bool AsyncSendVar(const std::string& ep, const platform::DeviceContext& ctx,
......@@ -221,7 +221,7 @@ class GRPCClient : public RPCClient {
void AsyncSendEndPass(const std::string& ep,
int64_t time_out = FLAGS_rpc_deadline) override;
void Wait() override;
bool Wait() override;
void SendBeginPass() override;
......@@ -247,6 +247,7 @@ class GRPCClient : public RPCClient {
std::mutex sync_mutex_;
std::condition_variable sync_cond_;
std::atomic<int64_t> req_count_{0};
bool ok_;
// mutex for GetChannel thread safety
std::mutex chan_mutex_;
......
......@@ -72,7 +72,7 @@ class RPCClient {
virtual void SendBeginPass() = 0;
virtual void SendEndPass() = 0;
virtual void Wait() = 0;
virtual bool Wait() = 0;
template <typename T>
static RPCClient* GetInstance() {
......
......@@ -45,13 +45,13 @@ class FetchBarrierOp : public framework::OperatorBase {
distributed::RPCClient* rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>();
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
for (auto& ep : eps) {
VLOG(3) << "fetch barrier, ep: " << ep;
rpc_client->AsyncSendFetchBarrier(ep);
}
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
};
......
......@@ -61,6 +61,8 @@ static void ParallelExecuteBlocks(
framework::Async([&executor, &prepared, &program, &scope, idx]() {
int run_block = idx; // thread local
try {
VLOG(3) << "running server block: " << run_block
<< "pointer: " << prepared[run_block].get();
executor->RunPreparedContext(prepared[run_block].get(), scope);
} catch (const std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
......@@ -107,12 +109,14 @@ void ListenAndServOp::RunSyncLoop(
PADDLE_ENFORCE_GE(num_blocks, 2,
"server program should have at least 2 blocks");
std::vector<int> optimize_blocks_idx;
for (auto blk : optimize_blocks) {
optimize_blocks_idx.push_back(blk->ID());
// Prepare all the server block
std::vector<int> optimize_blocks_list;
for (size_t i = 1; i < program->Size(); ++i) {
optimize_blocks_list.push_back(i);
}
auto optimize_prepared = executor->Prepare(*program, optimize_blocks_idx);
// Insert placeholder for block0 which holds current op itself.
auto optimize_prepared = executor->Prepare(*program, optimize_blocks_list);
// Insert placeholder for block0 which holds current op itself,
// NOTE the first block in `optimize_prepared` should never be ran.
optimize_prepared.insert(
optimize_prepared.begin(),
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
......
......@@ -53,7 +53,7 @@ class PrefetchOp : public framework::OperatorBase {
VLOG(3) << "don't send no-initialied variable: " << ins[i];
}
}
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
};
......
......@@ -51,7 +51,7 @@ class RecvOp : public framework::OperatorBase {
rpc_client->AsyncGetVar(epmap[i], ctx, scope, outs[i]);
}
if (sync_mode) {
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
}
};
......
......@@ -50,13 +50,13 @@ class SendBarrierOp : public framework::OperatorBase {
VLOG(3) << "SendBarrierOp sync_mode:" << sync_mode;
// need to wait before sending send_barrier message
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
if (sync_mode) {
for (auto& ep : eps) {
VLOG(3) << "send barrier, ep: " << ep;
rpc_client->AsyncSendBatchBarrier(ep);
}
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
}
};
......
......@@ -59,7 +59,7 @@ class SendOp : public framework::OperatorBase {
}
}
if (sync_send) {
rpc_client->Wait();
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
}
};
......
......@@ -145,14 +145,14 @@ void BindBlockDesc(pybind11::module *m) {
.def_property_readonly("id", &pd::BlockDesc::ID)
.def_property_readonly("parent", &pd::BlockDesc::Parent)
.def("get_forward_block_idx", &pd::BlockDesc::ForwardBlockID)
.def("set_forward_block_idx", &pd::BlockDesc::SetForwardBlockID)
.def("_set_forward_block_idx", &pd::BlockDesc::SetForwardBlockID)
.def("append_op", &pd::BlockDesc::AppendOp,
pybind11::return_value_policy::reference)
.def("prepend_op", &pd::BlockDesc::PrependOp,
.def("_prepend_op", &pd::BlockDesc::PrependOp,
pybind11::return_value_policy::reference)
.def("insert_op", &pd::BlockDesc::InsertOp,
.def("_insert_op", &pd::BlockDesc::InsertOp,
pybind11::return_value_policy::reference)
.def("remove_op", &pd::BlockDesc::RemoveOp)
.def("_remove_op", &pd::BlockDesc::RemoveOp)
.def("var",
[](pd::BlockDesc &self, pybind11::bytes byte_name) {
std::string name = byte_name;
......@@ -165,7 +165,7 @@ void BindBlockDesc(pybind11::module *m) {
return self.HasVar(name);
},
pybind11::return_value_policy::reference)
.def("rename_var",
.def("_rename_var",
[](pd::BlockDesc &self, const pybind11::bytes &byte_name,
const pybind11::bytes &byte_name_new) {
std::string name = byte_name;
......@@ -189,7 +189,7 @@ void BindBlockDesc(pybind11::module *m) {
return self.FindVarRecursive(name);
},
pybind11::return_value_policy::reference)
.def("remove_var",
.def("_remove_var",
[](pd::BlockDesc &self, pybind11::bytes byte_name) {
std::string name = byte_name;
return self.RemoveVar(name);
......
......@@ -68,7 +68,7 @@ bool IsCompiledWithCUDA() {
}
bool IsCompiledWithDIST() {
#ifdef PADDLE_WITH_DIST
#ifdef PADDLE_WITH_DISTRIBUTE
return true;
#else
return false;
......@@ -669,7 +669,7 @@ All parameter, weight, gradient are variables in Paddle.
const std::string &, Scope *, std::vector<Scope *> &,
const ExecutionStrategy &, const BuildStrategy &, size_t,
size_t>())
.def("bcast_params", &ParallelExecutor::BCastParamsToDevs)
.def("bcast_params", &ParallelExecutor::BCastParamsToDevices)
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
// We still cannot get local_scope from this vector, since the element
// of vec<Scope*> will be freed by Python GC. We can only return Scope*
......
......@@ -516,6 +516,7 @@ function gen_fluid_inference_lib() {
Deploying fluid inference library ...
========================================
EOF
cmake .. -DWITH_DISTRIBUTE=OFF
make -j `nproc` inference_lib_dist
cd ${PADDLE_ROOT}/build
cp -r fluid_install_dir fluid
......@@ -531,7 +532,7 @@ function test_fluid_inference_lib() {
========================================
EOF
cd ${PADDLE_ROOT}/paddle/contrib/inference/demo_ci
sh run.sh ${PADDLE_ROOT} ${WITH_MKL:-ON} ${WITH_GPU:-OFF}
./run.sh ${PADDLE_ROOT} ${WITH_MKL:-ON} ${WITH_GPU:-OFF}
fi
}
......@@ -577,6 +578,7 @@ function main() {
fluid_inference_lib)
cmake_gen ${PYTHON_ABI:-""}
gen_fluid_inference_lib
test_fluid_inference_lib
;;
check_style)
check_style
......
......@@ -46,7 +46,7 @@ from param_attr import ParamAttr, WeightNormParamAttr
from data_feeder import DataFeeder
from core import LoDTensor, LoDTensorArray, CPUPlace, CUDAPlace, CUDAPinnedPlace, Scope
from transpiler import DistributeTranspiler, InferenceTranspiler, \
memory_optimize, release_memory
memory_optimize, release_memory, DistributeTranspilerConfig
from concurrency import (Go, make_channel, channel_send, channel_recv,
channel_close, Select)
from lod_tensor import create_lod_tensor, create_random_int_lodtensor
......@@ -56,6 +56,7 @@ import unique_name
import recordio_writer
import parallel_executor
from parallel_executor import *
from paddle.fluid.layers.math_op_patch import monkey_patch_variable
Tensor = LoDTensor
......@@ -138,5 +139,5 @@ def __bootstrap__():
# TODO(panyx0718): Avoid doing complex initialization logic in __init__.py.
# Consider paddle.init(args) or paddle.main(args)
layers.monkey_patch_variable()
monkey_patch_variable()
__bootstrap__()
......@@ -328,7 +328,7 @@ def _append_backward_ops_(block,
if op.has_attr("sub_block"):
sub_block = program.block(op.block_attr("sub_block"))
grad_sub_block = program.create_block()
grad_sub_block.set_forward_block_idx(sub_block.idx)
grad_sub_block._set_forward_block_idx(sub_block.idx)
cb = _callback_lookup_(op)
if cb is not None:
if callbacks is None:
......@@ -571,7 +571,7 @@ def append_backward(loss, parameter_list=None, no_grad_set=None,
_append_backward_vars_(root_block, fwd_op_num, grad_to_var, grad_info_map)
program.current_block_idx = current_block_idx
program.sync_with_cpp()
program._sync_with_cpp()
# FIXME(zcd): prevent loss.grad optimized by mem_opt.
loss.block.var(_append_grad_suffix_(loss.name)).persistable = True
......@@ -744,7 +744,7 @@ def calc_gradient(targets, inputs, target_gradients=None, no_grad_set=None):
_rename_grad_(block, fwd_op_num, grad_to_var, target_grad_map)
_append_backward_vars_(block, fwd_op_num, grad_to_var, grad_info_map)
prog.sync_with_cpp()
prog._sync_with_cpp()
grad_vars = []
for input_var in inputs:
......
......@@ -82,7 +82,7 @@ def error_clip_callback(block, context):
op_desc = block.desc.op(block.desc.op_size() - 1)
for grad_n in filter(lambda n: grad_to_var.has_key(n),
op_desc.output_arg_names()):
fwd_var = block.var_recursive(grad_to_var[grad_n])
fwd_var = block._var_recursive(grad_to_var[grad_n])
error_clip = getattr(fwd_var, "error_clip", None)
if not (error_clip is None or isinstance(error_clip,
BaseErrorClipAttr)):
......
......@@ -69,8 +69,10 @@ class Go(BlockGuard):
parent_block.append_op(
type='go',
inputs={
'X':
[parent_block.var_recursive(x_name) for x_name in x_name_list]
'X': [
parent_block._var_recursive(x_name)
for x_name in x_name_list
]
},
outputs={},
attrs={'sub_block': go_block})
......@@ -259,7 +261,7 @@ class Select(BlockGuard):
if var_name in intermediate
]
X = [select_block.var_recursive(x_name) for x_name in params]
X = [select_block._var_recursive(x_name) for x_name in params]
# Needs to be used by `equal` inside the cases block.
X.append(self.case_to_execute)
......
......@@ -309,7 +309,7 @@ class Executor(object):
if not has_feed_operators(global_block, feed, feed_var_name):
for i, name in enumerate(feed):
out = global_block.var(name)
global_block.prepend_op(
global_block._prepend_op(
type='feed',
inputs={'X': [feed_var]},
outputs={'Out': [out]},
......
......@@ -32,7 +32,6 @@ except Exception, e:
import unique_name
__all__ = [
'Block',
'Variable',
'Program',
'Operator',
......@@ -447,7 +446,7 @@ class Operator(object):
Notes:
The constructor of operator should not be invoked directly. Use
Block.append_op or Block.prepend_op instead.
Block.append_op or Block._prepend_op instead.
Examples:
.. code-block:: python
......@@ -870,7 +869,7 @@ class Block(object):
def forward_block_idx(self):
return self.desc.get_forward_block_idx()
def set_forward_block_idx(self, idx):
def _set_forward_block_idx(self, idx):
"""
Set the forward block Idx.
......@@ -880,7 +879,7 @@ class Block(object):
Returns:
None
"""
self.desc.set_forward_block_idx(idx)
self.desc._set_forward_block_idx(idx)
@property
def idx(self):
......@@ -909,7 +908,7 @@ class Block(object):
raise ValueError("var %s not in this block" % name)
return v
def var_recursive(self, name):
def _var_recursive(self, name):
"""
Get a Variable by name from this block recursively.
......@@ -951,9 +950,9 @@ class Block(object):
raise ValueError("Var {0} is not found recursively".format(name))
def all_parameters(self):
return list(self.iter_parameters())
return list(self._iter_parameters())
def iter_parameters(self):
def _iter_parameters(self):
return (item[1] for item in self.vars.iteritems()
if isinstance(item[1], Parameter))
......@@ -966,7 +965,7 @@ class Block(object):
def has_var(self, name):
return name in self.vars
def rename_var(self, name, new_name):
def _rename_var(self, name, new_name):
"""
Rename variable in vars and ops' inputs and outputs
......@@ -1000,8 +999,8 @@ class Block(object):
else:
raise ValueError("unsupported var type: %s", type(v))
orig_var_type = v.type
self.desc.rename_var(name, new_name)
# NOTE: v is destroyed by C++ after calling rename_var.
self.desc._rename_var(name, new_name)
# NOTE: v is destroyed by C++ after calling _rename_var.
d = self.desc.find_var(new_name)
if var_type == "Parameter":
var = Parameter(
......@@ -1024,16 +1023,16 @@ class Block(object):
error_clip=error_clip,
stop_gradient=stop_gradient)
# rename the python side, sync_with_cpp will only add
# rename the python side, _sync_with_cpp will only add
# new vars/ops to python side.
self.vars[new_name] = var
del self.vars[name]
self.sync_with_cpp()
self._sync_with_cpp()
return var
def remove_var(self, name):
self.sync_with_cpp()
self.desc.remove_var(name)
def _remove_var(self, name):
self._sync_with_cpp()
self.desc._remove_var(name)
del self.vars[name]
def create_parameter(self, *args, **kwargs):
......@@ -1055,7 +1054,7 @@ class Block(object):
self.ops.append(op)
return op
def insert_op(self, index, *args, **kwargs):
def _insert_op(self, index, *args, **kwargs):
"""
Insert a Operator according to the giving arguments.
......@@ -1065,13 +1064,13 @@ class Block(object):
Returns:
Operator: the insert Operator.
"""
self.sync_with_cpp()
op_desc = self.desc.insert_op(index)
self._sync_with_cpp()
op_desc = self.desc._insert_op(index)
op = Operator(block=self, desc=op_desc, *args, **kwargs)
self.ops.insert(index, op)
return op
def remove_op(self, index):
def _remove_op(self, index):
"""
Remove the specific position operator.
......@@ -1081,11 +1080,11 @@ class Block(object):
Returns:
None
"""
self.sync_with_cpp()
self.desc.remove_op(index, index + 1)
self._sync_with_cpp()
self.desc._remove_op(index, index + 1)
del self.ops[index]
def slice_ops(self, start, end):
def _slice_ops(self, start, end):
"""
Return the Operator between start and end.
......@@ -1098,13 +1097,13 @@ class Block(object):
"""
return self.ops[start:end]
def prepend_op(self, *args, **kwargs):
op_desc = self.desc.prepend_op()
def _prepend_op(self, *args, **kwargs):
op_desc = self.desc._prepend_op()
op = Operator(self, op_desc, *args, **kwargs)
self.ops.insert(0, op)
return op
def sync_with_cpp(self):
def _sync_with_cpp(self):
"""
Sync from the desc on the c++ end. This method is used to synchronize
the c++ desc instance generated by backward.
......@@ -1170,7 +1169,7 @@ class Block(object):
for index in range(len(self.ops)):
assert self.ops[index].desc == ops_in_cpp[index]
def copy_param_info_from(self, other):
def _copy_param_info_from(self, other):
"""
Copy the information of parameters from the other block.
......@@ -1185,12 +1184,13 @@ class Block(object):
None
"""
if not isinstance(other, Block):
raise TypeError("copy_param_info_from should be invoked with Block")
for p in other.iter_parameters():
raise TypeError(
"_copy_param_info_from should be invoked with Block")
for p in other._iter_parameters():
assert isinstance(p, Parameter)
v = self.vars.get(p.name, None)
if v is None:
raise ValueError("copy_param_info_from should be invoked with "
raise ValueError("_copy_param_info_from should be invoked with "
"same topology")
assert isinstance(v, Variable)
new_p = Parameter(
......@@ -1208,7 +1208,7 @@ class Block(object):
name=v.name)
self.vars[new_p.name] = new_p
def clone_variable(self, var):
def _clone_variable(self, var):
"""
Clone a variable into current block.
......@@ -1484,9 +1484,9 @@ class Program(object):
p = Program()
p.desc = core.ProgramDesc(self.desc)
p.blocks = [Block(p, i) for i in xrange(self.desc.num_blocks())]
p.sync_with_cpp()
p._sync_with_cpp()
p.copy_param_info_from(self)
p._copy_param_info_from(self)
p.copy_data_info_from(self)
return p
......@@ -1536,7 +1536,7 @@ class Program(object):
res = Program()
res.desc = core.prune(self.desc, targets_idx)
res.blocks = [Block(res, i) for i in xrange(res.desc.num_blocks())]
res.sync_with_cpp()
res._sync_with_cpp()
return res
def inference_optimize(self):
......@@ -1562,7 +1562,7 @@ class Program(object):
if op.has_attr('is_test'):
op.set_attr('is_test', True)
res.blocks = [Block(res, i) for i in xrange(res.desc.num_blocks())]
res.sync_with_cpp()
res._sync_with_cpp()
return res
@staticmethod
......@@ -1582,7 +1582,7 @@ class Program(object):
p = Program()
p.desc = core.ProgramDesc(binary_str)
p.blocks = [Block(p, i) for i in xrange(p.desc.num_blocks())]
p.sync_with_cpp()
p._sync_with_cpp()
return p
@property
......@@ -1662,7 +1662,7 @@ class Program(object):
"""
self.current_block_idx = self.current_block().parent_idx
def sync_with_cpp(self):
def _sync_with_cpp(self):
"""
Synchronize Python instance to its binding C++ object instance.
If the program is modified in C++ space, this method should be invoked.
......@@ -1676,9 +1676,9 @@ class Program(object):
for block_idx in range(len(self.blocks), self.desc.num_blocks()):
self.blocks.append(Block(self, block_idx))
for block in self.blocks:
block.sync_with_cpp()
block._sync_with_cpp()
def copy_param_info_from(self, other):
def _copy_param_info_from(self, other):
"""
Copy the information of parameters from other program.
......@@ -1692,13 +1692,13 @@ class Program(object):
None
"""
if not isinstance(other, Program):
raise TypeError("copy_param_info_from should be invoked with "
raise TypeError("_copy_param_info_from should be invoked with "
"Program")
if len(self.blocks) != len(other.blocks):
raise ValueError("copy_param_info_from should be invoked with two "
raise ValueError("_copy_param_info_from should be invoked with two "
"program, with represent the same topology")
self.global_block().copy_param_info_from(other.global_block())
self.global_block()._copy_param_info_from(other.global_block())
def copy_data_info_from(self, other):
"""
......@@ -1714,11 +1714,11 @@ class Program(object):
None
"""
if not isinstance(other, Program):
raise TypeError("copy_param_info_from should be invoked with "
raise TypeError("_copy_param_info_from should be invoked with "
"Program")
if len(self.blocks) != len(other.blocks):
raise ValueError("copy_param_info_from should be invoked with two "
raise ValueError("_copy_param_info_from should be invoked with two "
"program, with represent the same topology")
for var in other.global_block().vars.itervalues():
if var.is_data:
......
......@@ -148,7 +148,7 @@ class ConstantInitializer(Initializer):
assert isinstance(var, framework.Variable)
assert isinstance(block, framework.Block)
# Initialization Ops should be prepended and not appended
op = block.prepend_op(
op = block._prepend_op(
type="fill_constant",
outputs={"Out": var},
attrs={
......@@ -202,7 +202,7 @@ class UniformInitializer(Initializer):
# Initialization Ops should be prepended and not appended
if self._seed == 0:
self._seed = block.program.random_seed
op = block.prepend_op(
op = block._prepend_op(
type="uniform_random",
outputs={"Out": var},
attrs={
......@@ -256,7 +256,7 @@ class NormalInitializer(Initializer):
# Initialization Ops should be prepended and not appended
if self._seed == 0:
self._seed = block.program.random_seed
op = block.prepend_op(
op = block._prepend_op(
type="gaussian_random",
outputs={"Out": var},
attrs={
......@@ -346,7 +346,7 @@ class XavierInitializer(Initializer):
if self._uniform:
limit = np.sqrt(6.0 / float(fan_in + fan_out))
op = block.prepend_op(
op = block._prepend_op(
type="uniform_random",
outputs={"Out": var},
attrs={
......@@ -359,7 +359,7 @@ class XavierInitializer(Initializer):
else:
std = np.sqrt(2.0 / float(fan_in + fan_out))
op = block.prepend_op(
op = block._prepend_op(
type="gaussian_random",
outputs={"Out": var},
attrs={
......@@ -444,7 +444,7 @@ class MSRAInitializer(Initializer):
if self._uniform:
limit = np.sqrt(6.0 / float(fan_in))
op = block.prepend_op(
op = block._prepend_op(
type="uniform_random",
outputs={"Out": var},
attrs={
......@@ -457,7 +457,7 @@ class MSRAInitializer(Initializer):
else:
std = np.sqrt(2.0 / float(fan_in))
op = block.prepend_op(
op = block._prepend_op(
type="gaussian_random",
outputs={"Out": var},
attrs={
......
......@@ -523,7 +523,7 @@ def prepend_feed_ops(inference_program,
for i, name in enumerate(feed_target_names):
out = global_block.var(name)
global_block.prepend_op(
global_block._prepend_op(
type='feed',
inputs={'X': [feed_var]},
outputs={'Out': [out]},
......@@ -625,7 +625,7 @@ def save_inference_model(dirname,
for i, op in enumerate(global_block.ops):
op.desc.set_is_target(False)
if op.type == "feed" or op.type == "fetch":
global_block.remove_op(i)
global_block._remove_op(i)
copy_program.desc.flush()
pruned_program = copy_program.prune(targets=target_vars)
......@@ -874,7 +874,7 @@ def get_test_program(filelist, program=None, startup_program=None):
main_block = program.global_block()
for var in main_block.vars.values():
if var.type == core.VarDesc.VarType.READER:
main_block.rename_var(
main_block._rename_var(
str(var.name), str(_get_test_reader_name(var.name)))
for op in main_block.ops:
......@@ -883,7 +883,7 @@ def get_test_program(filelist, program=None, startup_program=None):
if op.type == "create_multi_pass_reader":
test_op.set_attr("pass_num", 1)
startup_program.sync_with_cpp()
program.sync_with_cpp()
startup_program._sync_with_cpp()
program._sync_with_cpp()
return program
......@@ -33,7 +33,6 @@ from metric_op import *
from learning_rate_scheduler import *
__all__ = []
__all__ += math_op_patch.__all__
__all__ += nn.__all__
__all__ += io.__all__
__all__ += tensor.__all__
......
......@@ -730,8 +730,10 @@ class While(object):
parent_block.append_op(
type='while',
inputs={
'X':
[parent_block.var_recursive(x_name) for x_name in x_name_list],
'X': [
parent_block._var_recursive(x_name)
for x_name in x_name_list
],
'Condition': [self.cond_var]
},
outputs={'Out': out_vars,
......@@ -1259,7 +1261,7 @@ class ConditionalBlock(object):
input_set = set([ipt.name for ipt in self.inputs])
param_list = [
parent_block.var_recursive(each_name) for each_name in params
parent_block._var_recursive(each_name) for each_name in params
if each_name not in input_set
]
......
......@@ -16,8 +16,6 @@ from ..framework import Variable, unique_name
from layer_function_generator import OpProtoHolder
from ..initializer import force_init_on_cpu
__all__ = ['monkey_patch_variable']
def monkey_patch_variable():
def unique_tmp_name():
......
......@@ -76,7 +76,7 @@ def accuracy(input, label, k=1, correct=None, total=None):
return acc_out
def auc(input, label, curve='ROC', num_thresholds=200):
def auc(input, label, curve='ROC', num_thresholds=200, topk=1):
"""
**Area Under the Curve (AUC) Layer**
......@@ -102,6 +102,7 @@ def auc(input, label, curve='ROC', num_thresholds=200):
curve(str): Curve type, can be 'ROC' or 'PR'. Default 'ROC'.
num_thresholds(int): The number of thresholds to use when discretizing
the roc curve. Default 200.
topk(int): only topk number of prediction output will be used for auc.
Returns:
Variable: A scalar representing the current AUC.
......@@ -115,7 +116,7 @@ def auc(input, label, curve='ROC', num_thresholds=200):
"""
warnings.warn(
"This interface not recommended, fluid.layers.auc compute the auc at every minibatch, \
"This interface is not recommended, fluid.layers.auc compute the auc at every minibatch, \
but can not aggregate them and get the pass AUC, because pass \
auc can not be averaged with weighted from the minibatch auc value. \
Please use fluid.metrics.Auc, it can compute the auc value via Python natively, \
......@@ -125,14 +126,34 @@ def auc(input, label, curve='ROC', num_thresholds=200):
topk_indices = helper.create_tmp_variable(dtype="int64")
topk_out, topk_indices = nn.topk(input, k=k)
auc_out = helper.create_tmp_variable(dtype="float32")
# make tp, tn, fp, fn persistable, so that can accumulate all batches.
tp = helper.create_global_variable(persistable=True)
tn = helper.create_global_variable(persistable=True)
fp = helper.create_global_variable(persistable=True)
fn = helper.create_global_variable(persistable=True)
for var in [tp, tn, fp, fn]:
helper.set_variable_initializer(
var, Constant(
value=0.0, force_cpu=True))
helper.append_op(
type="auc",
inputs={
"Out": [topk_out],
"Indices": [topk_indices],
"Label": [label]
"Label": [label],
"TP": [tp],
"TN": [tn],
"FP": [fp],
"FN": [fn]
},
attrs={"curve": curve,
"num_thresholds": num_thresholds},
outputs={"AUC": [auc_out], })
outputs={
"AUC": [auc_out],
"TPOut": [tp],
"TNOut": [tn],
"FPOut": [fp],
"FNOut": [fn]
})
return auc_out
......@@ -4367,7 +4367,7 @@ def autoincreased_step_counter(counter_name=None, begin=1, step=1):
helper.set_variable_initializer(
counter, initializer=Constant(
value=begin - 1, force_cpu=True))
helper.main_program.global_block().prepend_op(
helper.main_program.global_block()._prepend_op(
type='increment',
inputs={'X': [counter]},
outputs={'Out': [counter]},
......
......@@ -240,7 +240,7 @@ class Optimizer(object):
self._finish_update(loss.block, parameters_and_grads)
end = len(global_block.ops)
return global_block.slice_ops(start, end)
return global_block._slice_ops(start, end)
def minimize(self,
loss,
......
......@@ -152,7 +152,7 @@ class ParallelExecutor(object):
self.executor = core.ParallelExecutor(
self._places,
set([
p.name for p in main.global_block().iter_parameters()
p.name for p in main.global_block()._iter_parameters()
if not p.stop_gradient
]),
set(self.persistable_vars), main.desc, loss_name
......
......@@ -35,7 +35,8 @@ class TestParallelExecutorBase(unittest.TestCase):
feed_dict=None,
seed=None,
use_parallel_executor=True,
use_reduce=False):
use_reduce=False,
optimizer=fluid.optimizer.Adam):
def run_executor(exe, feed, fetch_list, program=None):
if isinstance(exe, fluid.ParallelExecutor):
res = exe.run(fetch_list=fetch_list, feed=feed)
......@@ -57,8 +58,8 @@ class TestParallelExecutorBase(unittest.TestCase):
main.random_seed = seed
loss = method(use_feed=feed_dict is not None)
adam = fluid.optimizer.Adam()
adam.minimize(loss)
optimizer().minimize(loss)
if memory_opt:
fluid.memory_optimize(main)
......
......@@ -24,7 +24,20 @@ class TestAucOp(OpTest):
indices = np.random.randint(0, 2, (128, 2))
labels = np.random.randint(0, 2, (128, 1))
num_thresholds = 200
self.inputs = {'Out': pred, 'Indices': indices, 'Label': labels}
tp = np.zeros((num_thresholds, )).astype("int64")
tn = np.zeros((num_thresholds, )).astype("int64")
fp = np.zeros((num_thresholds, )).astype("int64")
fn = np.zeros((num_thresholds, )).astype("int64")
self.inputs = {
'Out': pred,
'Indices': indices,
'Label': labels,
'TP': tp,
'TN': tn,
'FP': fp,
'FN': fn
}
self.attrs = {'curve': 'ROC', 'num_thresholds': num_thresholds}
# NOTE: sklearn use a different way to generate thresholds
# which will cause the result differs slightly:
......@@ -71,7 +84,13 @@ class TestAucOp(OpTest):
y = (tpr[:num_thresholds - 1] + tpr[1:]) / 2.0
auc_value = np.sum(x * y)
self.outputs = {'AUC': auc_value}
self.outputs = {
'AUC': auc_value,
'TPOut': tp_list,
'FNOut': fn_list,
'TNOut': tn_list,
'FPOut': fp_list
}
def test_check_output(self):
self.check_output()
......
......@@ -27,7 +27,6 @@ class TranspilerTest(unittest.TestCase):
self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175"
self.pserver1_ep = "127.0.0.1:6174"
self.pserver2_ep = "127.0.0.1:6175"
self.slice_var_up = True
self.sync_mode = True
self.transpiler = None
......@@ -52,27 +51,26 @@ class TranspilerTest(unittest.TestCase):
self.origin_prog = main.clone()
return main
def get_trainer(self):
t = self._transpiler_instance()
def get_trainer(self, config=None):
t = self._transpiler_instance(config)
return t.get_trainer_program()
def get_pserver(self, ep):
t = self._transpiler_instance()
def get_pserver(self, ep, config=None):
t = self._transpiler_instance(config)
pserver = t.get_pserver_program(ep)
startup = t.get_startup_program(ep, pserver)
return pserver, startup
def _transpiler_instance(self):
def _transpiler_instance(self, config=None):
if not self.transpiler:
main = self.get_main_program()
self.transpiler = fluid.DistributeTranspiler()
self.transpiler = fluid.DistributeTranspiler(config=config)
self.transpiler.transpile(
self.trainer_id,
program=main,
pservers=self.pserver_eps,
trainers=self.trainers,
slice_var_up=self.slice_var_up,
sync_mode=self.sync_mode)
trainers=self.trainers)
return self.transpiler
......@@ -124,14 +122,67 @@ class TestBasicModel(TranspilerTest):
self.assertEqual(set(pserver_params), set(trainer_params))
class TestBasicModelWithLargeBlockSize(TranspilerTest):
def test_transpiler(self):
config = fluid.DistributeTranspilerConfig()
config.min_block_size = 1048576
pserver, startup = self.get_pserver(self.pserver1_ep, config)
pserver2, startup2 = self.get_pserver(self.pserver2_ep, config)
trainer = self.get_trainer(config)
self.assertEqual([op.type for op in trainer.global_block().ops], [
'mul', 'elementwise_add', 'elementwise_sub', 'square', 'mean',
'fill_constant', 'mean_grad', 'square_grad', 'elementwise_sub_grad',
'elementwise_add_grad', 'send', 'mul_grad', 'send', 'send_barrier',
'recv', 'recv', 'fetch_barrier'
])
self.assertEqual(len(pserver.blocks), 2)
# block0: listen_and_serv
self.assertEqual([op.type for op in pserver.blocks[0].ops],
["listen_and_serv"])
# block1~2: optimize pass
self.assertEqual([op.type for op in pserver.blocks[1].ops],
["sum", "scale", "sgd"])
# confirm startup program
self.assertEqual([op.type for op in startup.global_block().ops],
["fill_constant", "fill_constant", "fill_constant"])
# the variable #fc_w will be split into two blocks
fc_w_var = startup2.global_block().var("fc_w")
self.assertEqual(fc_w_var.shape, (1000L, 1000L))
# all parameters should be optimized on pserver
pserver_params = []
for prog in [pserver, pserver2]:
for blk in prog.blocks:
for op in blk.ops:
if "Param" in op.input_names:
param_name = op.input("Param")[0]
is_block_idx = param_name.find(".block")
if is_block_idx != -1:
origin_param_name = param_name[:is_block_idx]
else:
origin_param_name = param_name
pserver_params.append(origin_param_name)
trainer_params = []
for op in self.origin_prog.global_block().ops:
if "Param" in op.input_names:
trainer_params.append(op.input("Param")[0])
self.assertEqual(set(pserver_params), set(trainer_params))
class TestNoSliceVar(TranspilerTest):
def setUp(self):
super(TestNoSliceVar, self).setUp()
self.slice_var_up = False
def test_transpiler(self):
_, startup = self.get_pserver(self.pserver1_ep)
_, startup2 = self.get_pserver(self.pserver2_ep)
config = fluid.DistributeTranspilerConfig()
config.slice_var_up = False
_, startup = self.get_pserver(self.pserver1_ep, config)
_, startup2 = self.get_pserver(self.pserver2_ep, config)
if startup.global_block().vars.has_key("fc_w"):
fc_w_var = startup.global_block().vars["fc_w"]
......@@ -253,10 +304,50 @@ class TestL2Decay(TranspilerTest):
# TODO(typhoonzero): test clipping and L2Decay ops are removed from trainer
# FIXME(typhoonzero): need to add test for async case:
# see https://github.com/PaddlePaddle/Paddle/issues/11691
class TestAsyncSGD(TranspilerTest):
pass
class TestL2DecayWithPiecewise(TranspilerTest):
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=fluid.ParamAttr(name='fc_b'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
base_lr = 1.0
bd = [1, 10, 20, 30]
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
sgd_optimizer = fluid.optimizer.Momentum(
learning_rate=fluid.layers.piecewise_decay(
boundaries=bd, values=lr),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
sgd_optimizer.minimize(avg_cost)
return
def test_transpiler(self):
pserver, startup = self.get_pserver(self.pserver1_ep)
trainer = self.get_trainer()
self.assertEqual(len(pserver.blocks), 9)
self.assertEqual([op.type for op in pserver.blocks[1].ops], [
"increment", "cast", "fill_constant", "fill_constant", "less_than",
"logical_not", "conditional_block", "fill_constant",
"fill_constant", "less_than", "logical_not", "logical_and",
"logical_and", "conditional_block", "fill_constant",
"fill_constant", "less_than", "logical_not", "logical_and",
"logical_and", "conditional_block", "fill_constant",
"fill_constant", "less_than", "logical_not", "logical_and",
"logical_and", "conditional_block", "fill_constant",
"conditional_block"
])
self.assertEqual(
[op.type for op in pserver.blocks[7].ops],
["sum", "scale", "scale", "elementwise_add", "momentum"])
self.assertEqual(
[op.type for op in pserver.blocks[8].ops],
["sum", "scale", "scale", "elementwise_add", "momentum"])
if __name__ == "__main__":
......
......@@ -13,8 +13,12 @@
# limitations under the License.
import paddle.fluid as fluid
import paddle.fluid.layers.ops as ops
from paddle.fluid.initializer import init_on_cpu
from paddle.fluid.layers.learning_rate_scheduler import _decay_step_counter
from parallel_executor_test_base import TestParallelExecutorBase
import unittest
import math
import os
......@@ -131,27 +135,71 @@ def SE_ResNeXt50Small(batch_size=2, use_feed=False):
class TestResnet(TestParallelExecutorBase):
def check_resnet_convergence(self, use_cuda, use_reduce=False, iter=20):
def check_resnet_convergence_with_learning_rate_decay(self,
use_cuda=True,
use_reduce=False,
iter=20):
os.environ['CPU_NUM'] = str(4)
def _cosine_decay(learning_rate, step_each_epoch, epochs=120):
"""
Applies cosine decay to the learning rate.
lr = 0.05 * (math.cos(epoch * (math.pi / 120)) + 1)
"""
global_step = _decay_step_counter()
with init_on_cpu():
epoch = ops.floor(global_step / step_each_epoch)
decayed_lr = learning_rate * \
(ops.cos(epoch * (math.pi / epochs)) + 1)/2
return decayed_lr
def _optimizer(learning_rate=0.01):
optimizer = fluid.optimizer.Momentum(
learning_rate=_cosine_decay(
learning_rate=learning_rate, step_each_epoch=2, epochs=1),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
return optimizer
import functools
batch_size = 2
self.check_network_convergence(
single_first_loss, single_last_loss = self.check_network_convergence(
functools.partial(
SE_ResNeXt50Small, batch_size=batch_size),
iter=iter,
batch_size=batch_size,
use_cuda=use_cuda,
use_reduce=use_reduce)
def test_resnet(self):
self.check_resnet_convergence(True)
self.check_resnet_convergence(False, iter=5)
use_reduce=use_reduce,
optimizer=_optimizer,
use_parallel_executor=False)
def test_resnet_with_new_strategy(self):
# use_cuda, use_reduce
self.check_resnet_convergence(True, True)
self.check_resnet_convergence(False, True, iter=5)
parallel_first_loss, parallel_last_loss = self.check_network_convergence(
functools.partial(
SE_ResNeXt50Small, batch_size=batch_size),
iter=iter,
batch_size=batch_size,
use_cuda=use_cuda,
use_reduce=use_reduce,
optimizer=_optimizer)
for p_f in parallel_first_loss:
self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6)
for p_l in parallel_last_loss:
self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)
def test_seresnext_with_learning_rate_decay(self):
self.check_resnet_convergence_with_learning_rate_decay(True, False)
self.check_resnet_convergence_with_learning_rate_decay(
False, False, iter=5)
def test_seresnext_with_new_strategy_with_learning_rate_decay(self):
self.check_resnet_convergence_with_learning_rate_decay(True, True)
self.check_resnet_convergence_with_learning_rate_decay(
False, True, iter=5)
if __name__ == '__main__':
......
......@@ -181,13 +181,13 @@ class TestBlockDesc(unittest.TestCase):
self.assertIsNotNone(block)
op1 = block.append_op()
op2 = block.append_op()
op0 = block.prepend_op()
op0 = block._prepend_op()
all_ops = []
for idx in xrange(0, block.op_size()):
all_ops.append(block.op(idx))
self.assertEqual(all_ops, [op0, op1, op2])
def test_remove_op(self):
def test__remove_op(self):
program = Program()
program_desc = program.desc
self.assertIsNotNone(program_desc)
......@@ -201,8 +201,8 @@ class TestBlockDesc(unittest.TestCase):
op1.set_type("test")
op2.set_type("test")
block.remove_op(1, 2)
program.sync_with_cpp()
block._remove_op(1, 2)
program._sync_with_cpp()
all_ops = []
for idx in xrange(0, block.op_size()):
......
......@@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from distribute_transpiler import DistributeTranspiler
from distribute_transpiler import DistributeTranspiler, DistributeTranspilerConfig
from inference_transpiler import InferenceTranspiler
from memory_optimization_transpiler import memory_optimize, release_memory
from ps_dispatcher import HashName, RoundRobin
__all__ = [
"DistributeTranspiler", "InferenceTranspiler", "memory_optimize",
"release_memory", "HashName", "RoundRobin"
"release_memory", "HashName", "RoundRobin", "DistributeTranspilerConfig"
]
......@@ -17,10 +17,10 @@ def delete_ops(block, ops):
try:
start = list(block.ops).index(ops[0])
end = list(block.ops).index(ops[-1])
[block.remove_op(start) for _ in xrange(end - start + 1)]
[block._remove_op(start) for _ in xrange(end - start + 1)]
except Exception, e:
raise e
block.program.sync_with_cpp()
block.program._sync_with_cpp()
def find_op_by_input_arg(block, arg_name):
......
......@@ -31,6 +31,7 @@ Steps to transpile pserver:
from __future__ import print_function
import math
import random
import numpy as np
from ps_dispatcher import RoundRobin, HashName, PSDispatcher
......@@ -63,7 +64,7 @@ def same_or_split_var(p_name, var_name):
return p_name == var_name or p_name.startswith(var_name + ".block")
def slice_variable(var_list, slice_count, min_block_size=8192):
def slice_variable(var_list, slice_count, min_block_size):
"""
We may need to split dense tensor to one or more blocks and put
them equally onto parameter server. One block is a sub-tensor
......@@ -109,6 +110,22 @@ def slice_variable(var_list, slice_count, min_block_size=8192):
return blocks
class DistributeTranspilerConfig(object):
"""
slice_var_up (bool): Do Tensor slice for pservers, default is True.
split_method (PSDispatcher): RoundRobin or HashName can be used
try to choose the best method to balance loads for pservers.
min_block_size (int): Minimum splitted element number in block.
According:https://github.com/PaddlePaddle/Paddle/issues/8638#issuecomment-369912156
We can use bandwidth effiently when data size is larger than 2MB.If you
want to change it, please be sure you see the slice_variable function.
"""
slice_var_up = True
split_method = None
min_block_size = 8192
class DistributeTranspiler(object):
"""
**DistributeTranspiler**
......@@ -145,13 +162,23 @@ class DistributeTranspiler(object):
trainer_program = t.get_trainer_program()
"""
def __init__(self, config=None):
if config is not None:
self.config = config
else:
self.config = DistributeTranspilerConfig()
if self.config.split_method is None:
self.config.split_method = RoundRobin
assert (self.config.min_block_size >= 8192)
assert (self.config.split_method.__bases__[0] == PSDispatcher)
def transpile(self,
trainer_id,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
slice_var_up=True,
split_method=RoundRobin,
sync_mode=True):
"""
Run the transpiler.
......@@ -164,12 +191,8 @@ class DistributeTranspiler(object):
pservers (str): comma separated ip:port string for the pserver
list.
trainers (int): number of trainers in the distributed job.
slice_var_up (bool): Do Tensor slice for pservers, default is True.
split_method (PSDispatcher): RoundRobin or HashName can be used
try to choose the best method to balance loads for pservers.
sync_mode (bool): Do sync training or not, default is True.
"""
assert (split_method.__bases__[0] == PSDispatcher)
if program is None:
program = default_main_program()
self.origin_program = program
......@@ -180,11 +203,11 @@ class DistributeTranspiler(object):
self.pserver_endpoints = pserver_endpoints
self.optimize_ops, self.params_grads = self._get_optimize_pass()
ps_dispatcher = split_method(self.pserver_endpoints)
ps_dispatcher = self.config.split_method(self.pserver_endpoints)
self.has_distributed_lookup_table = self._has_distributed_lookup_table()
# split and create vars, then put splited vars in dicts for later use.
self._init_splited_vars(slice_var_up)
self._init_splited_vars()
# step 3.1: insert send op to send gradient vars to parameter servers
ps_dispatcher.reset()
......@@ -196,13 +219,14 @@ class DistributeTranspiler(object):
# fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2
# shuffle the map will avoid the uneven distribution above
grad_var_mapping_items = self.grad_var_mapping.items()
if not slice_var_up:
np.random.shuffle(grad_var_mapping_items)
if not self.config.slice_var_up:
random.seed(self.trainer_num)
random.shuffle(grad_var_mapping_items)
for orig_varname, splited_vars in grad_var_mapping_items:
eplist = ps_dispatcher.dispatch(splited_vars)
if not slice_var_up:
if not self.config.slice_var_up:
assert (len(splited_vars) == 1)
if len(splited_vars) == 1:
......@@ -219,7 +243,7 @@ class DistributeTranspiler(object):
AssertionError("Can not insert the send op by original "
"variable name :", orig_varname)
program.global_block().insert_op(
program.global_block()._insert_op(
index=index + 1,
type="send",
inputs={"X": splited_vars},
......@@ -405,7 +429,7 @@ class DistributeTranspiler(object):
# clone vars
for var in origin_block.vars:
new_sub_block.clone_variable(var)
new_sub_block._clone_variable(var)
# clone ops
for origin_op in origin_block.ops:
......@@ -437,6 +461,8 @@ class DistributeTranspiler(object):
per_opt_block = pserver_program.create_block(pre_block_idx)
optimize_blocks.append(per_opt_block)
# append grad merging ops before clip and weight decay
# cases may like:
# L2Decay op -> clip op -> optimize
for _, op in enumerate(self.optimize_ops):
# find the origin @GRAD var before clipping
grad_varname_for_block = __op_have_grad_input__(op)
......@@ -444,6 +470,7 @@ class DistributeTranspiler(object):
merged_var = self._append_pserver_grad_merge_ops(
per_opt_block, grad_varname_for_block, endpoint,
grad_to_block_id, self.origin_program)
break # append optimize op once then append other ops.
for _, op in enumerate(self.optimize_ops):
# optimizer is connected to itself
if ufind.is_connected(op, opt_op) and op not in global_ops:
......@@ -498,7 +525,7 @@ class DistributeTranspiler(object):
outputs={},
attrs=attrs)
pserver_program.sync_with_cpp()
pserver_program._sync_with_cpp()
return pserver_program
def get_startup_program(self, endpoint, pserver_program):
......@@ -530,7 +557,7 @@ class DistributeTranspiler(object):
pserver_vars = pserver_program.global_block().vars
created_var_map = dict()
for _, var in pserver_vars.iteritems():
tmpvar = s_prog.global_block().clone_variable(var)
tmpvar = s_prog.global_block()._clone_variable(var)
created_var_map[var.name] = tmpvar
# 2. rename op outputs
......@@ -625,7 +652,7 @@ class DistributeTranspiler(object):
]
return param_list, grad_list
def _init_splited_vars(self, slice_var_up):
def _init_splited_vars(self):
# update these mappings for further transpile:
# 1. param_var_mapping: param var name -> [splited params vars]
# 2. grad_var_mapping: grad var name -> [splited grads vars]
......@@ -649,17 +676,22 @@ class DistributeTranspiler(object):
param_list, grad_list = self._update_dist_lookup_table_vars(
param_list, grad_list, self.params_grads)
if slice_var_up:
if self.config.slice_var_up:
# when we slice var up into blocks, we will slice the var according to
# pserver services' count. A pserver may have two or more listening ports.
grad_blocks = slice_variable(grad_list, len(self.pserver_endpoints))
grad_blocks = slice_variable(grad_list,
len(self.pserver_endpoints),
self.config.min_block_size)
param_blocks = slice_variable(param_list,
len(self.pserver_endpoints))
len(self.pserver_endpoints),
self.config.min_block_size)
else:
# when we do NOT slice var up into blocks, we will always slice params
# grads into one block.
grad_blocks = slice_variable(grad_list, 1)
param_blocks = slice_variable(param_list, 1)
grad_blocks = slice_variable(grad_list, 1,
self.config.min_block_size)
param_blocks = slice_variable(param_list, 1,
self.config.min_block_size)
assert (len(grad_blocks) == len(param_blocks))
# origin_varname -> [splited_var]
......@@ -728,7 +760,7 @@ class DistributeTranspiler(object):
self.all_prefetch_output_vars.append(prefetch_output_vars)
# insert split_ids_op
program.global_block().insert_op(
program.global_block()._insert_op(
index=lookup_table_op_index,
type="split_ids",
inputs={
......@@ -740,7 +772,7 @@ class DistributeTranspiler(object):
outputs={"Out": prefetch_input_vars})
# insert prefetch_op
program.global_block().insert_op(
program.global_block()._insert_op(
index=lookup_table_op_index + 1,
type="prefetch",
inputs={'X': prefetch_input_vars},
......@@ -751,7 +783,7 @@ class DistributeTranspiler(object):
})
# insert concat_op
program.global_block().insert_op(
program.global_block()._insert_op(
index=lookup_table_op_index + 2,
type="merge_ids",
inputs={
......@@ -782,14 +814,14 @@ class DistributeTranspiler(object):
if table_grad_name in op.output_arg_names:
op_index = list(all_ops).index(op)
# insert split_ids_op
program.global_block().insert_op(
program.global_block()._insert_op(
index=op_index + 1,
type="split_ids",
inputs={
'Ids': [program.global_block().vars[table_grad_name]]
},
outputs={"Out": self.trainer_side_table_grad_list})
program.global_block().insert_op(
program.global_block()._insert_op(
index=op_index + 2,
type="send",
inputs={'X': self.trainer_side_table_grad_list},
......@@ -848,7 +880,7 @@ class DistributeTranspiler(object):
persistable=True)
# parameter must be selected rows
param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS)
grad_var = pserver_program.global_block().clone_variable(
grad_var = pserver_program.global_block()._clone_variable(
self.origin_program.global_block().vars[grad_var_name(
self.table_name)])
......@@ -888,7 +920,7 @@ class DistributeTranspiler(object):
if not splited_grad_name.startswith(origin_grad_name):
raise ValueError("origin_grad_var: " + splited_grad_name +
" grad_var:" + grad_var.name)
grad_var = pserver_program.global_block().rename_var(
grad_var = pserver_program.global_block()._rename_var(
origin_grad_name, splited_grad_name)
lr_var = pserver_program.global_block().vars[table_opt_op.input(
......@@ -964,7 +996,7 @@ class DistributeTranspiler(object):
if self.sync_mode and add_trainer_suffix:
new_var_name = "%s.trainer_%d" % \
(orig_var.name, self.trainer_id)
program.global_block().rename_var(varname, new_var_name)
program.global_block()._rename_var(varname, new_var_name)
var_mapping[varname] = \
[program.global_block().var(new_var_name)]
else:
......@@ -998,7 +1030,7 @@ class DistributeTranspiler(object):
type=orig_var.type,
shape=splited_shape) # flattend splited var
var_mapping[varname].append(var)
program.global_block().sync_with_cpp()
program.global_block()._sync_with_cpp()
return var_mapping
def create_splited_vars(self, source_var, block, tag):
......@@ -1026,7 +1058,7 @@ class DistributeTranspiler(object):
height_sections = []
for v in splited_vars:
height_sections.append(v.shape[0])
program.global_block().insert_op(
program.global_block()._insert_op(
index=index + 1,
type="split_selected_rows",
inputs={"X": orig_var},
......@@ -1036,7 +1068,7 @@ class DistributeTranspiler(object):
sections = []
for v in splited_vars:
sections.append(v.shape[0])
program.global_block().insert_op(
program.global_block()._insert_op(
index=index + 1,
type="split_byref",
inputs={"X": orig_var},
......@@ -1225,7 +1257,7 @@ class DistributeTranspiler(object):
varlist = [varlist]
for var in varlist:
if var not in program.global_block().vars:
block.clone_variable(var)
block._clone_variable(var)
outputs = self._get_output_map_from_op(
self.origin_program.global_block().vars, op)
......@@ -1234,7 +1266,7 @@ class DistributeTranspiler(object):
varlist = [varlist]
for var in varlist:
if var not in program.global_block().vars:
block.clone_variable(var)
block._clone_variable(var)
return block.append_op(
type=op.type, inputs=inputs, outputs=outputs, attrs=op.attrs)
......@@ -1272,7 +1304,7 @@ class DistributeTranspiler(object):
if grad_block:
outputs[key] = grad_block
elif not program.global_block().vars.has_key(var.name):
program.global_block().clone_variable(var)
program.global_block()._clone_variable(var)
return optimize_block.append_op(
type=opt_op.type,
......
......@@ -95,7 +95,7 @@ class InferenceTranspiler(object):
# modify bnorm OP to include relu
current_op.set_attr("fuse_with_relu", True)
# remove relu OP
self.block.remove_op(i + 1)
self.block._remove_op(i + 1)
i = i + 1
self._remove_unused_var()
......@@ -171,7 +171,7 @@ class InferenceTranspiler(object):
# fuse batch_norm
self._fuse_param(current_op, next_op, bias_op, 0)
# remove batch_norm_op
self.block.remove_op(i + 2)
self.block._remove_op(i + 2)
i = i + 1
# conv2d with bias, the next_op.type is elementwise_add
elif (next_op.type == 'elementwise_add'):
......@@ -180,7 +180,7 @@ class InferenceTranspiler(object):
# fuse batch_norm
self._fuse_param(current_op, next_next_op, next_op, 1)
# remove batch_norm_op
self.block.remove_op(i + 2)
self.block._remove_op(i + 2)
i = i + 1
i = i + 1
......@@ -212,7 +212,7 @@ class InferenceTranspiler(object):
y_var = self.block.var(bn_op.input("Bias")[0])
out_var = self.block.var(bn_op.output("Y")[0])
bias_op = self.block.insert_op(
bias_op = self.block._insert_op(
index,
type="elementwise_add",
inputs={"X": x_var,
......@@ -307,4 +307,4 @@ class InferenceTranspiler(object):
for var in self.block.vars.keys():
if var not in args:
self.block.remove_var(var)
self.block._remove_var(var)
......@@ -177,7 +177,7 @@ class ControlFlowGraph(object):
in_diff)
if can_optimize:
index = i + fwd_id + 1 if is_forward else i - self._forward_num + bwd_id + 1
delete_op = block_desc.insert_op(index)
delete_op = block_desc._insert_op(index)
delete_op.set_type("delete_var")
delete_op.set_input("X", can_optimize)
if is_forward:
......
......@@ -182,7 +182,7 @@ def resize_short(im, size):
h_new = size * h / w
else:
w_new = size * w / h
im = cv2.resize(im, (h_new, w_new), interpolation=cv2.INTER_CUBIC)
im = cv2.resize(im, (w_new, h_new), interpolation=cv2.INTER_CUBIC)
return im
......@@ -324,7 +324,6 @@ def simple_transform(im,
if np.random.randint(2) == 0:
im = left_right_flip(im, is_color)
else:
im = center_crop(im, crop_size, is_color)
im = center_crop(im, crop_size, is_color=is_color)
if len(im.shape) == 3:
im = to_chw(im)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册