未验证 提交 546a0d3c 编写于 作者: T tangwei12 提交者: GitHub

trainer from dataset fetch targets (#19760) (#20182)

add executor.FetchHandler for train/infer from the dataset
上级 bad312c3
......@@ -16,7 +16,7 @@
if(WIN32)
if(NOT PYTHON_EXECUTABLE)
FIND_PACKAGE(PythonInterp REQUIRED)
FIND_PACKAGE(PythonInterp REQUIRED)
endif()
endif()
......@@ -78,26 +78,26 @@ add_custom_target(inference_lib_dist DEPENDS ${inference_lib_deps})
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/eigen3")
copy(inference_lib_dist
SRCS ${EIGEN_INCLUDE_DIR}/Eigen/Core ${EIGEN_INCLUDE_DIR}/Eigen/src ${EIGEN_INCLUDE_DIR}/unsupported/Eigen
DSTS ${dst_dir}/Eigen ${dst_dir}/Eigen ${dst_dir}/unsupported)
SRCS ${EIGEN_INCLUDE_DIR}/Eigen/Core ${EIGEN_INCLUDE_DIR}/Eigen/src ${EIGEN_INCLUDE_DIR}/unsupported/Eigen
DSTS ${dst_dir}/Eigen ${dst_dir}/Eigen ${dst_dir}/unsupported)
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/boost")
copy(inference_lib_dist
SRCS ${BOOST_INCLUDE_DIR}/boost
DSTS ${dst_dir})
SRCS ${BOOST_INCLUDE_DIR}/boost
DSTS ${dst_dir})
if(WITH_MKLML)
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/mklml")
if(WIN32)
copy(inference_lib_dist
SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_SHARED_LIB}
SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_SHARED_LIB}
${MKLML_SHARED_LIB_DEPS} ${MKLML_SHARED_IOMP_LIB} ${MKLML_INC_DIR}
DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}/lib
DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}/lib
${dst_dir}/lib ${dst_dir}/lib ${dst_dir})
else()
copy(inference_lib_dist
SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_INC_DIR}
DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir})
SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_INC_DIR}
DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir})
endif()
elseif (NOT CBLAS_FOUND OR WIN32)
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/openblas")
......@@ -107,16 +107,16 @@ elseif (NOT CBLAS_FOUND OR WIN32)
endif ()
if(WITH_MKLDNN)
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/mkldnn")
if(WIN32)
copy(inference_lib_dist
SRCS ${MKLDNN_INC_DIR} ${MKLDNN_SHARED_LIB} ${MKLDNN_LIB}
DSTS ${dst_dir} ${dst_dir}/lib ${dst_dir}/lib)
else()
copy(inference_lib_dist
SRCS ${MKLDNN_INC_DIR} ${MKLDNN_SHARED_LIB}
DSTS ${dst_dir} ${dst_dir}/lib)
endif()
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/mkldnn")
if(WIN32)
copy(inference_lib_dist
SRCS ${MKLDNN_INC_DIR} ${MKLDNN_SHARED_LIB} ${MKLDNN_LIB}
DSTS ${dst_dir} ${dst_dir}/lib ${dst_dir}/lib)
else()
copy(inference_lib_dist
SRCS ${MKLDNN_INC_DIR} ${MKLDNN_SHARED_LIB}
DSTS ${dst_dir} ${dst_dir}/lib)
endif()
endif()
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/gflags")
......@@ -156,20 +156,20 @@ endif ()
if (TENSORRT_FOUND)
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/tensorrt")
copy(inference_lib_dist
SRCS ${TENSORRT_ROOT}/include/Nv*.h ${TENSORRT_ROOT}/lib/*nvinfer*
DSTS ${dst_dir}/include ${dst_dir}/lib)
SRCS ${TENSORRT_ROOT}/include/Nv*.h ${TENSORRT_ROOT}/lib/*nvinfer*
DSTS ${dst_dir}/include ${dst_dir}/lib)
endif ()
if (ANAKIN_FOUND)
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/anakin")
copy(inference_lib_dist
SRCS ${ANAKIN_ROOT}/*
DSTS ${dst_dir})
SRCS ${ANAKIN_ROOT}/*
DSTS ${dst_dir})
endif ()
copy(inference_lib_dist
SRCS ${CMAKE_CURRENT_BINARY_DIR}/CMakeCache.txt
DSTS ${FLUID_INFERENCE_INSTALL_DIR})
SRCS ${CMAKE_CURRENT_BINARY_DIR}/CMakeCache.txt
DSTS ${FLUID_INFERENCE_INSTALL_DIR})
set(src_dir "${PADDLE_SOURCE_DIR}/paddle/fluid")
if(WIN32)
......@@ -179,8 +179,8 @@ else(WIN32)
endif(WIN32)
copy(inference_lib_dist
SRCS ${src_dir}/inference/api/paddle_*.h ${paddle_fluid_lib}
DSTS ${FLUID_INFERENCE_INSTALL_DIR}/paddle/include ${FLUID_INFERENCE_INSTALL_DIR}/paddle/lib)
SRCS ${src_dir}/inference/api/paddle_*.h ${paddle_fluid_lib}
DSTS ${FLUID_INFERENCE_INSTALL_DIR}/paddle/include ${FLUID_INFERENCE_INSTALL_DIR}/paddle/lib)
# fluid library for both train and inference
......@@ -190,17 +190,23 @@ add_custom_target(fluid_lib_dist ALL DEPENDS ${fluid_lib_deps})
set(dst_dir "${FLUID_INSTALL_DIR}/paddle/fluid")
set(module "inference")
copy(fluid_lib_dist
SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/api/paddle_*.h ${paddle_fluid_lib}
DSTS ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module}
)
SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/api/paddle_*.h ${paddle_fluid_lib}
DSTS ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module}
)
set(module "framework")
set(framework_lib_deps framework_proto)
add_dependencies(fluid_lib_dist ${framework_lib_deps})
copy(fluid_lib_dist
SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/details/*.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/framework.pb.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/data_feed.pb.h ${src_dir}/${module}/ir/memory_optimize_pass/*.h
${src_dir}/${module}/ir/*.h ${src_dir}/${module}/fleet/*.h
DSTS ${dst_dir}/${module} ${dst_dir}/${module}/details ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module}/ir/memory_optimize_pass ${dst_dir}/${module}/ir ${dst_dir}/${module}/fleet)
SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/details/*.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/trainer_desc.pb.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/framework.pb.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/data_feed.pb.h ${src_dir}/${module}/ir/memory_optimize_pass/*.h
${src_dir}/${module}/ir/*.h ${src_dir}/${module}/fleet/*.h
DSTS ${dst_dir}/${module} ${dst_dir}/${module}/details ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module}/ir/memory_optimize_pass ${dst_dir}/${module}/ir ${dst_dir}/${module}/fleet)
set(module "operators")
copy(fluid_lib_dist
SRCS ${src_dir}/${module}/reader/blocking_queue.h
DSTS ${dst_dir}/${module}/reader/
)
set(module "memory")
copy(fluid_lib_dist
......@@ -252,4 +258,4 @@ function(version version_file)
endif ()
endfunction()
version(${FLUID_INSTALL_DIR}/version.txt)
version(${FLUID_INFERENCE_INSTALL_DIR}/version.txt)
version(${FLUID_INFERENCE_INSTALL_DIR}/version.txt)
\ No newline at end of file
......@@ -30,9 +30,9 @@ paddle.fluid.load_op_library (ArgSpec(args=['lib_filename'], varargs=None, keywo
paddle.fluid.Executor ('paddle.fluid.executor.Executor', ('document', '34e8c1769313fbeff7817212dda6259e'))
paddle.fluid.Executor.__init__ (ArgSpec(args=['self', 'place'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.Executor.close (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '3a584496aa1343f36eebf3c46b323a74'))
paddle.fluid.Executor.infer_from_dataset (ArgSpec(args=['self', 'program', 'dataset', 'scope', 'thread', 'debug', 'fetch_list', 'fetch_info', 'print_period'], varargs=None, keywords=None, defaults=(None, None, None, 0, False, None, None, 100)), ('document', 'bedc29ad01c1b911e99032ee1e19ac59'))
paddle.fluid.Executor.infer_from_dataset (ArgSpec(args=['self', 'program', 'dataset', 'scope', 'thread', 'debug', 'fetch_list', 'fetch_info', 'print_period', 'fetch_handler'], varargs=None, keywords=None, defaults=(None, None, None, 0, False, None, None, 100, None)), ('document', '4ff256774ecaeee01c840a5fb5de8f7a'))
paddle.fluid.Executor.run (ArgSpec(args=['self', 'program', 'feed', 'fetch_list', 'feed_var_name', 'fetch_var_name', 'scope', 'return_numpy', 'use_program_cache'], varargs=None, keywords=None, defaults=(None, None, None, 'feed', 'fetch', None, True, False)), ('document', '4cfcd9c15b766a51b584cc46d38f1ad8'))
paddle.fluid.Executor.train_from_dataset (ArgSpec(args=['self', 'program', 'dataset', 'scope', 'thread', 'debug', 'fetch_list', 'fetch_info', 'print_period'], varargs=None, keywords=None, defaults=(None, None, None, 0, False, None, None, 100)), ('document', '28f50904a0213f110947a30e0438529c'))
paddle.fluid.Executor.train_from_dataset (ArgSpec(args=['self', 'program', 'dataset', 'scope', 'thread', 'debug', 'fetch_list', 'fetch_info', 'print_period', 'fetch_handler'], varargs=None, keywords=None, defaults=(None, None, None, 0, False, None, None, 100, None)), ('document', '73024c79f46b4f14f1060edeaa4919c8'))
paddle.fluid.global_scope (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', 'f65788d9ead293ada47551339df12203'))
paddle.fluid.scope_guard (ArgSpec(args=['scope'], varargs=None, keywords=None, defaults=None), ('document', 'e6c073ed237001aaba7bff976b62b122'))
paddle.fluid.DistributeTranspiler ('paddle.fluid.transpiler.distribute_transpiler.DistributeTranspiler', ('document', 'b2b19821c5dffcd11473d6a4eef089af'))
......
......@@ -123,7 +123,7 @@ cc_library(shape_inference SRCS shape_inference.cc DEPS ddim attribute device_co
cc_library(transfer_scope_cache SRCS transfer_scope_cache.cc DEPS scope framework_proto device_context)
cc_library(op_kernel_type SRCS op_kernel_type.cc DEPS device_context place)
cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog data_feed_proto
cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog trainer_desc_proto data_feed_proto
shape_inference data_transform lod_tensor profiler transfer_scope_cache op_kernel_type op_call_stack)
cc_test(operator_test SRCS operator_test.cc DEPS operator op_registry device_context)
......
......@@ -140,14 +140,14 @@ void Executor::CreateVariables(const ProgramDesc& pdesc, Scope* scope,
}
}
void Executor::RunFromDataset(const ProgramDesc& main_program, Scope* scope,
Dataset* dataset,
const std::string& trainer_desc_str) {
std::shared_ptr<TrainerBase> Executor::InitForDataset(
const ProgramDesc& main_program, const std::string& trainer_desc_str,
Scope* scope, Dataset* dataset) {
VLOG(3) << "Start to RunFromDataset in executor";
TrainerDesc trainer_desc;
bool success = trainer_desc.ParseFromString(trainer_desc_str);
PADDLE_ENFORCE(success, "Fail to parse TrainerDesc from string:\n%s",
trainer_desc_str.c_str());
PADDLE_ENFORCE_EQ(success, true, "Fail to parse TrainerDesc from string:\n%s",
trainer_desc_str.c_str());
VLOG(3) << "Going to create trainer, trainer class is "
<< trainer_desc.class_name();
std::shared_ptr<TrainerBase> trainer;
......@@ -162,12 +162,17 @@ void Executor::RunFromDataset(const ProgramDesc& main_program, Scope* scope,
trainer->InitTrainerEnv(main_program, place_);
VLOG(3) << "Try to init other environment";
trainer->InitOtherEnv(main_program);
return trainer;
}
void Executor::RunFromDataset(std::shared_ptr<TrainerBase> trainer) {
PADDLE_ENFORCE_NE(trainer, nullptr,
"Trainer is nullptr, invoke InitForDataset first");
// training and finalize training
VLOG(3) << "Trainer starts to run";
trainer->Run();
VLOG(3) << "Trainer going to finalize";
trainer->Finalize();
return;
}
void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
......
......@@ -26,6 +26,7 @@ limitations under the License. */
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/trainer.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
......@@ -119,8 +120,10 @@ class Executor {
void EnableMKLDNN(const ProgramDesc& program);
void RunFromDataset(const ProgramDesc& main_program, Scope* scope,
Dataset* dataset, const std::string& trainer_desc_str);
std::shared_ptr<TrainerBase> InitForDataset(
const ProgramDesc& main_program, const std::string& trainer_desc_str,
Scope* scope, Dataset* dataset);
void RunFromDataset(std::shared_ptr<TrainerBase> trainer);
private:
const platform::Place place_;
......
......@@ -62,6 +62,10 @@ void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program,
}
}
Scope* MultiTrainer::GetWorkerScope(int thread_id) {
return workers_[thread_id]->GetThreadScope();
}
void MultiTrainer::Run() {
VLOG(3) << "Going to run";
for (int thidx = 0; thidx < thread_num_; ++thidx) {
......
......@@ -261,6 +261,10 @@ void PipelineTrainer::Finalize() {
root_scope_->DropKids();
}
Scope* PipelineTrainer::GetWorkerScope(int thread_id) {
return pipeline_scopes_[thread_id];
}
} // end namespace framework
} // end namespace paddle
#endif
......@@ -50,6 +50,7 @@ class TrainerBase {
virtual void InitOtherEnv(const ProgramDesc& main_program) = 0;
virtual void Run() = 0;
virtual void Finalize() = 0;
virtual Scope* GetWorkerScope(int thread_id) = 0;
protected:
Scope* root_scope_;
......@@ -70,6 +71,7 @@ class MultiTrainer : public TrainerBase {
virtual void InitOtherEnv(const ProgramDesc& main_program) {}
virtual void Run();
virtual void Finalize();
virtual Scope* GetWorkerScope(int thread_id);
protected:
int thread_num_;
......@@ -92,6 +94,7 @@ class DistMultiTrainer : public MultiTrainer {
virtual void FinalizeDumpEnv();
virtual void InitDumpEnv();
virtual void DumpWork();
virtual Scope* GetWorkerScope(int thread_id) { return root_scope_; }
protected:
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
......@@ -117,6 +120,7 @@ class PipelineTrainer : public TrainerBase {
void InitOtherEnv(const ProgramDesc& main_program) override {}
void Run() override;
void Finalize() override;
virtual Scope* GetWorkerScope(int thread_id);
protected:
int section_num_;
......
......@@ -55,7 +55,7 @@ endif()
cc_test(rpc_server_test SRCS rpc_server_test.cc
DEPS ${RPC_DEPS} executor proto_desc lookup_sparse_table_op)
DEPS ${RPC_DEPS} executor scope proto_desc lookup_sparse_table_op)
cc_test(varhandle_test SRCS varhandle_test.cc DEPS profiler scope)
cc_library(parameter_prefetch SRCS parameter_prefetch.cc DEPS sendrecvop_rpc memory)
cc_library(parameter_send SRCS parameter_send.cc DEPS sendrecvop_rpc memory)
......
......@@ -41,6 +41,7 @@ limitations under the License. */
#include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/framework/scope_pool.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/trainer.h"
#include "paddle/fluid/framework/version.h"
#include "paddle/fluid/memory/allocation/allocator_strategy.h"
#include "paddle/fluid/operators/activation_op.h"
......@@ -1014,11 +1015,31 @@ All parameter, weight, gradient are variables in Paddle.
py::class_<framework::ExecutorPrepareContext>(m, "ExecutorPrepareContext")
.def(py::init<const ProgramDesc &, size_t>());
py::class_<framework::TrainerBase, std::shared_ptr<framework::TrainerBase>>(
m, "TrainerBase")
.def("get_worker_scope",
[](TrainerBase &self, int thread_id) -> Scope * {
return self.GetWorkerScope(thread_id);
},
py::return_value_policy::reference)
.def("finalize", &TrainerBase::Finalize);
py::class_<framework::Executor>(m, "Executor")
.def(py::init<const platform::Place &>())
.def("close", &Executor::Close)
.def("run_from_dataset", &Executor::RunFromDataset,
py::call_guard<py::gil_scoped_release>())
.def("init_for_dataset",
[](Executor &self, const ProgramDesc &prog,
const std::string &trainer_desc, Scope *scope,
Dataset *dataset) -> std::shared_ptr<TrainerBase> {
return self.InitForDataset(prog, trainer_desc, scope, dataset);
})
.def("run_from_dataset",
[](Executor &self, std::shared_ptr<TrainerBase> trainer) {
pybind11::gil_scoped_release release;
self.RunFromDataset(trainer);
})
.def("run_prepared_ctx",
[](Executor &self, ExecutorPrepareContext *ctx, Scope *scope,
std::map<std::string, const LoDTensor *> *feed_targets,
......
......@@ -27,6 +27,7 @@ from . import core
from . import compiler
from .. import compat as cpt
from .trainer_factory import TrainerFactory
from .trainer_factory import FetchHandlerMonitor
__all__ = ['Executor', 'global_scope', 'scope_guard']
......@@ -377,6 +378,27 @@ def _as_lodtensor(data, place):
return tensor
class FetchHandler(object):
def __init__(self, fetch_target_names, period_secs=60, return_np=True):
self.fetch_target_names = fetch_target_names
self.period_secs = period_secs
self.return_np = return_np
def handler(self, fetch_target_vars):
return
@staticmethod
def help():
print("""
class FetchHandlerExamlpe(FetchHandler):
def handler(self, fetch_target_vars):
b_auc = fetch_target_vars[0]
g_auc = fetch_target_vars[1]
print("b_auc: {}, g_auc: {} at time: {}".format(b_auc, g_auc, time.ctime()))
""")
class Executor(object):
"""
An Executor in Python, supports single/multiple-GPU running,
......@@ -918,6 +940,67 @@ class Executor(object):
trainer._set_fetch_var_and_info(fetch_list, fetch_info, print_period)
return scope, trainer
def _run_from_dataset(self,
program=None,
dataset=None,
scope=None,
thread=0,
is_infer=False,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
if dataset is None:
raise RuntimeError("dataset is need and should be initialized")
if program._pipeline_opt:
thread = self._adjust_pipeline_resource(program._pipeline_opt,
dataset, thread)
dataset._prepare_to_run()
if fetch_handler is not None:
fetch_instance = fetch_handler
elif fetch_handler is None and fetch_list is not None:
class FH(FetchHandler):
def handler(self, fetch_target_vars):
for i in range(len(fetch_target_vars)):
print("{}: \n {}\n".format(fetch_info[i],
fetch_target_vars[i]))
fetch_target_names = [var.name for var in fetch_list]
fetch_instance = FH(fetch_target_names,
period_secs=print_period,
return_np=False)
else:
fetch_instance = FetchHandler([])
scope, trainer = self._prepare_trainer(
program=program,
dataset=dataset,
scope=scope,
thread=thread,
debug=debug)
trainer._set_infer(is_infer)
trainer._gen_trainer_desc()
self._dump_debug_info(program=program, trainer=trainer)
trainer_instance = self._default_executor.init_for_dataset(
program.desc, trainer._desc(), scope, dataset.dataset)
scope0 = trainer_instance.get_worker_scope(0)
fetch_monitor = FetchHandlerMonitor(scope0, fetch_instance)
fetch_monitor.start()
self._default_executor.run_from_dataset(trainer_instance)
fetch_monitor.stop()
dataset._finish_to_run()
return None
def infer_from_dataset(self,
program=None,
dataset=None,
......@@ -926,7 +1009,8 @@ class Executor(object):
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100):
print_period=100,
fetch_handler=None):
"""
The document of infer_from_dataset is almost the same as
train_from_dataset, except that in distributed training,
......@@ -949,6 +1033,7 @@ class Executor(object):
will be printed during training, default is None
fetch_info(String List): print information for each variable, default is None
print_period(int): the number of mini-batches for each print, default is 100
fetch_handler(FetchHandler): a user define class for fetch output.
Returns:
None
......@@ -973,29 +1058,9 @@ class Executor(object):
dataset=dataset)
"""
if dataset == None:
raise RuntimeError("dataset is needed and should be initialized")
dataset._prepare_to_run()
scope, trainer = self._prepare_trainer(
program=program,
dataset=dataset,
scope=scope,
thread=thread,
debug=debug,
fetch_list=fetch_list,
fetch_info=fetch_info,
print_period=print_period)
trainer._set_infer(True)
trainer._gen_trainer_desc()
self._dump_debug_info(program=program, trainer=trainer)
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)
self._default_executor.run_from_dataset(program.desc, scope,
dataset.dataset,
trainer._desc())
dataset._dynamic_adjust_after_train()
dataset._finish_to_run()
return None
return self._run_from_dataset(program, dataset, scope, thread, True,
debug, fetch_list, fetch_info,
print_period, fetch_handler)
def train_from_dataset(self,
program=None,
......@@ -1005,7 +1070,8 @@ class Executor(object):
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100):
print_period=100,
fetch_handler=None):
"""
Train from a pre-defined Dataset. Dataset is defined in paddle.fluid.dataset.
Given a program, either a program or compiled program, train_from_dataset will
......@@ -1032,6 +1098,7 @@ class Executor(object):
will be printed during training
fetch_info(String List): print information for each variable
print_period(int): the number of mini-batches for each print
fetch_handler(FetchHandler): a user define class for fetch output.
Returns:
None
......@@ -1056,29 +1123,6 @@ class Executor(object):
dataset=dataset)
"""
if dataset == None:
raise RuntimeError("dataset is need and should be initialized")
if program._pipeline_opt:
thread = self._adjust_pipeline_resource(program._pipeline_opt,
dataset, thread)
dataset._prepare_to_run()
scope, trainer = self._prepare_trainer(
program=program,
dataset=dataset,
scope=scope,
thread=thread,
debug=debug,
fetch_list=fetch_list,
fetch_info=fetch_info,
print_period=print_period)
trainer._gen_trainer_desc()
self._dump_debug_info(program=program, trainer=trainer)
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)
self._default_executor.run_from_dataset(program.desc, scope,
dataset.dataset,
trainer._desc())
dataset._dynamic_adjust_after_train()
dataset._finish_to_run()
return None
return self._run_from_dataset(program, dataset, scope, thread, False,
debug, fetch_list, fetch_info,
print_period, fetch_handler)
......@@ -14,9 +14,11 @@
from __future__ import print_function
import os
import logging
import tarfile
import os
import random
import paddle
import paddle.fluid.incubate.data_generator as data_generator
......@@ -61,14 +63,18 @@ def load_lr_input_record(sent):
class DatasetCtrReader(data_generator.MultiSlotDataGenerator):
def generate_sample(self, line):
def get_rand(low=0.0, high=1.0):
return random.random()
def iter():
fs = line.strip().split('\t')
dnn_input = load_dnn_input_record(fs[0])
lr_input = load_lr_input_record(fs[1])
click = [int(fs[2])]
yield ("dnn_data", dnn_input), \
("lr_data", lr_input), \
("click", click)
if get_rand() < 0.1:
fs = line.strip().split('\t')
dnn_input = load_dnn_input_record(fs[0])
lr_input = load_lr_input_record(fs[1])
click = [int(fs[2])]
yield ("dnn_data", dnn_input), \
("lr_data", lr_input), \
("click", click)
return iter
......
......@@ -147,7 +147,25 @@ class TestDistCTR2x2(FleetDistRunnerBase):
dataset=dataset,
fetch_list=[self.avg_cost],
fetch_info=["cost"],
print_period=100,
print_period=2,
debug=False)
pass_time = time.time() - pass_start
class FH(fluid.executor.FetchHandler):
def handler(self, fetch_target_vars):
for i in range(len(fetch_target_vars)):
print("{}: \n {}\n".format(self.fetch_target_names[0],
fetch_target_vars[0]))
for epoch_id in range(2):
pass_start = time.time()
dataset.set_filelist(filelist)
exe.train_from_dataset(
program=fleet.main_program,
dataset=dataset,
fetch_handler=FH([self.avg_cost.name],
period_secs=2,
return_np=True),
debug=False)
pass_time = time.time() - pass_start
......
......@@ -18,6 +18,7 @@ including create, config, run, etc.
from __future__ import print_function
import paddle.fluid as fluid
import paddle.compat as cpt
import paddle.fluid.core as core
import numpy as np
import os
......@@ -410,5 +411,108 @@ class TestDatasetWithDataLoader(TestDataset):
self.drop_last = False
class TestDatasetWithFetchHandler(unittest.TestCase):
def net(self):
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
poolings = []
for slot in slots:
data = fluid.layers.data(
name=slot, shape=[1], dtype="int64", lod_level=1)
var = fluid.layers.cast(x=data, dtype='float32')
pool = fluid.layers.sequence_pool(input=var, pool_type='AVERAGE')
slots_vars.append(data)
poolings.append(pool)
concated = fluid.layers.concat(poolings, axis=1)
fc = fluid.layers.fc(input=concated, act='tanh', size=32)
return slots_vars, fc
def get_dataset(self, inputs, files):
dataset = fluid.DatasetFactory().create_dataset("QueueDataset")
dataset.set_batch_size(32)
dataset.set_thread(3)
dataset.set_filelist(files)
dataset.set_pipe_command("cat")
dataset.set_use_var(inputs)
return dataset
def setUp(self):
with open("test_queue_dataset_run_a.txt", "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
f.write(data)
with open("test_queue_dataset_run_b.txt", "w") as f:
data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
f.write(data)
def tearDown(self):
os.remove("./test_queue_dataset_run_a.txt")
os.remove("./test_queue_dataset_run_b.txt")
def test_dataset_none(self):
slots_vars, out = self.net()
files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
dataset = self.get_dataset(slots_vars, files)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
# test dataset->None
try:
exe.train_from_dataset(fluid.default_main_program(), None)
except ImportError as e:
print("warning: we skip trainer_desc_pb2 import problem in windows")
except RuntimeError as e:
error_msg = "dataset is need and should be initialized"
self.assertEqual(error_msg, cpt.get_exception_message(e))
except Exception as e:
self.assertTrue(False)
def test_infer_from_dataset(self):
slots_vars, out = self.net()
files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
dataset = self.get_dataset(slots_vars, files)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
try:
exe.infer_from_dataset(fluid.default_main_program(), dataset)
except ImportError as e:
print("warning: we skip trainer_desc_pb2 import problem in windows")
except Exception as e:
self.assertTrue(False)
def test_fetch_handler(self):
slots_vars, out = self.net()
files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
dataset = self.get_dataset(slots_vars, files)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
fh = fluid.executor.FetchHandler(out.name)
fh.help()
try:
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_handler=fh)
except ImportError as e:
print("warning: we skip trainer_desc_pb2 import problem in windows")
except RuntimeError as e:
error_msg = "dataset is need and should be initialized"
self.assertEqual(error_msg, cpt.get_exception_message(e))
except Exception as e:
self.assertTrue(False)
if __name__ == '__main__':
unittest.main()
......@@ -97,6 +97,7 @@ class FleetDistRunnerBase(object):
optimizer = fluid.optimizer.SGD(LEARNING_RATE)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
out = self.do_training(fleet)
def net(self, batch_size=4, lr=0.01):
......@@ -181,12 +182,13 @@ class TestFleetBase(unittest.TestCase):
def _run_cluster(self, model, envs):
env = {'CPU_NUM': '1'}
env.update(envs)
python_path = self._python_interp
if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
python_path += " -m coverage run --branch -p"
env.update(envs)
tr_cmd = "{0} {1} --role trainer --endpoints {2} --current_id {{}} --trainers {3}".format(
python_path, model, self._ps_endpoints, self._trainers)
......
......@@ -30,6 +30,7 @@ def skip_ci(func):
return __func__
@skip_ci
class TestDistMnist2x2(TestFleetBase):
def _setup_config(self):
self._sync_mode = False
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import time
import unittest
import numpy as np
import paddle.fluid.core as core
import paddle.fluid as fluid
class TestFetchHandler(unittest.TestCase):
def test_fetch_handler(self):
place = core.CPUPlace()
scope = core.Scope()
table = np.random.random((3, 10)).astype("float32")
class FH(fluid.executor.FetchHandler):
def handler(self, fetch_target_vars):
assert len(fetch_target_vars) == 1
table_var = scope.var('emb').get_tensor()
table_var.set(table, place)
fh = FH(['emb'], period_secs=2, return_np=True)
fm = fluid.trainer_factory.FetchHandlerMonitor(scope, fh)
fm.start()
time.sleep(10)
fm.stop()
if __name__ == "__main__":
unittest.main()
......@@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time
import numpy as np
from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer
from .device_worker import Hogwild, DownpourSGD, Section
__all__ = ["TrainerFactory"]
__all__ = ["TrainerFactory", "FetchHandler", "FetchHandlerMonitor"]
class TrainerFactory(object):
......@@ -48,3 +53,61 @@ class TrainerFactory(object):
trainer._set_adjust_ins_weight(opt_info["adjust_ins_weight"])
trainer._set_device_worker(device_worker)
return trainer
class FetchHandlerMonitor(object):
def __init__(self, scope, handler):
self.fetch_instance = handler
self.fetch_thread = threading.Thread(
target=self.handler_decorator,
args=(scope, self.fetch_instance.handler))
self.running = False
def start(self):
self.running = True
self.fetch_thread.setDaemon(True)
self.fetch_thread.start()
def handler_decorator(self, fetch_scope, fetch_handler):
fetch_target_names = self.fetch_instance.fetch_target_names
period_secs = self.fetch_instance.period_secs
elapsed_secs = 0
while True:
while self.running and elapsed_secs >= period_secs:
elapsed_secs = 0
fetch_vars = [
fetch_scope.find_var(varname)
for varname in fetch_target_names
]
fetch_tensors = [var.get_tensor() for var in fetch_vars]
if self.fetch_instance.return_np:
fetch_nps = []
for tensor in fetch_tensors:
lod = tensor.lod()
if len(lod) > 0:
raise RuntimeError(
"Some of your fetched tensors hold LoD information. \
They can not be completely cast to Python ndarray. We can not \
return LoDTensor itself directly, please choose another targets"
)
if tensor._is_initialized():
fetch_nps.append(np.array(tensor))
else:
fetch_nps.append(None)
fetch_handler(fetch_nps)
else:
fetch_handler(fetch_tensors)
else:
time.sleep(1)
elapsed_secs += 1
def stop(self):
self.running = False
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册