未验证 提交 d714bf03 编写于 作者: G guru4elephant 提交者: GitHub

remove async executor and add data_feed.proto to the deps of train demo (#18659)

* remove async executor and add data_feed.proto to the deps of train demo
上级 ce1ec332
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
| emailweixu | Wei Xu | | emailweixu | Wei Xu |
| gangliao | Gang Liao | | gangliao | Gang Liao |
| gongweibao | Wei-Bao Gong | | gongweibao | Wei-Bao Gong |
| guru4elephant | Daxiang Dong |
| Guo Sheng | Sheng Guo | | Guo Sheng | Sheng Guo |
| Haichao-Zhang | Hai-Chao Zhang | | Haichao-Zhang | Hai-Chao Zhang |
| hedaoyuan | Dao-Yuan He | | hedaoyuan | Dao-Yuan He |
......
...@@ -192,10 +192,11 @@ set(module "framework") ...@@ -192,10 +192,11 @@ set(module "framework")
if (NOT WIN32) if (NOT WIN32)
set(framework_lib_deps framework_py_proto) set(framework_lib_deps framework_py_proto)
endif (NOT WIN32) endif (NOT WIN32)
copy(framework_lib DEPS ${framework_lib_deps} copy(framework_lib DEPS ${framework_lib_deps}
SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/details/*.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/framework.pb.h SRCS ${src_dir}/${module}/*.h ${src_dir}/${module}/details/*.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/framework.pb.h ${PADDLE_BINARY_DIR}/paddle/fluid/framework/data_feed.pb.h ${src_dir}/${module}/ir/memory_optimize_pass/*.h
${src_dir}/${module}/ir/*.h ${src_dir}/${module}/ir/*.h ${src_dir}/${module}/fleet/*.h
DSTS ${dst_dir}/${module} ${dst_dir}/${module}/details ${dst_dir}/${module} ${dst_dir}/${module}/ir DSTS ${dst_dir}/${module} ${dst_dir}/${module}/details ${dst_dir}/${module} ${dst_dir}/${module} ${dst_dir}/${module}/ir/memory_optimize_pass ${dst_dir}/${module}/ir ${dst_dir}/${module}/fleet
) )
set(module "memory") set(module "memory")
......
...@@ -28,7 +28,6 @@ add_subdirectory(io) ...@@ -28,7 +28,6 @@ add_subdirectory(io)
#ddim lib #ddim lib
proto_library(framework_proto SRCS framework.proto) proto_library(framework_proto SRCS framework.proto)
proto_library(data_feed_proto SRCS data_feed.proto) proto_library(data_feed_proto SRCS data_feed.proto)
proto_library(async_executor_proto SRCS data_feed.proto)
proto_library(trainer_desc_proto SRCS trainer_desc.proto DEPS framework_proto proto_library(trainer_desc_proto SRCS trainer_desc.proto DEPS framework_proto
data_feed_proto) data_feed_proto)
...@@ -190,7 +189,7 @@ else() ...@@ -190,7 +189,7 @@ else()
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto trainer_desc_proto glog device_context scope framework_proto data_feed_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper lodtensor_printer feed_fetch_method lod_rank_table fs shell fleet_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper ${NGRAPH_EXE_DEPS} timer data_feed_proto) graph_to_program_pass variable_helper ${NGRAPH_EXE_DEPS} timer)
cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op) cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op)
endif() endif()
...@@ -201,18 +200,6 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS ...@@ -201,18 +200,6 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS
graph build_strategy graph build_strategy
fast_threaded_ssa_graph_executor variable_helper) fast_threaded_ssa_graph_executor variable_helper)
cc_library(async_executor SRCS async_executor.cc data_feed.cc data_feed_factory.cc
executor_thread_worker.cc multi_trainer.cc dist_multi_trainer.cc pipeline_trainer.cc
trainer_factory.cc trainer.cc device_worker.cc hogwild_worker.cc
downpour_worker.cc pull_dense_worker.cc section_worker.cc
device_worker_factory.cc data_set.cc dataset_factory.cc
DEPS op_registry device_context scope framework_proto
trainer_desc_proto glog lod_rank_table fleet_wrapper lodtensor_printer
feed_fetch_method graph_to_program_pass data_feed_proto
variable_helper timer fs shell)
cc_test(data_feed_test SRCS data_feed_test.cc DEPS async_executor)
cc_library(prune SRCS prune.cc DEPS framework_proto) cc_library(prune SRCS prune.cc DEPS framework_proto)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
cc_test(var_type_inference_test SRCS var_type_inference_test.cc DEPS op_registry cc_test(var_type_inference_test SRCS var_type_inference_test.cc DEPS op_registry
......
...@@ -101,8 +101,7 @@ template <typename T> ...@@ -101,8 +101,7 @@ template <typename T>
void PrivateQueueDataFeed<T>::SetQueueSize(int queue_size) { void PrivateQueueDataFeed<T>::SetQueueSize(int queue_size) {
PADDLE_ENFORCE(queue_size > 0, "Illegal queue size: %d.", queue_size); PADDLE_ENFORCE(queue_size > 0, "Illegal queue size: %d.", queue_size);
queue_size_ = queue_size; queue_size_ = queue_size;
queue_ = std::unique_ptr<paddle::operators::reader::BlockingQueue<T>>( queue_ = paddle::framework::MakeChannel<T>();
new paddle::operators::reader::BlockingQueue<T>(queue_size_));
} }
template <typename T> template <typename T>
...@@ -125,7 +124,7 @@ void PrivateQueueDataFeed<T>::ReadThread() { ...@@ -125,7 +124,7 @@ void PrivateQueueDataFeed<T>::ReadThread() {
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER); __fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
T instance; T instance;
while (ParseOneInstanceFromPipe(&instance)) { while (ParseOneInstanceFromPipe(&instance)) {
queue_->Send(instance); queue_->Put(instance);
} }
} }
queue_->Close(); queue_->Close();
...@@ -137,10 +136,10 @@ int PrivateQueueDataFeed<T>::Next() { ...@@ -137,10 +136,10 @@ int PrivateQueueDataFeed<T>::Next() {
#ifdef _LINUX #ifdef _LINUX
CheckStart(); CheckStart();
int index = 0; int index = 0;
T instance;
T ins_vec; T ins_vec;
while (index < default_batch_size_) { while (index < default_batch_size_) {
if (!queue_->Receive(&instance)) { T instance;
if (!queue_->Get(instance)) {
break; break;
} }
AddInstanceToInsVec(&ins_vec, instance, index++); AddInstanceToInsVec(&ins_vec, instance, index++);
...@@ -345,7 +344,7 @@ void MultiSlotDataFeed::ReadThread() { ...@@ -345,7 +344,7 @@ void MultiSlotDataFeed::ReadThread() {
int ins_num = 0; int ins_num = 0;
while (ParseOneInstanceFromPipe(&instance)) { while (ParseOneInstanceFromPipe(&instance)) {
ins_num++; ins_num++;
queue_->Send(instance); queue_->Put(instance);
} }
VLOG(3) << "filename: " << filename << " inst num: " << ins_num; VLOG(3) << "filename: " << filename << " inst num: " << ins_num;
} }
......
...@@ -33,11 +33,9 @@ limitations under the License. */ ...@@ -33,11 +33,9 @@ limitations under the License. */
#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/framework/data_feed.pb.h" #include "paddle/fluid/framework/data_feed.pb.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
namespace paddle { namespace paddle {
...@@ -198,7 +196,7 @@ class PrivateQueueDataFeed : public DataFeed { ...@@ -198,7 +196,7 @@ class PrivateQueueDataFeed : public DataFeed {
size_t queue_size_; size_t queue_size_;
string::LineFileReader reader_; string::LineFileReader reader_;
// The queue for store parsed data // The queue for store parsed data
std::unique_ptr<paddle::operators::reader::BlockingQueue<T>> queue_; std::shared_ptr<paddle::framework::ChannelObject<T>> queue_;
}; };
template <typename T> template <typename T>
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "google/protobuf/message.h" #include "google/protobuf/message.h"
#include "google/protobuf/text_format.h" #include "google/protobuf/text_format.h"
#include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/io/fs.h" #include "paddle/fluid/framework/io/fs.h"
#include "paddle/fluid/platform/timer.h" #include "paddle/fluid/platform/timer.h"
......
...@@ -2,7 +2,7 @@ if(WITH_TESTING) ...@@ -2,7 +2,7 @@ if(WITH_TESTING)
include(tests/test.cmake) # some generic cmake funtion for inference include(tests/test.cmake) # some generic cmake funtion for inference
endif() endif()
set(FLUID_CORE_MODULES proto_desc memory lod_tensor executor) set(FLUID_CORE_MODULES proto_desc memory lod_tensor executor data_feed_proto)
# TODO(panyx0718): Should this be called paddle_fluid_inference_api_internal? # TODO(panyx0718): Should this be called paddle_fluid_inference_api_internal?
cc_library(paddle_fluid_api cc_library(paddle_fluid_api
......
set(PYBIND_DEPS pybind python proto_desc memory executor async_executor fleet_wrapper nccl_wrapper prune set(PYBIND_DEPS pybind python proto_desc memory executor fleet_wrapper nccl_wrapper prune
feed_fetch_method pass_builder parallel_executor profiler layer scope_pool feed_fetch_method pass_builder parallel_executor profiler layer scope_pool
tracer analysis_predictor imperative_profiler nccl_context) tracer analysis_predictor imperative_profiler nccl_context)
...@@ -17,7 +17,6 @@ set(PYBIND_SRCS ...@@ -17,7 +17,6 @@ set(PYBIND_SRCS
const_value.cc const_value.cc
recordio.cc recordio.cc
reader_py.cc reader_py.cc
async_executor_py.cc
fleet_wrapper_py.cc fleet_wrapper_py.cc
nccl_wrapper_py.cc nccl_wrapper_py.cc
data_set_py.cc data_set_py.cc
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <fcntl.h>
// To avoid conflicting definition in gcc-4.8.2 headers and pyconfig.h (2.7.3)
#ifdef _POSIX_C_SOURCE
#undef _POSIX_C_SOURCE
#endif
#ifdef _XOPEN_SOURCE
#undef _XOPEN_SOURCE
#endif
#include <memory>
#include <string>
#include <vector>
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/text_format.h"
#include "paddle/fluid/framework/async_executor.h"
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/data_feed.pb.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/inference/io.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/variant.h"
#include "paddle/fluid/pybind/async_executor_py.h"
namespace py = pybind11;
namespace pd = paddle::framework;
namespace paddle {
namespace pybind {
using set_name_func = void (pd::DataFeedDesc::*)(const std::string&);
#ifdef PADDLE_WITH_PSLIB
void BindAsyncExecutor(py::module* m) {
py::class_<framework::AsyncExecutor>(*m, "AsyncExecutor")
.def(py::init([](framework::Scope* scope, const platform::Place& place) {
return std::unique_ptr<framework::AsyncExecutor>(
new framework::AsyncExecutor(scope, place));
}))
.def("run_from_files", &framework::AsyncExecutor::RunFromFile)
.def("init_server", &framework::AsyncExecutor::InitServer)
.def("init_worker", &framework::AsyncExecutor::InitWorker)
.def("start_server", &framework::AsyncExecutor::StartServer)
.def("stop_server", &framework::AsyncExecutor::StopServer)
.def("gather_servers", &framework::AsyncExecutor::GatherServers)
.def("init_model", &framework::AsyncExecutor::InitModel)
.def("save_model", &framework::AsyncExecutor::SaveModel);
} // end BindAsyncExecutor
#else
void BindAsyncExecutor(py::module* m) {
py::class_<framework::AsyncExecutor>(*m, "AsyncExecutor")
.def(py::init([](framework::Scope* scope, const platform::Place& place) {
return std::unique_ptr<framework::AsyncExecutor>(
new framework::AsyncExecutor(scope, place));
}))
.def("run_from_files", &framework::AsyncExecutor::RunFromFile);
} // end BindAsyncExecutor
#endif
} // end namespace pybind
} // end namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "pybind11/pybind11.h"
#include "pybind11/stl.h"
namespace py = pybind11;
namespace paddle {
namespace pybind {
void BindAsyncExecutor(py::module* m);
} // namespace pybind
} // namespace paddle
...@@ -51,7 +51,6 @@ limitations under the License. */ ...@@ -51,7 +51,6 @@ limitations under the License. */
#include "paddle/fluid/platform/init.h" #include "paddle/fluid/platform/init.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/pybind/async_executor_py.h"
#include "paddle/fluid/pybind/const_value.h" #include "paddle/fluid/pybind/const_value.h"
#include "paddle/fluid/pybind/data_set_py.h" #include "paddle/fluid/pybind/data_set_py.h"
#include "paddle/fluid/pybind/exception.h" #include "paddle/fluid/pybind/exception.h"
...@@ -1609,7 +1608,6 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -1609,7 +1608,6 @@ All parameter, weight, gradient are variables in Paddle.
}); });
BindRecordIOWriter(&m); BindRecordIOWriter(&m);
BindAsyncExecutor(&m);
BindFleetWrapper(&m); BindFleetWrapper(&m);
#ifndef _WIN32 #ifndef _WIN32
BindNCCLWrapper(&m); BindNCCLWrapper(&m);
......
...@@ -70,7 +70,7 @@ endif(APPLE) ...@@ -70,7 +70,7 @@ endif(APPLE)
target_link_libraries(demo_trainer target_link_libraries(demo_trainer
${MACOS_LD_FLAGS} ${MACOS_LD_FLAGS}
${ARCHIVE_START} ${ARCHIVE_START}
${PADDLE_LIB}/paddle/fluid/inference/libpaddle_fluid.a ${PADDLE_LIB}/paddle/fluid/inference/libpaddle_fluid.so
${ARCHIVE_END} ${ARCHIVE_END}
${MATH_LIB} ${MATH_LIB}
${MKLDNN_LIB} ${MKLDNN_LIB}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册