diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 14daf9448b3cb25f5d91b68616fa9f375b89d5c9..62f35f205b96e2e736eb28a93ed237d9ef56f6bd 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -177,6 +177,9 @@ int InMemoryDataFeed::Next() { } CHECK(in_channel != nullptr); CHECK(out_channel != nullptr); + VLOG(3) << "in_channel size=" << in_channel->Size() + << ", out_channel size=" << out_channel->Size() + << ", thread_id=" << thread_id_; int index = 0; T instance; T ins_vec; @@ -259,14 +262,19 @@ void InMemoryDataFeed::FillChannelToMemoryData() { channel = shuffled_ins_out_; } CHECK(channel != nullptr); - local_vec.reserve(channel->Size()); + local_vec.resize(channel->Size()); for (int64_t i = 0; i < channel->Size(); ++i) { channel->Pop(local_vec[i]); } - std::unique_lock lock(*mutex_for_update_memory_data_); - lock.lock(); - memory_data_->insert(memory_data_->end(), local_vec.begin(), local_vec.end()); - lock.unlock(); + VLOG(3) << "local_vec size=" << local_vec.size() <<", thread_id=" << thread_id_; + { + std::lock_guard g(*mutex_for_update_memory_data_); + VLOG(3) << "before insert, memory_data_ size=" << memory_data_->size() + << ", thread_id=" << thread_id_; + memory_data_->insert(memory_data_->end(), local_vec.begin(), local_vec.end()); + VLOG(3) << "after insert memory_data_ size=" << memory_data_->size() + << ", thread_id=" << thread_id_; + } std::vector().swap(local_vec); } diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 28cfbed4f419c1bb2ef3ca49a987abf8f7e260db..1d2a018be4bd24becc014a124c4456cb21d6e39d 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -176,7 +176,6 @@ void DatasetImpl::DestroyReaders() { for (std::thread& t : fill_threads) { t.join(); } - std::vector().swap(filelist_); std::vector>().swap(readers_); } diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 334fceb69928f435183a98cce97dbd50439d388b..41aa636c6b0b9456f517bebbf8a8f86f2757aa72 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -83,10 +83,6 @@ class DatasetImpl : public Dataset { std::vector> readers_; std::vector memory_data_; std::mutex mutex_for_update_memory_data_; - std::vector>> - shuffled_ins_vec_; - std::vector>> - shuffled_ins_out_vec_; int thread_num_; paddle::framework::DataFeedDesc data_feed_desc_; std::vector filelist_; diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index e4fd0062871aac5e6066aef035db28a5d62cbaed..501480876b216b36cfe4b6f0e99a7acd7b555193 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -118,7 +118,7 @@ void Executor::CreateVariables(const ProgramDesc& pdesc, Scope* scope, } void Executor::RunFromDataset(const ProgramDesc& main_program, Scope* scope, - MultiSlotDataset* dataset, + Dataset* dataset, const std::string& trainer_desc_str) { VLOG(3) << "Start to RunFromDataset in executor"; TrainerDesc trainer_desc; diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index b351b924b7b755b18939dfda92facb07a2bc6e8d..d0bd3a4c76c1c191d1fc174222d92a2fde185e5b 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -113,7 +113,7 @@ class Executor { void EnableMKLDNN(const ProgramDesc& program); void RunFromDataset(const ProgramDesc& main_program, Scope* scope, - MultiSlotDataset* dataset, + Dataset* dataset, const std::string& trainer_desc_str); private: diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 954920df63d2482e1429a59e2c220e571be73ab8..92b762946a756a4bce454a73f727613b46f184ed 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -297,6 +297,9 @@ void FleetWrapper::PushSparseVarsWithLabelAsync( int FleetWrapper::RegisterClientToClientMsgHandler( int msg_type, MsgHandlerFunc handler) { #ifdef PADDLE_WITH_PSLIB + VLOG(3) << "calling FleetWrapper::RegisterClientToClientMsgHandler"; + VLOG(3) << "pslib_ptr_=" << pslib_ptr_; + VLOG(3) << "_worker_ptr=" << pslib_ptr_->_worker_ptr; pslib_ptr_->_worker_ptr->registe_client2client_msg_handler( msg_type, handler); #else