提交 79d555b9 编写于 作者: L Luo Tao

Merge branch 'develop' into mkldnn

......@@ -106,7 +106,7 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安
- 学习 Docker 有多难?
理解 Docker 并不难,大概花十分钟看一下 `这篇文章 <https://zhuanlan.zhihu.com/p/19902938>`_ 。这可以帮您省掉花一小时安装和配置各种开发工具,以及切换机器时需要新安装的辛苦。别忘了 PaddlePaddle 更新可能导致需要新的开发工具。更别提简化问题复现带来的好处了。
理解 Docker 并不难,大概花十分钟看一下 `如何使用Docker <https://zhuanlan.zhihu.com/p/19902938>`_ 。这可以帮您省掉花一小时安装和配置各种开发工具,以及切换机器时需要新安装的辛苦。别忘了 PaddlePaddle 更新可能导致需要新的开发工具。更别提简化问题复现带来的好处了。
- 我可以用 IDE 吗?
......@@ -123,7 +123,7 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安
- 可以并行编译吗?
是的。我们的 Docker image 运行一个 `Bash脚本 <https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/docker/build.sh>`_ 。这个脚本调用 `make -j$(nproc)` 来启动和 CPU 核一样多的进程来并行编译。
是的。我们的 Docker image 运行一个 `Paddle编译Bash脚本 <https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/docker/build.sh>`_ 。这个脚本调用 `make -j$(nproc)` 来启动和 CPU 核一样多的进程来并行编译。
- Docker 需要 sudo
......@@ -131,11 +131,11 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安
- 在 Windows/MacOS 上编译很慢
Docker 在 Windows 和 MacOS 都可以运行。不过实际上是运行在一个 Linux 虚拟机上。可能需要注意给这个虚拟机多分配一些 CPU 和内存,以保证编译高效。具体做法请参考 `这个issue <https://github.com/PaddlePaddle/Paddle/issues/627>`_ 。
Docker 在 Windows 和 MacOS 都可以运行。不过实际上是运行在一个 Linux 虚拟机上。可能需要注意给这个虚拟机多分配一些 CPU 和内存,以保证编译高效。具体做法请参考 `如何为Windows/Mac计算机上的Docker增加内存和虚拟机 <https://github.com/PaddlePaddle/Paddle/issues/627>`_ 。
- 磁盘不够
本文中的例子里,`docker run` 命令里都用了 `--rm` 参数,这样保证运行结束之后的 containers 不会保留在磁盘上。可以用 `docker ps -a` 命令看到停止后但是没有删除的 containers。`docker build` 命令有时候会产生一些中间结果,是没有名字的 images,也会占用磁盘。可以参考 `这篇文章 <https://zaiste.net/posts/removing_docker_containers/>`_ 来清理这些内容。
本文中的例子里,`docker run` 命令里都用了 `--rm` 参数,这样保证运行结束之后的 containers 不会保留在磁盘上。可以用 `docker ps -a` 命令看到停止后但是没有删除的 containers。`docker build` 命令有时候会产生一些中间结果,是没有名字的 images,也会占用磁盘。可以参考 `如何删除Docker Container <https://zaiste.net/posts/removing_docker_containers/>`_ 来清理这些内容。
.. _compile_deps:
......@@ -195,7 +195,7 @@ BLAS
PaddlePaddle支持 `MKL <https://software.intel.com/en-us/intel-mkl>`_ 和
`OpenBlAS <http://www.openblas.net/>`_ 两种BLAS库。默认使用MKL。如果使用MKL并且机器含有AVX2指令集,
还会下载MKL-DNN数学库,详细参考 `这里 <https://github.com/PaddlePaddle/Paddle/tree/develop/doc/design/mkldnn#cmake>`_ 。
还会下载MKL-DNN数学库,详细参考 `mkldnn设计文档 <https://github.com/PaddlePaddle/Paddle/tree/develop/doc/design/mkldnn#cmake>`_ 。
如果关闭MKL,则会使用OpenBLAS作为BLAS库。
......
......@@ -28,6 +28,9 @@ struct DataTypeMap {
};
static DataTypeMap* InitDataTypeMap();
// C++11 removes the need for manual locking. Concurrent execution shall wait if
// a static local variable is already being initialized.
// https://stackoverflow.com/questions/11711920/how-to-implement-multithread-safe-singleton-in-c11-without-using-mutex
static DataTypeMap& gDataTypeMap() {
static DataTypeMap* g_data_type_map_ = InitDataTypeMap();
return *g_data_type_map_;
......
......@@ -42,7 +42,7 @@ void FuseVarsOpHandle::RunImpl() {
out_t->ShareDataWith(out_tensor->Slice(s, s + numel));
s += numel;
}
this->RunAndRecordEvent([this] {});
this->RunAndRecordEvent([] {});
}
std::string FuseVarsOpHandle::Name() const { return "fuse vars"; }
......
......@@ -151,7 +151,8 @@ class TRTConvertValidation {
// Compare two output
ASSERT_FALSE(fluid_out.empty());
for (size_t i = 0; i < fluid_out.size(); i++) {
EXPECT_LT(std::abs(fluid_out[i] - trt_out[i]), 1e-6);
// Loose the threshold for CI in different machine model.
EXPECT_LT(std::abs(fluid_out[i] - trt_out[i]), 2e-5);
}
}
}
......
......@@ -24,12 +24,12 @@ namespace operators {
: public ::paddle::framework::OpProtoAndCheckerMaker { \
public: \
void Make() override { \
AddInput("X", "Input of " #OP_NAME "operator"); \
AddOutput("Out", "Output of" #OP_NAME "operator"); \
AddInput("X", "Input of " #OP_NAME " operator"); \
AddOutput("Out", "Output of " #OP_NAME " operator"); \
AddAttr<bool>("use_mkldnn", \
"(bool, default false) Only used in mkldnn kernel") \
.SetDefault(false); \
AddComment(#OP_COMMENT); \
AddComment(OP_COMMENT); \
} \
}
......
......@@ -48,6 +48,13 @@ class CropOp : public framework::OperatorWithKernel {
ctx->SetOutputDim("Out", y_dim);
}
}
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext& ctx) const override {
return framework::OpKernelType(
framework::ToDataType(ctx.Input<framework::LoDTensor>("X")->type()),
ctx.device_context());
}
};
class CropOpMaker : public framework::OpProtoAndCheckerMaker {
......@@ -60,13 +67,19 @@ class CropOpMaker : public framework::OpProtoAndCheckerMaker {
"The input used as reference for cropping, "
"which is of the same dimensions as X.")
.AsDispensable();
AddInput("Offsets",
"The input used to describe offsets in runtime, which is a "
"1-D vector whose size equals to the rank of input 'X'. The "
"elements data type must be int.")
.AsDispensable();
AddOutput("Out",
"The output of crop op, "
"which is of the same dimensions as X.");
AddAttr<std::vector<int>>("offsets",
"A list<int> describing offsets to be cropped. "
"The size of offsets list should be the same as "
"the dimension size of input X.");
"the dimension size of input X.")
.SetDefault(std::vector<int>());
AddAttr<std::vector<int>>("shape",
"A list<int> describing the shape of output. "
"The size of shape list should be the same as "
......@@ -77,6 +90,17 @@ Crop Operator.
Crop input into output, as specified by offsets and shape.
There are two ways to set the offsets:
1. In runtime: Using the input 'Offsets', which is a Vairbale and can be
output of other operators. This way is suitable for
dynamic offsets.
2. In network configuration: Using the attribute 'offsets', which will be
set in Python configure script. This way is
suitable for fixed offsets.
You CANNOT use these two ways at the same time. An exception will be raised
if input 'Offset' is configured and meanwhile the attribute 'offsets' is
not empty.
There are two ways to set shape:
1. reference input: crop input X into the same shape as reference input.
The dimension of reference input should
......@@ -146,6 +170,15 @@ class CropOpGrad : public framework::OperatorWithKernel {
ctx->SetOutputDim(x_grad_name, x_dims);
}
}
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext& ctx) const override {
return framework::OpKernelType(
framework::ToDataType(
ctx.Input<framework::LoDTensor>(framework::GradVarName("Out"))
->type()),
ctx.device_context());
}
};
} // namespace operators
......
......@@ -27,6 +27,37 @@ template <typename T, size_t D, int MajorType = Eigen::RowMajor,
using EigenTensor = framework::EigenTensor<T, D, MajorType, IndexType>;
using framework::Tensor;
static std::vector<int> GetOffsets(const framework::ExecutionContext& ctx) {
std::vector<int> res;
int rank = ctx.Input<Tensor>("X")->dims().size();
if (ctx.HasInput("Offsets")) {
PADDLE_ENFORCE(ctx.Attr<std::vector<int>>("offsets").empty(),
"Input 'Offsets' and attribute 'offsets' should not be used "
"at the same time.");
const auto* offsets_tensor = ctx.Input<Tensor>("Offsets");
PADDLE_ENFORCE_EQ(offsets_tensor->dims().size(), 1);
PADDLE_ENFORCE_EQ(
rank, offsets_tensor->dims()[0],
"Offsets size should be equal to dimension size of input tensor.");
const int* offsets_data;
framework::Tensor cpu_tmp_tensor;
if (platform::is_cpu_place(offsets_tensor->place())) {
offsets_data = offsets_tensor->data<int>();
} else {
framework::TensorCopySync(*offsets_tensor, platform::CPUPlace(),
&cpu_tmp_tensor);
offsets_data = cpu_tmp_tensor.data<int>();
}
res = std::vector<int>(offsets_data, offsets_data + rank);
} else {
res = ctx.Attr<std::vector<int>>("offsets");
PADDLE_ENFORCE_EQ(
rank, res.size(),
"Offsets size should be equal to dimension size of input tensor.");
}
return res;
}
template <typename T>
class CropKernel : public framework::OpKernel<T> {
public:
......@@ -37,10 +68,7 @@ class CropKernel : public framework::OpKernel<T> {
T* out_data = out->mutable_data<T>(context.GetPlace());
auto x_stride = framework::stride(x->dims());
auto out_stride = framework::stride(out->dims());
auto offsets = context.Attr<std::vector<int>>("offsets");
PADDLE_ENFORCE_EQ(
x->dims().size(), static_cast<int64_t>(offsets.size()),
"Offsets size should be equal to dimension size of input tensor.");
auto offsets = GetOffsets(context);
int64_t offset = 0;
for (size_t i = 0; i < offsets.size(); ++i) {
offset += (x_stride[i] * offsets[i]);
......@@ -56,7 +84,7 @@ void CropGradFunction(const framework::ExecutionContext& context) {
if (d_x != nullptr) {
auto* d_out = context.Input<Tensor>(framework::GradVarName("Out"));
d_x->mutable_data<T>(context.GetPlace());
auto offsets = context.Attr<std::vector<int>>("offsets");
auto offsets = GetOffsets(context);
Eigen::array<std::pair<int, int>, D> paddings;
for (size_t i = 0; i < D; ++i) {
paddings[i].first = offsets[i];
......
......@@ -80,7 +80,6 @@ class RequestHandler {
}
framework::ProgramDesc* program() { return program_; }
framework::Executor* executor() { return executor_; }
std::vector<framework::Variable*>& sparse_vars() { return sparse_vars_; }
// This function processes user's rpc request.
// The implemention is in request_handler_impl.
......@@ -113,13 +112,7 @@ class RequestHandler {
std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>*
grad_to_prepared_ctx_;
// Record received sparse variables, so that
// we could reset those after execute optimize program
std::vector<framework::Variable*> sparse_vars_;
RPCServer* rpc_server_;
std::mutex sparse_var_mutex_;
};
} // namespace detail
......
......@@ -63,16 +63,22 @@ bool RequestSendHandler::Handle(const std::string& varname,
PADDLE_THROW("sync: Can not find server side var");
return false;
}
if (invar->IsType<framework::SelectedRows>()) {
std::unique_lock<std::mutex> lock(sparse_var_mutex_);
std::unique_lock<std::mutex> lock(mutex_sparse_vars_);
sparse_vars_.push_back(invar);
}
}
return true;
}
void RequestSendHandler::ResetSparseVarRecorder() {
std::unique_lock<std::mutex> lock(mutex_sparse_vars_);
for (auto* var : sparse_vars_) {
var->GetMutable<framework::SelectedRows>()->mutable_rows()->clear();
}
sparse_vars_.clear();
}
bool RequestGetHandler::Handle(const std::string& varname,
framework::Scope* scope,
framework::Variable* invar,
......
......@@ -41,6 +41,11 @@ class RequestSendHandler final : public RequestHandler {
virtual ~RequestSendHandler() {}
bool Handle(const std::string& varname, framework::Scope* scope,
framework::Variable* var, framework::Variable** outvar) override;
void ResetSparseVarRecorder();
private:
std::mutex mutex_sparse_vars_;
std::vector<framework::Variable*> sparse_vars_;
};
class RequestGetHandler final : public RequestHandler {
......
......@@ -60,6 +60,7 @@ class RPCServer {
void SetCond(const std::string& rpc_name);
void WaitCond(const std::string& rpc_name);
void IncreaseBatchBarrier(const std::string rpc_name);
void ResetBarrierCounter();
protected:
......
......@@ -43,7 +43,8 @@ TEST(Gather, GatherData) {
auto* cpu_place = new paddle::platform::CPUPlace();
paddle::platform::CPUDeviceContext ctx(*cpu_place);
paddle::operators::CPUGather<int>(ctx, *src, *index, output);
delete cpu_place;
cpu_place = NULL;
for (int i = 0; i < 4; ++i) EXPECT_EQ(p_output[i], i + 4);
for (int i = 4; i < 8; ++i) EXPECT_EQ(p_output[i], i - 4);
......
......@@ -108,9 +108,6 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor,
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
rpc_service_->ResetBarrierCounter();
// Record received sparse variables, so that
// we could reset those after execute optimize program
std::vector<framework::Variable *> sparse_vars;
while (true) {
// Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient.
......@@ -146,18 +143,12 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor,
recv_scope);
VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
// Reset the received sparse variables, the sum operator would not
// sum the input sparse variables which rows is empty at the next
// mini-batch.
// TODO(Yancey1989): move the reset action into an operator, we couldn't
// have any hide logic in the operator.
for (framework::Variable *var : sparse_vars) {
var->GetMutable<framework::SelectedRows>()->mutable_rows()->clear();
}
rpc_service_->SetCond(detail::kRequestGet);
rpc_service_->WaitBarrier(detail::kRequestGet);
rpc_service_->ResetBarrierCounter();
// reset received sparse vars to avoid reuse it in the next mini-batch
dynamic_cast<detail::RequestSendHandler *>(request_send_handler_.get())
->ResetSparseVarRecorder();
} // while(true)
}
......
......@@ -77,6 +77,8 @@ TEST(math_function, gemm_trans_clbas) {
paddle::platform::CPUDeviceContext context(*cpu_place);
GetBlas<float>(context).GEMM(false, true, m, n, k, 1, input1_ptr, 3,
input2_ptr + 3, 3, 1, input3_ptr + 1, 4);
delete cpu_place;
cpu_place = NULL;
EXPECT_EQ(input3_ptr[0], 0);
EXPECT_EQ(input3_ptr[1], 24);
......
......@@ -20,7 +20,6 @@ class RandomCropOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
protected:
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext& ctx) const override {
return framework::OpKernelType(
......@@ -36,7 +35,7 @@ class RandomCropOpMaker : public framework::OpProtoAndCheckerMaker {
AddInput("Seed", "The random seed.");
AddOutput("Out", "The cropped instance batch.");
AddOutput("SeedOut", "The random seed after random cropping.")
.AsDispensable();
.AsIntermediate();
AddAttr<std::vector<int>>("shape", "The shape of a cropped instance.");
AddComment(R"DOC(
This operator takes a batch of instance, and do random cropping on each instance.
......
......@@ -26,6 +26,7 @@ from trainer import BeginEpochEvent
from trainer import EndEpochEvent
from trainer import BeginStepEvent
from trainer import EndStepEvent
from trainer import CheckpointConfig
import inferencer
from inferencer import Inferencer
......
......@@ -363,6 +363,13 @@ class OpProtoHolder(object):
raise ValueError("Operator \"%s\" has not been registered." % type)
return self.op_proto_map[type]
@staticmethod
def generated_op_attr_names():
return {
core.op_proto_and_checker_maker.kOpRoleAttrName(),
core.op_proto_and_checker_maker.kOpRoleVarAttrName()
}
class Operator(object):
"""
......
......@@ -56,6 +56,8 @@ class Inferencer(object):
else:
self.exe = executor.Executor(self.place)
self.inference_program = self.inference_program.clone(for_test=True)
def infer(self, inputs, return_numpy=True):
"""
:param inputs: a map of {"input_name": input_var} that will be feed into the inference program
......
......@@ -24,7 +24,8 @@ __all__ = [
'save_vars', 'save_params', 'save_persistables', 'load_vars', 'load_params',
'load_persistables', 'save_inference_model', 'load_inference_model',
'get_inference_program', 'save_checkpoint', 'load_checkpoint',
'clean_checkpoint'
'clean_checkpoint', 'load_persist_vars_without_grad',
'save_persist_vars_without_grad', 'get_latest_checkpoint_serial'
]
......@@ -457,95 +458,161 @@ def get_parameter_value_by_name(name, executor, program=None):
SUCCESS_MARK_FILENAME = "_SUCCESS"
CHECKPOINT_PREFIX = "checkpoint"
MODEL_DIR = "__model__"
TRAINER_PREFIX = "trainer"
CHECKPOINT_SEPARATOR = "_"
def save_checkpoint(executor,
checkpoint_dir=None,
max_num_checkpoints=3,
save_interval_secs=600,
main_program=None):
checkpoint_dir,
trainer_id,
trainer_args=None,
main_program=None,
max_num_checkpoints=3):
"""
Save Checkpoint will save persistable LodTensor variables from main_program in checkpoint directory,
the directory named by serial number from 0 to (n -1), save_checkpoint use LRU strategy
to keep numbers of checkpoint directory, the numbers of checkpoint directory are max_num_checkpoints at most,
The interval between two saved checkpoints must greater than save_interval_secs.
:param executor
:param checkpoint_dir
:param max_num_checkpoints
:param save_interval_secs
:param main_program
:param executor executor for save the value
:param checkpoint_dir the checkpoint directory
:param trainer_id currect trainer id, if id is equal to 0, the trainer is chief
:param main_program will save all variables in program
:param max_num_checkpoints will keep numbers of checkpoint serials not bigger than max_num_checkpoints
"""
if checkpoint_dir is None:
checkpoint_dir = os.getcwd()
raise ValueError("'checkpoint_dir' should not be None")
if trainer_args:
assert isinstance(trainer_args, dict)
if not os.path.isdir(checkpoint_dir):
os.makedirs(checkpoint_dir)
serial = _get_lastest_checkpoint_dir(checkpoint_dir)
if serial >= 0 and not _interval_secs_exceed(
_get_serial_dir(serial, checkpoint_dir), save_interval_secs):
return
serial = get_latest_checkpoint_serial(checkpoint_dir) + 1
cur_dir = _get_serial_dir(checkpoint_dir, serial)
serial += 1
cur_dir = _get_serial_dir(serial, checkpoint_dir)
save_trainer_args(cur_dir, trainer_id, trainer_args)
save_vars(
executor,
dirname=cur_dir,
main_program=main_program,
vars=None,
predicate=_is_checkpoint_var,
filename=None)
_write_success(cur_dir)
_lru_delete(checkpoint_dir, max_num_checkpoints)
if trainer_id == 0:
save_persist_vars_without_grad(executor, cur_dir, main_program)
_scroll_delete(checkpoint_dir, max_num_checkpoints)
def load_checkpoint(executor, checkpoint_dir=None, main_program=None):
def load_checkpoint(executor, checkpoint_dir, serial, main_program):
"""
Load checkpoint from a directory by executor,
it will find the most recent saved checkpoint file and load it auto.
:param executor
:param checkpoint_dir
:param main_program
:param executor executor for load the value
:param checkpoint_dir the checkpoint directory
:param serial the serial folder in checkpoint directory will be load
:param main_program will load all variables in program
"""
if checkpoint_dir is None:
checkpoint_dir = os.getcwd()
raise ValueError("'checkpoint_dir' should not be None")
serial = _get_lastest_checkpoint_dir(checkpoint_dir)
if serial is None or serial < 0:
raise ValueError("'serial' should not be None or <0 ")
if serial < 0:
return
if main_program is None:
raise ValueError('main_program should not be None.')
cur_dir = _get_serial_dir(serial, checkpoint_dir)
load_vars(
executor,
dirname=cur_dir,
main_program=main_program,
predicate=_is_checkpoint_var,
filename=None)
cur_dir = _get_serial_dir(checkpoint_dir, serial)
load_persist_vars_without_grad(executor, cur_dir, main_program, True)
def clean_checkpoint(checkpoint_dir, delete_dir=False):
"""
clean the checkpoint dir, when the train exits normally, the trainer will call clean_checkpoint to delete checkpoint directory saved before.
delete_dir only works when the directory is empty, otherwise, OSError is raised.
:param checkpoint_dir
:param delete_dir
"""
if checkpoint_dir is None:
checkpoint_dir = os.getcwd()
_lru_delete(checkpoint_dir, max_num_checkpoints=0)
raise ValueError("'checkpoint_dir' should not be None")
_scroll_delete(checkpoint_dir, max_num_checkpoints=0)
if delete_dir and not os.listdir(checkpoint_dir):
os.rmdir(checkpoint_dir)
def _get_serial_dir(serial, checkpoint_dir):
serial_folder = CHECKPOINT_PREFIX + CHECKPOINT_SEPARATOR + str(serial)
return os.path.join(checkpoint_dir, serial_folder)
def load_persist_vars_without_grad(executor,
dirname,
program,
has_model_dir=False):
"""
load_persist_vars_without_grad will load variables from a directory by an executor,
the variable named end with "@GRAD" will not be loaded.
:param executor executor for load the value
:param dirname the checkpoint directory
:param program will load all variables in program
:param has_model_dir if has_model_dir is True, will load variables from sub directory named __model__
"""
if has_model_dir:
dirname = _get_model_dir(dirname)
load_vars(
executor,
dirname=dirname,
main_program=program,
predicate=_is_checkpoint_var,
filename=None)
def save_persist_vars_without_grad(executor, dirname, program):
"""
save_persist_vars_without_grad will save variables to a directory by an executor,
the variable named end with "@GRAD" will not be saved.
:param executor executor for load the value
:param dirname the checkpoint directory
:param program will load all variables in program
"""
cur_dir = _get_model_dir(dirname)
save_vars(
executor,
dirname=cur_dir,
main_program=program,
vars=None,
predicate=_is_checkpoint_var,
filename=None)
_write_success(cur_dir)
def save_trainer_args(dirname, trainer_id, trainer_args):
assert isinstance(trainer_args, dict)
cur_dir = _get_trainer_dir(dirname, trainer_id)
for name, value in trainer_args.iteritems():
args_file = os.path.join(cur_dir, name)
with open(args_file, 'w') as f:
f.write(str(value))
_write_success(cur_dir)
def load_trainer_args(checkpoint_dir, serial, trainer_id, trainer_args):
assert isinstance(trainer_args, list)
cur_dir = _get_serial_dir(checkpoint_dir, serial)
cur_dir = _get_trainer_dir(cur_dir, trainer_id)
ret_values = []
for arg in trainer_args:
cur_file = os.path.join(cur_dir, arg)
with open(cur_file, 'r') as f:
contents = f.read()
ret_values.append(contents.strip())
return ret_values
def _is_checkpoint_var(var):
......@@ -559,36 +626,74 @@ def _is_checkpoint_var(var):
var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \
var.desc.type() == core.VarDesc.VarType.RAW:
return False
# @GRAD are named for gradient variables, checkpoint will not save it.
if "@GRAD" in var.name:
return False
# .trainer_ are named for distribute train variables, checkpoint will not save it.
if ".trainer_" in var.name:
return False
if var.name.endswith("@GRAD"):
# .block is named for distribute train variables, checkpoint will not save it.
if ".block" in var.name:
return False
return var.persistable
def _interval_secs_exceed(dirname, save_interval_secs):
dir_time = os.path.getmtime(dirname)
if save_interval_secs > (time.time() - dir_time):
return False
return True
def _get_dir_serial(dirname):
_, serial = dirname.split(CHECKPOINT_SEPARATOR)
try:
serial_num = int(serial)
except ValueError:
serial_num = -1
return serial_num
def _lru_delete(dirname, max_num_checkpoints=3):
def _get_serial_dir(dirname, serial):
serial_folder = CHECKPOINT_PREFIX + CHECKPOINT_SEPARATOR + str(serial)
serial_dir = os.path.join(dirname, serial_folder)
if not os.path.isdir(serial_dir):
os.makedirs(serial_dir)
return serial_dir
def _get_model_dir(dirname):
model_dir = os.path.join(dirname, MODEL_DIR)
if not os.path.isdir(model_dir):
os.makedirs(model_dir)
return model_dir
def _get_trainer_dir(dirname, trainer_id):
trainer_folder = TRAINER_PREFIX + CHECKPOINT_SEPARATOR + str(trainer_id)
trainer_dir = os.path.join(dirname, trainer_folder)
if not os.path.isdir(trainer_dir):
os.makedirs(trainer_dir)
return trainer_dir
def _scroll_delete(dirname, max_num_checkpoints=3):
dirs = os.listdir(dirname)
serials = []
serial_map = {}
for serial in dirs:
try:
serials.append(int(serial))
except ValueError:
continue
serial_num = _get_dir_serial(serial)
serial_map[serial_num] = serial
if len(serials) <= max_num_checkpoints:
if len(serial_map.keys()) <= max_num_checkpoints:
return
serials = serial_map.keys()
serials.sort(reverse=True)
serials = serials[max_num_checkpoints:]
for serial in serials:
cur_dir = os.path.join(dirname, str(serial))
cur_dir = _get_serial_dir(dirname, serial)
shutil.rmtree(cur_dir)
......@@ -604,33 +709,30 @@ def _write_success(dirname):
f.write(now)
def _get_lastest_checkpoint_dir(checkpoint_dir):
def get_latest_checkpoint_serial(checkpoint_dir):
"""
get the latest file in checkpoint directory, the _SUCCESS file must exist in the directory
:param checkpoint_dir
"""
if not checkpoint_dir.strip():
if not checkpoint_dir:
return -1
def has_success(checkpoint_dir, cur_dir):
"""
is _SUCCESS in this dir
"""
_, serial = cur_dir.split(CHECKPOINT_SEPARATOR)
try:
int(serial)
except ValueError:
return -1
if not os.path.isdir(os.path.join(checkpoint_dir, cur_dir)):
serial = _get_dir_serial(cur_dir)
if serial == -1 or not os.path.isdir(
os.path.join(checkpoint_dir, cur_dir)):
return -1
success_path = os.path.join(
_get_serial_dir(serial, checkpoint_dir), SUCCESS_MARK_FILENAME)
_get_serial_dir(checkpoint_dir, serial), MODEL_DIR,
SUCCESS_MARK_FILENAME)
if os.path.isfile(success_path):
return int(serial)
return serial
if not os.path.isdir(checkpoint_dir):
return -1
......
......@@ -15,16 +15,13 @@ import re
import cStringIO
import functools
import warnings
import string
from ..proto import framework_pb2
from ..framework import OpProtoHolder, Variable
from ..layer_helper import LayerHelper
__all__ = [
'deprecated',
'generate_layer_fn',
'autodoc',
]
__all__ = ['deprecated', 'generate_layer_fn', 'autodoc', 'templatedoc']
def _convert_(name):
......@@ -43,6 +40,10 @@ def _convert_(name):
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
def _type_to_str_(tp):
return framework_pb2.AttrType.Name(tp)
def _generate_doc_string_(op_proto):
"""
Generate docstring by OpProto
......@@ -54,9 +55,6 @@ def _generate_doc_string_(op_proto):
str: the document string
"""
def _type_to_str_(tp):
return framework_pb2.AttrType.Name(tp)
if not isinstance(op_proto, framework_pb2.OpProto):
raise TypeError("OpProto should be `framework_pb2.OpProto`")
......@@ -75,7 +73,11 @@ def _generate_doc_string_(op_proto):
buf.write(str(each_input.dispensable))
buf.write('\n')
skip_attrs = OpProtoHolder.generated_op_attr_names()
for each_attr in op_proto.attrs:
if each_attr.name in skip_attrs:
continue
buf.write(' ')
buf.write(each_attr.name)
buf.write(' (')
......@@ -220,3 +222,49 @@ def autodoc(comment=""):
return func
return __impl__
def templatedoc():
"""
Decorator of layer function. It will use the docstring from the layer
function as the template. The template arguments are:
* ${comment}: The operator comment written in CPP.
* ${{name}_comment}: The comment of ${name} written with AddAttr, AddOutput,
and AddInput. The ${name} is Python snake style. i.e., xxx_xxx.
* ${{name}_type}: The type of ${name}.
Returns:
Decorated function.
"""
def __impl__(func):
op_proto = OpProtoHolder.instance().get_op_proto(func.__name__)
tmpl = string.Template(func.__doc__)
comment_lines = op_proto.comment.split("\n")
comment = ""
for line in comment_lines:
line = line.lstrip()
comment += line
comment += "\n"
args = {"comment": comment}
for each_input in op_proto.inputs:
input_name = _convert_(each_input.name)
args["{0}_comment".format(input_name)] = each_input.comment
args["{0}_type".format(input_name)] = "Variable"
for each_attr in op_proto.attrs:
input_name = _convert_(each_attr.name)
args["{0}_comment".format(input_name)] = each_attr.comment
args["{0}_type".format(input_name)] = _type_to_str_(each_attr.type)
for each_opt in op_proto.outputs:
output_name = _convert_(each_opt.name)
args["{0}_comment".format(output_name)] = each_opt.comment
args["{0}_type".format(output_name)] = "Variable"
func.__doc__ = tmpl.substitute(args)
return func
return __impl__
......@@ -64,10 +64,6 @@ def auc(input, label, curve='ROC', num_thresholds=200):
topk_indices = helper.create_tmp_variable(dtype="int64")
topk_out, topk_indices = nn.topk(input, k=k)
auc_out = helper.create_tmp_variable(dtype="float32")
if correct is None:
correct = helper.create_tmp_variable(dtype="int64")
if total is None:
total = helper.create_tmp_variable(dtype="int64")
helper.append_op(
type="accuracy",
inputs={
......
......@@ -19,9 +19,10 @@ from ..layer_helper import LayerHelper
from ..initializer import Normal, Constant
from ..framework import Variable
from ..param_attr import ParamAttr
from layer_function_generator import autodoc
from layer_function_generator import autodoc, templatedoc
from tensor import concat
import utils
import random
__all__ = [
'fc',
......@@ -801,7 +802,22 @@ def gru_unit(input,
return updated_hidden, reset_hidden_pre, gate
@templatedoc()
def linear_chain_crf(input, label, param_attr=None):
"""
Linear Chain CRF.
${comment}
Args:
input(${emission_type}): ${emission_comment}
label(${label_type}): ${label_comment}
param_attr(ParamAttr): The attribute of the learnable parameter.
Returns:
${log_likelihood_comment}
"""
helper = LayerHelper('linear_chain_crf', **locals())
size = input.shape[1]
transition = helper.create_parameter(
......@@ -827,7 +843,19 @@ def linear_chain_crf(input, label, param_attr=None):
return log_likelihood
@templatedoc()
def crf_decoding(input, param_attr, label=None):
"""
${comment}
Args:
input(${emission_type}): ${emission_comment}
param_attr(ParamAttr): The parameter attribute for training.
label(${label_type}): ${label_comment}
Returns:
${viterbi_path_comment}
"""
helper = LayerHelper('crf_decoding', **locals())
transition = helper.get_parameter(param_attr.name)
viterbi_path = helper.create_tmp_variable(dtype=helper.input_dtype())
......@@ -4107,10 +4135,31 @@ def gather(input, index):
return out
def random_crop(input, shape, seed=1):
@templatedoc()
def random_crop(x, shape, seed=None):
"""
${comment}
Examples:
>>> img = fluid.layers.data("img", [3, 256, 256])
>>> cropped_img = fluid.layers.random_crop(img, shape=[3, 224, 224])
Args:
x(${x_type}): ${x_comment}
shape(${shape_type}): ${shape_comment}
seed(int|${seed_type}|None): ${seed_comment} By default, the seed will
get from `random.randint(-65536, 65535)`.
Returns:
${out_comment}
"""
helper = LayerHelper("random_crop", **locals())
dtype = helper.input_dtype()
out = helper.create_tmp_variable(dtype)
if seed is None:
seed = random.randint(-65536, 65535)
if isinstance(seed, int):
seed_value = seed
seed = helper.create_tmp_variable(dtype="int64")
......
......@@ -73,6 +73,7 @@ __all__ = [
'sum',
'polygon_box_transform',
'shape',
'maxout',
] + __activations__
for _OP in set(__all__):
......
......@@ -38,7 +38,7 @@ def inference_program():
return y_predict
def linear():
def train_program():
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
y_predict = inference_program()
......@@ -104,7 +104,7 @@ def main(use_cuda):
# Directory for saving the trained model
params_dirname = "fit_a_line.inference.model"
train(use_cuda, linear, params_dirname)
train(use_cuda, train_program, params_dirname)
infer(use_cuda, inference_program, params_dirname)
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import unittest
import os
import tempfile
class TestCheckpoint(unittest.TestCase):
def setUp(self):
self.dirname = tempfile.mktemp()
self.max_num_checkpoints = 3
self.epoch_interval = 1
self.step_interval = 1
self.trainer_id = 0
self.chief = self.trainer_id == 0
self.place = fluid.CPUPlace()
self.epoch_id = 100
self.step_id = 20
def test_checkpoint(self):
self.save_checkpoint()
serial = fluid.io.get_latest_checkpoint_serial(self.dirname)
self.assertTrue(serial >= 0)
trainer_args = ["epoch_id", "step_id"]
epoch_id, step_id = fluid.io.load_trainer_args(
self.dirname, serial, self.trainer_id, trainer_args)
self.assertEqual(self.step_id, int(step_id))
self.assertEqual(self.epoch_id, int(epoch_id))
program = fluid.Program()
with fluid.program_guard(program):
exe = fluid.Executor(self.place)
fluid.io.load_checkpoint(exe, self.dirname, serial, program)
fluid.io.clean_checkpoint(self.dirname, delete_dir=True)
self.assertFalse(os.path.isdir(self.dirname))
def save_checkpoint(self):
config = fluid.CheckpointConfig(self.dirname, self.max_num_checkpoints,
self.epoch_interval, self.step_interval)
trainer_args = {}
trainer_args["epoch_id"] = self.epoch_id
trainer_args["step_id"] = self.step_id
program = fluid.Program()
with fluid.program_guard(program):
program.global_block().create_var(
name="scale_0",
psersistable=True,
dtype="float32",
shape=[32, 32])
exe = fluid.Executor(self.place)
for i in xrange(10):
fluid.io.save_checkpoint(exe, config.checkpoint_dir,
self.trainer_id, trainer_args, program,
config.max_num_checkpoints)
if __name__ == '__main__':
unittest.main()
......@@ -42,9 +42,9 @@ class TestCropOp(OpTest):
def setUp(self):
self.op_type = "crop"
self.crop_by_input = False
self.offset_by_input = False
self.attrs = {}
self.initTestCase()
self.attrs['offsets'] = self.offsets
if self.crop_by_input:
self.inputs = {
'X': np.random.random(self.x_shape).astype("float32"),
......@@ -55,6 +55,10 @@ class TestCropOp(OpTest):
self.inputs = {
'X': np.random.random(self.x_shape).astype("float32"),
}
if self.offset_by_input:
self.inputs['Offsets'] = np.array(self.offsets).astype('int32')
else:
self.attrs['offsets'] = self.offsets
self.outputs = {
'Out': crop(self.inputs['X'], self.offsets, self.crop_shape)
}
......@@ -101,5 +105,22 @@ class TestCase4(TestCropOp):
self.crop_by_input = True
class TestCase5(TestCropOp):
def initTestCase(self):
self.x_shape = (3, 4, 5)
self.crop_shape = [2, 2, 3]
self.offsets = [1, 0, 2]
self.offset_by_input = True
class TestCase6(TestCropOp):
def initTestCase(self):
self.x_shape = (10, 9, 14)
self.crop_shape = [3, 3, 5]
self.offsets = [3, 5, 4]
self.crop_by_input = True
self.offset_by_input = True
if __name__ == '__main__':
unittest.main()
......@@ -30,9 +30,6 @@ class Memory(object):
assert val.dtype == self.ex.dtype
self.cur = val
def ex(self):
return self.ex
def next(self):
self.ex = self.cur
self.cur = None
......
......@@ -387,6 +387,14 @@ class TestBook(unittest.TestCase):
self.assertIsNotNone(output)
print(str(program))
def test_maxout(self):
program = Program()
with program_guard(program):
data = layers.data(name='x', shape=[8, 6, 6], dtype="float32")
output = layers.maxout(x=data, groups=2)
self.assertIsNotNone(output)
print(str(program))
if __name__ == '__main__':
unittest.main()
......@@ -27,11 +27,8 @@ import parallel_executor
from transpiler import distribute_transpiler
__all__ = [
'Trainer',
'BeginEpochEvent',
'EndEpochEvent',
'BeginStepEvent',
'EndStepEvent',
'Trainer', 'BeginEpochEvent', 'EndEpochEvent', 'BeginStepEvent',
'EndStepEvent', 'CheckpointConfig'
]
......@@ -59,6 +56,35 @@ class EndStepEvent(object):
self.metrics = metrics
class CheckpointConfig(object):
def __init__(self,
checkpoint_dir=None,
max_num_checkpoints=3,
epoch_interval=1,
step_interval=10):
if checkpoint_dir is None:
self.checkpoint_dir = os.getcwd()
else:
self.checkpoint_dir = checkpoint_dir
self.max_num_checkpoints = max_num_checkpoints
if epoch_interval < 1:
self.epoch_interval = 1
else:
self.epoch_interval = epoch_interval
if step_interval < 1:
self.step_interval = 10
else:
self.step_interval = step_interval
self.epoch_id = 0
self.step_id = 0
self.load_serial = None
self.is_pserver = False
def check_and_get_place(place):
"""
Check the type of place or get the default place
......@@ -99,13 +125,24 @@ class Trainer(object):
optimizer_func,
param_path=None,
place=None,
parallel=False):
parallel=False,
checkpoint_config=None):
self.__stop = False
self.parallel = parallel
# 1. we need to generate a framework.Program by calling
# program_func. Reference: fluid.program_guard in
# test_word2vec.py
# config for checkpoint
# only chief worker will save variables
self.trainer_id = 0
self.checkpoint_cfg = checkpoint_config
if self.checkpoint_cfg:
assert isinstance(self.checkpoint_cfg, CheckpointConfig)
serial = io.get_latest_checkpoint_serial(
self.checkpoint_cfg.checkpoint_dir)
self.checkpoint_cfg.load_serial = serial if serial >= 0 else None
self.scope = core.Scope()
self.startup_program = framework.Program()
......@@ -115,9 +152,9 @@ class Trainer(object):
program_func_outs = train_func()
self.train_func_outputs = program_func_outs if isinstance(
program_func_outs, list) else [program_func_outs]
self.test_program = self.train_program.clone()
self.test_program = self.train_program.clone(for_test=True)
# The fisrt element of program_func_outs is loss.
# The first element of program_func_outs is loss.
loss = self.train_func_outputs[0]
optimizer = optimizer_func()
......@@ -137,9 +174,25 @@ class Trainer(object):
exe = executor.Executor(place)
exe.run(self.startup_program)
if param_path:
if self.checkpoint_cfg and self.checkpoint_cfg.load_serial:
with self._prog_and_scope_guard():
exe = executor.Executor(place)
io.load_checkpoint(exe, self.checkpoint_cfg.checkpoint_dir,
self.checkpoint_cfg.load_serial,
self.startup_program)
if not self.checkpoint_cfg.is_pserver:
epoch_id, step_id = io.load_trainer_args(
self.checkpoint_cfg.checkpoint_dir,
self.checkpoint_cfg.load_serial, self.trainer_id,
self._get_checkpoint_load_args())
self.checkpoint_cfg.epoch_id = int(epoch_id)
self.checkpoint_cfg.step_id = int(step_id)
if param_path and os.path.isdir(param_path):
# load params from param_path into scope
io.load_persistables(exe, dirname=param_path)
io.load_persist_vars_without_grad(
exe, dirname=param_path, program=self.startup_program)
def _transpile_nccl2_dist(self):
# PADDLE_TRAINER_IPS
......@@ -194,14 +247,18 @@ class Trainer(object):
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
# the unique trainer id, starting from 0, needed by trainer
# only
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
# the role, should be either PSERVER or TRAINER
training_role = os.getenv("PADDLE_TRAINING_ROLE")
with self._prog_and_scope_guard():
t = distribute_transpiler.DistributeTranspiler()
t.transpile(
trainer_id, pservers=pserver_endpoints, trainers=trainers)
self.trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
if self.checkpoint_cfg:
self.is_pserver = True
self.train_program = t.get_pserver_program(current_endpoint)
self.startup_program = t.get_startup_program(current_endpoint,
self.train_program)
......@@ -294,11 +351,26 @@ class Trainer(object):
self._train_by_any_executor(event_handler, exe, num_epochs, reader)
def _train_by_any_executor(self, event_handler, exe, num_epochs, reader):
for epoch_id in range(num_epochs):
if self.checkpoint_cfg:
epochs = [
epoch_id for epoch_id in range(num_epochs)
if epoch_id >= self.checkpoint_cfg.epoch_id
]
else:
epochs = [epoch_id for epoch_id in range(num_epochs)]
for epoch_id in epochs:
event_handler(BeginEpochEvent(epoch_id))
for step_id, data in enumerate(reader()):
if self.__stop:
if self.checkpoint_cfg:
self._clean_checkpoint()
return
if self.checkpoint_cfg and self.checkpoint_cfg.load_serial \
and self.checkpoint_cfg.step_id >= step_id and self.checkpoint_cfg.epoch_id == epoch_id:
continue
begin_event = BeginStepEvent(epoch_id, step_id)
event_handler(begin_event)
if begin_event.fetch_metrics:
......@@ -309,8 +381,13 @@ class Trainer(object):
])
else:
metrics = exe.run(feed=data, fetch_list=[])
if self.checkpoint_cfg:
self._save_checkpoint(epoch_id, step_id)
event_handler(EndStepEvent(epoch_id, step_id, metrics))
event_handler(EndEpochEvent(epoch_id))
if self.checkpoint_cfg:
self._clean_checkpoint()
def _test_by_executor(self, reader, feed_order, fetch_list):
with executor.scope_guard(self.scope):
......@@ -349,6 +426,38 @@ class Trainer(object):
loss_name=self.train_func_outputs[0].name)
return self._get_parallel_executor()
def _clean_checkpoint(self):
assert self.checkpoint_cfg
io.clean_checkpoint(checkpoint_dir=self.checkpoint_cfg.checkpoint_dir)
def _get_checkpoint_load_args(self):
"""
epoch_id and step_id are runtime arguments, they are not variables, will load them independently.
"""
return ["epoch_id", "step_id"]
def _get_checkpoint_save_args(self, epoch_id, step_id):
"""
epoch_id and step_id are runtime arguments, they are not variables, will save them independently.
"""
trainer_args = {}
trainer_args["epoch_id"] = epoch_id
trainer_args["step_id"] = step_id
return trainer_args
def _save_checkpoint(self, epoch_id, step_id):
assert self.checkpoint_cfg
if epoch_id % self.checkpoint_cfg.epoch_interval == 0 and step_id % self.checkpoint_cfg.step_interval == 0:
exe = executor.Executor(self.place)
io.save_checkpoint(
executor=exe,
checkpoint_dir=self.checkpoint_cfg.checkpoint_dir,
trainer_id=self.trainer_id,
trainer_args=self._get_checkpoint_save_args(epoch_id, step_id),
main_program=self.train_program,
max_num_checkpoints=self.checkpoint_cfg.max_num_checkpoints)
def build_feed_var_list(program, feed_order):
if not isinstance(program, framework.Program):
......
......@@ -177,6 +177,7 @@ class DistributeTranspiler:
dtype=table_grad_var.dtype)
for index in range(len(self.pserver_endpoints))
]
return param_list, grad_list
def _init_splited_vars(self, slice_var_up):
# update these mappings for further transpile:
......@@ -199,8 +200,8 @@ class DistributeTranspiler:
grad_list.append(g)
param_grad_set.add(g.name)
self._update_dist_lookup_table_vars(param_list, grad_list,
self.params_grads)
param_list, grad_list = self._update_dist_lookup_table_vars(
param_list, grad_list, self.params_grads)
if slice_var_up:
# when we slice var up into blocks, we will slice the var according to
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册