diff --git a/cmake/inference_lib.cmake b/cmake/inference_lib.cmake index 9503d1dc76a574df15a5f473c007d4a9c21f0d5e..a1a9dbbbd84a16993c7148b17bf21e33d1def57c 100644 --- a/cmake/inference_lib.cmake +++ b/cmake/inference_lib.cmake @@ -16,7 +16,7 @@ if(WIN32) if(NOT PYTHON_EXECUTABLE) - FIND_PACKAGE(PythonInterp REQUIRED) + FIND_PACKAGE(PythonInterp REQUIRED) endif() endif() @@ -78,26 +78,26 @@ add_custom_target(inference_lib_dist DEPENDS ${inference_lib_deps}) set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/eigen3") copy(inference_lib_dist - SRCS ${EIGEN_INCLUDE_DIR}/Eigen/Core ${EIGEN_INCLUDE_DIR}/Eigen/src ${EIGEN_INCLUDE_DIR}/unsupported/Eigen - DSTS ${dst_dir}/Eigen ${dst_dir}/Eigen ${dst_dir}/unsupported) + SRCS ${EIGEN_INCLUDE_DIR}/Eigen/Core ${EIGEN_INCLUDE_DIR}/Eigen/src ${EIGEN_INCLUDE_DIR}/unsupported/Eigen + DSTS ${dst_dir}/Eigen ${dst_dir}/Eigen ${dst_dir}/unsupported) set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/boost") copy(inference_lib_dist - SRCS ${BOOST_INCLUDE_DIR}/boost - DSTS ${dst_dir}) + SRCS ${BOOST_INCLUDE_DIR}/boost + DSTS ${dst_dir}) if(WITH_MKLML) set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/mklml") if(WIN32) copy(inference_lib_dist - SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_SHARED_LIB} + SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_SHARED_LIB} ${MKLML_SHARED_LIB_DEPS} ${MKLML_SHARED_IOMP_LIB} ${MKLML_INC_DIR} - DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}/lib + DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}) else() copy(inference_lib_dist - SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_INC_DIR} - DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}) + SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_INC_DIR} + DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}) endif() elseif (NOT CBLAS_FOUND OR WIN32) set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/openblas") @@ -107,16 +107,16 @@ elseif (NOT CBLAS_FOUND OR WIN32) endif () if(WITH_MKLDNN) -set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/mkldnn") -if(WIN32) - copy(inference_lib_dist - SRCS ${MKLDNN_INC_DIR} ${MKLDNN_SHARED_LIB} ${MKLDNN_LIB} - DSTS ${dst_dir} ${dst_dir}/lib ${dst_dir}/lib) -else() - copy(inference_lib_dist - SRCS ${MKLDNN_INC_DIR} ${MKLDNN_SHARED_LIB} - DSTS ${dst_dir} ${dst_dir}/lib) -endif() + set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/mkldnn") + if(WIN32) + copy(inference_lib_dist + SRCS ${MKLDNN_INC_DIR} ${MKLDNN_SHARED_LIB} ${MKLDNN_LIB} + DSTS ${dst_dir} ${dst_dir}/lib ${dst_dir}/lib) + else() + copy(inference_lib_dist + SRCS ${MKLDNN_INC_DIR} ${MKLDNN_SHARED_LIB} + DSTS ${dst_dir} ${dst_dir}/lib) + endif() endif() set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/gflags") @@ -156,20 +156,20 @@ endif () if (TENSORRT_FOUND) set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/tensorrt") copy(inference_lib_dist - SRCS ${TENSORRT_ROOT}/include/Nv*.h ${TENSORRT_ROOT}/lib/*nvinfer* - DSTS ${dst_dir}/include ${dst_dir}/lib) + SRCS ${TENSORRT_ROOT}/include/Nv*.h ${TENSORRT_ROOT}/lib/*nvinfer* + DSTS ${dst_dir}/include ${dst_dir}/lib) endif () if (ANAKIN_FOUND) set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/install/anakin") copy(inference_lib_dist - SRCS ${ANAKIN_ROOT}/* - DSTS ${dst_dir}) + SRCS ${ANAKIN_ROOT}/* + DSTS ${dst_dir}) endif () copy(inference_lib_dist - SRCS ${CMAKE_CURRENT_BINARY_DIR}/CMakeCache.txt - DSTS ${FLUID_INFERENCE_INSTALL_DIR}) + SRCS ${CMAKE_CURRENT_BINARY_DIR}/CMakeCache.txt + DSTS ${FLUID_INFERENCE_INSTALL_DIR}) set(src_dir "${PADDLE_SOURCE_DIR}/paddle/fluid") if(WIN32) @@ -179,8 +179,8 @@ else(WIN32) endif(WIN32) copy(inference_lib_dist - SRCS ${src_dir}/inference/api/paddle_*.h ${paddle_fluid_lib} - DSTS ${FLUID_INFERENCE_INSTALL_DIR}/paddle/include ${FLUID_INFERENCE_INSTALL_DIR}/paddle/lib) + SRCS ${src_dir}/inference/api/paddle_*.h ${paddle_fluid_lib} + DSTS ${FLUID_INFERENCE_INSTALL_DIR}/paddle/include ${FLUID_INFERENCE_INSTALL_DIR}/paddle/lib) # fluid library for both train and inference @@ -190,17 +190,23 @@ add_custom_target(fluid_lib_dist ALL DEPENDS ${fluid_lib_deps}) set(dst_dir "${FLUID_INSTALL_DIR}/paddle/fluid") set(module "inference") copy(fluid_lib_dist - SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/api/paddle_*.h ${paddle_fluid_lib} - DSTS ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module} -) + SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/api/paddle_*.h ${paddle_fluid_lib} + DSTS ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module} + ) set(module "framework") set(framework_lib_deps framework_proto) add_dependencies(fluid_lib_dist ${framework_lib_deps}) copy(fluid_lib_dist - SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/details/*.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/framework.pb.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/data_feed.pb.h ${src_dir}/${module}/ir/memory_optimize_pass/*.h - ${src_dir}/${module}/ir/*.h ${src_dir}/${module}/fleet/*.h - DSTS ${dst_dir}/${module} ${dst_dir}/${module}/details ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module}/ir/memory_optimize_pass ${dst_dir}/${module}/ir ${dst_dir}/${module}/fleet) + SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/details/*.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/trainer_desc.pb.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/framework.pb.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/data_feed.pb.h ${src_dir}/${module}/ir/memory_optimize_pass/*.h + ${src_dir}/${module}/ir/*.h ${src_dir}/${module}/fleet/*.h + DSTS ${dst_dir}/${module} ${dst_dir}/${module}/details ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module}/ir/memory_optimize_pass ${dst_dir}/${module}/ir ${dst_dir}/${module}/fleet) + +set(module "operators") +copy(fluid_lib_dist + SRCS ${src_dir}/${module}/reader/blocking_queue.h + DSTS ${dst_dir}/${module}/reader/ + ) set(module "memory") copy(fluid_lib_dist @@ -252,4 +258,4 @@ function(version version_file) endif () endfunction() version(${FLUID_INSTALL_DIR}/version.txt) -version(${FLUID_INFERENCE_INSTALL_DIR}/version.txt) +version(${FLUID_INFERENCE_INSTALL_DIR}/version.txt) \ No newline at end of file diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index fd72e1751d86061a04a92648b16be3e781db7f59..889420b6a09e4fe7ab0510d8b6a74350a3d5c7a7 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -30,9 +30,9 @@ paddle.fluid.load_op_library (ArgSpec(args=['lib_filename'], varargs=None, keywo paddle.fluid.Executor ('paddle.fluid.executor.Executor', ('document', '34e8c1769313fbeff7817212dda6259e')) paddle.fluid.Executor.__init__ (ArgSpec(args=['self', 'place'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.Executor.close (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '3a584496aa1343f36eebf3c46b323a74')) -paddle.fluid.Executor.infer_from_dataset (ArgSpec(args=['self', 'program', 'dataset', 'scope', 'thread', 'debug', 'fetch_list', 'fetch_info', 'print_period'], varargs=None, keywords=None, defaults=(None, None, None, 0, False, None, None, 100)), ('document', 'bedc29ad01c1b911e99032ee1e19ac59')) +paddle.fluid.Executor.infer_from_dataset (ArgSpec(args=['self', 'program', 'dataset', 'scope', 'thread', 'debug', 'fetch_list', 'fetch_info', 'print_period', 'fetch_handler'], varargs=None, keywords=None, defaults=(None, None, None, 0, False, None, None, 100, None)), ('document', '4ff256774ecaeee01c840a5fb5de8f7a')) paddle.fluid.Executor.run (ArgSpec(args=['self', 'program', 'feed', 'fetch_list', 'feed_var_name', 'fetch_var_name', 'scope', 'return_numpy', 'use_program_cache'], varargs=None, keywords=None, defaults=(None, None, None, 'feed', 'fetch', None, True, False)), ('document', '4cfcd9c15b766a51b584cc46d38f1ad8')) -paddle.fluid.Executor.train_from_dataset (ArgSpec(args=['self', 'program', 'dataset', 'scope', 'thread', 'debug', 'fetch_list', 'fetch_info', 'print_period'], varargs=None, keywords=None, defaults=(None, None, None, 0, False, None, None, 100)), ('document', '28f50904a0213f110947a30e0438529c')) +paddle.fluid.Executor.train_from_dataset (ArgSpec(args=['self', 'program', 'dataset', 'scope', 'thread', 'debug', 'fetch_list', 'fetch_info', 'print_period', 'fetch_handler'], varargs=None, keywords=None, defaults=(None, None, None, 0, False, None, None, 100, None)), ('document', '73024c79f46b4f14f1060edeaa4919c8')) paddle.fluid.global_scope (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', 'f65788d9ead293ada47551339df12203')) paddle.fluid.scope_guard (ArgSpec(args=['scope'], varargs=None, keywords=None, defaults=None), ('document', 'e6c073ed237001aaba7bff976b62b122')) paddle.fluid.DistributeTranspiler ('paddle.fluid.transpiler.distribute_transpiler.DistributeTranspiler', ('document', 'b2b19821c5dffcd11473d6a4eef089af')) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 02e3cd099a783585f954d02179061e78a5b5824a..cbdb8e87e575ef4ac533dfbb7bcc6031695e79dd 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -123,7 +123,7 @@ cc_library(shape_inference SRCS shape_inference.cc DEPS ddim attribute device_co cc_library(transfer_scope_cache SRCS transfer_scope_cache.cc DEPS scope framework_proto device_context) cc_library(op_kernel_type SRCS op_kernel_type.cc DEPS device_context place) -cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog data_feed_proto +cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog trainer_desc_proto data_feed_proto shape_inference data_transform lod_tensor profiler transfer_scope_cache op_kernel_type op_call_stack) cc_test(operator_test SRCS operator_test.cc DEPS operator op_registry device_context) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index df9b53d6a4045489e6f402fdca91ec0d758af0ea..44646f7ab1ae3ef823c8d12bd6283b042df5a814 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -140,14 +140,14 @@ void Executor::CreateVariables(const ProgramDesc& pdesc, Scope* scope, } } -void Executor::RunFromDataset(const ProgramDesc& main_program, Scope* scope, - Dataset* dataset, - const std::string& trainer_desc_str) { +std::shared_ptr Executor::InitForDataset( + const ProgramDesc& main_program, const std::string& trainer_desc_str, + Scope* scope, Dataset* dataset) { VLOG(3) << "Start to RunFromDataset in executor"; TrainerDesc trainer_desc; bool success = trainer_desc.ParseFromString(trainer_desc_str); - PADDLE_ENFORCE(success, "Fail to parse TrainerDesc from string:\n%s", - trainer_desc_str.c_str()); + PADDLE_ENFORCE_EQ(success, true, "Fail to parse TrainerDesc from string:\n%s", + trainer_desc_str.c_str()); VLOG(3) << "Going to create trainer, trainer class is " << trainer_desc.class_name(); std::shared_ptr trainer; @@ -162,12 +162,17 @@ void Executor::RunFromDataset(const ProgramDesc& main_program, Scope* scope, trainer->InitTrainerEnv(main_program, place_); VLOG(3) << "Try to init other environment"; trainer->InitOtherEnv(main_program); + return trainer; +} + +void Executor::RunFromDataset(std::shared_ptr trainer) { + PADDLE_ENFORCE_NE(trainer, nullptr, + "Trainer is nullptr, invoke InitForDataset first"); // training and finalize training VLOG(3) << "Trainer starts to run"; trainer->Run(); VLOG(3) << "Trainer going to finalize"; trainer->Finalize(); - return; } void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id, diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index a6db5c8d4136f726106f0ce4debd145d6d14fb45..587ac1a8a6f694bdceea691704da368b72dededb 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -26,6 +26,7 @@ limitations under the License. */ #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/tensor.h" +#include "paddle/fluid/framework/trainer.h" #include "paddle/fluid/platform/device_context.h" namespace paddle { @@ -119,8 +120,10 @@ class Executor { void EnableMKLDNN(const ProgramDesc& program); - void RunFromDataset(const ProgramDesc& main_program, Scope* scope, - Dataset* dataset, const std::string& trainer_desc_str); + std::shared_ptr InitForDataset( + const ProgramDesc& main_program, const std::string& trainer_desc_str, + Scope* scope, Dataset* dataset); + void RunFromDataset(std::shared_ptr trainer); private: const platform::Place place_; diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index be25672b4c7d29bc3bb7eca039a3c735994f0777..b270604afd3cbc0aefd3974d29cdd2f548edf71f 100644 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -62,6 +62,10 @@ void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program, } } +Scope* MultiTrainer::GetWorkerScope(int thread_id) { + return workers_[thread_id]->GetThreadScope(); +} + void MultiTrainer::Run() { VLOG(3) << "Going to run"; for (int thidx = 0; thidx < thread_num_; ++thidx) { diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 3617a8f18865729e5fac0d6340d436cef2158ee8..9ae91589c0f3237fb32f3feb026b338026ea52e4 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -261,6 +261,10 @@ void PipelineTrainer::Finalize() { root_scope_->DropKids(); } +Scope* PipelineTrainer::GetWorkerScope(int thread_id) { + return pipeline_scopes_[thread_id]; +} + } // end namespace framework } // end namespace paddle #endif diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index 170ceb50fda20501fe03de170568043de71af3cc..25ce93cdc92e3dd312927a42e5d7646db3f1211e 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -50,6 +50,7 @@ class TrainerBase { virtual void InitOtherEnv(const ProgramDesc& main_program) = 0; virtual void Run() = 0; virtual void Finalize() = 0; + virtual Scope* GetWorkerScope(int thread_id) = 0; protected: Scope* root_scope_; @@ -70,6 +71,7 @@ class MultiTrainer : public TrainerBase { virtual void InitOtherEnv(const ProgramDesc& main_program) {} virtual void Run(); virtual void Finalize(); + virtual Scope* GetWorkerScope(int thread_id); protected: int thread_num_; @@ -92,6 +94,7 @@ class DistMultiTrainer : public MultiTrainer { virtual void FinalizeDumpEnv(); virtual void InitDumpEnv(); virtual void DumpWork(); + virtual Scope* GetWorkerScope(int thread_id) { return root_scope_; } protected: std::shared_ptr pull_dense_worker_; @@ -117,6 +120,7 @@ class PipelineTrainer : public TrainerBase { void InitOtherEnv(const ProgramDesc& main_program) override {} void Run() override; void Finalize() override; + virtual Scope* GetWorkerScope(int thread_id); protected: int section_num_; diff --git a/paddle/fluid/operators/distributed/CMakeLists.txt b/paddle/fluid/operators/distributed/CMakeLists.txt index b8b82180b3a6aaedec8e18aa86ccaa7ca19204bf..9869603923cd0e881a49e67d285a60381146a86e 100644 --- a/paddle/fluid/operators/distributed/CMakeLists.txt +++ b/paddle/fluid/operators/distributed/CMakeLists.txt @@ -55,7 +55,7 @@ endif() cc_test(rpc_server_test SRCS rpc_server_test.cc - DEPS ${RPC_DEPS} executor proto_desc lookup_sparse_table_op) + DEPS ${RPC_DEPS} executor scope proto_desc lookup_sparse_table_op) cc_test(varhandle_test SRCS varhandle_test.cc DEPS profiler scope) cc_library(parameter_prefetch SRCS parameter_prefetch.cc DEPS sendrecvop_rpc memory) cc_library(parameter_send SRCS parameter_send.cc DEPS sendrecvop_rpc memory) diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index f6e56b4f8a0f23fbe1d4b12ea32d9078aeff1c5d..ad375aebd0f391e7064595b85cacdb52efdcf869 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -41,6 +41,7 @@ limitations under the License. */ #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/scope_pool.h" #include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/trainer.h" #include "paddle/fluid/framework/version.h" #include "paddle/fluid/memory/allocation/allocator_strategy.h" #include "paddle/fluid/operators/activation_op.h" @@ -1014,11 +1015,31 @@ All parameter, weight, gradient are variables in Paddle. py::class_(m, "ExecutorPrepareContext") .def(py::init()); + py::class_>( + m, "TrainerBase") + .def("get_worker_scope", + [](TrainerBase &self, int thread_id) -> Scope * { + return self.GetWorkerScope(thread_id); + }, + py::return_value_policy::reference) + .def("finalize", &TrainerBase::Finalize); + py::class_(m, "Executor") .def(py::init()) .def("close", &Executor::Close) .def("run_from_dataset", &Executor::RunFromDataset, py::call_guard()) + .def("init_for_dataset", + [](Executor &self, const ProgramDesc &prog, + const std::string &trainer_desc, Scope *scope, + Dataset *dataset) -> std::shared_ptr { + return self.InitForDataset(prog, trainer_desc, scope, dataset); + }) + .def("run_from_dataset", + [](Executor &self, std::shared_ptr trainer) { + pybind11::gil_scoped_release release; + self.RunFromDataset(trainer); + }) .def("run_prepared_ctx", [](Executor &self, ExecutorPrepareContext *ctx, Scope *scope, std::map *feed_targets, diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 33617bae90ead2a55159a5e9154ca617c2da7e12..460f1eae741b7c61a542453cb5eef95fa1f8202b 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -27,6 +27,7 @@ from . import core from . import compiler from .. import compat as cpt from .trainer_factory import TrainerFactory +from .trainer_factory import FetchHandlerMonitor __all__ = ['Executor', 'global_scope', 'scope_guard'] @@ -377,6 +378,27 @@ def _as_lodtensor(data, place): return tensor +class FetchHandler(object): + def __init__(self, fetch_target_names, period_secs=60, return_np=True): + self.fetch_target_names = fetch_target_names + self.period_secs = period_secs + self.return_np = return_np + + def handler(self, fetch_target_vars): + return + + @staticmethod + def help(): + print(""" +class FetchHandlerExamlpe(FetchHandler): + def handler(self, fetch_target_vars): + b_auc = fetch_target_vars[0] + g_auc = fetch_target_vars[1] + + print("b_auc: {}, g_auc: {} at time: {}".format(b_auc, g_auc, time.ctime())) +""") + + class Executor(object): """ An Executor in Python, supports single/multiple-GPU running, @@ -918,6 +940,67 @@ class Executor(object): trainer._set_fetch_var_and_info(fetch_list, fetch_info, print_period) return scope, trainer + def _run_from_dataset(self, + program=None, + dataset=None, + scope=None, + thread=0, + is_infer=False, + debug=False, + fetch_list=None, + fetch_info=None, + print_period=100, + fetch_handler=None): + if dataset is None: + raise RuntimeError("dataset is need and should be initialized") + + if program._pipeline_opt: + thread = self._adjust_pipeline_resource(program._pipeline_opt, + dataset, thread) + + dataset._prepare_to_run() + + if fetch_handler is not None: + fetch_instance = fetch_handler + elif fetch_handler is None and fetch_list is not None: + + class FH(FetchHandler): + def handler(self, fetch_target_vars): + for i in range(len(fetch_target_vars)): + print("{}: \n {}\n".format(fetch_info[i], + fetch_target_vars[i])) + + fetch_target_names = [var.name for var in fetch_list] + fetch_instance = FH(fetch_target_names, + period_secs=print_period, + return_np=False) + else: + fetch_instance = FetchHandler([]) + + scope, trainer = self._prepare_trainer( + program=program, + dataset=dataset, + scope=scope, + thread=thread, + debug=debug) + + trainer._set_infer(is_infer) + trainer._gen_trainer_desc() + + self._dump_debug_info(program=program, trainer=trainer) + + trainer_instance = self._default_executor.init_for_dataset( + program.desc, trainer._desc(), scope, dataset.dataset) + + scope0 = trainer_instance.get_worker_scope(0) + + fetch_monitor = FetchHandlerMonitor(scope0, fetch_instance) + fetch_monitor.start() + self._default_executor.run_from_dataset(trainer_instance) + fetch_monitor.stop() + dataset._finish_to_run() + return None + def infer_from_dataset(self, program=None, dataset=None, @@ -926,7 +1009,8 @@ class Executor(object): debug=False, fetch_list=None, fetch_info=None, - print_period=100): + print_period=100, + fetch_handler=None): """ The document of infer_from_dataset is almost the same as train_from_dataset, except that in distributed training, @@ -949,6 +1033,7 @@ class Executor(object): will be printed during training, default is None fetch_info(String List): print information for each variable, default is None print_period(int): the number of mini-batches for each print, default is 100 + fetch_handler(FetchHandler): a user define class for fetch output. Returns: None @@ -973,29 +1058,9 @@ class Executor(object): dataset=dataset) """ - if dataset == None: - raise RuntimeError("dataset is needed and should be initialized") - - dataset._prepare_to_run() - scope, trainer = self._prepare_trainer( - program=program, - dataset=dataset, - scope=scope, - thread=thread, - debug=debug, - fetch_list=fetch_list, - fetch_info=fetch_info, - print_period=print_period) - trainer._set_infer(True) - trainer._gen_trainer_desc() - self._dump_debug_info(program=program, trainer=trainer) - dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) - self._default_executor.run_from_dataset(program.desc, scope, - dataset.dataset, - trainer._desc()) - dataset._dynamic_adjust_after_train() - dataset._finish_to_run() - return None + return self._run_from_dataset(program, dataset, scope, thread, True, + debug, fetch_list, fetch_info, + print_period, fetch_handler) def train_from_dataset(self, program=None, @@ -1005,7 +1070,8 @@ class Executor(object): debug=False, fetch_list=None, fetch_info=None, - print_period=100): + print_period=100, + fetch_handler=None): """ Train from a pre-defined Dataset. Dataset is defined in paddle.fluid.dataset. Given a program, either a program or compiled program, train_from_dataset will @@ -1032,6 +1098,7 @@ class Executor(object): will be printed during training fetch_info(String List): print information for each variable print_period(int): the number of mini-batches for each print + fetch_handler(FetchHandler): a user define class for fetch output. Returns: None @@ -1056,29 +1123,6 @@ class Executor(object): dataset=dataset) """ - if dataset == None: - raise RuntimeError("dataset is need and should be initialized") - - if program._pipeline_opt: - thread = self._adjust_pipeline_resource(program._pipeline_opt, - dataset, thread) - - dataset._prepare_to_run() - scope, trainer = self._prepare_trainer( - program=program, - dataset=dataset, - scope=scope, - thread=thread, - debug=debug, - fetch_list=fetch_list, - fetch_info=fetch_info, - print_period=print_period) - trainer._gen_trainer_desc() - self._dump_debug_info(program=program, trainer=trainer) - dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) - self._default_executor.run_from_dataset(program.desc, scope, - dataset.dataset, - trainer._desc()) - dataset._dynamic_adjust_after_train() - dataset._finish_to_run() - return None + return self._run_from_dataset(program, dataset, scope, thread, False, + debug, fetch_list, fetch_info, + print_period, fetch_handler) diff --git a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py index ace4b01144b41d9ac404d086838e759cf279ac28..6c1672a708f1ff39b5cbfa0c88da546a5bfe5dec 100644 --- a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py +++ b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py @@ -14,9 +14,11 @@ from __future__ import print_function +import os import logging import tarfile -import os + +import random import paddle import paddle.fluid.incubate.data_generator as data_generator @@ -61,14 +63,18 @@ def load_lr_input_record(sent): class DatasetCtrReader(data_generator.MultiSlotDataGenerator): def generate_sample(self, line): + def get_rand(low=0.0, high=1.0): + return random.random() + def iter(): - fs = line.strip().split('\t') - dnn_input = load_dnn_input_record(fs[0]) - lr_input = load_lr_input_record(fs[1]) - click = [int(fs[2])] - yield ("dnn_data", dnn_input), \ - ("lr_data", lr_input), \ - ("click", click) + if get_rand() < 0.1: + fs = line.strip().split('\t') + dnn_input = load_dnn_input_record(fs[0]) + lr_input = load_lr_input_record(fs[1]) + click = [int(fs[2])] + yield ("dnn_data", dnn_input), \ + ("lr_data", lr_input), \ + ("click", click) return iter diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index a477e38d6edbddd89fc22ee50c8abe7e737c591f..5edaa71700ca38847f15f439002c079a5c542c2e 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -147,7 +147,25 @@ class TestDistCTR2x2(FleetDistRunnerBase): dataset=dataset, fetch_list=[self.avg_cost], fetch_info=["cost"], - print_period=100, + print_period=2, + debug=False) + pass_time = time.time() - pass_start + + class FH(fluid.executor.FetchHandler): + def handler(self, fetch_target_vars): + for i in range(len(fetch_target_vars)): + print("{}: \n {}\n".format(self.fetch_target_names[0], + fetch_target_vars[0])) + + for epoch_id in range(2): + pass_start = time.time() + dataset.set_filelist(filelist) + exe.train_from_dataset( + program=fleet.main_program, + dataset=dataset, + fetch_handler=FH([self.avg_cost.name], + period_secs=2, + return_np=True), debug=False) pass_time = time.time() - pass_start diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 8bfa88dc2c75b74060e0640664e6e73fff6b5144..737e16b3a1ac9384d6679b2cb8787073aa51ac6b 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -18,6 +18,7 @@ including create, config, run, etc. from __future__ import print_function import paddle.fluid as fluid +import paddle.compat as cpt import paddle.fluid.core as core import numpy as np import os @@ -410,5 +411,108 @@ class TestDatasetWithDataLoader(TestDataset): self.drop_last = False +class TestDatasetWithFetchHandler(unittest.TestCase): + def net(self): + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + poolings = [] + for slot in slots: + data = fluid.layers.data( + name=slot, shape=[1], dtype="int64", lod_level=1) + var = fluid.layers.cast(x=data, dtype='float32') + pool = fluid.layers.sequence_pool(input=var, pool_type='AVERAGE') + + slots_vars.append(data) + poolings.append(pool) + + concated = fluid.layers.concat(poolings, axis=1) + fc = fluid.layers.fc(input=concated, act='tanh', size=32) + return slots_vars, fc + + def get_dataset(self, inputs, files): + dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset.set_batch_size(32) + dataset.set_thread(3) + dataset.set_filelist(files) + dataset.set_pipe_command("cat") + dataset.set_use_var(inputs) + return dataset + + def setUp(self): + with open("test_queue_dataset_run_a.txt", "w") as f: + data = "1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_queue_dataset_run_b.txt", "w") as f: + data = "1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + def tearDown(self): + os.remove("./test_queue_dataset_run_a.txt") + os.remove("./test_queue_dataset_run_b.txt") + + def test_dataset_none(self): + slots_vars, out = self.net() + files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"] + dataset = self.get_dataset(slots_vars, files) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + + # test dataset->None + try: + exe.train_from_dataset(fluid.default_main_program(), None) + except ImportError as e: + print("warning: we skip trainer_desc_pb2 import problem in windows") + except RuntimeError as e: + error_msg = "dataset is need and should be initialized" + self.assertEqual(error_msg, cpt.get_exception_message(e)) + except Exception as e: + self.assertTrue(False) + + def test_infer_from_dataset(self): + slots_vars, out = self.net() + files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"] + dataset = self.get_dataset(slots_vars, files) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + + try: + exe.infer_from_dataset(fluid.default_main_program(), dataset) + except ImportError as e: + print("warning: we skip trainer_desc_pb2 import problem in windows") + except Exception as e: + self.assertTrue(False) + + def test_fetch_handler(self): + slots_vars, out = self.net() + files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"] + dataset = self.get_dataset(slots_vars, files) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + + fh = fluid.executor.FetchHandler(out.name) + fh.help() + + try: + exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_handler=fh) + except ImportError as e: + print("warning: we skip trainer_desc_pb2 import problem in windows") + except RuntimeError as e: + error_msg = "dataset is need and should be initialized" + self.assertEqual(error_msg, cpt.get_exception_message(e)) + except Exception as e: + self.assertTrue(False) + + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index f7f823c3da1af9166c219ff99cae57d0bc609267..0774f618c8b36a6e0a91c3decba3f5def4e1b3af 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -97,6 +97,7 @@ class FleetDistRunnerBase(object): optimizer = fluid.optimizer.SGD(LEARNING_RATE) optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) + out = self.do_training(fleet) def net(self, batch_size=4, lr=0.01): @@ -181,12 +182,13 @@ class TestFleetBase(unittest.TestCase): def _run_cluster(self, model, envs): env = {'CPU_NUM': '1'} + env.update(envs) + python_path = self._python_interp if os.getenv('WITH_COVERAGE', 'OFF') == 'ON': envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '') python_path += " -m coverage run --branch -p" - env.update(envs) tr_cmd = "{0} {1} --role trainer --endpoints {2} --current_id {{}} --trainers {3}".format( python_path, model, self._ps_endpoints, self._trainers) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py index acefd65b56b94e6b0862d8e2676bc9cb8826981b..9bad641a8cbd867c6c64467991b00ff9d7aa3011 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py @@ -30,6 +30,7 @@ def skip_ci(func): return __func__ +@skip_ci class TestDistMnist2x2(TestFleetBase): def _setup_config(self): self._sync_mode = False diff --git a/python/paddle/fluid/tests/unittests/test_fetch_handler.py b/python/paddle/fluid/tests/unittests/test_fetch_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..c0fb2825e0a23e0263030de7ed60de67eef14945 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fetch_handler.py @@ -0,0 +1,48 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import time +import unittest +import numpy as np + +import paddle.fluid.core as core +import paddle.fluid as fluid + + +class TestFetchHandler(unittest.TestCase): + def test_fetch_handler(self): + place = core.CPUPlace() + scope = core.Scope() + + table = np.random.random((3, 10)).astype("float32") + + class FH(fluid.executor.FetchHandler): + def handler(self, fetch_target_vars): + assert len(fetch_target_vars) == 1 + + table_var = scope.var('emb').get_tensor() + table_var.set(table, place) + + fh = FH(['emb'], period_secs=2, return_np=True) + fm = fluid.trainer_factory.FetchHandlerMonitor(scope, fh) + + fm.start() + time.sleep(10) + fm.stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index 5f312ea075ba7b3f30441645a46bb43b5d882bd5..145df910bf6184587e057aaf01c756856c218311 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -12,10 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading +import time + +import numpy as np + from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer from .device_worker import Hogwild, DownpourSGD, Section -__all__ = ["TrainerFactory"] +__all__ = ["TrainerFactory", "FetchHandler", "FetchHandlerMonitor"] class TrainerFactory(object): @@ -48,3 +53,61 @@ class TrainerFactory(object): trainer._set_adjust_ins_weight(opt_info["adjust_ins_weight"]) trainer._set_device_worker(device_worker) return trainer + + +class FetchHandlerMonitor(object): + def __init__(self, scope, handler): + self.fetch_instance = handler + self.fetch_thread = threading.Thread( + target=self.handler_decorator, + args=(scope, self.fetch_instance.handler)) + self.running = False + + def start(self): + self.running = True + self.fetch_thread.setDaemon(True) + self.fetch_thread.start() + + def handler_decorator(self, fetch_scope, fetch_handler): + fetch_target_names = self.fetch_instance.fetch_target_names + period_secs = self.fetch_instance.period_secs + + elapsed_secs = 0 + while True: + while self.running and elapsed_secs >= period_secs: + elapsed_secs = 0 + + fetch_vars = [ + fetch_scope.find_var(varname) + for varname in fetch_target_names + ] + + fetch_tensors = [var.get_tensor() for var in fetch_vars] + + if self.fetch_instance.return_np: + fetch_nps = [] + + for tensor in fetch_tensors: + lod = tensor.lod() + + if len(lod) > 0: + raise RuntimeError( + "Some of your fetched tensors hold LoD information. \ + They can not be completely cast to Python ndarray. We can not \ + return LoDTensor itself directly, please choose another targets" + ) + + if tensor._is_initialized(): + fetch_nps.append(np.array(tensor)) + else: + fetch_nps.append(None) + + fetch_handler(fetch_nps) + else: + fetch_handler(fetch_tensors) + else: + time.sleep(1) + elapsed_secs += 1 + + def stop(self): + self.running = False