diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index ac6ee6c0246afebaa746a21c2c4e09e4fd092736..954920df63d2482e1429a59e2c220e571be73ab8 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -38,10 +38,9 @@ std::shared_ptr FleetWrapper::s_instance_ = NULL; bool FleetWrapper::is_initialized_ = false; #ifdef PADDLE_WITH_PSLIB -template -paddle::ps::Archive& operator << ( - paddle::ps::Archive& ar, - const MultiSlotType& ins) { +template +paddle::ps::Archive& operator<<(paddle::ps::Archive& ar, + const MultiSlotType& ins) { ar << ins.GetType(); ar << ins.GetOffset(); ar << ins.GetFloatData(); @@ -49,10 +48,9 @@ paddle::ps::Archive& operator << ( return ar; } -template -paddle::ps::Archive& operator >> ( - paddle::ps::Archive& ar, - MultiSlotType& ins) { +template +paddle::ps::Archive& operator>>(paddle::ps::Archive& ar, + MultiSlotType& ins) { ar >> ins.MutableType(); ar >> ins.MutableOffset(); ar >> ins.MutableFloatData(); @@ -205,6 +203,10 @@ void FleetWrapper::PullDenseVarsSync( #endif } +void FleetWrapper::PushDenseVarsSync( + Scope* scope, const uint64_t table_id, + const std::vector& var_names) {} + void FleetWrapper::PushDenseVarsAsync( const Scope& scope, const uint64_t table_id, const std::vector& var_names, @@ -324,8 +326,7 @@ std::default_random_engine& FleetWrapper::LocalRandomEngine() { clock_gettime(CLOCK_REALTIME, &tp); double cur_time = tp.tv_sec + tp.tv_nsec * 1e-9; static std::atomic x(0); - std::seed_seq sseq = {x++, x++, x++, - (uint64_t)(cur_time * 1000)}; + std::seed_seq sseq = {x++, x++, x++, (uint64_t)(cur_time * 1000)}; engine.seed(sseq); } }; @@ -333,7 +334,7 @@ std::default_random_engine& FleetWrapper::LocalRandomEngine() { return r.engine; } -template +template void FleetWrapper::Serialize(const T& t, std::string* str) { #ifdef PADDLE_WITH_PSLIB paddle::ps::BinaryArchive ar; @@ -344,7 +345,7 @@ void FleetWrapper::Serialize(const T& t, std::string* str) { #endif } -template +template void FleetWrapper::Deserialize(T* t, const std::string& str) { #ifdef PADDLE_WITH_PSLIB paddle::ps::BinaryArchive ar; @@ -357,8 +358,8 @@ void FleetWrapper::Deserialize(T* t, const std::string& str) { template void FleetWrapper::Serialize>( const std::vector&, std::string*); -template void FleetWrapper::Deserialize( - std::vector*, const std::string&); +template void FleetWrapper::Deserialize(std::vector*, + const std::string&); } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index a649679b0d829ed9692bf9b4b9fadec474a1b49c..deab3bc1db1f5198ee6785698bf4a6a4d6351612 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -16,12 +16,12 @@ limitations under the License. */ #include #ifdef PADDLE_WITH_PSLIB -#include #include +#include #endif -#include #include #include +#include #include #include #include "paddle/fluid/framework/scope.h" @@ -79,6 +79,9 @@ class FleetWrapper { const std::vector& var_names, std::vector<::std::future>* push_sparse_status); + void PushDenseVarsSync(Scope* scope, const uint64_t table_id, + const std::vector& var_names); + // Push sparse variables with labels to server in Async mode // This is specially designed for click/show stats in server // Param: scope, table_id, var_grad_names, @@ -121,9 +124,9 @@ class FleetWrapper { const std::string& msg); std::default_random_engine& LocalRandomEngine(); - template + template void Serialize(const T& t, std::string* str); - template + template void Deserialize(T* t, const std::string& str); static std::shared_ptr GetInstance() { diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index 65f71096e9e091f24c94c9542253b7cbbf5da8aa..3c91e004f7e8d52b8a5ee93c4f0e97454f95cfdb 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -43,6 +43,7 @@ namespace pybind { void BindFleetWrapper(py::module* m) { py::class_(*m, "Fleet") .def(py::init()) + .def("push_dense", &framework::FleetWrapper::PushDenseVarsSync) .def("init_server", &framework::FleetWrapper::InitServer) .def("init_worker", &framework::FleetWrapper::InitWorker) .def("stop_server", &framework::FleetWrapper::StopServer) diff --git a/python/paddle/fluid/incubate/__init__.py b/python/paddle/fluid/incubate/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..76c5c6391fde3cafbd9a94e1d11e0ef4401420ed --- /dev/null +++ b/python/paddle/fluid/incubate/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +# incubate directory is mainly for internal use +# after we have tested incubate APIs in industrial application for a period +# we will move stable functions into fluid +__version__ = '0.1.0' diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index ec9b803b62c88f57cc76ed6df2237b8373ff4198..e7cf56474ec06d13e2da185ad3c7f318784084d5 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -142,4 +142,4 @@ class DistributedOptimizer(paddle.fluid.Optimizer): no_grad_set) fleet_instance._set_opt_info(opt_info) - return [a, b] + return [optimize_ops, param_grads]