/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #pragma once #include #include #include #include // NOLINT #include // local_random_engine #include #include #include // NOLINT #include #include #include "paddle/fluid/framework/data_feed.pb.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/executor_thread_worker.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" namespace paddle { namespace framework { inline double current_realtime() { #if !defined(_WIN32) struct timespec tp; clock_gettime(CLOCK_REALTIME, &tp); return tp.tv_sec + tp.tv_nsec * 1e-9; #else return 0.0; #endif } inline std::default_random_engine& local_random_engine() { struct engine_wrapper_t { std::default_random_engine engine; engine_wrapper_t() { static std::atomic x(0); std::seed_seq sseq = {x++, x++, x++, static_cast(current_realtime() * 1000)}; engine.seed(sseq); } }; thread_local engine_wrapper_t r; return r.engine; } class AsyncExecutor { public: AsyncExecutor(Scope* scope, const platform::Place& place); virtual ~AsyncExecutor() {} void RunFromFile(const ProgramDesc& main_program, const std::string& data_feed_desc_str, const std::vector& filelist, const int thread_num, const std::vector& fetch_names, const std::string& mode, const bool debug = false); void InitServer(const std::string& dist_desc, int index); void InitWorker(const std::string& dist_desc, const std::vector& host_sign_list, int node_num, int index); uint64_t StartServer(); void StopServer(); void GatherServers(const std::vector& host_sign_list, int node_num); void InitModel(); void SaveModel(const std::string& path); private: void CreateThreads(ExecutorThreadWorker* worker, const ProgramDesc& main_program, const std::shared_ptr& reader, const std::vector& fetch_var_names, Scope* root_scope, const int thread_index, const bool debug); public: std::shared_ptr fleet_ptr_; Scope* root_scope_; platform::Place place_; private: int actual_thread_num_; }; } // namespace framework } // namespace paddle