diff --git a/benchmark/fluid/fluid_benchmark.py b/benchmark/fluid/fluid_benchmark.py index 94ea7bd6aca7c9595037a2dacc5e36d4c77827e7..f8aed5a5e06c5e29dbdfb5db9f2ea0344c7eed6d 100644 --- a/benchmark/fluid/fluid_benchmark.py +++ b/benchmark/fluid/fluid_benchmark.py @@ -210,7 +210,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, # generate fake: if args.use_fake_data: for var in feed_var_list: - v = startup_prog.global_block().clone_variable(var) + v = startup_prog.global_block()._clone_variable(var) var.persistable = True v.persistable = True diff --git a/doc/fluid/design/modules/python_api.md b/doc/fluid/design/modules/python_api.md index 265732a348ea77d21005e335390d99abcdfbd045..83af4e55485c079265d3f2b1e15070825b532c02 100644 --- a/doc/fluid/design/modules/python_api.md +++ b/doc/fluid/design/modules/python_api.md @@ -98,13 +98,13 @@ class Block(objects): def append_operator(self, ...): self.ops.append(Operator(self, ...)) - def prepend_operator(self, ...): # Parameter's ctor prepands initialize operators. + def _prepend_operator(self, ...): # Parameter's ctor prepands initialize operators. self.ops.prepend(Operator(self, ...)) ``` `create_parameter` is necessary because parameters are global variables, defined in the global block, but can be created in some sub-blocks. For example, an FC layer in the step block of an RNN operator. -`prepend_operator` is necessary because the constructor of `Parameter` needs to create the initialize (or load) operator of the parameter, and would like to put it in the *preamble* of the global block. +`_prepend_operator` is necessary because the constructor of `Parameter` needs to create the initialize (or load) operator of the parameter, and would like to put it in the *preamble* of the global block. ### Operator diff --git a/doc/fluid/howto/performance/error_clip.md b/doc/fluid/howto/performance/error_clip.md index 58aa73b8cd38d01e2426278a3479714e4fb6a3b0..749cf7693c75696feb17f8556224ed03649baa80 100644 --- a/doc/fluid/howto/performance/error_clip.md +++ b/doc/fluid/howto/performance/error_clip.md @@ -78,7 +78,7 @@ def error_clip_callback(block, context): op_desc = block.desc.op(block.desc.op_size() - 1) for grad_n in filter(lambda n: grad_to_var.has_key(n), op_desc.output_arg_names()): - fwd_var = block.var_recursive(grad_to_var[grad_n]) + fwd_var = block.__var_recursive(grad_to_var[grad_n]) error_clip = getattr(fwd_var, "error_clip", None) if not (error_clip is None or isinstance(error_clip, BaseErrorClipAttr)): diff --git a/doc/v2/build_and_install/build_from_source_cn.rst b/doc/v2/build_and_install/build_from_source_cn.rst index 6421c5308271c2508597d849c79709255caf349a..d0dacb104f148c2aeb323365cbd6f014ae00ed5a 100644 --- a/doc/v2/build_and_install/build_from_source_cn.rst +++ b/doc/v2/build_and_install/build_from_source_cn.rst @@ -35,11 +35,16 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安 # 2. 可选步骤:源码中构建用于编译PaddlePaddle的Docker镜像 docker build -t paddle:dev . # 3. 执行下面的命令编译CPU-Only的二进制 - docker run -it -v $PWD:/paddle -w /paddle -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddlepaddle/paddle_manylinux_devel:cuda8.0_cudnn5 ./paddle/scripts/paddle_build.sh build + docker run -it -v $PWD:/paddle -w /paddle -e "PYTHON_ABI=cp27-cp27mu" -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddlepaddle/paddle_manylinux_devel:cuda8.0_cudnn5 ./paddle/scripts/paddle_build.sh build # 4. 或者也可以使用为上述可选步骤构建的镜像(必须先执行第2步) docker run -it -v $PWD:/paddle -w /paddle -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddle:dev ./paddle/scripts/paddle_build.sh build -注:上述命令把当前目录(源码树根目录)映射为 container 里的 :code:`/paddle` 目录。 +注: + +- 上述命令把当前目录(源码树根目录)映射为 container 里的 :code:`/paddle` 目录。 + +- 如果您使用的是 manylinux 的镜像进行编译, 那么您需要通过环境变量 :code:`PYTHON_ABI` 来指定一个 `Python ABI `__. +PaddlePaddle目前支持的 Python ABI 有 :code:`cp27-cp27m` 和 :code:`cp27-cp27mu`. 编译完成后会在build/python/dist目录下生成输出的whl包,可以选在在当前机器安装也可以拷贝到目标机器安装: diff --git a/doc/v2/build_and_install/build_from_source_en.rst b/doc/v2/build_and_install/build_from_source_en.rst index b08b45d43ec7f1deb2889832079a731ee724a44c..664b68da8b7dd3e005ebf3ec34de77729e5ab355 100644 --- a/doc/v2/build_and_install/build_from_source_en.rst +++ b/doc/v2/build_and_install/build_from_source_en.rst @@ -36,13 +36,18 @@ If you don't wish to use docker,you need to install several compile dependenci # 2. Optional: build development docker image from source docker build -t paddle:dev . # 3. Run the following command to build a CPU-Only binaries - docker run -it -v $PWD:/paddle -w /paddle -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddlepaddle/paddle_manylinux_devel:cuda8.0_cudnn5 ./paddle/scripts/paddle_build.sh build + docker run -it -v $PWD:/paddle -w /paddle -e "PYTHON_ABI=cp27-cp27mu" -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddlepaddle/paddle_manylinux_devel:cuda8.0_cudnn5 ./paddle/scripts/paddle_build.sh build # 4. Or, use your built Docker image to build PaddlePaddle (must run step 2) docker run -it -v $PWD:/paddle -w /paddle -e "WITH_GPU=OFF" -e "WITH_TESTING=OFF" paddle:dev ./paddle/scripts/paddle_build.sh build -NOTE: The above command try to mount the current working directory (root directory of source code) +NOTE: + +- The above command try to mount the current working directory (root directory of source code) into :code:`/paddle` directory inside docker container. +- You need to pass in the required environment variable :code:`PYTHON_ABI` to specify a `Python ABI `__. +Currently PaddlePaddle supported Python ABIs include :code:`cp27-cp27m` and :code:`cp27-cp27mu` . + When the compile finishes, you can get the output whl package under build/python/dist, then you can choose to install the whl on local machine or copy it to the target machine. diff --git a/paddle/contrib/float16/float16_transpiler.py b/paddle/contrib/float16/float16_transpiler.py index 91ba101edb65cd45bd5e37a0c6ad25e515593a81..66e0345c299730c113ffbdc8dd3c1fa32f872f3d 100644 --- a/paddle/contrib/float16/float16_transpiler.py +++ b/paddle/contrib/float16/float16_transpiler.py @@ -118,7 +118,7 @@ class Float16Transpiler: for var in self.block.vars.keys(): if var not in args: - self.block.remove_var(var) + self.block._remove_var(var) def _modify_feed_fetch(self): ''' @@ -165,7 +165,7 @@ class Float16Transpiler: dtype=core.VarDesc.VarType.FP16, shape=var.shape, persistable=var.persistable) - self.block.insert_op( + self.block._insert_op( i + 1, type="cast", inputs={"X": var}, @@ -188,7 +188,7 @@ class Float16Transpiler: persistable=var.persistable) find_op(var) var.op.rename_output(var_name, tmp_var_name) - self.block.insert_op( + self.block._insert_op( i, type="cast", inputs={"X": tmp_var}, @@ -253,4 +253,4 @@ class Float16Transpiler: # old var will be replaced by the fp16 var in program desc self.input_map[var.name] = fp16_var_name - self.block.remove_var(var.name) + self.block._remove_var(var.name) diff --git a/paddle/contrib/inference/CMakeLists.txt b/paddle/contrib/inference/CMakeLists.txt index 87173fc42a46c8218fbf0beb4ebf7760f69b7c24..df470a0752121e44f4305fd3609c5c41985dfaf7 100644 --- a/paddle/contrib/inference/CMakeLists.txt +++ b/paddle/contrib/inference/CMakeLists.txt @@ -104,7 +104,3 @@ if (WITH_ANAKIN) # only needed in CI target_compile_options(inference_anakin_test BEFORE PUBLIC ${ANAKIN_COMPILE_EXTRA_FLAGS}) endif(WITH_TESTING) endif() - -if(WITH_TESTING) - add_subdirectory(demo) -endif() diff --git a/paddle/contrib/inference/demo/CMakeLists.txt b/paddle/contrib/inference/demo/CMakeLists.txt deleted file mode 100644 index 2d501bf0085b1bd4c39ee1a6dfaaa9622fd72ce1..0000000000000000000000000000000000000000 --- a/paddle/contrib/inference/demo/CMakeLists.txt +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -option(WITH_INFERENCE_DEMO "Compile with Inference demo" OFF) -if(NOT WITH_INFERENCE_DEMO) - return() -endif() - -set(DEMO_INSTALL_DIR "${PADDLE_BINARY_DIR}/inference_demo") -set(URL_ROOT http://paddlemodels.bj.bcebos.com/inference-vis-demos%2F) - -function(inference_download_test_demo TARGET) - if (NOT WITH_TESTING) - return() - endif() - set(options "") - set(oneValueArgs URL) - set(multiValueArgs SRCS) - cmake_parse_arguments(tests "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) - - set(test_dir "${DEMO_INSTALL_DIR}/${TARGET}") - message(STATUS "inference demo ${test_dir}") - - if(NOT EXISTS "${test_dir}") - message(STATUS "Download ${TARGET} model from ${tests_URL}") - execute_process(COMMAND bash -c "mkdir -p ${test_dir}") - execute_process(COMMAND bash -c "cd ${test_dir}; wget -q ${tests_URL}") - execute_process(COMMAND bash -c "cd ${test_dir}; tar xzf *.tar.gz") - endif() - - cc_test(${TARGET} SRCS "${tests_SRCS}" - DEPS paddle_inference_api paddle_fluid - ARGS --data=${test_dir}/data.txt - --modeldir=${test_dir}/model - --refer=${test_dir}/result.txt) -endfunction() - -# disable mobilenet test -#inference_download_test_demo(mobilenet_inference_demo -# SRCS vis_demo.cc -# URL ${URL_ROOT}mobilenet.tar.gz) -inference_download_test_demo(se_resnext50_inference_demo - SRCS vis_demo.cc - URL ${URL_ROOT}se_resnext50.tar.gz) -inference_download_test_demo(ocr_inference_demo - SRCS vis_demo.cc - URL ${URL_ROOT}ocr.tar.gz) diff --git a/paddle/contrib/inference/demo/README.md b/paddle/contrib/inference/demo/README.md deleted file mode 100644 index f1d256660299a68dc5d9d73dbe4a401a0e7d9680..0000000000000000000000000000000000000000 --- a/paddle/contrib/inference/demo/README.md +++ /dev/null @@ -1,36 +0,0 @@ -# Infernce Demos - -Input data format: - -- Each line contains a single record -- Each record's format is - -``` -\t -``` - -Follow the C++ codes in `vis_demo.cc`. - -## MobileNet - -To execute the demo, simply run - -```sh -./mobilenet_inference_demo --modeldir --data -``` - -## SE-ResNeXt-50 - -To execute the demo, simply run - -```sh -./se_resnext50_inference_demo --modeldir --data -``` - -## OCR - -To execute the demo, simply run - -```sh -./ocr_inference_demo --modeldir --data -``` diff --git a/paddle/contrib/inference/demo_ci/.gitignore b/paddle/contrib/inference/demo_ci/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..1269488f7fb1f4b56a8c0e5eb48cecbfadfa9219 --- /dev/null +++ b/paddle/contrib/inference/demo_ci/.gitignore @@ -0,0 +1 @@ +data diff --git a/paddle/contrib/inference/demo_ci/CMakeLists.txt b/paddle/contrib/inference/demo_ci/CMakeLists.txt index 789bff7f23cd89bfaeba180efa95972cef6fc58c..33a0dfaf88b692f5d290920d97a650ad67f3dbca 100644 --- a/paddle/contrib/inference/demo_ci/CMakeLists.txt +++ b/paddle/contrib/inference/demo_ci/CMakeLists.txt @@ -52,14 +52,12 @@ else() set(MATH_LIB ${PADDLE_LIB}/third_party/install/openblas/lib/libopenblas.a) endif() +# Note: libpaddle_inference_api.so/a must put before libpaddle_fluid.so/a if(WITH_STATIC_LIB) set(DEPS - "-Wl,--whole-archive" - ${PADDLE_LIB}/paddle/fluid/inference/libpaddle_fluid.a - "-Wl,--no-whole-archive" - ${PADDLE_LIB}/contrib/inference/libpaddle_inference_api.a) + ${PADDLE_LIB}/contrib/inference/libpaddle_inference_api.a + ${PADDLE_LIB}/paddle/fluid/inference/libpaddle_fluid.a) else() - # Note: libpaddle_inference_api.so must put before libpaddle_fluid.so set(DEPS ${PADDLE_LIB}/contrib/inference/libpaddle_inference_api.so ${PADDLE_LIB}/paddle/fluid/inference/libpaddle_fluid.so) diff --git a/paddle/contrib/inference/demo_ci/README.md b/paddle/contrib/inference/demo_ci/README.md new file mode 100644 index 0000000000000000000000000000000000000000..7f013da7f30acd84ec484773f4ea716a08efa0ff --- /dev/null +++ b/paddle/contrib/inference/demo_ci/README.md @@ -0,0 +1,26 @@ +# Inference Demos + +There are several demos: + +- simple_on_word2vec: + - Follow the C++ codes is in `simple_on_word2vec.cc`. + - It is suitable for word2vec model. +- vis_demo: + - Follow the C++ codes is in `vis_demo.cc`. + - It is suitable for mobilenet, se_resnext50 and ocr three models. + - Input data format: + - Each line contains a single record + - Each record's format is + ``` + \t + ``` + +To build and execute the demos, simply run +``` +./run.sh $PADDLE_ROOT $TURN_ON_MKL $TEST_GPU_CPU +``` +- It will build and execute the demos in both static and shared library. +- `$PADDLE_ROOT`: paddle library path +- `$TURN_ON_MKL`: use MKL or Openblas +- `$TEST_GPU_CPU`: test both GPU/CPU mode or only CPU mode +- NOTE: for simple_on_word2vec, must run `ctest -R test_word2vec -R` to obtain word2vec model at first. diff --git a/paddle/contrib/inference/demo_ci/run.sh b/paddle/contrib/inference/demo_ci/run.sh index e3a7269af795b05c296423cb2dc92b753397c6b3..87690ac9603e1e54a46a761149798b75ea8a7990 100755 --- a/paddle/contrib/inference/demo_ci/run.sh +++ b/paddle/contrib/inference/demo_ci/run.sh @@ -1,34 +1,81 @@ set -x PADDLE_ROOT=$1 -WITH_MKL=$2 -WITH_GPU=$3 -if [ $3 == "ON" ]; then +TURN_ON_MKL=$2 # use MKL or Openblas +TEST_GPU_CPU=$3 # test both GPU/CPU mode or only CPU mode +if [ $2 == ON ]; then + # You can export yourself if move the install path + MKL_LIB=${PADDLE_ROOT}/build/fluid_install_dir/third_party/install/mklml/lib + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${MKL_LIB} +fi +if [ $3 == ON ]; then use_gpu_list='true false' else use_gpu_list='false' fi +# download vis_demo data +function download() { + dir_name=$1 + mkdir -p $dir_name + cd $dir_name + wget -q ${URL_ROOT}$dir_name.tar.gz + tar xzf *.tar.gz + cd .. +} +URL_ROOT=http://paddlemodels.bj.bcebos.com/inference-vis-demos%2F +mkdir -p data +cd data +vis_demo_list='se_resnext50 ocr mobilenet' +for vis_demo_name in $vis_demo_list; do + download $vis_demo_name +done +cd .. + +# compile and test the demo mkdir -p build cd build -for WITH_STATIC_LIB in false; do +for WITH_STATIC_LIB in ON OFF; do + # -----simple_on_word2vec----- rm -rf * cmake .. -DPADDLE_LIB=${PADDLE_ROOT}/build/fluid_install_dir/ \ - -DWITH_MKL=$WITH_MKL \ + -DWITH_MKL=$TURN_ON_MKL \ -DDEMO_NAME=simple_on_word2vec \ - -DWITH_GPU=$WITH_GPU \ + -DWITH_GPU=$TEST_GPU_CPU \ -DWITH_STATIC_LIB=$WITH_STATIC_LIB - make - for use_gpu in $use_gpu_list; do - ./simple_on_word2vec \ - --dirname=${PADDLE_ROOT}/build/python/paddle/fluid/tests/book/word2vec.inference.model \ - --use_gpu=$use_gpu + make -j + word2vec_model=${PADDLE_ROOT}'/build/python/paddle/fluid/tests/book/word2vec.inference.model' + if [ -d $word2vec_model ]; then + for use_gpu in $use_gpu_list; do + ./simple_on_word2vec \ + --dirname=$word2vec_model \ + --use_gpu=$use_gpu + if [ $? -ne 0 ]; then + echo "simple_on_word2vec demo runs fail." + exit 1 + fi + done + fi + # ---------vis_demo--------- + rm -rf * + cmake .. -DPADDLE_LIB=${PADDLE_ROOT}/build/fluid_install_dir/ \ + -DWITH_MKL=$TURN_ON_MKL \ + -DDEMO_NAME=vis_demo \ + -DWITH_GPU=$TEST_GPU_CPU \ + -DWITH_STATIC_LIB=$WITH_STATIC_LIB + make -j + for use_gpu in false; do + for vis_demo_name in $vis_demo_list; do + ./vis_demo \ + --modeldir=../data/$vis_demo_name/model \ + --data=../data/$vis_demo_name/data.txt \ + --refer=../data/$vis_demo_name/result.txt \ + --use_gpu=$use_gpu + if [ $? -ne 0 ]; then + echo "vis demo $vis_demo_name runs fail." + exit 1 + fi + done done done -if [ $? -eq 0 ]; then - exit 0 -else - echo "inference demo runs fail." - exit 1 -fi set +x diff --git a/paddle/contrib/inference/demo/utils.h b/paddle/contrib/inference/demo_ci/utils.h similarity index 96% rename from paddle/contrib/inference/demo/utils.h rename to paddle/contrib/inference/demo_ci/utils.h index b5330d8d9d89260cfe3d5214e5a4ceb720cffdf1..017b39edaf1dd8fb9d3b66f278abcf9ba7991ba9 100644 --- a/paddle/contrib/inference/demo/utils.h +++ b/paddle/contrib/inference/demo_ci/utils.h @@ -16,7 +16,7 @@ #include #include -#include "paddle/contrib/inference/paddle_inference_api.h" +#include "contrib/inference/paddle_inference_api.h" namespace paddle { namespace demo { diff --git a/paddle/contrib/inference/demo/vis_demo.cc b/paddle/contrib/inference/demo_ci/vis_demo.cc similarity index 75% rename from paddle/contrib/inference/demo/vis_demo.cc rename to paddle/contrib/inference/demo_ci/vis_demo.cc index 45575f9a862de430236ae20cf498e542a45b1f4b..1c570a9f658be5641ac2b3a288376bdf19b293a1 100644 --- a/paddle/contrib/inference/demo/vis_demo.cc +++ b/paddle/contrib/inference/demo_ci/vis_demo.cc @@ -18,19 +18,14 @@ limitations under the License. */ #include #include // use glog instead of PADDLE_ENFORCE to avoid importing other paddle header files. -#include #include #include -#include "paddle/contrib/inference/demo/utils.h" -#include "paddle/contrib/inference/paddle_inference_api.h" +#include "paddle/fluid/platform/enforce.h" +#include "utils.h" #ifdef PADDLE_WITH_CUDA DECLARE_double(fraction_of_gpu_memory_to_use); #endif - -namespace paddle { -namespace demo { - DEFINE_string(modeldir, "", "Directory of the inference model."); DEFINE_string(refer, "", "path to reference result for comparison."); DEFINE_string( @@ -38,6 +33,10 @@ DEFINE_string( "", "path of data; each line is a record, format is " "'\t data; @@ -47,7 +46,7 @@ struct Record { void split(const std::string& str, char sep, std::vector* pieces); Record ProcessALine(const std::string& line) { - LOG(INFO) << "process a line"; + VLOG(3) << "process a line"; std::vector columns; split(line, '\t', &columns); CHECK_EQ(columns.size(), 2UL) @@ -65,8 +64,8 @@ Record ProcessALine(const std::string& line) { for (auto& s : shape_strs) { record.shape.push_back(std::stoi(s)); } - LOG(INFO) << "data size " << record.data.size(); - LOG(INFO) << "data shape size " << record.shape.size(); + VLOG(3) << "data size " << record.data.size(); + VLOG(3) << "data shape size " << record.shape.size(); return record; } @@ -78,20 +77,22 @@ void CheckOutput(const std::string& referfile, const PaddleTensor& output) { file.close(); size_t numel = output.data.length() / PaddleDtypeSize(output.dtype); - LOG(INFO) << "predictor output numel " << numel; - LOG(INFO) << "reference output numel " << refer.data.size(); - EXPECT_EQ(numel, refer.data.size()); + VLOG(3) << "predictor output numel " << numel; + VLOG(3) << "reference output numel " << refer.data.size(); + PADDLE_ENFORCE_EQ(numel, refer.data.size()); switch (output.dtype) { case PaddleDType::INT64: { for (size_t i = 0; i < numel; ++i) { - EXPECT_EQ(static_cast(output.data.data())[i], refer.data[i]); + PADDLE_ENFORCE_EQ(static_cast(output.data.data())[i], + refer.data[i]); } break; } case PaddleDType::FLOAT32: for (size_t i = 0; i < numel; ++i) { - EXPECT_NEAR( - static_cast(output.data.data())[i], refer.data[i], 1e-5); + PADDLE_ENFORCE_LT( + fabs(static_cast(output.data.data())[i] - refer.data[i]), + 1e-5); } break; } @@ -106,15 +107,15 @@ void Main(bool use_gpu) { config.prog_file = FLAGS_modeldir + "/__model__"; config.use_gpu = use_gpu; config.device = 0; -#ifdef PADDLE_WITH_CUDA - config.fraction_of_gpu_memory = FLAGS_fraction_of_gpu_memory_to_use; -#endif + if (FLAGS_use_gpu) { + config.fraction_of_gpu_memory = 0.1; // set by yourself + } - LOG(INFO) << "init predictor"; + VLOG(3) << "init predictor"; auto predictor = CreatePaddlePredictor(config); - LOG(INFO) << "begin to process data"; + VLOG(3) << "begin to process data"; // Just a single batch of data. std::string line; std::ifstream file(FLAGS_data); @@ -129,21 +130,26 @@ void Main(bool use_gpu) { .data = PaddleBuf(record.data.data(), record.data.size() * sizeof(float)), .dtype = PaddleDType::FLOAT32}; - LOG(INFO) << "run executor"; + VLOG(3) << "run executor"; std::vector output; predictor->Run({input}, &output); - LOG(INFO) << "output.size " << output.size(); + VLOG(3) << "output.size " << output.size(); auto& tensor = output.front(); - LOG(INFO) << "output: " << SummaryTensor(tensor); + VLOG(3) << "output: " << SummaryTensor(tensor); // compare with reference result CheckOutput(FLAGS_refer, tensor); } -TEST(demo, vis_demo_cpu) { Main(false /*use_gpu*/); } -#ifdef PADDLE_WITH_CUDA -TEST(demo, vis_demo_gpu) { Main(true /*use_gpu*/); } -#endif } // namespace demo } // namespace paddle + +int main(int argc, char** argv) { + google::ParseCommandLineFlags(&argc, &argv, true); + paddle::demo::Main(false /* use_gpu*/); + if (FLAGS_use_gpu) { + paddle::demo::Main(true /*use_gpu*/); + } + return 0; +} diff --git a/paddle/contrib/inference/paddle_inference_api_anakin_engine.cc b/paddle/contrib/inference/paddle_inference_api_anakin_engine.cc index ba2d30314715a57c5ab85e5ae1d8ac0512bbc74f..daf0b56825498d48ca48d1d5b0ea6a3d7314bfc4 100644 --- a/paddle/contrib/inference/paddle_inference_api_anakin_engine.cc +++ b/paddle/contrib/inference/paddle_inference_api_anakin_engine.cc @@ -54,6 +54,7 @@ bool PaddleInferenceAnakinPredictor::Run( LOG(ERROR) << "copy data from CPU to GPU error"; return false; } + cudaStreamSynchronize(NULL); } executor_.prediction(); @@ -76,6 +77,7 @@ bool PaddleInferenceAnakinPredictor::Run( LOG(ERROR) << "copy data from GPU to CPU error"; return false; } + cudaStreamSynchronize(NULL); } return true; } diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 58be61362cabf22a3543af364f1b0bd180df826a..9a72e1baa34274201c40bd83a7aace549a7fc6ae 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -104,7 +104,7 @@ ParallelExecutor::ParallelExecutor( } if (member_->local_scopes_.size() != 1 && local_scopes.empty()) { - BCastParamsToDevs(bcast_vars); + BCastParamsToDevices(bcast_vars); } // Startup Program has been run. All local scopes has correct parameters. @@ -140,7 +140,7 @@ ParallelExecutor::ParallelExecutor( member_->places_, std::move(member_->executor_))); } -void ParallelExecutor::BCastParamsToDevs( +void ParallelExecutor::BCastParamsToDevices( const std::unordered_set &vars) const { // the initializing bcast, all vars would be bcast from device(0), // otherwise @@ -218,7 +218,10 @@ void ParallelExecutor::BCastParamsToDevs( auto local_scope = member_->local_scopes_[i]; auto *t = local_scope->Var(var)->GetMutable(); - if (member_->use_all_reduce_ || member_->use_cuda_) { + + // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix. + if (member_->use_all_reduce_ || member_->use_cuda_ || + var == "@LR_DECAY_COUNTER@") { t->Resize(dims); t->mutable_data(cpu, main_tensor.type()); paddle::framework::TensorCopy(main_tensor, cpu, t); diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 6985b6540690c6218bcee51ba0e69f3d34812bfc..ffb9934a2d702b2bf6db7ad75a6bf9867e1e9901 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -66,7 +66,7 @@ class ParallelExecutor { void Run(const std::vector &fetch_tensors, const std::string &fetched_var_name); - void BCastParamsToDevs(const std::unordered_set &vars) const; + void BCastParamsToDevices(const std::unordered_set &vars) const; private: ParallelExecutorPrivate *member_; diff --git a/paddle/fluid/inference/CMakeLists.txt b/paddle/fluid/inference/CMakeLists.txt index b1c33c3415f49f9b1160655034350087432d0cb0..86643b9aa111a9d73c184097e0c43bbec30c6b31 100644 --- a/paddle/fluid/inference/CMakeLists.txt +++ b/paddle/fluid/inference/CMakeLists.txt @@ -1,3 +1,10 @@ +# analysis and tensorrt must be added before creating static library, +# otherwise, there would be undefined reference to them in static library. +add_subdirectory(analysis) +if (TENSORRT_FOUND) + add_subdirectory(tensorrt) +endif() + set(FLUID_CORE_MODULES proto_desc memory lod_tensor executor ) # TODO(panyx0718): Should this be called paddle_fluid_inference_api_internal? @@ -7,10 +14,6 @@ cc_library(paddle_fluid_api get_property(fluid_modules GLOBAL PROPERTY FLUID_MODULES) -if(WITH_CONTRIB) - set(fluid_modules "${fluid_modules}" paddle_inference_api) -endif() - # Create static library cc_library(paddle_fluid DEPS ${fluid_modules} paddle_fluid_api) if(NOT APPLE) @@ -35,9 +38,3 @@ if(WITH_TESTING) # both tests/book and analysis depends the models that generated by python/paddle/fluid/tests/book add_subdirectory(tests/book) endif() - -add_subdirectory(analysis) - -if (TENSORRT_FOUND) - add_subdirectory(tensorrt) -endif() diff --git a/paddle/fluid/operators/auc_op.cc b/paddle/fluid/operators/auc_op.cc index c9871a9fe6b3b0d0cf671c2d155715f92c94fd8f..6bd3e491bccb037406b784147dc9f91049b34d53 100644 --- a/paddle/fluid/operators/auc_op.cc +++ b/paddle/fluid/operators/auc_op.cc @@ -35,7 +35,14 @@ class AucOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_EQ(inference_height, label_height, "Out and Label should have same height."); + int num_thres = ctx->Attrs().Get("num_thresholds"); + ctx->SetOutputDim("AUC", {1}); + ctx->SetOutputDim("TPOut", {num_thres}); + ctx->SetOutputDim("TNOut", {num_thres}); + ctx->SetOutputDim("FPOut", {num_thres}); + ctx->SetOutputDim("FNOut", {num_thres}); + ctx->ShareLoD("Out", /*->*/ "AUC"); } @@ -63,10 +70,18 @@ class AucOpMaker : public framework::OpProtoAndCheckerMaker { AddInput("Label", "A 2D int tensor indicating the label of the training data." "The height is batch size and width is always 1."); + AddInput("TP", "True-Positive value."); + AddInput("FP", "False-Positive value."); + AddInput("TN", "True-Negative value."); + AddInput("FN", "False-Negative value."); // TODO(typhoonzero): support weight input AddOutput("AUC", "A scalar representing the " "current area-under-the-curve."); + AddOutput("TPOut", "True-Positive value."); + AddOutput("FPOut", "False-Positive value."); + AddOutput("TNOut", "True-Negative value."); + AddOutput("FNOut", "False-Negative value."); AddAttr("curve", "Curve type, can be 'ROC' or 'PR'.") .SetDefault("ROC"); diff --git a/paddle/fluid/operators/auc_op.h b/paddle/fluid/operators/auc_op.h index 8b016c3d31ad83e66baeb298c61840cc529efa1e..58fefc1600dfb7df3e3d71959c047865ed5e2e39 100644 --- a/paddle/fluid/operators/auc_op.h +++ b/paddle/fluid/operators/auc_op.h @@ -34,6 +34,12 @@ class AucKernel : public framework::OpKernel { auto* inference = ctx.Input("Out"); auto* label = ctx.Input("Label"); auto* auc = ctx.Output("AUC"); + // Only use output var for now, make sure it's persistable and + // not cleaned up for each batch. + auto* true_positive = ctx.Output("TPOut"); + auto* false_positive = ctx.Output("FPOut"); + auto* true_negative = ctx.Output("TNOut"); + auto* false_negative = ctx.Output("FNOut"); float* auc_data = auc->mutable_data(ctx.GetPlace()); @@ -54,19 +60,10 @@ class AucKernel : public framework::OpKernel { const T* inference_data = inference->data(); const int64_t* label_data = label->data(); - // Create local tensor for storing the curve: TP, FN, TN, FP - // TODO(typhoonzero): use eigen op to caculate these values. - Tensor true_positive, false_positive, true_negative, false_negative; - - true_positive.Resize({num_thresholds}); - false_negative.Resize({num_thresholds}); - true_negative.Resize({num_thresholds}); - false_positive.Resize({num_thresholds}); - - int64_t* tp_data = true_positive.mutable_data(ctx.GetPlace()); - int64_t* fn_data = false_negative.mutable_data(ctx.GetPlace()); - int64_t* tn_data = true_negative.mutable_data(ctx.GetPlace()); - int64_t* fp_data = false_positive.mutable_data(ctx.GetPlace()); + auto* tp_data = true_positive->mutable_data(ctx.GetPlace()); + auto* fn_data = false_negative->mutable_data(ctx.GetPlace()); + auto* tn_data = true_negative->mutable_data(ctx.GetPlace()); + auto* fp_data = false_positive->mutable_data(ctx.GetPlace()); for (int idx_thresh = 0; idx_thresh < num_thresholds; idx_thresh++) { // caculate TP, FN, TN, FP for current thresh @@ -91,10 +88,10 @@ class AucKernel : public framework::OpKernel { } } // store rates - tp_data[idx_thresh] = tp; - fn_data[idx_thresh] = fn; - tn_data[idx_thresh] = tn; - fp_data[idx_thresh] = fp; + tp_data[idx_thresh] += tp; + fn_data[idx_thresh] += fn; + tn_data[idx_thresh] += tn; + fp_data[idx_thresh] += fp; } // epsilon to avoid divide by zero. float epsilon = 1e-6; diff --git a/paddle/fluid/operators/checkpoint_notify_op.cc b/paddle/fluid/operators/checkpoint_notify_op.cc index c4219a429a53eb4869426a2674109555fb784b85..3a2527e407bb179c4873fa3ffe2e8f22fb47faf7 100644 --- a/paddle/fluid/operators/checkpoint_notify_op.cc +++ b/paddle/fluid/operators/checkpoint_notify_op.cc @@ -48,7 +48,7 @@ class CheckpointNotifyOp : public framework::OperatorBase { VLOG(3) << "checkpoint notify sending lookup table: " << lookup_table_name << " and dir:" << dir << " to " << epmap[i]; } - rpc_client->Wait(); + PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); } }; diff --git a/paddle/fluid/operators/conv_mkldnn_op.cc b/paddle/fluid/operators/conv_mkldnn_op.cc index 5bfa1aaa696d5cbe8bdcb94d708746259952740f..5098bd8700e11c9a2faeba90c38ed2d9499b17cf 100644 --- a/paddle/fluid/operators/conv_mkldnn_op.cc +++ b/paddle/fluid/operators/conv_mkldnn_op.cc @@ -18,9 +18,6 @@ namespace paddle { namespace operators { -using conv_bwd_data = mkldnn::convolution_backward_data; -using conv_bwd_weights = mkldnn::convolution_backward_weights; -using conv_fwd = mkldnn::convolution_forward; using framework::DataLayout; using mkldnn::memory; using mkldnn::primitive; @@ -39,6 +36,72 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { conv_pd_ = conv_pd; } + ConvMKLDNNHandler( + std::shared_ptr conv_pd, + std::shared_ptr + conv_bwd_data_pd, + std::shared_ptr + conv_bwd_weights_pd, + const platform::MKLDNNDeviceContext& dev_ctx, mkldnn::engine engine, + const std::string& base_key) + : platform::MKLDNNHandler(dev_ctx, engine, base_key), + conv_pd_(conv_pd), + conv_bwd_weights_pd_(conv_bwd_weights_pd), + conv_bwd_data_pd_(conv_bwd_data_pd) { + // If we are in Grad operatgor then update a key with BWD suffix to + // distinguish from FWD memory primitives + key_ += "-BWD"; + } + + std::shared_ptr AcquireSrcMemoryFromWeightsPrimitive( + const std::shared_ptr user_memory_p, + std::vector& pipeline) { + auto src_pd = conv_bwd_weights_pd_->src_primitive_desc(); + auto user_pd = user_memory_p->get_primitive_desc(); + return this->AcquireMemory(src_pd, user_pd, user_memory_p, + "@weights-src_mem_p", pipeline); + } + + std::shared_ptr AcquireDiffDstMemoryFromWeightsPrimitive( + const std::shared_ptr user_memory_p, + std::vector& pipeline) { + auto diff_dst_pd = conv_bwd_weights_pd_->diff_dst_primitive_desc(); + auto user_pd = user_memory_p->get_primitive_desc(); + return this->AcquireMemory(diff_dst_pd, user_pd, user_memory_p, + "@weights-diff_dst_mem_p", pipeline); + } + + std::shared_ptr AcquireDiffWeightsMemoryFromWeightsPrimitive( + void* ptr) { + return this->AcquireMemoryFromPrimitive( + conv_bwd_weights_pd_->diff_weights_primitive_desc(), ptr, + "@diff_weights_mem_p"); + } + + std::shared_ptr AcquireDiffDstMemoryFromDataPrimitive( + const std::shared_ptr user_memory_p, + std::vector& pipeline) { + auto diff_dst_pd = conv_bwd_data_pd_->diff_dst_primitive_desc(); + auto user_pd = user_memory_p->get_primitive_desc(); + return this->AcquireMemory(diff_dst_pd, user_pd, user_memory_p, + "@data-diff_dst_mem_p", pipeline); + } + + std::shared_ptr AcquireWeightsMemoryFromDataPrimitive( + const std::shared_ptr user_weights_memory_p, + std::vector& pipeline) { + auto weights_pd = conv_bwd_data_pd_->weights_primitive_desc(); + auto user_pd = user_weights_memory_p->get_primitive_desc(); + return this->AcquireMemory(weights_pd, user_pd, user_weights_memory_p, + "@data-weights_mem_p", pipeline); + } + + std::shared_ptr AcquireDiffSrcMemoryFromDataPrimitive( + void* ptr) { + return this->AcquireMemoryFromPrimitive( + conv_bwd_data_pd_->diff_src_primitive_desc(), ptr, "@diff_src_mem_p"); + } + std::shared_ptr AcquireDstMemoryFromPrimitive(void* ptr) { return this->AcquireMemoryFromPrimitive(conv_pd_->dst_primitive_desc(), ptr, "@dst_mem_p"); @@ -68,7 +131,6 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { std::shared_ptr weights_memory_p, std::shared_ptr dst_memory_p) { auto prim_key = key_ + "@conv_p"; - auto prim_desc_key = key_ + "@conv_pd"; auto conv_p = std::static_pointer_cast( dev_ctx_.GetBlob(prim_key)); PADDLE_ENFORCE((conv_p != nullptr) || (is_reusing_ == false), @@ -85,6 +147,54 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { return conv_p; } + std::shared_ptr + AcquireConvolutionBackwardWeights( + std::shared_ptr src_memory_p, + std::shared_ptr diff_dst_memory_p, + std::shared_ptr diff_weights_memory_p) { + auto prim_key = key_ + "@conv_bwd_weights_p"; + auto conv_bwd_weights_p = + std::static_pointer_cast( + dev_ctx_.GetBlob(prim_key)); + PADDLE_ENFORCE( + (conv_bwd_weights_p != nullptr) || (is_reusing_ == false), + "Fail to find convolution bwd weights primitive in device context"); + if (conv_bwd_weights_p == nullptr) { + // create backward conv primitive for weights + conv_bwd_weights_p = + std::make_shared( + *conv_bwd_weights_pd_, *src_memory_p, *diff_dst_memory_p, + *diff_weights_memory_p); + dev_ctx_.SetBlob(prim_key, conv_bwd_weights_p); + } else { + is_reusing_ = true; + } + return conv_bwd_weights_p; + } + + std::shared_ptr + AcquireConvolutionBackwardData( + std::shared_ptr diff_dst_memory_p, + std::shared_ptr weights_memory_p, + std::shared_ptr diff_src_memory_p) { + auto prim_key = key_ + "@conv_bwd_data_p"; + auto conv_bwd_data_p = + std::static_pointer_cast( + dev_ctx_.GetBlob(prim_key)); + PADDLE_ENFORCE( + (conv_bwd_data_p != nullptr) || (is_reusing_ == false), + "Fail to find convolution bwd data primitive in device context"); + if (conv_bwd_data_p == nullptr) { + conv_bwd_data_p = std::make_shared( + *conv_bwd_data_pd_, *diff_dst_memory_p, *weights_memory_p, + *diff_src_memory_p); + dev_ctx_.SetBlob(prim_key, conv_bwd_data_p); + } else { + is_reusing_ = true; + } + return conv_bwd_data_p; + } + // Generate keys for storing/retriving primitives for this operator // TODO(jczaja): Make hashing function more optimial static std::string GetHash(memory::dims& input_dims, @@ -100,6 +210,10 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { private: std::shared_ptr conv_pd_; + std::shared_ptr + conv_bwd_weights_pd_; + std::shared_ptr + conv_bwd_data_pd_; }; template @@ -174,8 +288,9 @@ class ConvMKLDNNOpKernel : public paddle::framework::OpKernel { dst_tz, platform::MKLDNNGetDataType(), memory::format::any); // create a conv primitive descriptor and save it for usage in backward - std::shared_ptr conv_pd = ConvFwdPrimitiveDesc( - src_md, weights_md, dst_md, strides, paddings, mkldnn_engine); + std::shared_ptr conv_pd = + ConvFwdPrimitiveDesc(src_md, weights_md, dst_md, strides, paddings, + mkldnn_engine); // Save conv_pd/src_memory/weights_memory for backward pass dev_ctx.SetBlob(key_conv_pd, conv_pd); @@ -208,21 +323,24 @@ class ConvMKLDNNOpKernel : public paddle::framework::OpKernel { } private: - std::unique_ptr ConvFwdPrimitiveDesc( - const memory::desc& src, const memory::desc& weights, - const memory::desc& dst, const std::vector& strides, - const std::vector& paddings, const mkldnn::engine& engine) const { + std::unique_ptr + ConvFwdPrimitiveDesc(const memory::desc& src, const memory::desc& weights, + const memory::desc& dst, const std::vector& strides, + const std::vector& paddings, + const mkldnn::engine& engine) const { memory::dims stride_dims = {strides[0], strides[1]}; memory::dims padding_dims = {paddings[0], paddings[1]}; - auto conv_desc = - conv_fwd::desc(mkldnn::prop_kind::forward, mkldnn::convolution_direct, - src, weights, dst, stride_dims, padding_dims, - padding_dims, mkldnn::padding_kind::zero); + auto conv_desc = mkldnn::convolution_forward::desc( + mkldnn::prop_kind::forward, mkldnn::convolution_direct, src, weights, + dst, stride_dims, padding_dims, padding_dims, + mkldnn::padding_kind::zero); - auto p_conv_pd = new conv_fwd::primitive_desc(conv_desc, engine); + auto p_conv_pd = + new mkldnn::convolution_forward::primitive_desc(conv_desc, engine); - return std::unique_ptr(p_conv_pd); + return std::unique_ptr( + p_conv_pd); } }; @@ -284,153 +402,115 @@ class ConvMKLDNNGradOpKernel : public paddle::framework::OpKernel { std::vector dst_tz = paddle::framework::vectorize2int(output->dims()); // Get an unique name from "argument" name of "Output" variable + // as well as attributes of primitive to be created // This name will be used as key when saving info into device context const std::string key = ConvMKLDNNHandler::GetHash(src_tz, weights_tz, strides, paddings, dilations, groups, ctx.op().Input("Output")); const std::string key_conv_pd = key + "@conv_pd"; + std::vector pipeline; - // create mkldnn memory from input tensors (input/weights/output_grad) - auto user_src_memory = memory( - {{{src_tz}, memory::data_type::f32, input->format()}, mkldnn_engine}, - to_void_cast(input_data)); - auto user_weights_memory = - memory({{{weights_tz}, memory::data_type::f32, filter->format()}, - mkldnn_engine}, - to_void_cast(filter_data)); - auto user_diff_dst_memory = - memory({{{dst_tz}, memory::data_type::f32, output_grad->format()}, - mkldnn_engine}, - to_void_cast(output_grad_data)); + // Create user memory descriptors + auto user_src_md = platform::MKLDNNMemDesc( + {src_tz}, platform::MKLDNNGetDataType(), input->format()); + auto user_weights_md = platform::MKLDNNMemDesc( + {weights_tz}, platform::MKLDNNGetDataType(), filter->format()); + auto user_diff_dst_md = platform::MKLDNNMemDesc( + {dst_tz}, platform::MKLDNNGetDataType(), output_grad->format()); /* create memory descriptor for conv backward without specified format * ('any') which lets a primitive (conv backward in this case) choose * the memory format preferred for best performance */ - auto src_md = platform::MKLDNNMemDesc(src_tz, memory::data_type::f32, - memory::format::any); - auto diff_src_md = platform::MKLDNNMemDesc(src_tz, memory::data_type::f32, - memory::format::any); + auto src_md = platform::MKLDNNMemDesc( + src_tz, platform::MKLDNNGetDataType(), memory::format::any); + auto diff_src_md = platform::MKLDNNMemDesc( + src_tz, platform::MKLDNNGetDataType(), memory::format::any); auto weights_md = platform::MKLDNNMemDesc( - weights_tz, memory::data_type::f32, memory::format::any); + weights_tz, platform::MKLDNNGetDataType(), memory::format::any); auto diff_weights_md = platform::MKLDNNMemDesc( - weights_tz, memory::data_type::f32, memory::format::any); - auto diff_dst_md = platform::MKLDNNMemDesc(dst_tz, memory::data_type::f32, - memory::format::any); + weights_tz, platform::MKLDNNGetDataType(), memory::format::any); + auto diff_dst_md = platform::MKLDNNMemDesc( + dst_tz, platform::MKLDNNGetDataType(), memory::format::any); // Retrieve conv_pd from device context - auto conv_pd = std::static_pointer_cast( - dev_ctx.GetBlob(key_conv_pd)); + auto conv_pd = + std::static_pointer_cast( + dev_ctx.GetBlob(key_conv_pd)); PADDLE_ENFORCE(conv_pd != nullptr, "Fail to find conv_pd in device context"); + // create backward convolution weights primitive descriptor + auto conv_bwd_weights_desc = mkldnn::convolution_backward_weights::desc( + mkldnn::convolution_direct, src_md, diff_weights_md, diff_dst_md, + strides, paddings, paddings, mkldnn::padding_kind::zero); + auto conv_bwd_weights_pd = + std::make_shared( + conv_bwd_weights_desc, mkldnn_engine, *conv_pd); + + // create backward convolution data primitive descriptor + auto conv_bwd_data_desc = mkldnn::convolution_backward_data::desc( + mkldnn::convolution_direct, diff_src_md, weights_md, diff_dst_md, + strides, paddings, paddings, mkldnn::padding_kind::zero); + auto conv_bwd_data_pd = + std::make_shared( + conv_bwd_data_desc, mkldnn_engine, *conv_pd); + + ConvMKLDNNHandler handler(conv_pd, conv_bwd_data_pd, conv_bwd_weights_pd, + dev_ctx, mkldnn_engine, key); + + // create mkldnn memory from input tensors (data/weights) + auto user_src_memory_p = + handler.AcquireSrcMemory(user_src_md, to_void_cast(input_data)); + auto user_weights_memory_p = handler.AcquireWeightsMemory( + user_weights_md, to_void_cast(filter_data)); + auto user_diff_dst_memory_p = handler.AcquireDiffDstMemory( + user_diff_dst_md, to_void_cast(output_grad_data)); + // create backward conv primitive for weights if (filter_grad) { - // create backward convolution primitive descriptor - auto conv_bwd_weights_desc = conv_bwd_weights::desc( - mkldnn::convolution_direct, src_md, diff_weights_md, diff_dst_md, - strides, paddings, paddings, mkldnn::padding_kind::zero); - auto conv_bwd_weights_pd = conv_bwd_weights::primitive_desc( - conv_bwd_weights_desc, mkldnn_engine, *conv_pd); - - // create reorder primitive if the input format is not the preferred one - auto src_memory = user_src_memory; - primitive reorder_src; - bool is_src_reordered = false; - if (memory::primitive_desc(conv_bwd_weights_pd.src_primitive_desc()) != - user_src_memory.get_primitive_desc()) { - src_memory = memory(conv_bwd_weights_pd.src_primitive_desc()); - reorder_src = reorder(user_src_memory, src_memory); - is_src_reordered = true; - } - - auto diff_dst_memory_4filter = user_diff_dst_memory; - primitive reorder_diff_dst_4filter; - bool is_diff_dst_reordered_4filter = false; - if (memory::primitive_desc( - conv_bwd_weights_pd.diff_dst_primitive_desc()) != - user_diff_dst_memory.get_primitive_desc()) { - diff_dst_memory_4filter = - memory(conv_bwd_weights_pd.diff_dst_primitive_desc()); - reorder_diff_dst_4filter = - reorder(user_diff_dst_memory, diff_dst_memory_4filter); - is_diff_dst_reordered_4filter = true; - } - - // create mkldnn memory for output (i.e. diff weights) - auto diff_weights_memory = - memory(conv_bwd_weights_pd.diff_weights_primitive_desc(), - reinterpret_cast(filter_grad_data)); + auto src_memory_p = handler.AcquireSrcMemoryFromWeightsPrimitive( + user_src_memory_p, pipeline); - // create backward conv primitive for weights - auto conv_bwd_weights_prim = - conv_bwd_weights(conv_bwd_weights_pd, src_memory, - diff_dst_memory_4filter, diff_weights_memory); - - // push primitive and execute it - std::vector pipeline; - if (is_src_reordered) pipeline.push_back(reorder_src); - if (is_diff_dst_reordered_4filter) - pipeline.push_back(reorder_diff_dst_4filter); - pipeline.push_back(conv_bwd_weights_prim); - stream(stream::kind::eager).submit(pipeline).wait(); + auto diff_dst_memory_4filter_p = + handler.AcquireDiffDstMemoryFromWeightsPrimitive( + user_diff_dst_memory_p, pipeline); + + auto diff_weights_memory_p = + handler.AcquireDiffWeightsMemoryFromWeightsPrimitive( + reinterpret_cast(filter_grad_data)); + + auto conv_bwd_weights_p = handler.AcquireConvolutionBackwardWeights( + src_memory_p, diff_dst_memory_4filter_p, diff_weights_memory_p); + + // push primitive to stream and wait until it's executed + pipeline.push_back(*conv_bwd_weights_p); filter_grad->set_layout(DataLayout::kMKLDNN); - filter_grad->set_format(GetMKLDNNFormat(diff_weights_memory)); + filter_grad->set_format(GetMKLDNNFormat(*diff_weights_memory_p)); } if (input_grad) { - // create backward convolution primitive descriptor - auto conv_bwd_data_desc = conv_bwd_data::desc( - mkldnn::convolution_direct, diff_src_md, weights_md, diff_dst_md, - strides, paddings, paddings, mkldnn::padding_kind::zero); - auto conv_bwd_data_pd = conv_bwd_data::primitive_desc( - conv_bwd_data_desc, mkldnn_engine, *conv_pd); - - // create reorder primitive if the input format is not the preferred one - auto weights_memory = user_weights_memory; - primitive reorder_weights; - bool is_weights_reordered = false; - if (memory::primitive_desc(conv_bwd_data_pd.weights_primitive_desc()) != - user_weights_memory.get_primitive_desc()) { - weights_memory = memory(conv_bwd_data_pd.weights_primitive_desc()); - reorder_weights = reorder(user_weights_memory, weights_memory); - is_weights_reordered = true; - } - - auto diff_dst_memory_4data = user_diff_dst_memory; - primitive reorder_diff_dst_4data; - bool is_diff_dst_reordered_4data = false; - if (memory::primitive_desc(conv_bwd_data_pd.diff_dst_primitive_desc()) != - user_diff_dst_memory.get_primitive_desc()) { - diff_dst_memory_4data = - memory(conv_bwd_data_pd.diff_dst_primitive_desc()); - reorder_diff_dst_4data = - reorder(user_diff_dst_memory, diff_dst_memory_4data); - is_diff_dst_reordered_4data = true; - } - - // create mkldnn memory for output (i.e. diff src) - auto diff_src_memory = memory(conv_bwd_data_pd.diff_src_primitive_desc(), - reinterpret_cast(input_grad_data)); - - // create backward conv primitive for data - auto conv_bwd_data_prim = - conv_bwd_data(conv_bwd_data_pd, diff_dst_memory_4data, weights_memory, - diff_src_memory); - - // push primitive and execute it - std::vector pipeline; - if (is_weights_reordered) pipeline.push_back(reorder_weights); - if (is_diff_dst_reordered_4data) - pipeline.push_back(reorder_diff_dst_4data); - pipeline.push_back(conv_bwd_data_prim); - stream(stream::kind::eager).submit(pipeline).wait(); + auto weights_memory_p = handler.AcquireWeightsMemoryFromDataPrimitive( + user_weights_memory_p, pipeline); + + auto diff_dst_memory_4data_p = + handler.AcquireDiffDstMemoryFromDataPrimitive(user_diff_dst_memory_p, + pipeline); + + auto diff_src_memory_p = handler.AcquireDiffSrcMemoryFromDataPrimitive( + reinterpret_cast(input_grad_data)); + + auto conv_bwd_data_p = handler.AcquireConvolutionBackwardData( + diff_dst_memory_4data_p, weights_memory_p, diff_src_memory_p); + + pipeline.push_back(*conv_bwd_data_p); input_grad->set_layout(DataLayout::kMKLDNN); - input_grad->set_format(GetMKLDNNFormat(diff_src_memory)); + input_grad->set_format(GetMKLDNNFormat(*diff_src_memory_p)); } + stream(stream::kind::eager).submit(pipeline).wait(); } // Compute() }; diff --git a/paddle/fluid/operators/distributed/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc index 35318a805898de645c844a2224f6df8c458d346c..4d60801b6a6ecaabf1165321e0cb19044d27aa34 100644 --- a/paddle/fluid/operators/distributed/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc_client.cc @@ -281,9 +281,10 @@ void GRPCClient::AsyncCheckpointNotify(const std::string& ep, req_count_++; } -void GRPCClient::Wait() { +bool GRPCClient::Wait() { std::unique_lock lk(sync_mutex_); - sync_cond_.wait(lk, [this] { return req_count_ == 0; }); + sync_cond_.wait(lk, [this] { return (req_count_ == 0 || ok_ == false); }); + return ok_; } void GRPCClient::Proceed() { @@ -297,6 +298,14 @@ void GRPCClient::Proceed() { if (c->status_.ok()) { VLOG(3) << c->var_h_.String() << " process"; c->Process(); + } else if (c->status_.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) { + LOG(ERROR) << c->var_h_.String() + << " meets grpc error:" << c->status_.error_message(); + { + std::lock_guard lk(sync_mutex_); + ok_ = false; + } + sync_cond_.notify_all(); } else { LOG(FATAL) << c->var_h_.String() << " meets grpc error:" << c->status_.error_message(); diff --git a/paddle/fluid/operators/distributed/grpc_client.h b/paddle/fluid/operators/distributed/grpc_client.h index 5dae20155edcf9edd746a5d9a9bbe0ccd789f431..d03a3e56aedbe4a008ee9ff187111f7635d14b58 100644 --- a/paddle/fluid/operators/distributed/grpc_client.h +++ b/paddle/fluid/operators/distributed/grpc_client.h @@ -188,7 +188,7 @@ class CheckpointNotifyProcessor : public BaseProcessor { class GRPCClient : public RPCClient { public: - GRPCClient() {} + GRPCClient() : ok_(true) {} virtual ~GRPCClient(); bool AsyncSendVar(const std::string& ep, const platform::DeviceContext& ctx, @@ -221,7 +221,7 @@ class GRPCClient : public RPCClient { void AsyncSendEndPass(const std::string& ep, int64_t time_out = FLAGS_rpc_deadline) override; - void Wait() override; + bool Wait() override; void SendBeginPass() override; @@ -247,6 +247,7 @@ class GRPCClient : public RPCClient { std::mutex sync_mutex_; std::condition_variable sync_cond_; std::atomic req_count_{0}; + bool ok_; // mutex for GetChannel thread safety std::mutex chan_mutex_; diff --git a/paddle/fluid/operators/distributed/rpc_client.h b/paddle/fluid/operators/distributed/rpc_client.h index 6479d3a97bafba37b74a1d1c04852a6e60e01be8..4d87376fbf776e29156b78d826f5012bc53460df 100644 --- a/paddle/fluid/operators/distributed/rpc_client.h +++ b/paddle/fluid/operators/distributed/rpc_client.h @@ -72,7 +72,7 @@ class RPCClient { virtual void SendBeginPass() = 0; virtual void SendEndPass() = 0; - virtual void Wait() = 0; + virtual bool Wait() = 0; template static RPCClient* GetInstance() { diff --git a/paddle/fluid/operators/fetch_barrier_op.cc b/paddle/fluid/operators/fetch_barrier_op.cc index 02beb80fc8a9f451393dcdd54492c4f88f908497..680fde19eefe57475b7526ebc29d4ff977a16977 100644 --- a/paddle/fluid/operators/fetch_barrier_op.cc +++ b/paddle/fluid/operators/fetch_barrier_op.cc @@ -45,13 +45,13 @@ class FetchBarrierOp : public framework::OperatorBase { distributed::RPCClient* rpc_client = distributed::RPCClient::GetInstance(); - rpc_client->Wait(); + PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); for (auto& ep : eps) { VLOG(3) << "fetch barrier, ep: " << ep; rpc_client->AsyncSendFetchBarrier(ep); } - rpc_client->Wait(); + PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); } }; diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 56e39649b409f7eed108027f6df58c19dd3c8ab8..438b44b42aaf4c7e3ff05a5f7c52bbfd850e92c7 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -61,6 +61,8 @@ static void ParallelExecuteBlocks( framework::Async([&executor, &prepared, &program, &scope, idx]() { int run_block = idx; // thread local try { + VLOG(3) << "running server block: " << run_block + << "pointer: " << prepared[run_block].get(); executor->RunPreparedContext(prepared[run_block].get(), scope); } catch (const std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); @@ -107,12 +109,14 @@ void ListenAndServOp::RunSyncLoop( PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - std::vector optimize_blocks_idx; - for (auto blk : optimize_blocks) { - optimize_blocks_idx.push_back(blk->ID()); + // Prepare all the server block + std::vector optimize_blocks_list; + for (size_t i = 1; i < program->Size(); ++i) { + optimize_blocks_list.push_back(i); } - auto optimize_prepared = executor->Prepare(*program, optimize_blocks_idx); - // Insert placeholder for block0 which holds current op itself. + auto optimize_prepared = executor->Prepare(*program, optimize_blocks_list); + // Insert placeholder for block0 which holds current op itself, + // NOTE the first block in `optimize_prepared` should never be ran. optimize_prepared.insert( optimize_prepared.begin(), std::shared_ptr(nullptr)); diff --git a/paddle/fluid/operators/prefetch_op.cc b/paddle/fluid/operators/prefetch_op.cc index 8734282fe496b8e90af19abd5549566d62316fc3..4b804740a06f9e29704f2b3f58a90191e3559347 100644 --- a/paddle/fluid/operators/prefetch_op.cc +++ b/paddle/fluid/operators/prefetch_op.cc @@ -53,7 +53,7 @@ class PrefetchOp : public framework::OperatorBase { VLOG(3) << "don't send no-initialied variable: " << ins[i]; } } - rpc_client->Wait(); + PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); } }; diff --git a/paddle/fluid/operators/recv_op.cc b/paddle/fluid/operators/recv_op.cc index 9854a31f5b10f5ecd940c0d41c2c3e468fc17bad..1ba684014904e61a86bebacd7d29d7e10d313092 100644 --- a/paddle/fluid/operators/recv_op.cc +++ b/paddle/fluid/operators/recv_op.cc @@ -51,7 +51,7 @@ class RecvOp : public framework::OperatorBase { rpc_client->AsyncGetVar(epmap[i], ctx, scope, outs[i]); } if (sync_mode) { - rpc_client->Wait(); + PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); } } }; diff --git a/paddle/fluid/operators/send_barrier_op.cc b/paddle/fluid/operators/send_barrier_op.cc index 6b4572dcccc21e783f1df0b9bcde11d532ff4ba8..d7f8e994afd7e656bd5a9dd7c5ab45f0d52fe88b 100644 --- a/paddle/fluid/operators/send_barrier_op.cc +++ b/paddle/fluid/operators/send_barrier_op.cc @@ -50,13 +50,13 @@ class SendBarrierOp : public framework::OperatorBase { VLOG(3) << "SendBarrierOp sync_mode:" << sync_mode; // need to wait before sending send_barrier message - rpc_client->Wait(); + PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); if (sync_mode) { for (auto& ep : eps) { VLOG(3) << "send barrier, ep: " << ep; rpc_client->AsyncSendBatchBarrier(ep); } - rpc_client->Wait(); + PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); } } }; diff --git a/paddle/fluid/operators/send_op.cc b/paddle/fluid/operators/send_op.cc index 0cac329aafa8c4c67cae48ba62a48575f5edba92..829f310d4233c01a7fbb9ccf7427f6e47ce8d384 100644 --- a/paddle/fluid/operators/send_op.cc +++ b/paddle/fluid/operators/send_op.cc @@ -59,7 +59,7 @@ class SendOp : public framework::OperatorBase { } } if (sync_send) { - rpc_client->Wait(); + PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); } } }; diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index fcd3356d44ee592233c3883d439d0677714900b8..2199f5311fd3728e624fc222a1b876eb947cc0aa 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -145,14 +145,14 @@ void BindBlockDesc(pybind11::module *m) { .def_property_readonly("id", &pd::BlockDesc::ID) .def_property_readonly("parent", &pd::BlockDesc::Parent) .def("get_forward_block_idx", &pd::BlockDesc::ForwardBlockID) - .def("set_forward_block_idx", &pd::BlockDesc::SetForwardBlockID) + .def("_set_forward_block_idx", &pd::BlockDesc::SetForwardBlockID) .def("append_op", &pd::BlockDesc::AppendOp, pybind11::return_value_policy::reference) - .def("prepend_op", &pd::BlockDesc::PrependOp, + .def("_prepend_op", &pd::BlockDesc::PrependOp, pybind11::return_value_policy::reference) - .def("insert_op", &pd::BlockDesc::InsertOp, + .def("_insert_op", &pd::BlockDesc::InsertOp, pybind11::return_value_policy::reference) - .def("remove_op", &pd::BlockDesc::RemoveOp) + .def("_remove_op", &pd::BlockDesc::RemoveOp) .def("var", [](pd::BlockDesc &self, pybind11::bytes byte_name) { std::string name = byte_name; @@ -165,7 +165,7 @@ void BindBlockDesc(pybind11::module *m) { return self.HasVar(name); }, pybind11::return_value_policy::reference) - .def("rename_var", + .def("_rename_var", [](pd::BlockDesc &self, const pybind11::bytes &byte_name, const pybind11::bytes &byte_name_new) { std::string name = byte_name; @@ -189,7 +189,7 @@ void BindBlockDesc(pybind11::module *m) { return self.FindVarRecursive(name); }, pybind11::return_value_policy::reference) - .def("remove_var", + .def("_remove_var", [](pd::BlockDesc &self, pybind11::bytes byte_name) { std::string name = byte_name; return self.RemoveVar(name); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index d8dc421bed711cfc1a149592c24b11c4ef115ec9..216c4666c0a311f93f29692b2ca1d17bf9dafab8 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -68,7 +68,7 @@ bool IsCompiledWithCUDA() { } bool IsCompiledWithDIST() { -#ifdef PADDLE_WITH_DIST +#ifdef PADDLE_WITH_DISTRIBUTE return true; #else return false; @@ -669,7 +669,7 @@ All parameter, weight, gradient are variables in Paddle. const std::string &, Scope *, std::vector &, const ExecutionStrategy &, const BuildStrategy &, size_t, size_t>()) - .def("bcast_params", &ParallelExecutor::BCastParamsToDevs) + .def("bcast_params", &ParallelExecutor::BCastParamsToDevices) // NOTE: even we return a vec* to Python use reference policy. // We still cannot get local_scope from this vector, since the element // of vec will be freed by Python GC. We can only return Scope* diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index f21fd5cd6ec1a7c526694bfc3e7fc574cbbc365d..4bdd86990f82be96085dca640cd52ffb843b2c32 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -516,6 +516,7 @@ function gen_fluid_inference_lib() { Deploying fluid inference library ... ======================================== EOF + cmake .. -DWITH_DISTRIBUTE=OFF make -j `nproc` inference_lib_dist cd ${PADDLE_ROOT}/build cp -r fluid_install_dir fluid @@ -531,7 +532,7 @@ function test_fluid_inference_lib() { ======================================== EOF cd ${PADDLE_ROOT}/paddle/contrib/inference/demo_ci - sh run.sh ${PADDLE_ROOT} ${WITH_MKL:-ON} ${WITH_GPU:-OFF} + ./run.sh ${PADDLE_ROOT} ${WITH_MKL:-ON} ${WITH_GPU:-OFF} fi } @@ -577,6 +578,7 @@ function main() { fluid_inference_lib) cmake_gen ${PYTHON_ABI:-""} gen_fluid_inference_lib + test_fluid_inference_lib ;; check_style) check_style diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index a06e041c1e8aaa8897ac77f2ec1275824849e7ef..329d3c5ace2ee59a1f7eb5cb52965720a290880e 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -46,7 +46,7 @@ from param_attr import ParamAttr, WeightNormParamAttr from data_feeder import DataFeeder from core import LoDTensor, LoDTensorArray, CPUPlace, CUDAPlace, CUDAPinnedPlace, Scope from transpiler import DistributeTranspiler, InferenceTranspiler, \ - memory_optimize, release_memory + memory_optimize, release_memory, DistributeTranspilerConfig from concurrency import (Go, make_channel, channel_send, channel_recv, channel_close, Select) from lod_tensor import create_lod_tensor, create_random_int_lodtensor @@ -56,6 +56,7 @@ import unique_name import recordio_writer import parallel_executor from parallel_executor import * +from paddle.fluid.layers.math_op_patch import monkey_patch_variable Tensor = LoDTensor @@ -138,5 +139,5 @@ def __bootstrap__(): # TODO(panyx0718): Avoid doing complex initialization logic in __init__.py. # Consider paddle.init(args) or paddle.main(args) -layers.monkey_patch_variable() +monkey_patch_variable() __bootstrap__() diff --git a/python/paddle/fluid/backward.py b/python/paddle/fluid/backward.py index ddcde04716d21df1f18e7202936f470d3d58a661..812f68bdd849544456b2e0ebf0b739f4f92b09ea 100644 --- a/python/paddle/fluid/backward.py +++ b/python/paddle/fluid/backward.py @@ -328,7 +328,7 @@ def _append_backward_ops_(block, if op.has_attr("sub_block"): sub_block = program.block(op.block_attr("sub_block")) grad_sub_block = program.create_block() - grad_sub_block.set_forward_block_idx(sub_block.idx) + grad_sub_block._set_forward_block_idx(sub_block.idx) cb = _callback_lookup_(op) if cb is not None: if callbacks is None: @@ -571,7 +571,7 @@ def append_backward(loss, parameter_list=None, no_grad_set=None, _append_backward_vars_(root_block, fwd_op_num, grad_to_var, grad_info_map) program.current_block_idx = current_block_idx - program.sync_with_cpp() + program._sync_with_cpp() # FIXME(zcd): prevent loss.grad optimized by mem_opt. loss.block.var(_append_grad_suffix_(loss.name)).persistable = True @@ -744,7 +744,7 @@ def calc_gradient(targets, inputs, target_gradients=None, no_grad_set=None): _rename_grad_(block, fwd_op_num, grad_to_var, target_grad_map) _append_backward_vars_(block, fwd_op_num, grad_to_var, grad_info_map) - prog.sync_with_cpp() + prog._sync_with_cpp() grad_vars = [] for input_var in inputs: diff --git a/python/paddle/fluid/clip.py b/python/paddle/fluid/clip.py index d9acfef58c3ba92c763d195c88f1323b3c6512b9..c029662ebc1b7e7f7d1ea44b4ebd4b08b812a579 100644 --- a/python/paddle/fluid/clip.py +++ b/python/paddle/fluid/clip.py @@ -82,7 +82,7 @@ def error_clip_callback(block, context): op_desc = block.desc.op(block.desc.op_size() - 1) for grad_n in filter(lambda n: grad_to_var.has_key(n), op_desc.output_arg_names()): - fwd_var = block.var_recursive(grad_to_var[grad_n]) + fwd_var = block._var_recursive(grad_to_var[grad_n]) error_clip = getattr(fwd_var, "error_clip", None) if not (error_clip is None or isinstance(error_clip, BaseErrorClipAttr)): diff --git a/python/paddle/fluid/concurrency.py b/python/paddle/fluid/concurrency.py index 470dd0df524936a773f6e740c8079f0efa8ef7b4..b8fe9bd4c1988dd3f6fa82df391c3059dfbfcf93 100644 --- a/python/paddle/fluid/concurrency.py +++ b/python/paddle/fluid/concurrency.py @@ -69,8 +69,10 @@ class Go(BlockGuard): parent_block.append_op( type='go', inputs={ - 'X': - [parent_block.var_recursive(x_name) for x_name in x_name_list] + 'X': [ + parent_block._var_recursive(x_name) + for x_name in x_name_list + ] }, outputs={}, attrs={'sub_block': go_block}) @@ -259,7 +261,7 @@ class Select(BlockGuard): if var_name in intermediate ] - X = [select_block.var_recursive(x_name) for x_name in params] + X = [select_block._var_recursive(x_name) for x_name in params] # Needs to be used by `equal` inside the cases block. X.append(self.case_to_execute) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index b436dfe70afdb52299222f8ba3f5bdff2842d103..f9e600cb4cb252baead87025db0e0db71e8169d2 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -309,7 +309,7 @@ class Executor(object): if not has_feed_operators(global_block, feed, feed_var_name): for i, name in enumerate(feed): out = global_block.var(name) - global_block.prepend_op( + global_block._prepend_op( type='feed', inputs={'X': [feed_var]}, outputs={'Out': [out]}, diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index d89cb246a939f247b94bc49f39198a909b1c30ea..03e0ac757586150610aee275620d9eee77323c99 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -32,7 +32,6 @@ except Exception, e: import unique_name __all__ = [ - 'Block', 'Variable', 'Program', 'Operator', @@ -447,7 +446,7 @@ class Operator(object): Notes: The constructor of operator should not be invoked directly. Use - Block.append_op or Block.prepend_op instead. + Block.append_op or Block._prepend_op instead. Examples: .. code-block:: python @@ -870,7 +869,7 @@ class Block(object): def forward_block_idx(self): return self.desc.get_forward_block_idx() - def set_forward_block_idx(self, idx): + def _set_forward_block_idx(self, idx): """ Set the forward block Idx. @@ -880,7 +879,7 @@ class Block(object): Returns: None """ - self.desc.set_forward_block_idx(idx) + self.desc._set_forward_block_idx(idx) @property def idx(self): @@ -909,7 +908,7 @@ class Block(object): raise ValueError("var %s not in this block" % name) return v - def var_recursive(self, name): + def _var_recursive(self, name): """ Get a Variable by name from this block recursively. @@ -951,9 +950,9 @@ class Block(object): raise ValueError("Var {0} is not found recursively".format(name)) def all_parameters(self): - return list(self.iter_parameters()) + return list(self._iter_parameters()) - def iter_parameters(self): + def _iter_parameters(self): return (item[1] for item in self.vars.iteritems() if isinstance(item[1], Parameter)) @@ -966,7 +965,7 @@ class Block(object): def has_var(self, name): return name in self.vars - def rename_var(self, name, new_name): + def _rename_var(self, name, new_name): """ Rename variable in vars and ops' inputs and outputs @@ -1000,8 +999,8 @@ class Block(object): else: raise ValueError("unsupported var type: %s", type(v)) orig_var_type = v.type - self.desc.rename_var(name, new_name) - # NOTE: v is destroyed by C++ after calling rename_var. + self.desc._rename_var(name, new_name) + # NOTE: v is destroyed by C++ after calling _rename_var. d = self.desc.find_var(new_name) if var_type == "Parameter": var = Parameter( @@ -1024,16 +1023,16 @@ class Block(object): error_clip=error_clip, stop_gradient=stop_gradient) - # rename the python side, sync_with_cpp will only add + # rename the python side, _sync_with_cpp will only add # new vars/ops to python side. self.vars[new_name] = var del self.vars[name] - self.sync_with_cpp() + self._sync_with_cpp() return var - def remove_var(self, name): - self.sync_with_cpp() - self.desc.remove_var(name) + def _remove_var(self, name): + self._sync_with_cpp() + self.desc._remove_var(name) del self.vars[name] def create_parameter(self, *args, **kwargs): @@ -1055,7 +1054,7 @@ class Block(object): self.ops.append(op) return op - def insert_op(self, index, *args, **kwargs): + def _insert_op(self, index, *args, **kwargs): """ Insert a Operator according to the giving arguments. @@ -1065,13 +1064,13 @@ class Block(object): Returns: Operator: the insert Operator. """ - self.sync_with_cpp() - op_desc = self.desc.insert_op(index) + self._sync_with_cpp() + op_desc = self.desc._insert_op(index) op = Operator(block=self, desc=op_desc, *args, **kwargs) self.ops.insert(index, op) return op - def remove_op(self, index): + def _remove_op(self, index): """ Remove the specific position operator. @@ -1081,11 +1080,11 @@ class Block(object): Returns: None """ - self.sync_with_cpp() - self.desc.remove_op(index, index + 1) + self._sync_with_cpp() + self.desc._remove_op(index, index + 1) del self.ops[index] - def slice_ops(self, start, end): + def _slice_ops(self, start, end): """ Return the Operator between start and end. @@ -1098,13 +1097,13 @@ class Block(object): """ return self.ops[start:end] - def prepend_op(self, *args, **kwargs): - op_desc = self.desc.prepend_op() + def _prepend_op(self, *args, **kwargs): + op_desc = self.desc._prepend_op() op = Operator(self, op_desc, *args, **kwargs) self.ops.insert(0, op) return op - def sync_with_cpp(self): + def _sync_with_cpp(self): """ Sync from the desc on the c++ end. This method is used to synchronize the c++ desc instance generated by backward. @@ -1170,7 +1169,7 @@ class Block(object): for index in range(len(self.ops)): assert self.ops[index].desc == ops_in_cpp[index] - def copy_param_info_from(self, other): + def _copy_param_info_from(self, other): """ Copy the information of parameters from the other block. @@ -1185,12 +1184,13 @@ class Block(object): None """ if not isinstance(other, Block): - raise TypeError("copy_param_info_from should be invoked with Block") - for p in other.iter_parameters(): + raise TypeError( + "_copy_param_info_from should be invoked with Block") + for p in other._iter_parameters(): assert isinstance(p, Parameter) v = self.vars.get(p.name, None) if v is None: - raise ValueError("copy_param_info_from should be invoked with " + raise ValueError("_copy_param_info_from should be invoked with " "same topology") assert isinstance(v, Variable) new_p = Parameter( @@ -1208,7 +1208,7 @@ class Block(object): name=v.name) self.vars[new_p.name] = new_p - def clone_variable(self, var): + def _clone_variable(self, var): """ Clone a variable into current block. @@ -1484,9 +1484,9 @@ class Program(object): p = Program() p.desc = core.ProgramDesc(self.desc) p.blocks = [Block(p, i) for i in xrange(self.desc.num_blocks())] - p.sync_with_cpp() + p._sync_with_cpp() - p.copy_param_info_from(self) + p._copy_param_info_from(self) p.copy_data_info_from(self) return p @@ -1536,7 +1536,7 @@ class Program(object): res = Program() res.desc = core.prune(self.desc, targets_idx) res.blocks = [Block(res, i) for i in xrange(res.desc.num_blocks())] - res.sync_with_cpp() + res._sync_with_cpp() return res def inference_optimize(self): @@ -1562,7 +1562,7 @@ class Program(object): if op.has_attr('is_test'): op.set_attr('is_test', True) res.blocks = [Block(res, i) for i in xrange(res.desc.num_blocks())] - res.sync_with_cpp() + res._sync_with_cpp() return res @staticmethod @@ -1582,7 +1582,7 @@ class Program(object): p = Program() p.desc = core.ProgramDesc(binary_str) p.blocks = [Block(p, i) for i in xrange(p.desc.num_blocks())] - p.sync_with_cpp() + p._sync_with_cpp() return p @property @@ -1662,7 +1662,7 @@ class Program(object): """ self.current_block_idx = self.current_block().parent_idx - def sync_with_cpp(self): + def _sync_with_cpp(self): """ Synchronize Python instance to its binding C++ object instance. If the program is modified in C++ space, this method should be invoked. @@ -1676,9 +1676,9 @@ class Program(object): for block_idx in range(len(self.blocks), self.desc.num_blocks()): self.blocks.append(Block(self, block_idx)) for block in self.blocks: - block.sync_with_cpp() + block._sync_with_cpp() - def copy_param_info_from(self, other): + def _copy_param_info_from(self, other): """ Copy the information of parameters from other program. @@ -1692,13 +1692,13 @@ class Program(object): None """ if not isinstance(other, Program): - raise TypeError("copy_param_info_from should be invoked with " + raise TypeError("_copy_param_info_from should be invoked with " "Program") if len(self.blocks) != len(other.blocks): - raise ValueError("copy_param_info_from should be invoked with two " + raise ValueError("_copy_param_info_from should be invoked with two " "program, with represent the same topology") - self.global_block().copy_param_info_from(other.global_block()) + self.global_block()._copy_param_info_from(other.global_block()) def copy_data_info_from(self, other): """ @@ -1714,11 +1714,11 @@ class Program(object): None """ if not isinstance(other, Program): - raise TypeError("copy_param_info_from should be invoked with " + raise TypeError("_copy_param_info_from should be invoked with " "Program") if len(self.blocks) != len(other.blocks): - raise ValueError("copy_param_info_from should be invoked with two " + raise ValueError("_copy_param_info_from should be invoked with two " "program, with represent the same topology") for var in other.global_block().vars.itervalues(): if var.is_data: diff --git a/python/paddle/fluid/initializer.py b/python/paddle/fluid/initializer.py index 373e9c060de1ee27c165ccd2380cd8c38612c4d9..0e640bf280d396504deec1183821da3e8a156530 100644 --- a/python/paddle/fluid/initializer.py +++ b/python/paddle/fluid/initializer.py @@ -148,7 +148,7 @@ class ConstantInitializer(Initializer): assert isinstance(var, framework.Variable) assert isinstance(block, framework.Block) # Initialization Ops should be prepended and not appended - op = block.prepend_op( + op = block._prepend_op( type="fill_constant", outputs={"Out": var}, attrs={ @@ -202,7 +202,7 @@ class UniformInitializer(Initializer): # Initialization Ops should be prepended and not appended if self._seed == 0: self._seed = block.program.random_seed - op = block.prepend_op( + op = block._prepend_op( type="uniform_random", outputs={"Out": var}, attrs={ @@ -256,7 +256,7 @@ class NormalInitializer(Initializer): # Initialization Ops should be prepended and not appended if self._seed == 0: self._seed = block.program.random_seed - op = block.prepend_op( + op = block._prepend_op( type="gaussian_random", outputs={"Out": var}, attrs={ @@ -346,7 +346,7 @@ class XavierInitializer(Initializer): if self._uniform: limit = np.sqrt(6.0 / float(fan_in + fan_out)) - op = block.prepend_op( + op = block._prepend_op( type="uniform_random", outputs={"Out": var}, attrs={ @@ -359,7 +359,7 @@ class XavierInitializer(Initializer): else: std = np.sqrt(2.0 / float(fan_in + fan_out)) - op = block.prepend_op( + op = block._prepend_op( type="gaussian_random", outputs={"Out": var}, attrs={ @@ -444,7 +444,7 @@ class MSRAInitializer(Initializer): if self._uniform: limit = np.sqrt(6.0 / float(fan_in)) - op = block.prepend_op( + op = block._prepend_op( type="uniform_random", outputs={"Out": var}, attrs={ @@ -457,7 +457,7 @@ class MSRAInitializer(Initializer): else: std = np.sqrt(2.0 / float(fan_in)) - op = block.prepend_op( + op = block._prepend_op( type="gaussian_random", outputs={"Out": var}, attrs={ diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 0eb1194e2754331dcbc8436f6680ab776a999c29..cf43998228a06d73f4d7d6dfc85dcd002078ba0f 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -523,7 +523,7 @@ def prepend_feed_ops(inference_program, for i, name in enumerate(feed_target_names): out = global_block.var(name) - global_block.prepend_op( + global_block._prepend_op( type='feed', inputs={'X': [feed_var]}, outputs={'Out': [out]}, @@ -625,7 +625,7 @@ def save_inference_model(dirname, for i, op in enumerate(global_block.ops): op.desc.set_is_target(False) if op.type == "feed" or op.type == "fetch": - global_block.remove_op(i) + global_block._remove_op(i) copy_program.desc.flush() pruned_program = copy_program.prune(targets=target_vars) @@ -874,7 +874,7 @@ def get_test_program(filelist, program=None, startup_program=None): main_block = program.global_block() for var in main_block.vars.values(): if var.type == core.VarDesc.VarType.READER: - main_block.rename_var( + main_block._rename_var( str(var.name), str(_get_test_reader_name(var.name))) for op in main_block.ops: @@ -883,7 +883,7 @@ def get_test_program(filelist, program=None, startup_program=None): if op.type == "create_multi_pass_reader": test_op.set_attr("pass_num", 1) - startup_program.sync_with_cpp() - program.sync_with_cpp() + startup_program._sync_with_cpp() + program._sync_with_cpp() return program diff --git a/python/paddle/fluid/layers/__init__.py b/python/paddle/fluid/layers/__init__.py index cd1492da24d5e9d09a9eaac0b1b9c7aaffac6250..4917e67de0d20ff9e8f9a27f38e1bd2abef5c503 100644 --- a/python/paddle/fluid/layers/__init__.py +++ b/python/paddle/fluid/layers/__init__.py @@ -33,7 +33,6 @@ from metric_op import * from learning_rate_scheduler import * __all__ = [] -__all__ += math_op_patch.__all__ __all__ += nn.__all__ __all__ += io.__all__ __all__ += tensor.__all__ diff --git a/python/paddle/fluid/layers/control_flow.py b/python/paddle/fluid/layers/control_flow.py index 849474dc58461ac3772f439da7bf5d57592daa8c..782aa933f2ee86274e800045c9356d8072915fc1 100644 --- a/python/paddle/fluid/layers/control_flow.py +++ b/python/paddle/fluid/layers/control_flow.py @@ -730,8 +730,10 @@ class While(object): parent_block.append_op( type='while', inputs={ - 'X': - [parent_block.var_recursive(x_name) for x_name in x_name_list], + 'X': [ + parent_block._var_recursive(x_name) + for x_name in x_name_list + ], 'Condition': [self.cond_var] }, outputs={'Out': out_vars, @@ -1259,7 +1261,7 @@ class ConditionalBlock(object): input_set = set([ipt.name for ipt in self.inputs]) param_list = [ - parent_block.var_recursive(each_name) for each_name in params + parent_block._var_recursive(each_name) for each_name in params if each_name not in input_set ] diff --git a/python/paddle/fluid/layers/math_op_patch.py b/python/paddle/fluid/layers/math_op_patch.py index 1754061c4ba6f5b97bced3548bc412dfb1b7932c..f814c41633fbac76eb9411e2f418f521e8e9679d 100644 --- a/python/paddle/fluid/layers/math_op_patch.py +++ b/python/paddle/fluid/layers/math_op_patch.py @@ -16,8 +16,6 @@ from ..framework import Variable, unique_name from layer_function_generator import OpProtoHolder from ..initializer import force_init_on_cpu -__all__ = ['monkey_patch_variable'] - def monkey_patch_variable(): def unique_tmp_name(): diff --git a/python/paddle/fluid/layers/metric_op.py b/python/paddle/fluid/layers/metric_op.py index 99e82fdd04282177fae63f1fb94b5e32d41c612e..194a16b123c441ac1318b8ce58158f67e2a8093d 100644 --- a/python/paddle/fluid/layers/metric_op.py +++ b/python/paddle/fluid/layers/metric_op.py @@ -76,7 +76,7 @@ def accuracy(input, label, k=1, correct=None, total=None): return acc_out -def auc(input, label, curve='ROC', num_thresholds=200): +def auc(input, label, curve='ROC', num_thresholds=200, topk=1): """ **Area Under the Curve (AUC) Layer** @@ -102,6 +102,7 @@ def auc(input, label, curve='ROC', num_thresholds=200): curve(str): Curve type, can be 'ROC' or 'PR'. Default 'ROC'. num_thresholds(int): The number of thresholds to use when discretizing the roc curve. Default 200. + topk(int): only topk number of prediction output will be used for auc. Returns: Variable: A scalar representing the current AUC. @@ -115,7 +116,7 @@ def auc(input, label, curve='ROC', num_thresholds=200): """ warnings.warn( - "This interface not recommended, fluid.layers.auc compute the auc at every minibatch, \ + "This interface is not recommended, fluid.layers.auc compute the auc at every minibatch, \ but can not aggregate them and get the pass AUC, because pass \ auc can not be averaged with weighted from the minibatch auc value. \ Please use fluid.metrics.Auc, it can compute the auc value via Python natively, \ @@ -125,14 +126,34 @@ def auc(input, label, curve='ROC', num_thresholds=200): topk_indices = helper.create_tmp_variable(dtype="int64") topk_out, topk_indices = nn.topk(input, k=k) auc_out = helper.create_tmp_variable(dtype="float32") + # make tp, tn, fp, fn persistable, so that can accumulate all batches. + tp = helper.create_global_variable(persistable=True) + tn = helper.create_global_variable(persistable=True) + fp = helper.create_global_variable(persistable=True) + fn = helper.create_global_variable(persistable=True) + for var in [tp, tn, fp, fn]: + helper.set_variable_initializer( + var, Constant( + value=0.0, force_cpu=True)) + helper.append_op( type="auc", inputs={ "Out": [topk_out], "Indices": [topk_indices], - "Label": [label] + "Label": [label], + "TP": [tp], + "TN": [tn], + "FP": [fp], + "FN": [fn] }, attrs={"curve": curve, "num_thresholds": num_thresholds}, - outputs={"AUC": [auc_out], }) + outputs={ + "AUC": [auc_out], + "TPOut": [tp], + "TNOut": [tn], + "FPOut": [fp], + "FNOut": [fn] + }) return auc_out diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index cc223899c73deb173701db0fba4123c8442bfd43..56124663929d1e33b7144ab57ae3b3c55e1652b3 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -4367,7 +4367,7 @@ def autoincreased_step_counter(counter_name=None, begin=1, step=1): helper.set_variable_initializer( counter, initializer=Constant( value=begin - 1, force_cpu=True)) - helper.main_program.global_block().prepend_op( + helper.main_program.global_block()._prepend_op( type='increment', inputs={'X': [counter]}, outputs={'Out': [counter]}, diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 94e78d155f1c9aa5b7abda0e83db528ad5e2aafb..b1762c2b46cef4f84216d88f66c68b6fbcb0b476 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -240,7 +240,7 @@ class Optimizer(object): self._finish_update(loss.block, parameters_and_grads) end = len(global_block.ops) - return global_block.slice_ops(start, end) + return global_block._slice_ops(start, end) def minimize(self, loss, diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 6baf648198585022f992709c519038688af293e1..10028a8c6e33edcea27650d925ca7378b770f143 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -152,7 +152,7 @@ class ParallelExecutor(object): self.executor = core.ParallelExecutor( self._places, set([ - p.name for p in main.global_block().iter_parameters() + p.name for p in main.global_block()._iter_parameters() if not p.stop_gradient ]), set(self.persistable_vars), main.desc, loss_name diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index f5c93319de02249a22981b50733da05bb8658e3a..fcf86cc5839113b75855ce97459b2ee4881238cd 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -35,7 +35,8 @@ class TestParallelExecutorBase(unittest.TestCase): feed_dict=None, seed=None, use_parallel_executor=True, - use_reduce=False): + use_reduce=False, + optimizer=fluid.optimizer.Adam): def run_executor(exe, feed, fetch_list, program=None): if isinstance(exe, fluid.ParallelExecutor): res = exe.run(fetch_list=fetch_list, feed=feed) @@ -57,8 +58,8 @@ class TestParallelExecutorBase(unittest.TestCase): main.random_seed = seed loss = method(use_feed=feed_dict is not None) - adam = fluid.optimizer.Adam() - adam.minimize(loss) + + optimizer().minimize(loss) if memory_opt: fluid.memory_optimize(main) diff --git a/python/paddle/fluid/tests/unittests/test_auc_op.py b/python/paddle/fluid/tests/unittests/test_auc_op.py index 948836039be48ad74d5556100f06231bb89f26d3..6bd5e2332a99693f5e53e147491aa83c35859548 100644 --- a/python/paddle/fluid/tests/unittests/test_auc_op.py +++ b/python/paddle/fluid/tests/unittests/test_auc_op.py @@ -24,7 +24,20 @@ class TestAucOp(OpTest): indices = np.random.randint(0, 2, (128, 2)) labels = np.random.randint(0, 2, (128, 1)) num_thresholds = 200 - self.inputs = {'Out': pred, 'Indices': indices, 'Label': labels} + tp = np.zeros((num_thresholds, )).astype("int64") + tn = np.zeros((num_thresholds, )).astype("int64") + fp = np.zeros((num_thresholds, )).astype("int64") + fn = np.zeros((num_thresholds, )).astype("int64") + + self.inputs = { + 'Out': pred, + 'Indices': indices, + 'Label': labels, + 'TP': tp, + 'TN': tn, + 'FP': fp, + 'FN': fn + } self.attrs = {'curve': 'ROC', 'num_thresholds': num_thresholds} # NOTE: sklearn use a different way to generate thresholds # which will cause the result differs slightly: @@ -71,7 +84,13 @@ class TestAucOp(OpTest): y = (tpr[:num_thresholds - 1] + tpr[1:]) / 2.0 auc_value = np.sum(x * y) - self.outputs = {'AUC': auc_value} + self.outputs = { + 'AUC': auc_value, + 'TPOut': tp_list, + 'FNOut': fn_list, + 'TNOut': tn_list, + 'FPOut': fp_list + } def test_check_output(self): self.check_output() diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index 75b4b4e50da04521021dcb1e97cfe495f2619433..9dbef0693bb129186dfc50f6efdd0896deedda81 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -27,7 +27,6 @@ class TranspilerTest(unittest.TestCase): self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175" self.pserver1_ep = "127.0.0.1:6174" self.pserver2_ep = "127.0.0.1:6175" - self.slice_var_up = True self.sync_mode = True self.transpiler = None @@ -52,27 +51,26 @@ class TranspilerTest(unittest.TestCase): self.origin_prog = main.clone() return main - def get_trainer(self): - t = self._transpiler_instance() + def get_trainer(self, config=None): + t = self._transpiler_instance(config) return t.get_trainer_program() - def get_pserver(self, ep): - t = self._transpiler_instance() + def get_pserver(self, ep, config=None): + t = self._transpiler_instance(config) pserver = t.get_pserver_program(ep) startup = t.get_startup_program(ep, pserver) return pserver, startup - def _transpiler_instance(self): + def _transpiler_instance(self, config=None): if not self.transpiler: main = self.get_main_program() - self.transpiler = fluid.DistributeTranspiler() + self.transpiler = fluid.DistributeTranspiler(config=config) self.transpiler.transpile( self.trainer_id, program=main, pservers=self.pserver_eps, - trainers=self.trainers, - slice_var_up=self.slice_var_up, - sync_mode=self.sync_mode) + trainers=self.trainers) + return self.transpiler @@ -124,14 +122,67 @@ class TestBasicModel(TranspilerTest): self.assertEqual(set(pserver_params), set(trainer_params)) +class TestBasicModelWithLargeBlockSize(TranspilerTest): + def test_transpiler(self): + config = fluid.DistributeTranspilerConfig() + config.min_block_size = 1048576 + + pserver, startup = self.get_pserver(self.pserver1_ep, config) + pserver2, startup2 = self.get_pserver(self.pserver2_ep, config) + + trainer = self.get_trainer(config) + + self.assertEqual([op.type for op in trainer.global_block().ops], [ + 'mul', 'elementwise_add', 'elementwise_sub', 'square', 'mean', + 'fill_constant', 'mean_grad', 'square_grad', 'elementwise_sub_grad', + 'elementwise_add_grad', 'send', 'mul_grad', 'send', 'send_barrier', + 'recv', 'recv', 'fetch_barrier' + ]) + + self.assertEqual(len(pserver.blocks), 2) + # block0: listen_and_serv + self.assertEqual([op.type for op in pserver.blocks[0].ops], + ["listen_and_serv"]) + # block1~2: optimize pass + self.assertEqual([op.type for op in pserver.blocks[1].ops], + ["sum", "scale", "sgd"]) + # confirm startup program + self.assertEqual([op.type for op in startup.global_block().ops], + ["fill_constant", "fill_constant", "fill_constant"]) + # the variable #fc_w will be split into two blocks + fc_w_var = startup2.global_block().var("fc_w") + self.assertEqual(fc_w_var.shape, (1000L, 1000L)) + # all parameters should be optimized on pserver + + pserver_params = [] + for prog in [pserver, pserver2]: + for blk in prog.blocks: + for op in blk.ops: + if "Param" in op.input_names: + param_name = op.input("Param")[0] + is_block_idx = param_name.find(".block") + if is_block_idx != -1: + origin_param_name = param_name[:is_block_idx] + else: + origin_param_name = param_name + pserver_params.append(origin_param_name) + trainer_params = [] + for op in self.origin_prog.global_block().ops: + if "Param" in op.input_names: + trainer_params.append(op.input("Param")[0]) + self.assertEqual(set(pserver_params), set(trainer_params)) + + class TestNoSliceVar(TranspilerTest): def setUp(self): super(TestNoSliceVar, self).setUp() - self.slice_var_up = False def test_transpiler(self): - _, startup = self.get_pserver(self.pserver1_ep) - _, startup2 = self.get_pserver(self.pserver2_ep) + config = fluid.DistributeTranspilerConfig() + config.slice_var_up = False + + _, startup = self.get_pserver(self.pserver1_ep, config) + _, startup2 = self.get_pserver(self.pserver2_ep, config) if startup.global_block().vars.has_key("fc_w"): fc_w_var = startup.global_block().vars["fc_w"] @@ -253,10 +304,50 @@ class TestL2Decay(TranspilerTest): # TODO(typhoonzero): test clipping and L2Decay ops are removed from trainer - # FIXME(typhoonzero): need to add test for async case: - # see https://github.com/PaddlePaddle/Paddle/issues/11691 -class TestAsyncSGD(TranspilerTest): - pass +class TestL2DecayWithPiecewise(TranspilerTest): + def net_conf(self): + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') + y_predict = fluid.layers.fc(input=x, + size=1000, + act=None, + param_attr=fluid.ParamAttr(name='fc_w'), + bias_attr=fluid.ParamAttr(name='fc_b')) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + base_lr = 1.0 + bd = [1, 10, 20, 30] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + sgd_optimizer = fluid.optimizer.Momentum( + learning_rate=fluid.layers.piecewise_decay( + boundaries=bd, values=lr), + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) + sgd_optimizer.minimize(avg_cost) + return + + def test_transpiler(self): + pserver, startup = self.get_pserver(self.pserver1_ep) + trainer = self.get_trainer() + + self.assertEqual(len(pserver.blocks), 9) + self.assertEqual([op.type for op in pserver.blocks[1].ops], [ + "increment", "cast", "fill_constant", "fill_constant", "less_than", + "logical_not", "conditional_block", "fill_constant", + "fill_constant", "less_than", "logical_not", "logical_and", + "logical_and", "conditional_block", "fill_constant", + "fill_constant", "less_than", "logical_not", "logical_and", + "logical_and", "conditional_block", "fill_constant", + "fill_constant", "less_than", "logical_not", "logical_and", + "logical_and", "conditional_block", "fill_constant", + "conditional_block" + ]) + self.assertEqual( + [op.type for op in pserver.blocks[7].ops], + ["sum", "scale", "scale", "elementwise_add", "momentum"]) + self.assertEqual( + [op.type for op in pserver.blocks[8].ops], + ["sum", "scale", "scale", "elementwise_add", "momentum"]) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index 57ae36dbdd401afd34d06f460ae613db18240a2e..4d39505b66abf44249e0ea160b82aaf7be0638cb 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -13,8 +13,12 @@ # limitations under the License. import paddle.fluid as fluid +import paddle.fluid.layers.ops as ops +from paddle.fluid.initializer import init_on_cpu +from paddle.fluid.layers.learning_rate_scheduler import _decay_step_counter from parallel_executor_test_base import TestParallelExecutorBase import unittest +import math import os @@ -131,27 +135,71 @@ def SE_ResNeXt50Small(batch_size=2, use_feed=False): class TestResnet(TestParallelExecutorBase): - def check_resnet_convergence(self, use_cuda, use_reduce=False, iter=20): + def check_resnet_convergence_with_learning_rate_decay(self, + use_cuda=True, + use_reduce=False, + iter=20): + os.environ['CPU_NUM'] = str(4) + def _cosine_decay(learning_rate, step_each_epoch, epochs=120): + """ + Applies cosine decay to the learning rate. + lr = 0.05 * (math.cos(epoch * (math.pi / 120)) + 1) + """ + global_step = _decay_step_counter() + + with init_on_cpu(): + epoch = ops.floor(global_step / step_each_epoch) + decayed_lr = learning_rate * \ + (ops.cos(epoch * (math.pi / epochs)) + 1)/2 + return decayed_lr + + def _optimizer(learning_rate=0.01): + optimizer = fluid.optimizer.Momentum( + learning_rate=_cosine_decay( + learning_rate=learning_rate, step_each_epoch=2, epochs=1), + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) + return optimizer + import functools + batch_size = 2 - self.check_network_convergence( + + single_first_loss, single_last_loss = self.check_network_convergence( functools.partial( SE_ResNeXt50Small, batch_size=batch_size), iter=iter, batch_size=batch_size, use_cuda=use_cuda, - use_reduce=use_reduce) - - def test_resnet(self): - self.check_resnet_convergence(True) - self.check_resnet_convergence(False, iter=5) + use_reduce=use_reduce, + optimizer=_optimizer, + use_parallel_executor=False) - def test_resnet_with_new_strategy(self): - # use_cuda, use_reduce - self.check_resnet_convergence(True, True) - self.check_resnet_convergence(False, True, iter=5) + parallel_first_loss, parallel_last_loss = self.check_network_convergence( + functools.partial( + SE_ResNeXt50Small, batch_size=batch_size), + iter=iter, + batch_size=batch_size, + use_cuda=use_cuda, + use_reduce=use_reduce, + optimizer=_optimizer) + + for p_f in parallel_first_loss: + self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6) + for p_l in parallel_last_loss: + self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6) + + def test_seresnext_with_learning_rate_decay(self): + self.check_resnet_convergence_with_learning_rate_decay(True, False) + self.check_resnet_convergence_with_learning_rate_decay( + False, False, iter=5) + + def test_seresnext_with_new_strategy_with_learning_rate_decay(self): + self.check_resnet_convergence_with_learning_rate_decay(True, True) + self.check_resnet_convergence_with_learning_rate_decay( + False, True, iter=5) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py index 3f9059fb5b31cd009c068ccddc9a8938adae5772..f75a79bfa42405747df9e6f4f4ab743014e303b9 100644 --- a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py +++ b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py @@ -181,13 +181,13 @@ class TestBlockDesc(unittest.TestCase): self.assertIsNotNone(block) op1 = block.append_op() op2 = block.append_op() - op0 = block.prepend_op() + op0 = block._prepend_op() all_ops = [] for idx in xrange(0, block.op_size()): all_ops.append(block.op(idx)) self.assertEqual(all_ops, [op0, op1, op2]) - def test_remove_op(self): + def test__remove_op(self): program = Program() program_desc = program.desc self.assertIsNotNone(program_desc) @@ -201,8 +201,8 @@ class TestBlockDesc(unittest.TestCase): op1.set_type("test") op2.set_type("test") - block.remove_op(1, 2) - program.sync_with_cpp() + block._remove_op(1, 2) + program._sync_with_cpp() all_ops = [] for idx in xrange(0, block.op_size()): diff --git a/python/paddle/fluid/transpiler/__init__.py b/python/paddle/fluid/transpiler/__init__.py index cf18090f71f34be5105498f5846dbcdf15ab2e3f..eae13b50398f791d4a203b72a0e96f3e87cc2a88 100644 --- a/python/paddle/fluid/transpiler/__init__.py +++ b/python/paddle/fluid/transpiler/__init__.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from distribute_transpiler import DistributeTranspiler +from distribute_transpiler import DistributeTranspiler, DistributeTranspilerConfig from inference_transpiler import InferenceTranspiler from memory_optimization_transpiler import memory_optimize, release_memory from ps_dispatcher import HashName, RoundRobin __all__ = [ "DistributeTranspiler", "InferenceTranspiler", "memory_optimize", - "release_memory", "HashName", "RoundRobin" + "release_memory", "HashName", "RoundRobin", "DistributeTranspilerConfig" ] diff --git a/python/paddle/fluid/transpiler/details/program_utils.py b/python/paddle/fluid/transpiler/details/program_utils.py index f10b496306a002ee131d01798a0698b807d379ca..2ca1d4716b103d17117ae3ee958667c3a9747cdf 100644 --- a/python/paddle/fluid/transpiler/details/program_utils.py +++ b/python/paddle/fluid/transpiler/details/program_utils.py @@ -17,10 +17,10 @@ def delete_ops(block, ops): try: start = list(block.ops).index(ops[0]) end = list(block.ops).index(ops[-1]) - [block.remove_op(start) for _ in xrange(end - start + 1)] + [block._remove_op(start) for _ in xrange(end - start + 1)] except Exception, e: raise e - block.program.sync_with_cpp() + block.program._sync_with_cpp() def find_op_by_input_arg(block, arg_name): diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 121c36e477327d4d0e7b1cba1713e68ce4d06e03..c2044bf03135dd9c5256021d87866cfbbc598dad 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -31,6 +31,7 @@ Steps to transpile pserver: from __future__ import print_function import math +import random import numpy as np from ps_dispatcher import RoundRobin, HashName, PSDispatcher @@ -63,7 +64,7 @@ def same_or_split_var(p_name, var_name): return p_name == var_name or p_name.startswith(var_name + ".block") -def slice_variable(var_list, slice_count, min_block_size=8192): +def slice_variable(var_list, slice_count, min_block_size): """ We may need to split dense tensor to one or more blocks and put them equally onto parameter server. One block is a sub-tensor @@ -109,6 +110,22 @@ def slice_variable(var_list, slice_count, min_block_size=8192): return blocks +class DistributeTranspilerConfig(object): + """ + slice_var_up (bool): Do Tensor slice for pservers, default is True. + split_method (PSDispatcher): RoundRobin or HashName can be used + try to choose the best method to balance loads for pservers. + min_block_size (int): Minimum splitted element number in block. + According:https://github.com/PaddlePaddle/Paddle/issues/8638#issuecomment-369912156 + We can use bandwidth effiently when data size is larger than 2MB.If you + want to change it, please be sure you see the slice_variable function. + """ + + slice_var_up = True + split_method = None + min_block_size = 8192 + + class DistributeTranspiler(object): """ **DistributeTranspiler** @@ -145,13 +162,23 @@ class DistributeTranspiler(object): trainer_program = t.get_trainer_program() """ + def __init__(self, config=None): + if config is not None: + self.config = config + else: + self.config = DistributeTranspilerConfig() + + if self.config.split_method is None: + self.config.split_method = RoundRobin + + assert (self.config.min_block_size >= 8192) + assert (self.config.split_method.__bases__[0] == PSDispatcher) + def transpile(self, trainer_id, program=None, pservers="127.0.0.1:6174", trainers=1, - slice_var_up=True, - split_method=RoundRobin, sync_mode=True): """ Run the transpiler. @@ -164,12 +191,8 @@ class DistributeTranspiler(object): pservers (str): comma separated ip:port string for the pserver list. trainers (int): number of trainers in the distributed job. - slice_var_up (bool): Do Tensor slice for pservers, default is True. - split_method (PSDispatcher): RoundRobin or HashName can be used - try to choose the best method to balance loads for pservers. sync_mode (bool): Do sync training or not, default is True. """ - assert (split_method.__bases__[0] == PSDispatcher) if program is None: program = default_main_program() self.origin_program = program @@ -180,11 +203,11 @@ class DistributeTranspiler(object): self.pserver_endpoints = pserver_endpoints self.optimize_ops, self.params_grads = self._get_optimize_pass() - ps_dispatcher = split_method(self.pserver_endpoints) + ps_dispatcher = self.config.split_method(self.pserver_endpoints) self.has_distributed_lookup_table = self._has_distributed_lookup_table() # split and create vars, then put splited vars in dicts for later use. - self._init_splited_vars(slice_var_up) + self._init_splited_vars() # step 3.1: insert send op to send gradient vars to parameter servers ps_dispatcher.reset() @@ -196,13 +219,14 @@ class DistributeTranspiler(object): # fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2 # shuffle the map will avoid the uneven distribution above grad_var_mapping_items = self.grad_var_mapping.items() - if not slice_var_up: - np.random.shuffle(grad_var_mapping_items) + if not self.config.slice_var_up: + random.seed(self.trainer_num) + random.shuffle(grad_var_mapping_items) for orig_varname, splited_vars in grad_var_mapping_items: eplist = ps_dispatcher.dispatch(splited_vars) - if not slice_var_up: + if not self.config.slice_var_up: assert (len(splited_vars) == 1) if len(splited_vars) == 1: @@ -219,7 +243,7 @@ class DistributeTranspiler(object): AssertionError("Can not insert the send op by original " "variable name :", orig_varname) - program.global_block().insert_op( + program.global_block()._insert_op( index=index + 1, type="send", inputs={"X": splited_vars}, @@ -405,7 +429,7 @@ class DistributeTranspiler(object): # clone vars for var in origin_block.vars: - new_sub_block.clone_variable(var) + new_sub_block._clone_variable(var) # clone ops for origin_op in origin_block.ops: @@ -437,6 +461,8 @@ class DistributeTranspiler(object): per_opt_block = pserver_program.create_block(pre_block_idx) optimize_blocks.append(per_opt_block) # append grad merging ops before clip and weight decay + # cases may like: + # L2Decay op -> clip op -> optimize for _, op in enumerate(self.optimize_ops): # find the origin @GRAD var before clipping grad_varname_for_block = __op_have_grad_input__(op) @@ -444,6 +470,7 @@ class DistributeTranspiler(object): merged_var = self._append_pserver_grad_merge_ops( per_opt_block, grad_varname_for_block, endpoint, grad_to_block_id, self.origin_program) + break # append optimize op once then append other ops. for _, op in enumerate(self.optimize_ops): # optimizer is connected to itself if ufind.is_connected(op, opt_op) and op not in global_ops: @@ -498,7 +525,7 @@ class DistributeTranspiler(object): outputs={}, attrs=attrs) - pserver_program.sync_with_cpp() + pserver_program._sync_with_cpp() return pserver_program def get_startup_program(self, endpoint, pserver_program): @@ -530,7 +557,7 @@ class DistributeTranspiler(object): pserver_vars = pserver_program.global_block().vars created_var_map = dict() for _, var in pserver_vars.iteritems(): - tmpvar = s_prog.global_block().clone_variable(var) + tmpvar = s_prog.global_block()._clone_variable(var) created_var_map[var.name] = tmpvar # 2. rename op outputs @@ -625,7 +652,7 @@ class DistributeTranspiler(object): ] return param_list, grad_list - def _init_splited_vars(self, slice_var_up): + def _init_splited_vars(self): # update these mappings for further transpile: # 1. param_var_mapping: param var name -> [splited params vars] # 2. grad_var_mapping: grad var name -> [splited grads vars] @@ -649,17 +676,22 @@ class DistributeTranspiler(object): param_list, grad_list = self._update_dist_lookup_table_vars( param_list, grad_list, self.params_grads) - if slice_var_up: + if self.config.slice_var_up: # when we slice var up into blocks, we will slice the var according to # pserver services' count. A pserver may have two or more listening ports. - grad_blocks = slice_variable(grad_list, len(self.pserver_endpoints)) + grad_blocks = slice_variable(grad_list, + len(self.pserver_endpoints), + self.config.min_block_size) param_blocks = slice_variable(param_list, - len(self.pserver_endpoints)) + len(self.pserver_endpoints), + self.config.min_block_size) else: # when we do NOT slice var up into blocks, we will always slice params # grads into one block. - grad_blocks = slice_variable(grad_list, 1) - param_blocks = slice_variable(param_list, 1) + grad_blocks = slice_variable(grad_list, 1, + self.config.min_block_size) + param_blocks = slice_variable(param_list, 1, + self.config.min_block_size) assert (len(grad_blocks) == len(param_blocks)) # origin_varname -> [splited_var] @@ -728,7 +760,7 @@ class DistributeTranspiler(object): self.all_prefetch_output_vars.append(prefetch_output_vars) # insert split_ids_op - program.global_block().insert_op( + program.global_block()._insert_op( index=lookup_table_op_index, type="split_ids", inputs={ @@ -740,7 +772,7 @@ class DistributeTranspiler(object): outputs={"Out": prefetch_input_vars}) # insert prefetch_op - program.global_block().insert_op( + program.global_block()._insert_op( index=lookup_table_op_index + 1, type="prefetch", inputs={'X': prefetch_input_vars}, @@ -751,7 +783,7 @@ class DistributeTranspiler(object): }) # insert concat_op - program.global_block().insert_op( + program.global_block()._insert_op( index=lookup_table_op_index + 2, type="merge_ids", inputs={ @@ -782,14 +814,14 @@ class DistributeTranspiler(object): if table_grad_name in op.output_arg_names: op_index = list(all_ops).index(op) # insert split_ids_op - program.global_block().insert_op( + program.global_block()._insert_op( index=op_index + 1, type="split_ids", inputs={ 'Ids': [program.global_block().vars[table_grad_name]] }, outputs={"Out": self.trainer_side_table_grad_list}) - program.global_block().insert_op( + program.global_block()._insert_op( index=op_index + 2, type="send", inputs={'X': self.trainer_side_table_grad_list}, @@ -848,7 +880,7 @@ class DistributeTranspiler(object): persistable=True) # parameter must be selected rows param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) - grad_var = pserver_program.global_block().clone_variable( + grad_var = pserver_program.global_block()._clone_variable( self.origin_program.global_block().vars[grad_var_name( self.table_name)]) @@ -888,7 +920,7 @@ class DistributeTranspiler(object): if not splited_grad_name.startswith(origin_grad_name): raise ValueError("origin_grad_var: " + splited_grad_name + " grad_var:" + grad_var.name) - grad_var = pserver_program.global_block().rename_var( + grad_var = pserver_program.global_block()._rename_var( origin_grad_name, splited_grad_name) lr_var = pserver_program.global_block().vars[table_opt_op.input( @@ -964,7 +996,7 @@ class DistributeTranspiler(object): if self.sync_mode and add_trainer_suffix: new_var_name = "%s.trainer_%d" % \ (orig_var.name, self.trainer_id) - program.global_block().rename_var(varname, new_var_name) + program.global_block()._rename_var(varname, new_var_name) var_mapping[varname] = \ [program.global_block().var(new_var_name)] else: @@ -998,7 +1030,7 @@ class DistributeTranspiler(object): type=orig_var.type, shape=splited_shape) # flattend splited var var_mapping[varname].append(var) - program.global_block().sync_with_cpp() + program.global_block()._sync_with_cpp() return var_mapping def create_splited_vars(self, source_var, block, tag): @@ -1026,7 +1058,7 @@ class DistributeTranspiler(object): height_sections = [] for v in splited_vars: height_sections.append(v.shape[0]) - program.global_block().insert_op( + program.global_block()._insert_op( index=index + 1, type="split_selected_rows", inputs={"X": orig_var}, @@ -1036,7 +1068,7 @@ class DistributeTranspiler(object): sections = [] for v in splited_vars: sections.append(v.shape[0]) - program.global_block().insert_op( + program.global_block()._insert_op( index=index + 1, type="split_byref", inputs={"X": orig_var}, @@ -1225,7 +1257,7 @@ class DistributeTranspiler(object): varlist = [varlist] for var in varlist: if var not in program.global_block().vars: - block.clone_variable(var) + block._clone_variable(var) outputs = self._get_output_map_from_op( self.origin_program.global_block().vars, op) @@ -1234,7 +1266,7 @@ class DistributeTranspiler(object): varlist = [varlist] for var in varlist: if var not in program.global_block().vars: - block.clone_variable(var) + block._clone_variable(var) return block.append_op( type=op.type, inputs=inputs, outputs=outputs, attrs=op.attrs) @@ -1272,7 +1304,7 @@ class DistributeTranspiler(object): if grad_block: outputs[key] = grad_block elif not program.global_block().vars.has_key(var.name): - program.global_block().clone_variable(var) + program.global_block()._clone_variable(var) return optimize_block.append_op( type=opt_op.type, diff --git a/python/paddle/fluid/transpiler/inference_transpiler.py b/python/paddle/fluid/transpiler/inference_transpiler.py index b8afeae5ebd6ef7948a7c0c2775f419af461da04..f1905f08787da7a58a41d840ea68fb6c07f4028f 100644 --- a/python/paddle/fluid/transpiler/inference_transpiler.py +++ b/python/paddle/fluid/transpiler/inference_transpiler.py @@ -95,7 +95,7 @@ class InferenceTranspiler(object): # modify bnorm OP to include relu current_op.set_attr("fuse_with_relu", True) # remove relu OP - self.block.remove_op(i + 1) + self.block._remove_op(i + 1) i = i + 1 self._remove_unused_var() @@ -171,7 +171,7 @@ class InferenceTranspiler(object): # fuse batch_norm self._fuse_param(current_op, next_op, bias_op, 0) # remove batch_norm_op - self.block.remove_op(i + 2) + self.block._remove_op(i + 2) i = i + 1 # conv2d with bias, the next_op.type is elementwise_add elif (next_op.type == 'elementwise_add'): @@ -180,7 +180,7 @@ class InferenceTranspiler(object): # fuse batch_norm self._fuse_param(current_op, next_next_op, next_op, 1) # remove batch_norm_op - self.block.remove_op(i + 2) + self.block._remove_op(i + 2) i = i + 1 i = i + 1 @@ -212,7 +212,7 @@ class InferenceTranspiler(object): y_var = self.block.var(bn_op.input("Bias")[0]) out_var = self.block.var(bn_op.output("Y")[0]) - bias_op = self.block.insert_op( + bias_op = self.block._insert_op( index, type="elementwise_add", inputs={"X": x_var, @@ -307,4 +307,4 @@ class InferenceTranspiler(object): for var in self.block.vars.keys(): if var not in args: - self.block.remove_var(var) + self.block._remove_var(var) diff --git a/python/paddle/fluid/transpiler/memory_optimization_transpiler.py b/python/paddle/fluid/transpiler/memory_optimization_transpiler.py index 999ef43ca0feacbddff5f9db59589ce7097fe77e..dd90d66110e6233806b04bb726636a915f2ad84a 100644 --- a/python/paddle/fluid/transpiler/memory_optimization_transpiler.py +++ b/python/paddle/fluid/transpiler/memory_optimization_transpiler.py @@ -177,7 +177,7 @@ class ControlFlowGraph(object): in_diff) if can_optimize: index = i + fwd_id + 1 if is_forward else i - self._forward_num + bwd_id + 1 - delete_op = block_desc.insert_op(index) + delete_op = block_desc._insert_op(index) delete_op.set_type("delete_var") delete_op.set_input("X", can_optimize) if is_forward: diff --git a/python/paddle/v2/image.py b/python/paddle/v2/image.py index 9235c41e9eb95b25a0dc53a494a203e7a4525981..08d8bd68f9b7eb703c15f7cb5ad1300969db5713 100644 --- a/python/paddle/v2/image.py +++ b/python/paddle/v2/image.py @@ -182,7 +182,7 @@ def resize_short(im, size): h_new = size * h / w else: w_new = size * w / h - im = cv2.resize(im, (h_new, w_new), interpolation=cv2.INTER_CUBIC) + im = cv2.resize(im, (w_new, h_new), interpolation=cv2.INTER_CUBIC) return im @@ -324,7 +324,6 @@ def simple_transform(im, if np.random.randint(2) == 0: im = left_right_flip(im, is_color) else: - im = center_crop(im, crop_size, is_color) im = center_crop(im, crop_size, is_color=is_color) if len(im.shape) == 3: im = to_chw(im)