提交 8b154c17 编写于 作者: M minqiyang

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into fix_dist_resnet_ut_in_py36

test=develop
...@@ -22,6 +22,27 @@ ENV HOME /root ...@@ -22,6 +22,27 @@ ENV HOME /root
# Add bash enhancements # Add bash enhancements
COPY ./paddle/scripts/docker/root/ /root/ COPY ./paddle/scripts/docker/root/ /root/
# Prepare packages for Python
RUN apt-get update && \
apt-get install -y make build-essential libssl-dev zlib1g-dev libbz2-dev \
libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \
xz-utils tk-dev libffi-dev liblzma-dev
# Install Python3.6
RUN mkdir -p /root/python_build/ && wget -q https://www.sqlite.org/2018/sqlite-autoconf-3250300.tar.gz && \
tar -zxf sqlite-autoconf-3250300.tar.gz && cd sqlite-autoconf-3250300 && \
./configure -prefix=/usr/local && make -j8 && make install && cd ../ && rm sqlite-autoconf-3250300.tar.gz && \
wget -q https://www.python.org/ftp/python/3.6.0/Python-3.6.0.tgz && \
tar -xzf Python-3.6.0.tgz && cd Python-3.6.0 && \
CFLAGS="-Wformat" ./configure --prefix=/usr/local/ --enable-shared > /dev/null && \
make -j8 > /dev/null && make altinstall > /dev/null
# Install Python3.7
RUN wget -q https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz && \
tar -xzf Python-3.7.0.tgz && cd Python-3.7.0 && \
CFLAGS="-Wformat" ./configure --prefix=/usr/local/ --enable-shared > /dev/null && \
make -j8 > /dev/null && make altinstall > /dev/null
RUN apt-get update && \ RUN apt-get update && \
apt-get install -y --allow-downgrades patchelf \ apt-get install -y --allow-downgrades patchelf \
python3 python3-dev python3-pip \ python3 python3-dev python3-pip \
...@@ -74,6 +95,12 @@ RUN localedef -i en_US -f UTF-8 en_US.UTF-8 ...@@ -74,6 +95,12 @@ RUN localedef -i en_US -f UTF-8 en_US.UTF-8
RUN pip3 install -U wheel && \ RUN pip3 install -U wheel && \
pip3 install -U docopt PyYAML sphinx==1.5.6 && \ pip3 install -U docopt PyYAML sphinx==1.5.6 && \
pip3 install sphinx-rtd-theme==0.1.9 recommonmark && \ pip3 install sphinx-rtd-theme==0.1.9 recommonmark && \
pip3.6 install -U wheel && \
pip3.6 install -U docopt PyYAML sphinx==1.5.6 && \
pip3.6 install sphinx-rtd-theme==0.1.9 recommonmark && \
pip3.7 install -U wheel && \
pip3.7 install -U docopt PyYAML sphinx==1.5.6 && \
pip3.7 install sphinx-rtd-theme==0.1.9 recommonmark && \
easy_install -U pip && \ easy_install -U pip && \
pip install -U pip setuptools wheel && \ pip install -U pip setuptools wheel && \
pip install -U docopt PyYAML sphinx==1.5.6 && \ pip install -U docopt PyYAML sphinx==1.5.6 && \
...@@ -82,22 +109,34 @@ RUN pip3 install -U wheel && \ ...@@ -82,22 +109,34 @@ RUN pip3 install -U wheel && \
RUN pip3 install 'pre-commit==1.10.4' 'ipython==5.3.0' && \ RUN pip3 install 'pre-commit==1.10.4' 'ipython==5.3.0' && \
pip3 install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \ pip3 install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \
pip3 install opencv-python && \ pip3 install opencv-python && \
pip3.6 install 'pre-commit==1.10.4' 'ipython==5.3.0' && \
pip3.6 install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \
pip3.6 install opencv-python && \
pip3.7 install 'pre-commit==1.10.4' 'ipython==5.3.0' && \
pip3.7 install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \
pip3.7 install opencv-python && \
pip install 'pre-commit==1.10.4' 'ipython==5.3.0' && \ pip install 'pre-commit==1.10.4' 'ipython==5.3.0' && \
pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \ pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \
pip install opencv-python pip install opencv-python
#For docstring checker #For docstring checker
RUN pip3 install pylint pytest astroid isort RUN pip3 install pylint pytest astroid isort
RUN pip3.6 install pylint pytest astroid isort
RUN pip3.7 install pylint pytest astroid isort
RUN pip install pylint pytest astroid isort LinkChecker RUN pip install pylint pytest astroid isort LinkChecker
COPY ./python/requirements.txt /root/ COPY ./python/requirements.txt /root/
RUN pip3 install -r /root/requirements.txt RUN pip3 install -r /root/requirements.txt
RUN pip3.6 install -r /root/requirements.txt
RUN pip3.7 install -r /root/requirements.txt
RUN pip install -r /root/requirements.txt RUN pip install -r /root/requirements.txt
# To fix https://github.com/PaddlePaddle/Paddle/issues/1954, we use # To fix https://github.com/PaddlePaddle/Paddle/issues/1954, we use
# the solution in https://urllib3.readthedocs.io/en/latest/user-guide.html#ssl-py2 # the solution in https://urllib3.readthedocs.io/en/latest/user-guide.html#ssl-py2
RUN apt-get install -y libssl-dev libffi-dev RUN apt-get install -y libssl-dev libffi-dev
RUN pip3 install certifi urllib3[secure] RUN pip3 install certifi urllib3[secure]
RUN pip3.6 install certifi urllib3[secure]
RUN pip3.7 install certifi urllib3[secure]
RUN pip install certifi urllib3[secure] RUN pip install certifi urllib3[secure]
......
...@@ -116,8 +116,14 @@ cc_test(op_proto_maker_test SRCS op_proto_maker_test.cc DEPS op_proto_maker) ...@@ -116,8 +116,14 @@ cc_test(op_proto_maker_test SRCS op_proto_maker_test.cc DEPS op_proto_maker)
cc_library(op_info SRCS op_info.cc DEPS attribute framework_proto) cc_library(op_info SRCS op_info.cc DEPS attribute framework_proto)
cc_library(shape_inference SRCS shape_inference.cc DEPS ddim attribute device_context) cc_library(shape_inference SRCS shape_inference.cc DEPS ddim attribute device_context)
if (NOT WIN32)
cc_library(transfer_scope_cache SRCS transfer_scope_cache.cc DEPS scope framework_proto)
cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog
shape_inference data_transform lod_tensor profiler) shape_inference data_transform lod_tensor profiler transfer_scope_cache)
else()
cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog
shape_inference data_transform lod_tensor)
endif(NOT WIN32)
cc_test(operator_test SRCS operator_test.cc DEPS operator op_registry device_context) cc_test(operator_test SRCS operator_test.cc DEPS operator op_registry device_context)
......
...@@ -20,6 +20,7 @@ limitations under the License. */ ...@@ -20,6 +20,7 @@ limitations under the License. */
#include "paddle/fluid/framework/ngraph_operator.h" #include "paddle/fluid/framework/ngraph_operator.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/framework/transfer_scope_cache.h"
#include "paddle/fluid/operators/detail/macros.h" #include "paddle/fluid/operators/detail/macros.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -391,8 +392,8 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, ...@@ -391,8 +392,8 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
int64_t max_memory_size = GetEagerDeletionThreshold(); int64_t max_memory_size = GetEagerDeletionThreshold();
std::unique_ptr<GarbageCollector<Tensor>> gc; std::unique_ptr<GarbageCollector<Tensor>> gc;
// WhileOp would set keep_kids to false // WhileOp would set keep_kids to true,
// WhileGradOp would need the scopes created in WhileOp // because WhileGradOp needs the scopes created in WhileOp.
// Perhaps, we should not perform eager deletion in WhileOp // Perhaps, we should not perform eager deletion in WhileOp
// The scopes and variables created by WhileOp would be deleted // The scopes and variables created by WhileOp would be deleted
// in WhileGradOp. // in WhileGradOp.
......
...@@ -83,6 +83,7 @@ void NaiveExecutor::Run() { ...@@ -83,6 +83,7 @@ void NaiveExecutor::Run() {
for (auto &op : ops_) { for (auto &op : ops_) {
VLOG(3) << std::this_thread::get_id() << " run " << op->Type() VLOG(3) << std::this_thread::get_id() << " run " << op->Type()
<< " on scope " << scope_; << " on scope " << scope_;
op->SetIsCalledByExecutor(false);
op->Run(*scope_, place_); op->Run(*scope_, place_);
} }
} }
......
...@@ -22,6 +22,7 @@ limitations under the License. */ ...@@ -22,6 +22,7 @@ limitations under the License. */
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/shape_inference.h" #include "paddle/fluid/framework/shape_inference.h"
#include "paddle/fluid/framework/transfer_scope_cache.h"
#include "paddle/fluid/framework/var_type.h" #include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -33,11 +34,6 @@ DEFINE_bool(check_nan_inf, false, ...@@ -33,11 +34,6 @@ DEFINE_bool(check_nan_inf, false,
namespace paddle { namespace paddle {
namespace framework { namespace framework {
// Combine two hash values to a single hash.
inline size_t CombineHash(size_t seed, size_t a) {
return (seed ^ a) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
std::vector<std::tuple<platform::Place, LibraryType>> kKernelPriority = { std::vector<std::tuple<platform::Place, LibraryType>> kKernelPriority = {
std::make_tuple(platform::CUDAPlace(0), LibraryType::kCUDNN), std::make_tuple(platform::CUDAPlace(0), LibraryType::kCUDNN),
std::make_tuple(platform::CUDAPlace(0), LibraryType::kPlain), std::make_tuple(platform::CUDAPlace(0), LibraryType::kPlain),
...@@ -797,17 +793,6 @@ void OperatorWithKernel::TransferInplaceVarsBack( ...@@ -797,17 +793,6 @@ void OperatorWithKernel::TransferInplaceVarsBack(
Scope* OperatorWithKernel::TryTransferData( Scope* OperatorWithKernel::TryTransferData(
const Scope& scope, const OpKernelType& expected_kernel_key, const Scope& scope, const OpKernelType& expected_kernel_key,
std::vector<std::string>* transfered_inplace_vars) const { std::vector<std::string>* transfered_inplace_vars) const {
// In the inference scenerio, the scopes will be reused across the batches, so
// the `new_scope` here will result in GPU memroy explosion over the running of
// operators.
// We use a thread_local cache to fix that issue, the key in the cache is the
// combination of the `scope` argument, from_kernel_type, target_kernel_type.
// Have a discussion with @Superjomn or the inference developers if some changes
// on this logic for this macro might not tested on the other scenerios.
#ifdef PADDLE_ON_INFERENCE
thread_local std::unordered_map<size_t, Scope*> infer_transfer_scope_cache;
#endif
Scope* new_scope = nullptr; Scope* new_scope = nullptr;
for (auto& var_name_item : Inputs()) { for (auto& var_name_item : Inputs()) {
for (auto& var_name : var_name_item.second) { for (auto& var_name : var_name_item.second) {
...@@ -838,23 +823,23 @@ Scope* OperatorWithKernel::TryTransferData( ...@@ -838,23 +823,23 @@ Scope* OperatorWithKernel::TryTransferData(
VLOG(30) << "Transform Variable " << var_name << " from " VLOG(30) << "Transform Variable " << var_name << " from "
<< kernel_type_for_var << " to " << expected_kernel_key; << kernel_type_for_var << " to " << expected_kernel_key;
#ifdef PADDLE_ON_INFERENCE // In the inference scenerio, the scopes will be reused across the
size_t infer_cache_key = // batches, so the `new_scope` here will result in GPU memroy explosion
CombineHash(OpKernelType::Hash()(kernel_type_for_var), // over the running of operators.
OpKernelType::Hash()(expected_kernel_key)); // We use a thread_local cache to fix that issue, the key in the cache is
infer_cache_key = // the combination of the `scope` argument, from_kernel_type,
CombineHash(infer_cache_key, std::hash<const Scope*>()(&scope)); // target_kernel_type.
// Have a discussion with @Superjomn or the inference developers if some
auto it = infer_transfer_scope_cache.find(infer_cache_key); // changes on this logic for this macro might not tested on the other
if (it != infer_transfer_scope_cache.end()) { // scenerios.
new_scope = infer_transfer_scope_cache[infer_cache_key]; // If this op is not called by an Executor or ParallelExecutor, it should
} else { // called by a NaiveExecutor, the NaiveExecutor will cache the scopes and
new_scope = &scope.NewScope(); // variables, that behavior a lot different.
infer_transfer_scope_cache[infer_cache_key] = new_scope; if (!run_by_executor_) {
} new_scope = TryCreateTransferScope(kernel_type_for_var,
#endif expected_kernel_key, &scope);
}
if (new_scope == nullptr) { if (!new_scope) {
new_scope = &scope.NewScope(); new_scope = &scope.NewScope();
} }
......
...@@ -127,6 +127,8 @@ class OperatorBase { ...@@ -127,6 +127,8 @@ class OperatorBase {
//! Get all outputs variable names //! Get all outputs variable names
virtual std::vector<std::string> OutputVars(bool has_intermediate) const; virtual std::vector<std::string> OutputVars(bool has_intermediate) const;
void SetIsCalledByExecutor(bool x) { run_by_executor_ = x; }
protected: protected:
std::string type_; std::string type_;
// NOTE: in case of OpGrad, inputs_ contains: // NOTE: in case of OpGrad, inputs_ contains:
...@@ -139,6 +141,8 @@ class OperatorBase { ...@@ -139,6 +141,8 @@ class OperatorBase {
// IG (Inputs Gradients) // IG (Inputs Gradients)
VariableNameMap outputs_; VariableNameMap outputs_;
AttributeMap attrs_; AttributeMap attrs_;
// Whether this operator executes in an Executor.
bool run_by_executor_{true};
private: private:
void GenerateTemporaryNames(); void GenerateTemporaryNames();
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/transfer_scope_cache.h"
namespace paddle {
namespace framework {
std::unordered_map<size_t, Scope*>& global_transfer_data_cache() {
thread_local auto* x = new std::unordered_map<size_t, Scope*>;
return *x;
}
std::unordered_set<Scope*>& global_transfer_scope_cache() {
thread_local auto* x = new std::unordered_set<Scope*>;
return *x;
}
Scope* TryCreateTransferScope(OpKernelType type0, OpKernelType type1,
const Scope* scope) {
Scope* new_scope{nullptr};
size_t infer_cache_key =
CombineHash(OpKernelType::Hash()(type0), OpKernelType::Hash()(type1));
infer_cache_key =
CombineHash(infer_cache_key, std::hash<const Scope*>()(scope));
auto it = global_transfer_data_cache().find(infer_cache_key);
if (it != global_transfer_data_cache().end()) {
new_scope = global_transfer_data_cache()[infer_cache_key];
} else {
new_scope = &scope->NewScope();
global_transfer_data_cache()[infer_cache_key] = new_scope;
}
global_transfer_scope_cache().insert(new_scope);
return new_scope;
}
void RemoveKidsFromTransferScopeCache(Scope* scope) {
auto it = global_transfer_scope_cache().find(scope);
if (it != global_transfer_scope_cache().end()) {
global_transfer_scope_cache().erase(it);
}
for (auto* s : scope->kids()) {
auto it = global_transfer_scope_cache().find(s);
if (it != global_transfer_scope_cache().end()) {
global_transfer_scope_cache().erase(it);
}
}
// remove global transfer data cache
auto& cache = global_transfer_data_cache();
for (auto it = cache.begin(); it != cache.end();) {
if (it->second == scope)
it = cache.erase(it);
else
it++;
}
}
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <thread> // NOLINT
#include <unordered_map>
#include <unordered_set>
#include "paddle/fluid/framework/op_kernel_type.h"
#include "paddle/fluid/framework/scope.h"
namespace paddle {
namespace framework {
std::unordered_map<size_t, Scope*>& global_transfer_data_cache();
std::unordered_set<Scope*>& global_transfer_scope_cache();
// Combine two hash values to a single hash.
static size_t CombineHash(size_t seed, size_t a) {
return (seed ^ a) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
Scope* TryCreateTransferScope(OpKernelType type0, OpKernelType type1,
const Scope* scope);
void RemoveKidsFromTransferScopeCache(Scope* scope);
} // namespace framework
} // namespace paddle
...@@ -4,6 +4,7 @@ endif() ...@@ -4,6 +4,7 @@ endif()
# analysis and tensorrt must be added before creating static library, # analysis and tensorrt must be added before creating static library,
# otherwise, there would be undefined reference to them in static library. # otherwise, there would be undefined reference to them in static library.
add_subdirectory(analysis) add_subdirectory(analysis)
add_subdirectory(utils)
if (TENSORRT_FOUND) if (TENSORRT_FOUND)
add_subdirectory(tensorrt) add_subdirectory(tensorrt)
endif() endif()
......
...@@ -30,7 +30,9 @@ cc_library(paddle_pass_builder SRCS paddle_pass_builder.cc) ...@@ -30,7 +30,9 @@ cc_library(paddle_pass_builder SRCS paddle_pass_builder.cc)
cc_library(analysis_predictor SRCS analysis_predictor.cc DEPS paddle_inference_api analysis naive_executor zero_copy_tensor reset_tensor_array analysis_config paddle_pass_builder ir_pass_manager) cc_library(analysis_predictor SRCS analysis_predictor.cc DEPS paddle_inference_api analysis naive_executor zero_copy_tensor reset_tensor_array analysis_config paddle_pass_builder ir_pass_manager)
cc_library(zero_copy_tensor SRCS details/zero_copy_tensor.cc DEPS scope lod_tensor enforce) cc_library(zero_copy_tensor SRCS details/zero_copy_tensor.cc DEPS scope lod_tensor enforce)
cc_library(zero_copy_tensor_dummy SRCS details/zero_copy_tensor_dummy.cc) cc_library(zero_copy_tensor_dummy SRCS details/zero_copy_tensor_dummy.cc)
cc_library(paddle_inference_api SRCS api.cc api_impl.cc helper.cc DEPS lod_tensor scope paddle_pass_builder reset_tensor_array analysis_config analysis_config paddle_pass_builder DEPS zero_copy_tensor) cc_library(paddle_inference_api SRCS api.cc api_impl.cc helper.cc DEPS
lod_tensor scope paddle_pass_builder reset_tensor_array analysis_config
analysis_config paddle_pass_builder zero_copy_tensor reset_tensor_array)
cc_test(test_paddle_inference_api cc_test(test_paddle_inference_api
SRCS api_tester.cc SRCS api_tester.cc
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "paddle/fluid/inference/tensorrt/convert/op_converter.h" #include "paddle/fluid/inference/tensorrt/convert/op_converter.h"
#endif #endif
#include "paddle/fluid/inference/utils/singleton.h" #include "paddle/fluid/inference/utils/singleton.h"
#include "paddle/fluid/memory/memcpy.h"
#include "paddle/fluid/platform/cpu_helper.h" #include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -174,7 +175,6 @@ bool AnalysisPredictor::Run(const std::vector<PaddleTensor> &inputs, ...@@ -174,7 +175,6 @@ bool AnalysisPredictor::Run(const std::vector<PaddleTensor> &inputs,
inference::Timer timer; inference::Timer timer;
timer.tic(); timer.tic();
// set feed variable // set feed variable
std::vector<framework::LoDTensor> feeds;
framework::Scope *scope = sub_scope_ ? sub_scope_ : scope_.get(); framework::Scope *scope = sub_scope_ ? sub_scope_ : scope_.get();
if (!SetFeed(inputs, scope)) { if (!SetFeed(inputs, scope)) {
LOG(ERROR) << "fail to set feed"; LOG(ERROR) << "fail to set feed";
...@@ -215,17 +215,29 @@ bool AnalysisPredictor::SetFeed(const std::vector<PaddleTensor> &inputs, ...@@ -215,17 +215,29 @@ bool AnalysisPredictor::SetFeed(const std::vector<PaddleTensor> &inputs,
framework::DDim ddim = framework::make_ddim(inputs[i].shape); framework::DDim ddim = framework::make_ddim(inputs[i].shape);
void *input_ptr; void *input_ptr;
if (inputs[i].dtype == PaddleDType::INT64) { if (inputs[i].dtype == PaddleDType::INT64) {
input_ptr = input.mutable_data<int64_t>(ddim, platform::CPUPlace()); input_ptr = input.mutable_data<int64_t>(ddim, place_);
} else if (inputs[i].dtype == PaddleDType::FLOAT32) { } else if (inputs[i].dtype == PaddleDType::FLOAT32) {
input_ptr = input.mutable_data<float>(ddim, platform::CPUPlace()); input_ptr = input.mutable_data<float>(ddim, place_);
} else { } else {
LOG(ERROR) << "unsupported feed type " << inputs[i].dtype; LOG(ERROR) << "unsupported feed type " << inputs[i].dtype;
return false; return false;
} }
if (platform::is_cpu_place(place_)) {
// TODO(panyx0718): Init LoDTensor from existing memcpy to save a copy. // TODO(panyx0718): Init LoDTensor from existing memcpy to save a copy.
std::memcpy(static_cast<void *>(input_ptr), inputs[i].data.data(), std::memcpy(static_cast<void *>(input_ptr), inputs[i].data.data(),
inputs[i].data.length()); inputs[i].data.length());
} else {
#ifdef PADDLE_WITH_CUDA
auto dst_gpu_place = boost::get<platform::CUDAPlace>(place_);
memory::Copy(dst_gpu_place, static_cast<void *>(input_ptr),
platform::CPUPlace(), inputs[i].data.data(),
inputs[i].data.length(),
0); // stream 0 for sync copy
#else
PADDLE_THROW("Not compile with CUDA, should not reach here.");
#endif
}
// TODO(Superjomn) Low performance, need optimization for heavy LoD copy. // TODO(Superjomn) Low performance, need optimization for heavy LoD copy.
framework::LoD lod; framework::LoD lod;
for (auto &level : inputs[i].lod) { for (auto &level : inputs[i].lod) {
......
...@@ -24,6 +24,7 @@ limitations under the License. */ ...@@ -24,6 +24,7 @@ limitations under the License. */
#include "paddle/fluid/inference/api/api_impl.h" #include "paddle/fluid/inference/api/api_impl.h"
#include "paddle/fluid/inference/api/details/reset_tensor_array.h" #include "paddle/fluid/inference/api/details/reset_tensor_array.h"
#include "paddle/fluid/inference/api/helper.h" #include "paddle/fluid/inference/api/helper.h"
#include "paddle/fluid/memory/memcpy.h"
#include "paddle/fluid/platform/cpu_helper.h" #include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -138,7 +139,6 @@ bool NativePaddlePredictor::Run(const std::vector<PaddleTensor> &inputs, ...@@ -138,7 +139,6 @@ bool NativePaddlePredictor::Run(const std::vector<PaddleTensor> &inputs,
Timer timer; Timer timer;
timer.tic(); timer.tic();
// set feed variable // set feed variable
std::vector<framework::LoDTensor> feeds;
framework::Scope *scope = sub_scope_ != nullptr ? sub_scope_ : scope_.get(); framework::Scope *scope = sub_scope_ != nullptr ? sub_scope_ : scope_.get();
if (!SetFeed(inputs, scope)) { if (!SetFeed(inputs, scope)) {
LOG(ERROR) << "fail to set feed"; LOG(ERROR) << "fail to set feed";
...@@ -194,17 +194,30 @@ bool NativePaddlePredictor::SetFeed(const std::vector<PaddleTensor> &inputs, ...@@ -194,17 +194,30 @@ bool NativePaddlePredictor::SetFeed(const std::vector<PaddleTensor> &inputs,
framework::DDim ddim = framework::make_ddim(inputs[i].shape); framework::DDim ddim = framework::make_ddim(inputs[i].shape);
void *input_ptr; void *input_ptr;
if (inputs[i].dtype == PaddleDType::INT64) { if (inputs[i].dtype == PaddleDType::INT64) {
input_ptr = input.mutable_data<int64_t>(ddim, platform::CPUPlace()); input_ptr = input.mutable_data<int64_t>(ddim, place_);
} else if (inputs[i].dtype == PaddleDType::FLOAT32) { } else if (inputs[i].dtype == PaddleDType::FLOAT32) {
input_ptr = input.mutable_data<float>(ddim, platform::CPUPlace()); input_ptr = input.mutable_data<float>(ddim, place_);
} else { } else {
LOG(ERROR) << "unsupported feed type " << inputs[i].dtype; LOG(ERROR) << "unsupported feed type " << inputs[i].dtype;
return false; return false;
} }
if (platform::is_cpu_place(place_)) {
// TODO(panyx0718): Init LoDTensor from existing memcpy to save a copy. // TODO(panyx0718): Init LoDTensor from existing memcpy to save a copy.
std::memcpy(static_cast<void *>(input_ptr), inputs[i].data.data(), std::memcpy(static_cast<void *>(input_ptr), inputs[i].data.data(),
inputs[i].data.length()); inputs[i].data.length());
} else {
#ifdef PADDLE_WITH_CUDA
auto dst_gpu_place = boost::get<platform::CUDAPlace>(place_);
memory::Copy(dst_gpu_place, static_cast<void *>(input_ptr),
platform::CPUPlace(), inputs[i].data.data(),
inputs[i].data.length(),
0); // stream 0 for sync copy
#else
PADDLE_THROW("Not compile with CUDA, should not reach here.");
#endif
}
// TODO(Superjomn) Low performance, need optimization for heavy LoD copy. // TODO(Superjomn) Low performance, need optimization for heavy LoD copy.
framework::LoD lod; framework::LoD lod;
for (auto &level : inputs[i].lod) { for (auto &level : inputs[i].lod) {
......
...@@ -46,8 +46,6 @@ if(WITH_GPU) ...@@ -46,8 +46,6 @@ if(WITH_GPU)
endif() endif()
endif(NOT WIN32) endif(NOT WIN32)
endif() endif()
include_directories("D:/Paddle/")
include_directories("${PADDLE_LIB}") include_directories("${PADDLE_LIB}")
include_directories("${PADDLE_LIB}/third_party/install/protobuf/include") include_directories("${PADDLE_LIB}/third_party/install/protobuf/include")
include_directories("${PADDLE_LIB}/third_party/install/glog/include") include_directories("${PADDLE_LIB}/third_party/install/glog/include")
......
cc_library(benchmark SRCS benchmark.cc DEPS enforce)
cc_test(test_benchmark SRCS benchmark_tester.cc DEPS benchmark)
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/inference/utils/benchmark.h"
#include <sstream>
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace inference {
std::string Benchmark::SerializeToString() const {
std::stringstream ss;
ss << "-----------------------------------------------------\n";
ss << "name\t";
ss << "batch_size\t";
ss << "num_threads\t";
ss << "latency\t";
ss << "qps";
ss << '\n';
ss << name_ << "\t";
ss << batch_size_ << "\t";
ss << num_threads_ << "\t";
ss << latency_ << "\t";
ss << 1000 / latency_;
ss << '\n';
return ss.str();
}
void Benchmark::PersistToFile(const std::string &path) const {
std::ofstream file(path, std::ios::app);
PADDLE_ENFORCE(file.is_open(), "Can not open %s to add benchmark", path);
file << SerializeToString();
file.flush();
file.close();
}
} // namespace inference
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <fstream>
#include <iostream>
namespace paddle {
namespace inference {
/*
* Helper class to calculate the performance.
*/
struct Benchmark {
int batch_size() const { return batch_size_; }
void SetBatchSize(int x) { batch_size_ = x; }
int num_threads() const { return num_threads_; }
void SetNumThreads(int x) { num_threads_ = x; }
bool use_gpu() const { return use_gpu_; }
void SetUseGpu() { use_gpu_ = true; }
int latency() const { return latency_; }
void SetLatency(int x) { latency_ = x; }
const std::string& name() const { return name_; }
void SetName(const std::string& name) { name_ = name; }
std::string SerializeToString() const;
void PersistToFile(const std::string& path) const;
private:
bool use_gpu_{false};
int batch_size_{0};
int latency_;
int num_threads_{1};
std::string name_;
};
} // namespace inference
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/inference/utils/benchmark.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
using namespace paddle::inference;
TEST(Benchmark, basic) {
Benchmark benchmark;
benchmark.SetName("key0");
benchmark.SetBatchSize(10);
benchmark.SetUseGpu();
benchmark.SetLatency(220);
LOG(INFO) << "benchmark:\n" << benchmark.SerializeToString();
}
TEST(Benchmark, PersistToFile) {
Benchmark benchmark;
benchmark.SetName("key0");
benchmark.SetBatchSize(10);
benchmark.SetUseGpu();
benchmark.SetLatency(220);
benchmark.PersistToFile("1.log");
benchmark.PersistToFile("1.log");
benchmark.PersistToFile("1.log");
}
\ No newline at end of file
...@@ -41,7 +41,7 @@ TEST(RetryAllocator, RetryAllocator) { ...@@ -41,7 +41,7 @@ TEST(RetryAllocator, RetryAllocator) {
size_t thread_num = 32; size_t thread_num = 32;
size_t sleep_time = 40; size_t sleep_time = 40;
size_t extra_time = 2; size_t extra_time = 10;
// Reserve to perform more tests in the future // Reserve to perform more tests in the future
std::vector<std::shared_ptr<Allocator>> allocators; std::vector<std::shared_ptr<Allocator>> allocators;
......
...@@ -22,6 +22,8 @@ limitations under the License. */ ...@@ -22,6 +22,8 @@ limitations under the License. */
#include "paddle/fluid/operators/distributed/request_handler.h" #include "paddle/fluid/operators/distributed/request_handler.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
DECLARE_bool(rpc_disable_reuse_port);
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace distributed { namespace distributed {
...@@ -383,6 +385,9 @@ std::shared_ptr<grpc::Channel> GRPCClient::GetChannel(const std::string& ep) { ...@@ -383,6 +385,9 @@ std::shared_ptr<grpc::Channel> GRPCClient::GetChannel(const std::string& ep) {
// Channel configurations: // Channel configurations:
grpc::ChannelArguments args; grpc::ChannelArguments args;
args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000); args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000);
if (FLAGS_rpc_disable_reuse_port) {
args.SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0);
}
args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE); args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE);
args.SetMaxSendMessageSize(std::numeric_limits<int>::max()); args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max()); args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
......
...@@ -20,6 +20,8 @@ limitations under the License. */ ...@@ -20,6 +20,8 @@ limitations under the License. */
using ::grpc::ServerAsyncResponseWriter; using ::grpc::ServerAsyncResponseWriter;
DECLARE_bool(rpc_disable_reuse_port);
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace distributed { namespace distributed {
...@@ -252,6 +254,20 @@ void AsyncGRPCServer::WaitServerReady() { ...@@ -252,6 +254,20 @@ void AsyncGRPCServer::WaitServerReady() {
VLOG(40) << "AsyncGRPCServer WaitSeverReady"; VLOG(40) << "AsyncGRPCServer WaitSeverReady";
} }
// Define an option subclass in order to disable SO_REUSEPORT for the
// server socket.
// Come from:
// https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc
class NoReusePortOption : public ::grpc::ServerBuilderOption {
public:
void UpdateArguments(::grpc::ChannelArguments* args) override {
args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0);
}
void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>*
plugins) override {}
};
void AsyncGRPCServer::StartServer() { void AsyncGRPCServer::StartServer() {
::grpc::ServerBuilder builder; ::grpc::ServerBuilder builder;
builder.AddListeningPort(bind_address_, ::grpc::InsecureServerCredentials(), builder.AddListeningPort(bind_address_, ::grpc::InsecureServerCredentials(),
...@@ -259,6 +275,10 @@ void AsyncGRPCServer::StartServer() { ...@@ -259,6 +275,10 @@ void AsyncGRPCServer::StartServer() {
builder.SetMaxSendMessageSize(std::numeric_limits<int>::max()); builder.SetMaxSendMessageSize(std::numeric_limits<int>::max());
builder.SetMaxReceiveMessageSize(std::numeric_limits<int>::max()); builder.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
if (FLAGS_rpc_disable_reuse_port) {
builder.SetOption(
std::unique_ptr<::grpc::ServerBuilderOption>(new NoReusePortOption));
}
builder.RegisterService(&service_); builder.RegisterService(&service_);
for (auto t : rpc_call_map_) { for (auto t : rpc_call_map_) {
......
...@@ -22,6 +22,8 @@ limitations under the License. */ ...@@ -22,6 +22,8 @@ limitations under the License. */
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h" #include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
#include "paddle/fluid/operators/distributed/variable_response.h" #include "paddle/fluid/operators/distributed/variable_response.h"
DEFINE_bool(rpc_disable_reuse_port, false, "Disable SO_REUSEPORT or not.");
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace distributed { namespace distributed {
......
...@@ -67,6 +67,7 @@ class LookupSparseTableOp : public framework::OperatorBase { ...@@ -67,6 +67,7 @@ class LookupSparseTableOp : public framework::OperatorBase {
framework::proto::VarType::FP32, framework::proto::VarType::FP32,
"The sparse table only support FP32"); "The sparse table only support FP32");
w_t->Get(ids_t, out_t, true, is_test); w_t->Get(ids_t, out_t, true, is_test);
out_t->set_lod(ids_t.lod());
} }
}; };
......
...@@ -127,6 +127,9 @@ class SumKernel : public framework::OpKernel<T> { ...@@ -127,6 +127,9 @@ class SumKernel : public framework::OpKernel<T> {
math::scatter::MergeAdd<DeviceContext, T> merge_add; math::scatter::MergeAdd<DeviceContext, T> merge_add;
merge_add(context.template device_context<DeviceContext>(), inputs, merge_add(context.template device_context<DeviceContext>(), inputs,
out); out);
out->SyncIndex();
} else { } else {
// no data, just set a empty out tensor. // no data, just set a empty out tensor.
out->mutable_value()->mutable_data<T>(framework::make_ddim({0}), out->mutable_value()->mutable_data<T>(framework::make_ddim({0}),
......
...@@ -106,9 +106,9 @@ class LoDTensorArray2TensorOp : public framework::OperatorBase { ...@@ -106,9 +106,9 @@ class LoDTensorArray2TensorOp : public framework::OperatorBase {
out_inx_dim[0] = inx.size(); out_inx_dim[0] = inx.size();
out_inx.Resize(out_inx_dim); out_inx.Resize(out_inx_dim);
auto &local_scope = scope.NewScope();
std::string var_name = "out_index"; std::string var_name = "out_index";
framework::Variable *tmp_index_var = framework::Variable *tmp_index_var = local_scope.Var(var_name);
const_cast<framework::Scope &>(scope).Var(var_name);
auto &tmp_index_tensor = auto &tmp_index_tensor =
*(tmp_index_var->GetMutable<paddle::framework::LoDTensor>()); *(tmp_index_var->GetMutable<paddle::framework::LoDTensor>());
tmp_index_tensor.Resize(out_inx_dim); tmp_index_tensor.Resize(out_inx_dim);
...@@ -128,12 +128,12 @@ class LoDTensorArray2TensorOp : public framework::OperatorBase { ...@@ -128,12 +128,12 @@ class LoDTensorArray2TensorOp : public framework::OperatorBase {
out_dims[axis] = out_dim_sum; out_dims[axis] = out_dim_sum;
out.Resize(out_dims); out.Resize(out_dims);
LodTensorArray2LodTensorVector(scope, base_name, Input("X"), &names); LodTensorArray2LodTensorVector(local_scope, base_name, Input("X"), &names);
// Invoke Reshape Op // Invoke concat Op
auto concat_op = framework::OpRegistry::CreateOp( auto concat_op = framework::OpRegistry::CreateOp(
"concat", {{"X", names}}, {{"Out", {Output("Out")}}}, attrs); "concat", {{"X", names}}, {{"Out", {Output("Out")}}}, attrs);
concat_op->Run(scope, place); concat_op->Run(local_scope, place);
} }
}; };
......
...@@ -32,6 +32,9 @@ CUBLAS_BLAS_ROUTINE_EACH_R2(DEFINE_WRAP); ...@@ -32,6 +32,9 @@ CUBLAS_BLAS_ROUTINE_EACH_R2(DEFINE_WRAP);
CUBLAS_BLAS_ROUTINE_EACH_R3(DEFINE_WRAP); CUBLAS_BLAS_ROUTINE_EACH_R3(DEFINE_WRAP);
#endif #endif
#ifdef CUBLAS_BLAS_ROUTINE_EACH_R4
CUBLAS_BLAS_ROUTINE_EACH_R4(DEFINE_WRAP);
#endif
} // namespace dynload } // namespace dynload
} // namespace platform } // namespace platform
} // namespace paddle } // namespace paddle
...@@ -90,23 +90,33 @@ CUBLAS_BLAS_ROUTINE_EACH(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP) ...@@ -90,23 +90,33 @@ CUBLAS_BLAS_ROUTINE_EACH(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
// APIs available after CUDA 8.0 // APIs available after CUDA 8.0
#if CUDA_VERSION >= 8000 #if CUDA_VERSION >= 8000
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasGemmEx); #define CUBLAS_BLAS_ROUTINE_EACH_R2(__macro) \
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasSgemmStridedBatched); __macro(cublasGemmEx); \
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasDgemmStridedBatched); __macro(cublasSgemmStridedBatched); \
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasCgemmStridedBatched); __macro(cublasDgemmStridedBatched); \
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasZgemmStridedBatched); __macro(cublasCgemmStridedBatched); \
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasHgemmStridedBatched); __macro(cublasZgemmStridedBatched); \
__macro(cublasHgemmStridedBatched);
CUBLAS_BLAS_ROUTINE_EACH_R2(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
#endif #endif
// APIs available after CUDA 9.0 // APIs available after CUDA 9.0
#if CUDA_VERSION >= 9000 #if CUDA_VERSION >= 9000
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasSetMathMode); #define CUBLAS_BLAS_ROUTINE_EACH_R3(__macro) \
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasGetMathMode); __macro(cublasSetMathMode); \
__macro(cublasGetMathMode);
CUBLAS_BLAS_ROUTINE_EACH_R3(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
#endif #endif
// APIs available after CUDA 9.1
#if CUDA_VERSION >= 9010 #if CUDA_VERSION >= 9010
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasGemmBatchedEx); #define CUBLAS_BLAS_ROUTINE_EACH_R4(__macro) \
DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(cublasGemmStridedBatchedEx); __macro(cublasGemmBatchedEx); \
__macro(cublasGemmStridedBatchedEx);
CUBLAS_BLAS_ROUTINE_EACH_R4(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
#endif #endif
#undef DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP #undef DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP
......
...@@ -31,6 +31,11 @@ int main(int argc, char** argv) { ...@@ -31,6 +31,11 @@ int main(int argc, char** argv) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
new_argv.push_back( new_argv.push_back(
strdup("--tryfromenv=fraction_of_gpu_memory_to_use,allocator_strategy")); strdup("--tryfromenv=fraction_of_gpu_memory_to_use,allocator_strategy"));
#elif __clang__
new_argv.push_back(
strdup("--tryfromenv=use_mkldnn,initial_cpu_memory_in_"
"mb,allocator_strategy"));
new_argv.push_back(strdup("--undefok=use_mkldnn,initial_cpu_memory_in_mb"));
#else #else
new_argv.push_back( new_argv.push_back(
strdup("--tryfromenv=use_pinned_memory,use_mkldnn,initial_cpu_memory_in_" strdup("--tryfromenv=use_pinned_memory,use_mkldnn,initial_cpu_memory_in_"
......
...@@ -91,6 +91,7 @@ def __bootstrap__(): ...@@ -91,6 +91,7 @@ def __bootstrap__():
""" """
import sys import sys
import os import os
import platform
from . import core from . import core
in_test = 'unittest' in sys.modules in_test = 'unittest' in sys.modules
...@@ -110,14 +111,17 @@ def __bootstrap__(): ...@@ -110,14 +111,17 @@ def __bootstrap__():
print('PLEASE USE OMP_NUM_THREADS WISELY.', file=sys.stderr) print('PLEASE USE OMP_NUM_THREADS WISELY.', file=sys.stderr)
os.environ['OMP_NUM_THREADS'] = str(num_threads) os.environ['OMP_NUM_THREADS'] = str(num_threads)
sysstr = platform.system()
read_env_flags = [ read_env_flags = [
'use_pinned_memory', 'check_nan_inf', 'benchmark', 'eager_delete_scope', 'check_nan_inf', 'benchmark', 'eager_delete_scope', 'use_mkldnn',
'use_mkldnn', 'use_ngraph', 'initial_cpu_memory_in_mb', 'use_ngraph', 'initial_cpu_memory_in_mb', 'init_allocated_mem',
'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads', 'free_idle_memory', 'paddle_num_threads', "dist_threadpool_size",
"dist_threadpool_size", 'eager_delete_tensor_gb', 'allocator_strategy', 'eager_delete_tensor_gb', 'allocator_strategy',
'reader_queue_speed_test_mode', 'print_sub_graph_dir' 'reader_queue_speed_test_mode', 'print_sub_graph_dir'
] ]
if 'Darwin' not in sysstr:
read_env_flags.append('use_pinned_memory')
if os.name != 'nt': if os.name != 'nt':
read_env_flags.append('warpctc_dir') read_env_flags.append('warpctc_dir')
read_env_flags.append('cpu_deterministic') read_env_flags.append('cpu_deterministic')
...@@ -129,6 +133,7 @@ def __bootstrap__(): ...@@ -129,6 +133,7 @@ def __bootstrap__():
read_env_flags.append('rpc_send_thread_num') read_env_flags.append('rpc_send_thread_num')
read_env_flags.append('rpc_get_thread_num') read_env_flags.append('rpc_get_thread_num')
read_env_flags.append('rpc_prefetch_thread_num') read_env_flags.append('rpc_prefetch_thread_num')
read_env_flags.append('rpc_disable_reuse_port')
if core.is_compiled_with_cuda(): if core.is_compiled_with_cuda():
read_env_flags += [ read_env_flags += [
......
...@@ -13,8 +13,10 @@ ...@@ -13,8 +13,10 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
from . import lookup_table_utils
from .lookup_table_utils import *
from . import hdfs_utils from . import hdfs_utils
from .hdfs_utils import * from .hdfs_utils import *
__all__ = lookup_table_utils.__all__
__all__ = hdfs_utils.__all__ __all__ = hdfs_utils.__all__
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import time
import logging
import paddle
import paddle.fluid as fluid
from paddle.fluid import core
from paddle.fluid import io
from paddle.fluid import Program
__all__ = [
"load_inference_model", "load_persistable_vars",
"convert_dist_to_sparse_program"
]
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
_logger = logging.getLogger("lookup_table_utils")
_logger.setLevel(logging.INFO)
model_filename = "__model__"
lookup_table_dir = "__lookup_table__"
def __insert_lookup_sparse_table_op(main_program, idx, ids, w, out):
main_program.global_block()._insert_op(
index=idx,
type="lookup_sparse_table",
inputs={"Ids": [ids],
"W": [w]},
outputs={"Out": [out]},
attrs={
"is_distributed": False,
"is_sparse": True,
"grad_inplace": False
})
def __get_prefetch_op_tuples(main_program):
# current lookup tables op is split_ids->prefetch->merge_ids
prefetch_op_tuples = None
op_types = [op.type for op in main_program.global_block().ops]
for i in range(len(op_types)):
if op_types[i] == "prefetch":
if op_types[i - 1] == "split_ids" and op_types[i +
1] == "merge_ids":
split_ids_op_id = i - 1
split_ids_inputs = main_program.global_block().ops[i - 1].input(
"Ids")
prefetch_op_inputs = main_program.global_block().ops[i].input(
"X")
prefetch_op_outputs = main_program.global_block().ops[i].output(
"Out")
merge_ids_outputs = main_program.global_block().ops[
i + 1].output("Out")
need_delete_vars = []
need_delete_vars.extend(prefetch_op_inputs)
need_delete_vars.extend(prefetch_op_outputs)
prefetch_op_tuples = (split_ids_op_id, split_ids_inputs,
merge_ids_outputs, need_delete_vars)
break
return prefetch_op_tuples
def convert_dist_to_sparse_program(main_program):
if not main_program._distributed_lookup_table:
_logger.warn(
"There are no distributed lookup tables need to be converted")
return
# create table param and grad var in pserver program
origin_emb_var = "{}.origin".format(main_program._distributed_lookup_table)
emb_var = main_program._distributed_lookup_table
main_program.global_block()._rename_var(emb_var, origin_emb_var)
origin_param_var = main_program.global_block().vars[origin_emb_var]
param_var = main_program.global_block().create_var(
name=emb_var,
shape=origin_param_var.shape,
dtype=origin_param_var.dtype,
type=core.VarDesc.VarType.SELECTED_ROWS,
persistable=True)
# parameter must be selected rows
param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS)
main_program._sync_with_cpp()
prefetch_op_tuples = __get_prefetch_op_tuples(main_program)
split_ids_id = prefetch_op_tuples[0]
for idx in range(split_ids_id + 2, split_ids_id - 1, -1):
main_program.global_block()._remove_op(idx)
main_program.desc.flush()
in_out_pairs = zip(prefetch_op_tuples[1], prefetch_op_tuples[2])
for in_out_pair in in_out_pairs:
idx = split_ids_id
ids = main_program.global_block().vars[in_out_pair[0]]
out = main_program.global_block().vars[in_out_pair[1]]
__insert_lookup_sparse_table_op(main_program, idx, ids, param_var, out)
main_program.desc.flush()
return main_program
def load_persistable_vars(executor, dirname, program, lookup_table_var):
def _is_checkpoint_var(exclude_fluid_vars=None):
"""
the checkpoint will not save or load all the variables.
var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded.
: param var(Variable)
"""
if exclude_fluid_vars is None:
exclude_fluid_vars = []
def is_valid(var):
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \
var.desc.type() == core.VarDesc.VarType.RAW:
return False
# @GRAD are named for gradient variables, checkpoint will not save it.
if "@GRAD" in var.name:
return False
# .trainer_ are named for distribute train variables, checkpoint will not save it.
if ".trainer_" in var.name:
return False
# .block is named for distribute train variables, checkpoint will not save it.
if ".block" in var.name:
return False
if "tmp_" in var.name:
return False
if var.name in exclude_fluid_vars:
return False
return var.persistable
return is_valid
def _load_lookup_table_vars(executor, dirname, main_program,
lookup_table_vars):
if not os.path.isdir(dirname):
raise ValueError("There is no directory named '%s'", dirname)
lookup_table_dirname = os.path.join(dirname, lookup_table_dir)
emb_var_name = lookup_table_vars[0]
emb_var = main_program.global_block().var(emb_var_name)
emb_files = []
for emb_name in os.listdir(lookup_table_dirname):
if emb_var_name in emb_name:
emb_files.append(emb_name)
convert_program = Program()
global_block = convert_program.global_block()
emb_var = global_block.create_var(
name=emb_var.name,
shape=emb_var.shape,
dtype=emb_var.dtype,
type=core.VarDesc.VarType.SELECTED_ROWS,
persistable=True)
emb_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS)
sums = []
for i, emb_file in enumerate(emb_files):
var_name = "{}_{}".format(emb_var.name, i)
param_var = global_block.create_var(
name=var_name,
shape=emb_var.shape,
dtype=emb_var.dtype,
type=core.VarDesc.VarType.SELECTED_ROWS,
persistable=True)
param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS)
global_block.append_op(
type='load',
inputs={},
outputs={'Out': [param_var]},
attrs={
'file_path': os.path.join(lookup_table_dirname, var_name)
})
sums.append(param_var)
global_block.append_op(
type='sum', inputs={"X": sums}, outputs={'Out': emb_var}, attrs={})
global_block.append_op(type='delete_var', inputs={'X': sums})
executor.run(convert_program)
_logger.info("Start Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}".format(
dirname, time.ctime()))
lookup_table_vars = [lookup_table_var]
io.load_vars(
executor,
dirname=dirname,
main_program=program,
predicate=_is_checkpoint_var(lookup_table_vars),
filename=None)
_load_lookup_table_vars(executor, dirname, program, lookup_table_vars)
_logger.info("Finish Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}".format(
dirname, time.ctime()))
def load_inference_model(dirname, executor, lookup_table_var_name):
if not os.path.isdir(dirname):
raise ValueError("There is no directory named '%s'", dirname)
local_model = os.path.join(dirname, model_filename)
with open(local_model, "rb") as f:
program_desc_str = f.read()
program = Program.parse_from_string(program_desc_str)
if not core._is_program_version_supported(program._version()):
raise ValueError("Unsupported program version: %d\n" %
program._version())
# Binary data also need version.
load_persistable_vars(executor, dirname, program, lookup_table_var_name)
feed_target_names = program.desc.get_feed_target_names()
fetch_target_names = program.desc.get_fetch_target_names()
fetch_targets = [
program.global_block().var(name) for name in fetch_target_names
]
return [program, feed_target_names, fetch_targets]
...@@ -1698,6 +1698,7 @@ class Program(object): ...@@ -1698,6 +1698,7 @@ class Program(object):
p._copy_param_info_from(self) p._copy_param_info_from(self)
p._copy_data_info_from(self) p._copy_data_info_from(self)
p._copy_dist_param_info_from(self)
return p return p
def _prune(self, targets): def _prune(self, targets):
...@@ -1938,6 +1939,25 @@ class Program(object): ...@@ -1938,6 +1939,25 @@ class Program(object):
"program, with represent the same topology") "program, with represent the same topology")
self.global_block()._copy_param_info_from(other.global_block()) self.global_block()._copy_param_info_from(other.global_block())
def _copy_dist_param_info_from(self, other):
"""
Copy the information of distributed information from other program.
Args:
other(Program): Other program
Returns:
None
"""
if not isinstance(other, Program):
raise TypeError("_copy_dist_param_info_from should be invoked with "
"Program")
self._is_distributed = other._is_distributed
self._is_chief = other._is_chief
self._slice_vars_and_attrs = other._slice_vars_and_attrs
self._endpoints = other._endpoints
self._distributed_lookup_table = other._distributed_lookup_table
def _copy_data_info_from(self, other): def _copy_data_info_from(self, other):
""" """
Copy the information of data variables from other program. Copy the information of data variables from other program.
......
...@@ -165,6 +165,7 @@ def save_vars(executor, ...@@ -165,6 +165,7 @@ def save_vars(executor,
save_vars( save_vars(
executor, executor,
main_program=main_program,
dirname=dirname, dirname=dirname,
vars=list(filter(predicate, main_program.list_vars())), vars=list(filter(predicate, main_program.list_vars())),
filename=filename) filename=filename)
...@@ -172,11 +173,18 @@ def save_vars(executor, ...@@ -172,11 +173,18 @@ def save_vars(executor,
save_program = Program() save_program = Program()
save_block = save_program.global_block() save_block = save_program.global_block()
if main_program is None:
main_program = default_main_program()
if not isinstance(main_program, Program):
raise TypeError("program should be as Program type or None")
save_var_map = {} save_var_map = {}
for each_var in vars: for each_var in vars:
# NOTE: don't save the variable which type is RAW # NOTE: don't save the variable which type is RAW
if each_var.type == core.VarDesc.VarType.RAW: if each_var.type == core.VarDesc.VarType.RAW:
continue continue
if each_var.name == main_program._distributed_lookup_table:
continue
new_var = _clone_var_in_block_(save_block, each_var) new_var = _clone_var_in_block_(save_block, each_var)
if filename is None: if filename is None:
save_block.append_op( save_block.append_op(
...@@ -198,6 +206,16 @@ def save_vars(executor, ...@@ -198,6 +206,16 @@ def save_vars(executor,
outputs={}, outputs={},
attrs={'file_path': os.path.join(dirname, filename)}) attrs={'file_path': os.path.join(dirname, filename)})
# if there is lookup table, the trainer 0 will notify all pserver to save.
if main_program._is_distributed and main_program._is_chief and main_program._distributed_lookup_table:
lookup_table_filename = os.path.join(dirname, "__lookup_table__")
attrs = {}
attrs['epmap'] = main_program._endpoints
attrs['dir'] = lookup_table_filename
attrs['lookup_table'] = main_program._distributed_lookup_table
save_block.append_op(
type='checkpoint_notify', inputs={}, outputs={}, attrs=attrs)
executor.run(save_program) executor.run(save_program)
...@@ -379,11 +397,22 @@ def load_vars(executor, ...@@ -379,11 +397,22 @@ def load_vars(executor,
load_prog = Program() load_prog = Program()
load_block = load_prog.global_block() load_block = load_prog.global_block()
if main_program is None:
main_program = default_main_program()
if not isinstance(main_program, Program):
raise TypeError("program should be as Program type or None")
load_slice_vars = []
for each_var in main_program._slice_vars_and_attrs:
load_slice_vars.append(each_var[2].name)
load_var_map = {} load_var_map = {}
for each_var in vars: for each_var in vars:
assert isinstance(each_var, Variable) assert isinstance(each_var, Variable)
if each_var.type == core.VarDesc.VarType.RAW: if each_var.type == core.VarDesc.VarType.RAW:
continue continue
if each_var.name in load_slice_vars:
continue
new_var = _clone_var_in_block_(load_block, each_var) new_var = _clone_var_in_block_(load_block, each_var)
if filename is None: if filename is None:
load_block.append_op( load_block.append_op(
...@@ -406,9 +435,6 @@ def load_vars(executor, ...@@ -406,9 +435,6 @@ def load_vars(executor,
attrs={'file_path': os.path.join(dirname, filename)}) attrs={'file_path': os.path.join(dirname, filename)})
executor.run(load_prog) executor.run(load_prog)
if main_program is None:
main_program = default_main_program()
# load slice vars on pserver, if have it. # load slice vars on pserver, if have it.
_load_slice_up_vars(executor, dirname, _load_slice_up_vars(executor, dirname,
main_program._slice_vars_and_attrs) main_program._slice_vars_and_attrs)
...@@ -618,13 +644,6 @@ def save_inference_model(dirname, ...@@ -618,13 +644,6 @@ def save_inference_model(dirname,
if main_program is None: if main_program is None:
main_program = default_main_program() main_program = default_main_program()
# if there is lookup table, the trainer 0 will notify all pserver to save.
if main_program._is_distributed and main_program._is_chief and main_program._distributed_lookup_table:
lookup_table_filename = os.path.join(dirname, "__lookup_table__")
_save_lookup_tables_by_notify(executor, lookup_table_filename,
main_program._distributed_lookup_table,
main_program._endpoints)
# when a pserver and a trainer running on the same machine, mkdir may conflict # when a pserver and a trainer running on the same machine, mkdir may conflict
try: try:
os.makedirs(dirname) os.makedirs(dirname)
...@@ -642,6 +661,9 @@ def save_inference_model(dirname, ...@@ -642,6 +661,9 @@ def save_inference_model(dirname,
# it can only be loaded for inference directly. If it's false, the whole # it can only be loaded for inference directly. If it's false, the whole
# original program and related meta are saved so that future usage can be # original program and related meta are saved so that future usage can be
# more flexible. # more flexible.
origin_program = main_program.clone()
if export_for_deployment: if export_for_deployment:
main_program = main_program.clone() main_program = main_program.clone()
global_block = main_program.global_block() global_block = main_program.global_block()
...@@ -666,8 +688,11 @@ def save_inference_model(dirname, ...@@ -666,8 +688,11 @@ def save_inference_model(dirname,
with open(model_basename + ".main_program", "wb") as f: with open(model_basename + ".main_program", "wb") as f:
f.write(main_program.desc.serialize_to_string()) f.write(main_program.desc.serialize_to_string())
main_program._copy_dist_param_info_from(origin_program)
if params_filename is not None: if params_filename is not None:
params_filename = os.path.basename(params_filename) params_filename = os.path.basename(params_filename)
save_persistables(executor, dirname, main_program, params_filename) save_persistables(executor, dirname, main_program, params_filename)
...@@ -897,6 +922,9 @@ def _load_slice_up_vars(executor, dirname, slice_vars_and_attrs): ...@@ -897,6 +922,9 @@ def _load_slice_up_vars(executor, dirname, slice_vars_and_attrs):
slice_var = var_tuple[2] slice_var = var_tuple[2]
end = start + slice_var.shape[0] end = start + slice_var.shape[0]
orig_var_name = orig_var.name
orig_var.name = "{}.origin".format(orig_var_name)
clone_orig_var = load_block.create_var( clone_orig_var = load_block.create_var(
name=orig_var.name, name=orig_var.name,
type=orig_var.type, type=orig_var.type,
...@@ -915,7 +943,7 @@ def _load_slice_up_vars(executor, dirname, slice_vars_and_attrs): ...@@ -915,7 +943,7 @@ def _load_slice_up_vars(executor, dirname, slice_vars_and_attrs):
type='load', type='load',
inputs={}, inputs={},
outputs={'Out': [clone_orig_var]}, outputs={'Out': [clone_orig_var]},
attrs={'file_path': os.path.join(dirname, clone_orig_var.name)}) attrs={'file_path': os.path.join(dirname, orig_var_name)})
load_block.append_op( load_block.append_op(
type="slice", type="slice",
inputs={'Input': clone_orig_var}, inputs={'Input': clone_orig_var},
...@@ -924,6 +952,7 @@ def _load_slice_up_vars(executor, dirname, slice_vars_and_attrs): ...@@ -924,6 +952,7 @@ def _load_slice_up_vars(executor, dirname, slice_vars_and_attrs):
'starts': [start], 'starts': [start],
'ends': [end]}) 'ends': [end]})
need_delete_vars.append(clone_orig_var) need_delete_vars.append(clone_orig_var)
load_block.append_op( load_block.append_op(
type='delete_var', type='delete_var',
inputs={'X': need_delete_vars}, ) inputs={'X': need_delete_vars}, )
......
...@@ -896,9 +896,10 @@ def array_to_lod_tensor(x, table): ...@@ -896,9 +896,10 @@ def array_to_lod_tensor(x, table):
def increment(x, value=1.0, in_place=True): def increment(x, value=1.0, in_place=True):
""" """
This function performs an operation that increments each value in the This function performs an operation that increments the value in the
input :math:`x` by an amount: :math:`value` as mentioned in the input input :math:`x` by an amount: :math:`value` as mentioned in the input
parameter. This operation is performed in-place by default. parameter. This operation is performed in-place by default. Notice that
the number of elements in :math:`x` must be equal to 1.
Args: Args:
x (Variable|list): The tensor that has the input values. x (Variable|list): The tensor that has the input values.
...@@ -911,7 +912,8 @@ def increment(x, value=1.0, in_place=True): ...@@ -911,7 +912,8 @@ def increment(x, value=1.0, in_place=True):
Examples: Examples:
.. code-block:: python .. code-block:: python
data = fluid.layers.data(name='data', shape=[32, 32], dtype='float32') data = fluid.layers.data(name='data', shape=[1], dtype='float32',
append_batch_size=False)
data = fluid.layers.increment(x=data, value=3.0, in_place=True) data = fluid.layers.increment(x=data, value=3.0, in_place=True)
""" """
helper = LayerHelper("increment", **locals()) helper = LayerHelper("increment", **locals())
......
...@@ -6972,14 +6972,14 @@ def prelu(x, mode, param_attr=None, name=None): ...@@ -6972,14 +6972,14 @@ def prelu(x, mode, param_attr=None, name=None):
""" """
Equation: Equation:
y = \max(0, x) + alpha \min(0, x) y = \max(0, x) + alpha * \min(0, x)
Args: Args:
x (Variable): The input tensor. x (Variable): The input tensor.
param_attr(ParamAttr|None): The parameter attribute for the learnable param_attr(ParamAttr|None): The parameter attribute for the learnable
weight (alpha). weight (alpha).
mode (string): The mode for weight sharing mode (string): The mode for weight sharing. It supports all, channel
all: all elements share same weight and element. all: all elements share same weight
channel:elements in a channel share same weight channel:elements in a channel share same weight
element:each element has a weight element:each element has a weight
name(str|None): A name for this layer(optional). If set None, the layer name(str|None): A name for this layer(optional). If set None, the layer
......
...@@ -239,7 +239,7 @@ def infer(use_cuda, save_dirname=None): ...@@ -239,7 +239,7 @@ def infer(use_cuda, save_dirname=None):
assert len(results[0]) == len(transpiler_results[0]) assert len(results[0]) == len(transpiler_results[0])
for i in range(len(results[0])): for i in range(len(results[0])):
np.testing.assert_almost_equal( np.testing.assert_almost_equal(
results[0][i], transpiler_results[0][i], decimal=5) results[0][i], transpiler_results[0][i], decimal=4)
print("infer results: ", results[0]) print("infer results: ", results[0])
......
...@@ -145,10 +145,15 @@ def batched_multiclass_nms(boxes, scores, background, score_threshold, ...@@ -145,10 +145,15 @@ def batched_multiclass_nms(boxes, scores, background, score_threshold,
lod.append(nmsed_num) lod.append(nmsed_num)
if nmsed_num == 0: continue if nmsed_num == 0: continue
tmp_det_out = []
for c, indices in nmsed_outs.items(): for c, indices in nmsed_outs.items():
for idx in indices: for idx in indices:
xmin, ymin, xmax, ymax = boxes[n][idx][:] xmin, ymin, xmax, ymax = boxes[n][idx][:]
det_outs.append([c, scores[n][c][idx], xmin, ymin, xmax, ymax]) tmp_det_out.append(
[c, scores[n][c][idx], xmin, ymin, xmax, ymax])
sorted_det_out = sorted(
tmp_det_out, key=lambda tup: tup[0], reverse=False)
det_outs.extend(sorted_det_out)
return det_outs, lod return det_outs, lod
......
...@@ -644,6 +644,9 @@ in a single call.") ...@@ -644,6 +644,9 @@ in a single call.")
else: else:
recv_inputs.append(single_trainer_var) recv_inputs.append(single_trainer_var)
self._slice_params_and_optimizes = self._get_slice_vars_and_attrs(
endpoint)
# step 3 # step 3
# Create a union-find data structure from optimize ops, # Create a union-find data structure from optimize ops,
# If two ops are connected, we could add these two ops # If two ops are connected, we could add these two ops
...@@ -766,7 +769,7 @@ in a single call.") ...@@ -766,7 +769,7 @@ in a single call.")
grad_to_block_id, merged_var, grad_to_block_id, merged_var,
lr_ops) lr_ops)
# dedup grad to ids list # dedup grad to ids list
grad_to_block_id = list(set(grad_to_block_id)) grad_to_block_id = list(set(grad_to_block_id))
# append global ops # append global ops
if global_ops: if global_ops:
...@@ -827,8 +830,8 @@ in a single call.") ...@@ -827,8 +830,8 @@ in a single call.")
attrs=attrs) attrs=attrs)
# add distributed attrs # add distributed attrs
pserver_program._slice_vars_and_attrs = self._get_slice_vars_and_attrs( pserver_program._slice_vars_and_attrs = list(
endpoint) self._slice_params_and_optimizes.values())
pserver_program._sync_with_cpp() pserver_program._sync_with_cpp()
# save pserver program to generate pserver side startup relatively. # save pserver program to generate pserver side startup relatively.
...@@ -941,12 +944,12 @@ to transpile() call.") ...@@ -941,12 +944,12 @@ to transpile() call.")
outputs={"Out": startup_tmpvar}) outputs={"Out": startup_tmpvar})
# add slice vars # add slice vars
s_prog._slice_vars_and_attrs = self._get_slice_vars_and_attrs(endpoint) s_prog._slice_vars_and_attrs = pserver_program._slice_vars_and_attrs
return s_prog return s_prog
def _get_slice_vars_and_attrs(self, endpoint): def _get_slice_vars_and_attrs(self, endpoint):
slice_vars_and_attrs = [] slice_vars_and_attrs = {}
block_suffix = "block" block_suffix = "block"
for param in self.param_grad_ep_mapping[endpoint]["params"]: for param in self.param_grad_ep_mapping[endpoint]["params"]:
orig_var_name, block_name, _ = self._get_varname_parts(param.name) orig_var_name, block_name, _ = self._get_varname_parts(param.name)
...@@ -960,8 +963,7 @@ to transpile() call.") ...@@ -960,8 +963,7 @@ to transpile() call.")
slice_vars = self.param_var_mapping[orig_var_name] slice_vars = self.param_var_mapping[orig_var_name]
for slice_var in slice_vars[:block_idx]: for slice_var in slice_vars[:block_idx]:
skip_dim0 += slice_var.shape[0] skip_dim0 += slice_var.shape[0]
slice_vars_and_attrs.append([orig_var, skip_dim0, param]) slice_vars_and_attrs[param.name] = [orig_var, skip_dim0, param]
return slice_vars_and_attrs return slice_vars_and_attrs
# ====================== private transpiler functions ===================== # ====================== private transpiler functions =====================
...@@ -1662,10 +1664,10 @@ to transpile() call.") ...@@ -1662,10 +1664,10 @@ to transpile() call.")
if key in ["Param", "Grad", "LearningRate"]: if key in ["Param", "Grad", "LearningRate"]:
continue continue
var = self.origin_program.global_block().vars[opt_op.input(key)[0]] var = self.origin_program.global_block().vars[opt_op.input(key)[0]]
param_var = new_inputs["Param"]
# update accumulator variable shape # update accumulator variable shape
param_shape = new_inputs["Param"].shape new_shape = self._get_optimizer_input_shape(
new_shape = self._get_optimizer_input_shape(opt_op.type, key, opt_op.type, key, var.shape, param_var.shape)
var.shape, param_shape)
tmpvar = pserver_block.create_var( tmpvar = pserver_block.create_var(
name=var.name, name=var.name,
persistable=var.persistable, persistable=var.persistable,
...@@ -1673,6 +1675,13 @@ to transpile() call.") ...@@ -1673,6 +1675,13 @@ to transpile() call.")
shape=new_shape) shape=new_shape)
new_inputs[key] = tmpvar new_inputs[key] = tmpvar
# var shape been changed
if new_shape != var.shape:
slice_var_args = self._slice_params_and_optimizes[
param_var.name]
self._slice_params_and_optimizes[
var.name] = [var, slice_var_args[1], tmpvar]
# change output's ParamOut variable # change output's ParamOut variable
outputs = self._get_output_map_from_op( outputs = self._get_output_map_from_op(
self.origin_program.global_block().vars, opt_op) self.origin_program.global_block().vars, opt_op)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册