diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 4f8fa005d7b75440b6964bde7cd7b4d9af66fae7..939ca07d6fbf0561afebc3647314476862d104bc 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -246,8 +246,8 @@ void InMemoryDataFeed::FillMemoryDataToChannel() { VLOG(3) << "FillMemoryDataToChannel, thread_id=" << thread_id_; auto interval = GetMemoryDataInterval(); VLOG(3) << "memory data size=" << memory_data_->size() - << ", fill data from [" << interval.first << ", " - << interval.second << "), thread_id=" << thread_id_; + << ", fill data from [" << interval.first << ", " << interval.second + << "), thread_id=" << thread_id_; for (int64_t i = interval.first; i < interval.second; ++i) { T& t = (*memory_data_)[i]; shuffled_ins_->Push(std::move(t)); @@ -275,13 +275,13 @@ void InMemoryDataFeed::FillChannelToMemoryData() { channel->Pop(&local_vec[i]); } VLOG(3) << "local_vec size=" << local_vec.size() - <<", thread_id=" << thread_id_; + << ", thread_id=" << thread_id_; { std::lock_guard g(*mutex_for_update_memory_data_); VLOG(3) << "before insert, memory_data_ size=" << memory_data_->size() << ", thread_id=" << thread_id_; memory_data_->insert(memory_data_->end(), local_vec.begin(), - local_vec.end()); + local_vec.end()); VLOG(3) << "after insert memory_data_ size=" << memory_data_->size() << ", thread_id=" << thread_id_; } @@ -308,8 +308,8 @@ void InMemoryDataFeed::LoadIntoMemory() { local_vec.push_back(instance); } timeline.Pause(); - VLOG(3) << "LoadIntoMemory() read all lines, file=" - << filename << ", cost time=" << timeline.ElapsedSec() + VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename + << ", cost time=" << timeline.ElapsedSec() << " seconds, thread_id=" << thread_id_; { std::lock_guard lock(*mutex_for_update_memory_data_); @@ -319,8 +319,7 @@ void InMemoryDataFeed::LoadIntoMemory() { std::make_move_iterator(local_vec.end())); timeline.Pause(); VLOG(3) << "LoadIntoMemory() memory_data insert, cost time=" - << timeline.ElapsedSec() << " seconds, thread_id=" - << thread_id_; + << timeline.ElapsedSec() << " seconds, thread_id=" << thread_id_; } local_vec.clear(); } @@ -358,8 +357,8 @@ void InMemoryDataFeed::GlobalShuffle() { std::string send_str; SerializeIns(send_vec[j], &send_str); VLOG(3) << "send str_length=" << send_str.length() - << ", ins num=" << send_vec[j].size() << " to node_id=" - << j << ", thread_id=" << thread_id_; + << ", ins num=" << send_vec[j].size() << " to node_id=" << j + << ", thread_id=" << thread_id_; auto ret = fleet_ptr->SendClientToClientMsg(0, j, send_str); VLOG(3) << "end send, thread_id=" << thread_id_; send_vec[j].clear(); @@ -371,8 +370,8 @@ void InMemoryDataFeed::GlobalShuffle() { if (send_vec[j].size() != 0) { std::string send_str; SerializeIns(send_vec[j], &send_str); - VLOG(3) << "send str_length=" << send_str.length() - << " to node_id=" << j << ", thread_id=" << thread_id_; + VLOG(3) << "send str_length=" << send_str.length() << " to node_id=" << j + << ", thread_id=" << thread_id_; auto ret = fleet_ptr->SendClientToClientMsg(0, j, send_str); VLOG(3) << "end send, thread_id=" << thread_id_; total_status.push_back(std::move(ret)); @@ -888,15 +887,13 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec( // todo serialize ins in global shuffle void MultiSlotInMemoryDataFeed::SerializeIns( - const std::vector*>& ins, - std::string* str) { + const std::vector*>& ins, std::string* str) { auto fleet_ptr = FleetWrapper::GetInstance(); fleet_ptr->Serialize(ins, str); } // todo deserialize ins in global shuffle void MultiSlotInMemoryDataFeed::DeserializeIns( - std::vector>* ins, - const std::string& str) { + std::vector>* ins, const std::string& str) { auto fleet_ptr = FleetWrapper::GetInstance(); fleet_ptr->Deserialize(ins, str); } diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 1c6c44242dbed82b99f8560673699aaaddc08b81..2bc31e6c9b49deb2c01a84ce6491b31a37537b77 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -15,23 +15,23 @@ limitations under the License. */ #pragma once #include +#include // NOLINT #include #include // NOLINT +#include #include #include // NOLINT -#include -#include -#include // NOLINT #include +#include +#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/data_feed.pb.h" +#include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/variable.h" #include "paddle/fluid/operators/reader/blocking_queue.h" #include "paddle/fluid/string/string_helper.h" -#include "paddle/fluid/framework/blocking_queue.h" -#include "paddle/fluid/framework/fleet/fleet_wrapper.h" namespace paddle { namespace framework { @@ -85,21 +85,19 @@ class DataFeed { virtual void AddFeedVar(Variable* var, const std::string& name); // This function will do nothing at default - virtual void SetMemoryData(void* memory_data) { } + virtual void SetMemoryData(void* memory_data) {} // This function will do nothing at default - virtual void SetMemoryDataMutex(std::mutex* mutex) { } + virtual void SetMemoryDataMutex(std::mutex* mutex) {} // This function will do nothing at default - virtual void SetThreadId(int thread_id) { } + virtual void SetThreadId(int thread_id) {} // This function will do nothing at default - virtual void SetThreadNum(int thread_num) { } + virtual void SetThreadNum(int thread_num) {} // This function will do nothing at default - virtual void SetTrainerNum(int trainer_num) { } + virtual void SetTrainerNum(int trainer_num) {} virtual void SetFileListMutex(std::mutex* mutex) { mutex_for_pick_file_ = mutex; } - virtual void SetFileListIndex(size_t* file_index) { - file_idx_ = file_index; - } + virtual void SetFileListIndex(size_t* file_index) { file_idx_ = file_index; } virtual void LoadIntoMemory() { PADDLE_THROW("This function(LoadIntoMemory) is not implemented."); } @@ -110,11 +108,11 @@ class DataFeed { PADDLE_THROW("This function(GlobalShuffle) is not implemented."); } // This function will do nothing at default - virtual void FillMemoryDataToChannel() { } + virtual void FillMemoryDataToChannel() {} // This function will do nothing at default - virtual void FillChannelToMemoryData() { } + virtual void FillChannelToMemoryData() {} // This function will do nothing at default - virtual void PutInsToChannel(const std::string& ins_str) { } + virtual void PutInsToChannel(const std::string& ins_str) {} protected: // The following three functions are used to check if it is executed in this @@ -222,8 +220,7 @@ class InMemoryDataFeed : public PrivateQueueDataFeed { virtual void GlobalShuffle(); protected: - virtual void AddInstanceToInsVec(T* vec_ins, - const T& instance, + virtual void AddInstanceToInsVec(T* vec_ins, const T& instance, int index) = 0; virtual bool ParseOneInstance(T* instance) = 0; virtual bool ParseOneInstanceFromPipe(T* instance) = 0; @@ -363,6 +360,7 @@ class MultiSlotInMemoryDataFeed MultiSlotInMemoryDataFeed() {} virtual ~MultiSlotInMemoryDataFeed() {} virtual void Init(const paddle::framework::DataFeedDesc& data_feed_desc); + protected: virtual void AddInstanceToInsVec(std::vector* vec_ins, const std::vector& instance, diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index c7b9ee717ac1504da115b65261037c7ae22ef064..774010e5e60647ab5fdf537a0e5c55f86efb8709 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -18,8 +18,8 @@ #include "google/protobuf/message.h" #include "google/protobuf/text_format.h" #include "paddle/fluid/framework/data_feed_factory.h" -#include "paddle/fluid/platform/timer.h" #include "paddle/fluid/framework/io/fs.h" +#include "paddle/fluid/platform/timer.h" namespace paddle { namespace framework { @@ -248,8 +248,7 @@ template int DatasetImpl::ReceiveFromClient(int msg_type, int client_id, const std::string& msg) { VLOG(3) << "ReceiveFromClient msg_type=" << msg_type - << ", client_id=" << client_id << ", msg length=" - << msg.length(); + << ", client_id=" << client_id << ", msg length=" << msg.length(); auto fleet_ptr = FleetWrapper::GetInstance(); int64_t index = fleet_ptr->LocalRandomEngine()() % thread_num_; VLOG(3) << "ramdom index=" << index; diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 1f08f8eaa887e079c44626a19d460104ba9ee40f..e60ada1d5bd96768d9c0669a3f5e72fa73f32273 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -19,8 +19,8 @@ #include // NOLINT #include #include // NOLINT -#include #include +#include #include "paddle/fluid/framework/data_feed.h" diff --git a/paddle/fluid/framework/dataset_factory.cc b/paddle/fluid/framework/dataset_factory.cc index 56f425c1ee3866851c68b23e8fbd36566c1dfb8e..60be4cf9a43c01666c94018b7339da5f3ba797e5 100644 --- a/paddle/fluid/framework/dataset_factory.cc +++ b/paddle/fluid/framework/dataset_factory.cc @@ -25,24 +25,23 @@ typedef std::shared_ptr (*CreateDatasetFunction)(); typedef std::unordered_map datasetMap; datasetMap g_dataset_map; -#define REGISTER_DATASET_CLASS(dataset_class) \ - namespace { \ - std::shared_ptr Creator_##dataset_class() { \ - return std::shared_ptr(new dataset_class); \ - } \ - class __Registerer_##dataset_class { \ - public: \ - __Registerer_##dataset_class() { \ +#define REGISTER_DATASET_CLASS(dataset_class) \ + namespace { \ + std::shared_ptr Creator_##dataset_class() { \ + return std::shared_ptr(new dataset_class); \ + } \ + class __Registerer_##dataset_class { \ + public: \ + __Registerer_##dataset_class() { \ g_dataset_map[#dataset_class] = &Creator_##dataset_class; \ - } \ - }; \ - __Registerer_##dataset_class g_registerer_##dataset_class; \ + } \ + }; \ + __Registerer_##dataset_class g_registerer_##dataset_class; \ } // namespace std::string DatasetFactory::DatasetTypeList() { std::string dataset_types; - for (auto iter = g_dataset_map.begin(); iter != g_dataset_map.end(); - ++iter) { + for (auto iter = g_dataset_map.begin(); iter != g_dataset_map.end(); ++iter) { if (iter != g_dataset_map.begin()) { dataset_types += ", "; } diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index d0bd3a4c76c1c191d1fc174222d92a2fde185e5b..e13cf5e2d17f78ebe8b04c703f58c92aaaceab26 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -113,8 +113,7 @@ class Executor { void EnableMKLDNN(const ProgramDesc& program); void RunFromDataset(const ProgramDesc& main_program, Scope* scope, - Dataset* dataset, - const std::string& trainer_desc_str); + Dataset* dataset, const std::string& trainer_desc_str); private: const platform::Place place_; diff --git a/paddle/fluid/framework/io/fs.h b/paddle/fluid/framework/io/fs.h index 8a0734bf549d5e87b2eeabd4a4bd3724f8a4ec0b..3f0174701c24cc5a3eac38d12792650bdbd9463b 100644 --- a/paddle/fluid/framework/io/fs.h +++ b/paddle/fluid/framework/io/fs.h @@ -15,9 +15,9 @@ #pragma once #include +#include #include #include -#include #include "glog/logging.h" #include "paddle/fluid/framework/io/shell.h" #include "paddle/fluid/string/string_helper.h" diff --git a/paddle/fluid/framework/pull_dense_worker.cc b/paddle/fluid/framework/pull_dense_worker.cc index 44ac50262ace00cf2c60d786db6971b4dca6da6b..3ebf0d8fb5b450f05b94b16ed49433b3b82e886d 100644 --- a/paddle/fluid/framework/pull_dense_worker.cc +++ b/paddle/fluid/framework/pull_dense_worker.cc @@ -47,7 +47,7 @@ void PullDenseWorker::Initialize(const TrainerDesc& param) { int var_num = table.dense_value_name_size(); dense_value_names_[tid].resize(var_num); for (int j = 0; j < var_num; ++j) { - dense_value_names_[tid][j] = table.dense_value_name(j); + dense_value_names_[tid][j] = table.dense_value_name(j); } // setup training version for each table training_versions_[tid].resize(thread_num_, 0); diff --git a/paddle/fluid/pybind/async_executor_py.cc b/paddle/fluid/pybind/async_executor_py.cc index b0951f0ccd16394a8baf3c901440f566e9664ab0..009d13c243bdb3ee05d79edf9e47a09127bfc10b 100644 --- a/paddle/fluid/pybind/async_executor_py.cc +++ b/paddle/fluid/pybind/async_executor_py.cc @@ -21,9 +21,9 @@ limitations under the License. */ #ifdef _XOPEN_SOURCE #undef _XOPEN_SOURCE #endif +#include #include #include -#include #include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/text_format.h" diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index bc6a39ea9e23dccf0ba9f9076a53f53bd87e0c02..b773fd03c003e4c5b51f4876e6ac999f9e830ce4 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -19,21 +19,21 @@ limitations under the License. */ #ifdef _XOPEN_SOURCE #undef _XOPEN_SOURCE #endif +#include #include #include -#include #include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/text_format.h" #include "paddle/fluid/framework/async_executor.h" #include "paddle/fluid/framework/data_feed.h" #include "paddle/fluid/framework/data_feed.pb.h" #include "paddle/fluid/framework/data_set.h" +#include "paddle/fluid/framework/dataset_factory.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/inference/io.h" #include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/variant.h" #include "paddle/fluid/pybind/data_set_py.h" -#include "paddle/fluid/framework/dataset_factory.h" namespace py = pybind11; namespace pd = paddle::framework; @@ -42,8 +42,8 @@ namespace paddle { namespace pybind { void BindDataset(py::module* m) { - py::class_>(*m, "Dataset") + py::class_>(*m, + "Dataset") .def(py::init([](const std::string& name = "MultiSlotDataset") { return framework::DatasetFactory::CreateDataset(name); })) @@ -58,7 +58,7 @@ void BindDataset(py::module* m) { .def("get_hdfs_config", &framework::Dataset::GetHdfsConfig) .def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc) .def("register_client2client_msg_handler", - &framework::Dataset::RegisterClientToClientMsgHandler) + &framework::Dataset::RegisterClientToClientMsgHandler) .def("load_into_memory", &framework::Dataset::LoadIntoMemory) .def("release_memory", &framework::Dataset::ReleaseMemory) .def("local_shuffle", &framework::Dataset::LocalShuffle) diff --git a/paddle/fluid/string/string_helper.h b/paddle/fluid/string/string_helper.h index bec11b39f765676d25824764cd122dddfa7eb0a5..e2ded402b1240680684fa6705251dfa4f34e4071 100644 --- a/paddle/fluid/string/string_helper.h +++ b/paddle/fluid/string/string_helper.h @@ -18,8 +18,8 @@ #include #include #include -#include #include +#include #include "boost/lexical_cast.hpp" #include "glog/logging.h" diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index f7580e5c2d84523a71c0779a3f9e7eb71e2a3b5c..adb287fba8240cfa9d255185965ba1a580177654 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -80,18 +80,20 @@ class TestDataset(unittest.TestCase): data += "1 7 2 3 6 4 8 8 8 8 1 7\n" f.write(data) - slots = ["slot1","slot2","slot3","slot4"] + slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: - var = fluid.layers.data(name=slot, shape=[1], - dtype="int64", lod_level=1) + var = fluid.layers.data( + name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(3) - dataset.set_filelist(["test_in_memory_dataset_run_a.txt", - "test_in_memory_dataset_run_b.txt"]) + dataset.set_filelist([ + "test_in_memory_dataset_run_a.txt", + "test_in_memory_dataset_run_b.txt" + ]) dataset.set_pipe_command("cat") dataset.set_use_var(slots_vars) dataset.load_into_memory() @@ -124,18 +126,18 @@ class TestDataset(unittest.TestCase): data += "1 7 2 3 6 4 8 8 8 8 1 7\n" f.write(data) - slots = ["slot1","slot2","slot3","slot4"] + slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: - var = fluid.layers.data(name=slot, shape=[1], - dtype="int64", lod_level=1) + var = fluid.layers.data( + name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) dataset = fluid.DatasetFactory().create_dataset("QueueDataset") dataset.set_batch_size(32) dataset.set_thread(3) - dataset.set_filelist(["test_queue_dataset_run_a.txt", - "test_queue_dataset_run_b.txt"]) + dataset.set_filelist( + ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) dataset.set_pipe_command("cat") dataset.set_use_var(slots_vars)