From 9f8c8f967fec782669b97ced3bea6238db7e4a88 Mon Sep 17 00:00:00 2001 From: Thunderbrook <52529258+Thunderbrook@users.noreply.github.com> Date: Thu, 15 Apr 2021 14:14:23 +0800 Subject: [PATCH] heterps support pscore (#32093) * pscore support heterps * fleet cmake * fleet wrapper * macro * solve conflict * solve conflict * add unitest * paddle enforce * unitest * unitest * unitest --- CMakeLists.txt | 1 + cmake/configure.cmake | 3 + .../fluid/distributed/service/CMakeLists.txt | 4 +- .../distributed/service/brpc_ps_client.cc | 6 +- .../fluid/distributed/service/communicator.h | 2 + paddle/fluid/distributed/service/ps_client.cc | 5 +- paddle/fluid/distributed/service/ps_client.h | 13 +- .../distributed/service/ps_local_client.cc | 269 ++++++++++++++++++ .../distributed/service/ps_local_client.h | 226 +++++++++++++++ .../distributed/service/ps_local_server.h | 37 +++ paddle/fluid/distributed/service/server.cc | 2 + .../distributed/table/common_sparse_table.cc | 76 +++++ .../distributed/table/common_sparse_table.h | 8 + .../table/depends/large_scale_kv.h | 32 ++- paddle/fluid/distributed/table/table.h | 7 + paddle/fluid/framework/device_worker.h | 1 - .../framework/distributed_strategy.proto | 1 + paddle/fluid/framework/fleet/CMakeLists.txt | 10 +- paddle/fluid/framework/fleet/fleet_wrapper.h | 3 + paddle/fluid/framework/fleet/heter_context.h | 15 +- .../framework/fleet/heter_ps/feature_value.h | 2 +- .../framework/fleet/heter_ps/hashtable.h | 8 +- .../framework/fleet/heter_ps/hashtable_inl.h | 11 +- .../framework/fleet/heter_ps/heter_comm.h | 4 +- .../framework/fleet/heter_ps/heter_comm_inl.h | 2 +- .../framework/fleet/heter_ps/heter_ps.cu | 6 +- .../fluid/framework/fleet/heter_ps/heter_ps.h | 2 +- .../framework/fleet/heter_ps/heter_ps_base.h | 2 +- .../fleet/heter_ps/heter_resource.cc | 2 +- .../framework/fleet/heter_ps/heter_resource.h | 2 +- .../framework/fleet/heter_ps/optimizer.cuh.h | 2 +- .../fluid/framework/fleet/ps_gpu_wrapper.cc | 52 +++- .../fluid/framework/fleet/ps_gpu_wrapper.cu | 2 +- paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 9 +- paddle/fluid/framework/heter_service.h | 18 +- paddle/fluid/framework/multi_trainer.cc | 103 +++++++ paddle/fluid/framework/ps_gpu_trainer.cc | 5 - paddle/fluid/framework/ps_gpu_worker.cc | 1 - paddle/fluid/framework/trainer.h | 12 +- paddle/fluid/operators/pull_box_sparse_op.h | 6 +- paddle/fluid/pybind/ps_gpu_wrapper_py.cc | 3 +- paddle/fluid/pybind/ps_gpu_wrapper_py.h | 3 +- paddle/fluid/pybind/pybind.cc | 3 +- .../parameter_server_optimizer.py | 47 ++- .../distributed/fleet/runtime/the_one_ps.py | 16 ++ python/paddle/fluid/device_worker.py | 12 +- python/paddle/fluid/executor.py | 7 +- .../fleet/parameter_server/ir/trainer_pass.py | 92 +++++- .../fluid/tests/unittests/CMakeLists.txt | 2 + .../unittests/test_communicator_ps_gpu.py | 97 +++++++ .../tests/unittests/test_dist_fleet_ps11.py | 180 ++++++++++++ .../tests/unittests/test_dist_fleet_ps12.py | 180 ++++++++++++ python/paddle/fluid/trainer_factory.py | 4 +- 53 files changed, 1516 insertions(+), 102 deletions(-) create mode 100644 paddle/fluid/distributed/service/ps_local_client.cc create mode 100644 paddle/fluid/distributed/service/ps_local_client.h create mode 100644 paddle/fluid/distributed/service/ps_local_server.h create mode 100644 python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py create mode 100644 python/paddle/fluid/tests/unittests/test_dist_fleet_ps11.py create mode 100644 python/paddle/fluid/tests/unittests/test_dist_fleet_ps12.py diff --git a/CMakeLists.txt b/CMakeLists.txt index ddd6df1eb2..30f9e3a3dc 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -184,6 +184,7 @@ option(WITH_XBYAK "Compile with xbyak support" ON) option(WITH_CONTRIB "Compile the third-party contributation" OFF) option(WITH_GRPC "Use grpc as the default rpc framework" ${WITH_DISTRIBUTE}) option(WITH_PSCORE "Compile with parameter server support" ${WITH_DISTRIBUTE}) +option(WITH_HETERPS "Compile with heterps" OFF}) option(WITH_INFERENCE_API_TEST "Test fluid inference C++ high-level api interface" OFF) option(PY_VERSION "Compile PaddlePaddle with python3 support" ${PY_VERSION}) option(WITH_DGC "Use DGC(Deep Gradient Compression) or not" ${WITH_DISTRIBUTE}) diff --git a/cmake/configure.cmake b/cmake/configure.cmake index 9f1eb16fcf..bf1352d4e1 100644 --- a/cmake/configure.cmake +++ b/cmake/configure.cmake @@ -173,6 +173,9 @@ if(WITH_PSCORE) add_definitions(-DPADDLE_WITH_PSCORE) endif() +if(WITH_HETERPS) + add_definitions(-DPADDLE_WITH_HETERPS) +endif() if(WITH_GRPC) add_definitions(-DPADDLE_WITH_GRPC) diff --git a/paddle/fluid/distributed/service/CMakeLists.txt b/paddle/fluid/distributed/service/CMakeLists.txt index 843dea9eea..d1f04e26ad 100644 --- a/paddle/fluid/distributed/service/CMakeLists.txt +++ b/paddle/fluid/distributed/service/CMakeLists.txt @@ -16,6 +16,7 @@ set_source_files_properties(communicator.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUT set_source_files_properties(service.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(brpc_ps_server.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(brpc_ps_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +set_source_files_properties(ps_local_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(brpc_utils.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(heter_server.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) @@ -29,7 +30,8 @@ set_source_files_properties(graph_brpc_client.cc PROPERTIES COMPILE_FLAGS ${DIST cc_library(brpc_utils SRCS brpc_utils.cc DEPS tensor device_context ${COMMON_DEPS} ${RPC_DEPS}) cc_library(downpour_server SRCS graph_brpc_server.cc brpc_ps_server.cc DEPS boost eigen3 table brpc_utils simple_threadpool ${RPC_DEPS}) -cc_library(downpour_client SRCS graph_brpc_client.cc brpc_ps_client.cc DEPS boost eigen3 table brpc_utils simple_threadpool ${RPC_DEPS}) +cc_library(downpour_client SRCS graph_brpc_client.cc brpc_ps_client.cc +ps_local_client.cc DEPS boost eigen3 table brpc_utils simple_threadpool ${RPC_DEPS}) cc_library(client SRCS ps_client.cc DEPS downpour_client boost ${RPC_DEPS}) cc_library(server SRCS server.cc DEPS downpour_server boost ${RPC_DEPS}) diff --git a/paddle/fluid/distributed/service/brpc_ps_client.cc b/paddle/fluid/distributed/service/brpc_ps_client.cc index b49a71ab0c..a6ad9d08f5 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.cc +++ b/paddle/fluid/distributed/service/brpc_ps_client.cc @@ -880,8 +880,8 @@ std::future BrpcPsClient::send_client2client_msg( auto promise = std::make_shared>(); std::future fut = promise->get_future(); if (to_client_id >= _client_channels.size()) { - LOG(FATAL) << "to_client_id is out of range clients, which size is " - << _client_channels.size(); + VLOG(0) << "to_client_id is out of range clients, which size is " + << _client_channels.size(); promise->set_value(-1); return fut; } @@ -1001,4 +1001,4 @@ int32_t BrpcPsClient::recv_and_save_table(const uint64_t table_id, } } // namespace distributed -} // namespace paddle \ No newline at end of file +} // namespace paddle diff --git a/paddle/fluid/distributed/service/communicator.h b/paddle/fluid/distributed/service/communicator.h index 043fe9d83d..fa60cab2b5 100644 --- a/paddle/fluid/distributed/service/communicator.h +++ b/paddle/fluid/distributed/service/communicator.h @@ -310,6 +310,8 @@ class Communicator { return _worker_ptr; } + RecvCtxMap &GetRecvCtxMap() { return recv_varname_to_ctx_; } + std::shared_ptr _worker_ptr; // pointer to worker protected: diff --git a/paddle/fluid/distributed/service/ps_client.cc b/paddle/fluid/distributed/service/ps_client.cc index 3f78908baa..d45f41a0f5 100644 --- a/paddle/fluid/distributed/service/ps_client.cc +++ b/paddle/fluid/distributed/service/ps_client.cc @@ -16,12 +16,15 @@ #include "glog/logging.h" #include "paddle/fluid/distributed/service/brpc_ps_client.h" #include "paddle/fluid/distributed/service/graph_brpc_client.h" +#include "paddle/fluid/distributed/service/ps_local_client.h" #include "paddle/fluid/distributed/table/table.h" namespace paddle { namespace distributed { REGISTER_PSCORE_CLASS(PSClient, BrpcPsClient); +REGISTER_PSCORE_CLASS(PSClient, PsLocalClient); REGISTER_PSCORE_CLASS(PSClient, GraphBrpcClient); + int32_t PSClient::configure( const PSParameter &config, const std::map> ®ions, @@ -83,4 +86,4 @@ PSClient *PSClientFactory::create(const PSParameter &ps_config) { return client; } } // namespace distributed -} // namespace paddle \ No newline at end of file +} // namespace paddle diff --git a/paddle/fluid/distributed/service/ps_client.h b/paddle/fluid/distributed/service/ps_client.h index 1c8abc6c2e..74a1e0dde7 100644 --- a/paddle/fluid/distributed/service/ps_client.h +++ b/paddle/fluid/distributed/service/ps_client.h @@ -118,6 +118,17 @@ class PSClient { const uint64_t *keys, size_t num, bool is_training) = 0; + virtual ::std::future pull_sparse_ptr(char **select_values, + size_t table_id, + const uint64_t *keys, + size_t num) { + VLOG(0) << "Did not implement"; + std::promise promise; + std::future fut = promise.get_future(); + promise.set_value(-1); + return fut; + } + virtual std::future print_table_stat(uint32_t table_id) = 0; // 确保所有积攒中的请求都发起发送 @@ -150,7 +161,7 @@ class PSClient { virtual std::future send_client2client_msg(int msg_type, int to_client_id, const std::string &msg) { - LOG(FATAL) << "Did not implement"; + VLOG(0) << "Did not implement"; std::promise promise; std::future fut = promise.get_future(); promise.set_value(-1); diff --git a/paddle/fluid/distributed/service/ps_local_client.cc b/paddle/fluid/distributed/service/ps_local_client.cc new file mode 100644 index 0000000000..2acc845a50 --- /dev/null +++ b/paddle/fluid/distributed/service/ps_local_client.cc @@ -0,0 +1,269 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/service/ps_local_client.h" +#include "paddle/fluid/distributed/table/table.h" + +//#define pslib_debug_dense_compress + +namespace paddle { +namespace distributed { +int32_t PsLocalClient::initialize() { + const auto& downpour_param = _config.server_param().downpour_server_param(); + TableManager::instance().initialize(); + for (size_t i = 0; i < downpour_param.downpour_table_param_size(); ++i) { + auto* table = CREATE_PSCORE_CLASS( + Table, downpour_param.downpour_table_param(i).table_class()); + table->initialize(downpour_param.downpour_table_param(i), + _config.fs_client_param()); + table->set_shard(0, 1); + _table_map[downpour_param.downpour_table_param(i).table_id()].reset(table); + } + return 0; +} + +::std::future PsLocalClient::shrink(uint32_t table_id, + const std::string threshold) { + // TODO + return done(); +} + +::std::future PsLocalClient::load(const std::string& epoch, + const std::string& mode) { + // TODO + // for (auto& it : _table_map) { + // load(it.first, epoch, mode); + //} + return done(); +} +::std::future PsLocalClient::load(uint32_t table_id, + const std::string& epoch, + const std::string& mode) { + // TODO + // auto* table_ptr = table(table_id); + // table_ptr->load(epoch, mode); + return done(); +} + +::std::future PsLocalClient::save(const std::string& epoch, + const std::string& mode) { + // TODO + for (auto& it : _table_map) { + save(it.first, epoch, mode); + } + return done(); +} +::std::future PsLocalClient::save(uint32_t table_id, + const std::string& epoch, + const std::string& mode) { + // TODO + auto* table_ptr = table(table_id); + table_ptr->flush(); + table_ptr->save(epoch, mode); + return done(); +} + +::std::future PsLocalClient::clear() { + // TODO + return done(); +} +::std::future PsLocalClient::clear(uint32_t table_id) { + // TODO + return done(); +} + +::std::future PsLocalClient::flush() { + // no need + return done(); +} + +::std::future PsLocalClient::stop_server() { + // no need + return done(); +} + +::std::future PsLocalClient::pull_dense(Region* regions, + size_t region_num, + size_t table_id) { + auto* accessor = table_accessor(table_id); + auto* table_ptr = table(table_id); + + uint32_t num_per_shard = dense_dim_per_shard(accessor->fea_dim(), 1); + std::vector region_buffer; + region_buffer.resize(num_per_shard); + table_ptr->pull_dense(region_buffer.data(), region_buffer.size()); + + size_t region_idx = 0; + size_t region_data_idx = 0; + size_t shard_data_size = num_per_shard; + size_t shard_buffer_remain = shard_data_size * sizeof(float); + PADDLE_ENFORCE_EQ( + shard_buffer_remain, region_buffer.size() * sizeof(float), + platform::errors::PreconditionNotMet("pull dense size error.")); + size_t index = 0; + while (shard_buffer_remain > 0 && region_idx < region_num) { + auto& region = regions[region_idx]; + if (region.size - region_data_idx >= shard_buffer_remain) { + memcpy((void*)(region.data + region_data_idx), + (uint8_t*)(void*)(region_buffer.data()) + index, + shard_buffer_remain); + region_data_idx += shard_buffer_remain; + shard_buffer_remain = 0; + } else if (region.size - region_data_idx == 0) { + ++region_idx; + region_data_idx = 0; + } else { + memcpy((void*)(region.data + region_data_idx), + (uint8_t*)(void*)(region_buffer.data()) + index, + region.size - region_data_idx); + shard_buffer_remain -= (region.size - region_data_idx); + index += (region.size - region_data_idx); + ++region_idx; + region_data_idx = 0; + } + } + + return done(); +} + +::std::future PsLocalClient::push_dense_param(const Region* regions, + size_t region_num, + size_t table_id) { + auto* accessor = table_accessor(table_id); + auto* table_ptr = table(table_id); + + std::vector region_buffer; + region_buffer.resize(dense_dim_per_shard(accessor->fea_dim(), 1), 0); + for (size_t i = 0, offset = 0; i < region_num; ++i) { + uint32_t data_num = regions[i].size / sizeof(float); + memcpy(region_buffer.data() + offset, regions[i].data, regions[i].size); + offset += data_num; + } + + // table_ptr->push_dense_param(region_buffer.data(), region_buffer.size()); + + return done(); +} + +::std::future PsLocalClient::push_dense_raw_gradient( + int table_id, float* total_send_data, size_t total_send_data_size, + void* callback) { + VLOG(1) << "wxx push_dense_raw_gradient"; + + PSClientClosure* closure = reinterpret_cast(callback); + + auto* table_ptr = table(table_id); + + table_ptr->push_dense(total_send_data, total_send_data_size); + delete closure; + return done(); +} + +::std::future PsLocalClient::push_dense(const Region* regions, + size_t region_num, + size_t table_id) { + auto* accessor = table_accessor(table_id); + auto* table_ptr = table(table_id); + + std::vector region_buffer; + region_buffer.resize(dense_dim_per_shard(accessor->fea_dim(), 1)); + size_t data_size = region_buffer.size(); + for (size_t i = 0, offset = 0; i < region_num; ++i) { + uint32_t data_num = regions[i].size / sizeof(float); + PADDLE_ENFORCE_LE( + offset + data_num, data_size, + platform::errors::PreconditionNotMet( + "invalid dense size, cur pos[%d] data_num[%d] size[%d]", offset, + data_num, data_size)); + memcpy(region_buffer.data() + offset, regions[i].data, regions[i].size); + offset += data_num; + } + + table_ptr->push_dense(region_buffer.data(), region_buffer.size()); + + return done(); +} + +//::std::future PsLocalClient::pull_sparse(float** select_values, +// size_t table_id, +// const uint64_t* keys, +// size_t num) { +// // FIXME +// // auto timer = +// // std::make_shared("pslib_downpour_client_pull_sparse"); +// // auto local_timer = +// // std::make_shared("pslib_downpour_client_pull_sparse_local"); +// //将key拆分到各shard请求,并记录原始对应value指针 +// auto* accessor = table_accessor(table_id); +// auto* table_ptr = table(table_id); +// size_t value_size = accessor->select_size(); +// +// // table_ptr->pull_sparse(keys, num); +// std::vector res_data; +// res_data.resize(num * value_size / sizeof(float)); +// table_ptr->pull_sparse(res_data.data(), keys, num); +// // memcpy(select_values[0], res_data->data(), res_data->size() * +// // sizeof(float)); +// size_t offset = 0; +// for (int i = 0; i < num; ++i) { +// memcpy(select_values[i], (char*)res_data.data() + offset, value_size); +// offset += value_size; +// } +// +// // return fut; +// return done(); +//} + +::std::future PsLocalClient::pull_sparse_ptr(char** select_values, + size_t table_id, + const uint64_t* keys, + size_t num) { + // FIXME + // auto timer = + // std::make_shared("pslib_downpour_client_pull_sparse"); + // auto local_timer = + // std::make_shared("pslib_downpour_client_pull_sparse_local"); + //将key拆分到各shard请求,并记录原始对应value指针 + auto* table_ptr = table(table_id); + + table_ptr->pull_sparse_ptr(select_values, keys, num); + + return done(); +} + +::std::future PsLocalClient::push_sparse_raw_gradient( + size_t table_id, const uint64_t* keys, const float** update_values, + size_t num, void* callback) { + VLOG(1) << "wxx push_sparse_raw_gradient"; + PSClientClosure* closure = reinterpret_cast(callback); + auto* accessor = table_accessor(table_id); + auto* table_ptr = table(table_id); + + table_ptr->push_sparse(keys, update_values, num); + delete closure; + return done(); +} + +::std::future PsLocalClient::push_sparse(size_t table_id, + const uint64_t* keys, + const float** update_values, + size_t num) { + auto* accessor = table_accessor(table_id); + auto* table_ptr = table(table_id); + + table_ptr->push_sparse(keys, update_values, num); + return done(); +} +} +} diff --git a/paddle/fluid/distributed/service/ps_local_client.h b/paddle/fluid/distributed/service/ps_local_client.h new file mode 100644 index 0000000000..9d2b01a45f --- /dev/null +++ b/paddle/fluid/distributed/service/ps_local_client.h @@ -0,0 +1,226 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License 0// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include "brpc/channel.h" +#include "brpc/controller.h" +#include "brpc/server.h" +#include "paddle/fluid/distributed/service/ps_client.h" + +namespace paddle { +namespace distributed { + +class Table; + +class PsLocalClient : public PSClient { + public: + PsLocalClient() {} + virtual ~PsLocalClient() { _running = false; } + virtual int32_t create_client2client_connection(int pslib_timeout_ms, + int pslib_connect_timeout_ms, + int max_retry) { + return 0; + } + + virtual ::std::future shrink(uint32_t table_id, + const std::string threshold) override; + virtual ::std::future load(const std::string& epoch, + const std::string& mode) override; + virtual ::std::future load(uint32_t table_id, + const std::string& epoch, + const std::string& mode) override; + + virtual ::std::future save(const std::string& epoch, + const std::string& mode) override; + virtual ::std::future save(uint32_t table_id, + const std::string& epoch, + const std::string& mode) override; + + virtual ::std::future clear() override; + virtual ::std::future clear(uint32_t table_id) override; + + virtual ::std::future stop_server() override; + + virtual void finalize_worker() override {} + virtual ::std::future pull_dense(Region* regions, size_t region_num, + size_t table_id); + + virtual ::std::future push_dense(const Region* regions, + size_t region_num, size_t table_id); + + virtual ::std::future push_dense_param(const Region* regions, + size_t region_num, + size_t table_id); + + virtual ::std::future pull_sparse(float** select_values, + size_t table_id, + const uint64_t* keys, size_t num, + bool is_training) { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + } + + virtual ::std::future pull_sparse_ptr(char** select_values, + size_t table_id, + const uint64_t* keys, + size_t num); + + virtual ::std::future print_table_stat(uint32_t table_id) { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + } + virtual ::std::future push_sparse(size_t table_id, + const uint64_t* keys, + const float** update_values, + size_t num); + + virtual ::std::future flush(); + // server profilera + virtual std::future start_profiler() { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + }; + + virtual std::future stop_profiler() { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + } + + virtual std::future barrier(size_t table_id, uint32_t barrier_type) { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + } + + virtual std::future pull_geo_param(size_t table_id, + std::vector* values, + std::vector* keys, + int pserver_idx) { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + } + + virtual std::future push_global_step(int table_id, + int64_t* total_send_data, + void* done) { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + } + + // recv table from server and save it in LodTensor + virtual int32_t recv_and_save_table(const uint64_t table_id, + const std::string& path) { + return 0; + } + + virtual ::std::future send_client2client_msg( + int msg_type, int to_client_id, const std::string& msg) override { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + } + virtual size_t get_server_nums() { return 1; } + + virtual std::future push_dense_raw_gradient( + int table_id, float* total_send_data, size_t total_send_data_size, + void* callback) override; + + virtual std::future push_sparse_raw_gradient( + size_t table_id, const uint64_t* keys, const float** update_values, + size_t num, void* callback) override; + + virtual std::future push_sparse_raw_gradient_partial( + size_t table_id, const uint64_t* keys, const float** update_values, + uint32_t num, void* done, int pserver_idx) override { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + } + + virtual std::future push_sparse_param(size_t table_id, + const uint64_t* keys, + const float** update_values, + size_t num, + void* done) override { + std::promise prom; + std::future fut = prom.get_future(); + prom.set_value(0); + + return fut; + } + + private: + virtual int32_t initialize() override; + + std::future done() { + std::shared_ptr> prom = + std::make_shared>(); + std::future fut = prom->get_future(); + prom->set_value(0); + return fut; + } + + inline uint32_t dense_dim_per_shard(uint32_t dense_dim_total, + uint32_t shard_num) { + return dense_dim_total / shard_num + 1; + } + + inline std::unordered_map>* table() { + return &_table_map; + } + + inline Table* table(size_t table_id) { + auto itr = _table_map.find(table_id); + if (itr != _table_map.end()) { + return itr->second.get(); + } + LOG(ERROR) << "table not found " << table_id; + return NULL; + } + + std::unordered_map> _table_map; + + bool _running = false; + bool _flushing = false; + + private: + float _mae = 0; + float _mse = 0; + uint16_t _push_times = 0; +}; +} +} diff --git a/paddle/fluid/distributed/service/ps_local_server.h b/paddle/fluid/distributed/service/ps_local_server.h new file mode 100644 index 0000000000..dfbccc7090 --- /dev/null +++ b/paddle/fluid/distributed/service/ps_local_server.h @@ -0,0 +1,37 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include "paddle/fluid/distributed/service/server.h" + +namespace paddle { +namespace distributed { + +class PsLocalServer : public PSServer { + public: + PsLocalServer() {} + virtual ~PsLocalServer() {} + virtual uint64_t start() { return 0; } + virtual uint64_t start(const std::string& ip, uint32_t port) { return 0; } + virtual int32_t stop() { return 0; } + virtual int32_t port() { return 0; } + + private: + virtual int32_t initialize() { return 0; } +}; +} +} diff --git a/paddle/fluid/distributed/service/server.cc b/paddle/fluid/distributed/service/server.cc index 9324adad69..e44876e3d2 100644 --- a/paddle/fluid/distributed/service/server.cc +++ b/paddle/fluid/distributed/service/server.cc @@ -17,12 +17,14 @@ #include "glog/logging.h" #include "paddle/fluid/distributed/service/brpc_ps_server.h" #include "paddle/fluid/distributed/service/graph_brpc_server.h" +#include "paddle/fluid/distributed/service/ps_local_server.h" #include "paddle/fluid/distributed/table/table.h" namespace paddle { namespace distributed { REGISTER_PSCORE_CLASS(PSServer, BrpcPsServer); +REGISTER_PSCORE_CLASS(PSServer, PsLocalServer); REGISTER_PSCORE_CLASS(PsBaseService, BrpcPsService); REGISTER_PSCORE_CLASS(PSServer, GraphBrpcServer); REGISTER_PSCORE_CLASS(PsBaseService, GraphBrpcService); diff --git a/paddle/fluid/distributed/table/common_sparse_table.cc b/paddle/fluid/distributed/table/common_sparse_table.cc index a25a90aa9a..2e8c257b6a 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.cc +++ b/paddle/fluid/distributed/table/common_sparse_table.cc @@ -446,6 +446,43 @@ int32_t CommonSparseTable::pull_sparse(float* pull_values, return 0; } +int32_t CommonSparseTable::pull_sparse_ptr(char** pull_values, + const uint64_t* keys, size_t num) { + std::vector> offset_bucket; + offset_bucket.resize(task_pool_size_); + + for (int x = 0; x < num; ++x) { + auto y = keys[x] % task_pool_size_; + offset_bucket[y].push_back(x); + } + + std::vector> tasks(task_pool_size_); + + for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { + tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( + [this, shard_id, &keys, &offset_bucket, &pull_values]() -> int { + auto& block = shard_values_[shard_id]; + auto& offsets = offset_bucket[shard_id]; + + for (int i = 0; i < offsets.size(); ++i) { + auto offset = offsets[i]; + auto id = keys[offset]; + auto* value = block->InitGet(id); + // std::copy_n(value + param_offset_, param_dim_, + // pull_values + param_dim_ * offset); + pull_values[offset] = (char*)value; + } + + return 0; + }); + } + + for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { + tasks[shard_id].wait(); + } + return 0; +} + int32_t CommonSparseTable::_push_sparse(const uint64_t* keys, const float* values, size_t num) { rwlock_->RDLock(); @@ -502,6 +539,45 @@ int32_t CommonSparseTable::push_sparse(const uint64_t* keys, return 0; } +int32_t CommonSparseTable::push_sparse(const uint64_t* keys, + const float** values, size_t num) { + _push_sparse(keys, values, num); + return 0; +} + +int32_t CommonSparseTable::_push_sparse(const uint64_t* keys, + const float** values, size_t num) { + rwlock_->RDLock(); + std::vector> offset_bucket; + offset_bucket.resize(task_pool_size_); + + for (int x = 0; x < num; ++x) { + auto y = keys[x] % task_pool_size_; + offset_bucket[y].push_back(x); + } + + std::vector> tasks(task_pool_size_); + + for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { + tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( + [this, shard_id, &keys, &values, num, &offset_bucket]() -> int { + auto& offsets = offset_bucket[shard_id]; + for (size_t i = 0; i < offsets.size(); ++i) { + std::vector tmp_off = {0}; + optimizer_->update(keys + offsets[i], values[offsets[i]], num, + tmp_off, shard_values_[shard_id].get()); + } + return 0; + }); + } + + for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { + tasks[shard_id].wait(); + } + rwlock_->UNLock(); + return 0; +} + int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys, const float* values, size_t num) { rwlock_->RDLock(); diff --git a/paddle/fluid/distributed/table/common_sparse_table.h b/paddle/fluid/distributed/table/common_sparse_table.h index 31f4dabcdf..50c295da53 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.h +++ b/paddle/fluid/distributed/table/common_sparse_table.h @@ -63,9 +63,15 @@ class CommonSparseTable : public SparseTable { virtual std::pair print_table_stat(); virtual int32_t pull_sparse(float* values, const PullSparseValue& pull_value); + virtual int32_t pull_sparse_ptr(char** pull_values, const uint64_t* keys, + size_t num); + virtual int32_t push_sparse(const uint64_t* keys, const float* values, size_t num); + virtual int32_t push_sparse(const uint64_t* keys, const float** values, + size_t num); + // only for sparse geo table virtual int32_t push_sparse_param(const uint64_t* keys, const float* values, size_t num); @@ -80,6 +86,8 @@ class CommonSparseTable : public SparseTable { protected: virtual int32_t _push_sparse(const uint64_t* keys, const float* values, size_t num); + virtual int32_t _push_sparse(const uint64_t* keys, const float** values, + size_t num); private: const int task_pool_size_ = 11; diff --git a/paddle/fluid/distributed/table/depends/large_scale_kv.h b/paddle/fluid/distributed/table/depends/large_scale_kv.h index cb077033ca..68d252661e 100644 --- a/paddle/fluid/distributed/table/depends/large_scale_kv.h +++ b/paddle/fluid/distributed/table/depends/large_scale_kv.h @@ -87,7 +87,7 @@ class ValueBlock { value_dims_(value_dims), value_offsets_(value_offsets), value_idx_(value_idx) { - for (int x = 0; x < value_dims.size(); ++x) { + for (size_t x = 0; x < value_dims.size(); ++x) { value_length_ += value_dims[x]; } @@ -96,13 +96,15 @@ class ValueBlock { auto slices = string::split_string(entry_attr, ":"); if (slices[0] == "none") { entry_func_ = std::bind(&count_entry, std::placeholders::_1, 0); + threshold_ = 0; } else if (slices[0] == "count_filter_entry") { - int threshold = std::stoi(slices[1]); - entry_func_ = std::bind(&count_entry, std::placeholders::_1, threshold); + threshold_ = std::stoi(slices[1]); + entry_func_ = + std::bind(&count_entry, std::placeholders::_1, threshold_); } else if (slices[0] == "probability_entry") { - float threshold = std::stof(slices[1]); + threshold_ = std::stof(slices[1]); entry_func_ = - std::bind(&probility_entry, std::placeholders::_1, threshold); + std::bind(&probility_entry, std::placeholders::_1, threshold_); } else { PADDLE_THROW(platform::errors::InvalidArgument( "Not supported Entry Type : %s, Only support [CountFilterEntry, " @@ -170,6 +172,21 @@ class ValueBlock { return value->data_.data(); } + VALUE *InitGet(const uint64_t &id, const bool with_update = true, + const int counter = 1) { + if (!Has(id)) { + values_[id] = std::make_shared(value_length_); + } + + auto &value = values_.at(id); + + if (with_update) { + AttrUpdate(value, counter); + } + + return value.get(); + } + void AttrUpdate(std::shared_ptr value, const int counter) { // update state value->unseen_days_ = 0; @@ -179,7 +196,7 @@ class ValueBlock { value->is_entry_ = entry_func_(value); if (value->is_entry_) { // initialize - for (int x = 0; x < value_names_.size(); ++x) { + for (size_t x = 0; x < value_names_.size(); ++x) { initializers_[x]->GetValue(value->data_.data() + value_offsets_[x], value_dims_[x]); } @@ -224,6 +241,8 @@ class ValueBlock { return; } + float GetThreshold() { return threshold_; } + private: bool Has(const uint64_t id) { auto got = values_.find(id); @@ -246,6 +265,7 @@ class ValueBlock { std::function)> entry_func_; std::vector> initializers_; + float threshold_; }; } // namespace distributed diff --git a/paddle/fluid/distributed/table/table.h b/paddle/fluid/distributed/table/table.h index 5bc818ff47..81a1ff5ece 100644 --- a/paddle/fluid/distributed/table/table.h +++ b/paddle/fluid/distributed/table/table.h @@ -48,10 +48,17 @@ class Table { return 0; } + virtual int32_t pull_sparse_ptr(char **pull_values, const uint64_t *keys, + size_t num) { + VLOG(0) << "NOT IMPLEMENT"; + return 0; + } virtual int32_t pull_sparse(float *values, const PullSparseValue &pull_value) = 0; virtual int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) = 0; + virtual int32_t push_sparse(const uint64_t *keys, const float **values, + size_t num){}; virtual int32_t push_sparse_param(const uint64_t *keys, const float *values, size_t num) { return 0; diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index 9ced4221e1..628b9f0d70 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -562,7 +562,6 @@ class PSGPUWorker : public HogwildWorker { void ResetStat(); protected: - std::shared_ptr fleet_ptr_; void PushGradients(); void DumpParam(); void CopySparseTable(); diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 6363eedc80..59af35465a 100755 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -124,6 +124,7 @@ message AsyncConfig { optional bool launch_barrier = 9 [ default = true ]; optional string heter_worker_device_guard = 10 [ default = 'cpu' ]; optional int32 lr_decay_steps = 11 [ default = 10 ]; + optional int32 use_ps_gpu = 12 [ default = 0 ]; } message PipelineConfig { diff --git a/paddle/fluid/framework/fleet/CMakeLists.txt b/paddle/fluid/framework/fleet/CMakeLists.txt index ce0a905afc..c8517b9503 100644 --- a/paddle/fluid/framework/fleet/CMakeLists.txt +++ b/paddle/fluid/framework/fleet/CMakeLists.txt @@ -1,5 +1,10 @@ if(WITH_PSLIB) cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope pslib_brpc pslib) +else() + cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope) +endif(WITH_PSLIB) + +if(WITH_HETERPS) if(WITH_NCCL) nv_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc DEPS heter_ps) @@ -8,13 +13,10 @@ if(WITH_PSLIB) hip_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc DEPS heter_ps) add_subdirectory(heter_ps) - else() - cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc) endif(WITH_NCCL) else() - cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope) cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc) -endif(WITH_PSLIB) +endif(WITH_HETERPS) if(WITH_NCCL OR WITH_RCCL) cc_library(nccl_wrapper SRCS nccl_wrapper.cc DEPS framework_proto variable_helper scope) diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index e584fb5e2b..613b280363 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -34,6 +34,9 @@ limitations under the License. */ #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN +#ifdef PADDLE_WITH_HETERPS +#include "paddle/fluid/platform/type_defs.h" +#endif namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/fleet/heter_context.h b/paddle/fluid/framework/fleet/heter_context.h index a02931b3f5..6f063e830c 100644 --- a/paddle/fluid/framework/fleet/heter_context.h +++ b/paddle/fluid/framework/fleet/heter_context.h @@ -14,15 +14,21 @@ limitations under the License. */ #pragma once -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ - (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_HETERPS #include #include #include #include +#ifdef PADDLE_WITH_PSLIB #include "common_value.h" // NOLINT +#endif + +#ifdef PADDLE_WITH_PSCORE +#include "paddle/fluid/distributed/table/depends/large_scale_kv.h" +#endif + #include "paddle/fluid/framework/fleet/heter_ps/feature_value.h" #include "paddle/fluid/framework/scope.h" @@ -39,7 +45,12 @@ class HeterContext { } Scope* scope_{nullptr}; std::vector> feature_keys_; +#ifdef PADDLE_WITH_PSLIB std::vector> value_ptr_; +#endif +#ifdef PADDLE_WITH_PSCORE + std::vector> value_ptr_; +#endif std::vector> device_values_; std::vector> device_keys_; std::vector mutex_; diff --git a/paddle/fluid/framework/fleet/heter_ps/feature_value.h b/paddle/fluid/framework/fleet/heter_ps/feature_value.h index 698ece09de..c3bf33b32c 100644 --- a/paddle/fluid/framework/fleet/heter_ps/feature_value.h +++ b/paddle/fluid/framework/fleet/heter_ps/feature_value.h @@ -14,7 +14,7 @@ limitations under the License. */ #pragma once -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS #include diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable.h b/paddle/fluid/framework/fleet/heter_ps/hashtable.h index e5c0972763..089130f6da 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable.h +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable.h @@ -17,11 +17,17 @@ limitations under the License. */ #include #include #include +#ifdef PADDLE_WTIH_PSLIB #include "common_value.h" // NOLINT +#endif +#ifdef PADDLE_WITH_PSCORE +#endif #include "thrust/pair.h" //#include "cudf/concurrent_unordered_map.cuh.h" #include "paddle/fluid/framework/fleet/heter_ps/cudf/concurrent_unordered_map.cuh.h" -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS +#include "paddle/fluid/distributed/table/depends/large_scale_kv.h" +#include "paddle/fluid/platform/type_defs.h" namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h b/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h index 871f9c7857..098c795fc7 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS namespace paddle { namespace framework { @@ -119,6 +119,7 @@ void HashTable::dump_to_cpu(int devid, cudaStream_t stream) { continue; } ValType& gpu_val = kv[i].second; +#ifdef PADDLE_WITH_PSLIB auto* downpour_value = (paddle::ps::DownpourFixedFeatureValue*)(gpu_val.cpu_ptr); int downpour_value_size = downpour_value->size(); @@ -138,6 +139,14 @@ void HashTable::dump_to_cpu(int devid, cudaStream_t stream) { cpu_val[x + 7] = gpu_val.mf[x]; } } +#endif +#ifdef PADDLE_WITH_PSCORE + auto* downpour_value = (paddle::distributed::VALUE*)(gpu_val.cpu_ptr); + downpour_value->count_ = gpu_val.show; + for (int x = 0; x < gpu_val.mf_size; x++) { + downpour_value->data_[x] = gpu_val.mf[x]; + } +#endif } container_->prefetch(devid, stream); diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_comm.h b/paddle/fluid/framework/fleet/heter_ps/heter_comm.h index 0e38ebbd7f..2ec2a8a1f1 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_comm.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_comm.h @@ -25,7 +25,7 @@ limitations under the License. */ #include "paddle/fluid/platform/place.h" #include "thrust/pair.h" -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS namespace paddle { namespace framework { @@ -182,7 +182,7 @@ class HeterComm { std::vector> path_; std::vector storage_; int feanum_{1800 * 2048}; - int multi_node_{1}; + int multi_node_{0}; std::vector nccl_inner_comms_; std::vector nccl_inter_comms_; int node_size_; diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h b/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h index 2f1c809c01..1b4205e3c3 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h @@ -12,9 +12,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#ifdef PADDLE_WITH_HETERPS #include -#ifdef PADDLE_WITH_PSLIB namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_ps.cu b/paddle/fluid/framework/fleet/heter_ps/heter_ps.cu index f2e129ded9..581b0d511c 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_ps.cu +++ b/paddle/fluid/framework/fleet/heter_ps/heter_ps.cu @@ -15,7 +15,7 @@ limitations under the License. */ #include #include "paddle/fluid/framework/fleet/heter_ps/heter_ps.h" -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS namespace paddle { namespace framework { @@ -54,8 +54,8 @@ void HeterPs::show_one_table(int gpu_num) { comm_->show_one_table(gpu_num); } void HeterPs::push_sparse(int num, FeatureKey* d_keys, FeaturePushValue* d_grads, size_t len) { - // comm_->push_sparse(num, d_keys, d_grads, len, opt_); - comm_->push_sparse_multi_node(num, d_keys, d_grads, len, opt_); + comm_->push_sparse(num, d_keys, d_grads, len, opt_); + // comm_->push_sparse_multi_node(num, d_keys, d_grads, len, opt_); } void HeterPs::set_nccl_comm_and_size(const std::vector& inner_comms, diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_ps.h b/paddle/fluid/framework/fleet/heter_ps/heter_ps.h index 142f4a93b9..d78b6b4920 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_ps.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_ps.h @@ -18,7 +18,7 @@ limitations under the License. */ #include "paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h" #include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h" -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h b/paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h index 7980220eab..05b3ecf9c3 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h @@ -17,7 +17,7 @@ limitations under the License. */ #include "paddle/fluid/framework/fleet/heter_ps/feature_value.h" #include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h" -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_resource.cc b/paddle/fluid/framework/fleet/heter_ps/heter_resource.cc index f65b664f83..0f2af2a522 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_resource.cc +++ b/paddle/fluid/framework/fleet/heter_ps/heter_resource.cc @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS #include "heter_resource.h" #include "paddle/fluid/platform/cuda_device_guard.h" diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_resource.h b/paddle/fluid/framework/fleet/heter_ps/heter_resource.h index ad7649a8a3..7b23379994 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_resource.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_resource.h @@ -20,7 +20,7 @@ limitations under the License. */ #include "paddle/fluid/platform/cuda_device_guard.h" #include "paddle/fluid/platform/enforce.h" -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h b/paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h index b3ec9e752e..7e82a8e014 100644 --- a/paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h +++ b/paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h @@ -18,7 +18,7 @@ limitations under the License. */ #include "optimizer_conf.h" #include "paddle/fluid/framework/fleet/heter_ps/feature_value.h" -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 4274876c99..b7bb511074 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -26,8 +26,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ - (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_HETERPS #include #include @@ -58,7 +57,12 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, auto& device_mutex = gpu_task->mutex_; std::vector threads; +#ifdef PADDLE_WITH_PSLIB auto fleet_ptr = FleetWrapper::GetInstance(); +#endif +#ifdef PADDLE_WITH_PSCORE + auto fleet_ptr = paddle::distributed::Communicator::GetInstance(); +#endif // data should be in input channel thread_keys_.resize(thread_keys_thread_num_); @@ -124,9 +128,16 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, auto ptl_func = [this, &local_keys, &local_ptr, &table_id, &fleet_ptr](int i) { size_t key_size = local_keys[i].size(); +#ifdef PADDLE_WITH_PSLIB auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr( reinterpret_cast(local_ptr[i].data()), table_id, local_keys[i].data(), key_size); +#endif +#ifdef PADDLE_WITH_PSCORE + auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr( + reinterpret_cast(local_ptr[i].data()), table_id, + local_keys[i].data(), key_size); +#endif tt.wait(); auto status = tt.get(); // auto status = 0; @@ -153,8 +164,14 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, auto build_func = [device_num, &local_keys, &local_ptr, &device_keys, &device_vals, &device_mutex](int i) { std::vector> task_keys(device_num); +#ifdef PADDLE_WITH_PSLIB std::vector> task_ptrs( device_num); +#endif + +#ifdef PADDLE_WITH_PSCORE + std::vector> task_ptrs(device_num); +#endif for (size_t j = 0; j < local_keys[i].size(); j++) { int shard = local_keys[i][j] % device_num; @@ -169,7 +186,7 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, int cur = device_keys[dev].size(); device_keys[dev].resize(device_keys[dev].size() + len); device_vals[dev].resize(device_vals[dev].size() + len); - +#ifdef PADDLE_WITH_PSLIB for (int j = 0; j < len; ++j) { device_keys[dev][cur + j] = task_keys[dev][j]; float* ptr_val = task_ptrs[dev][j]->data(); @@ -196,6 +213,35 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, } } } +#endif +#ifdef PADDLE_WITH_PSCORE + for (int j = 0; j < len; ++j) { + device_keys[dev][cur + j] = task_keys[dev][j]; + distributed::VALUE* ptr_val = task_ptrs[dev][j]; + FeatureValue& val = device_vals[dev][cur + j]; + bool has_mf = 1; + val.delta_score = 0; + val.show = ptr_val->count_; + val.clk = 0; + val.slot = 0; + val.lr = 0; + val.lr_g2sum = 0; + val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]); + + if (has_mf) { + val.mf_size = MF_DIM + 1; + for (int x = 0; x < val.mf_size; x++) { + val.mf[x] = ptr_val->data_[x]; + } + } else { + val.mf_size = 0; + for (int x = 0; x < MF_DIM + 1; x++) { + val.mf[x] = 0; + } + } + } +#endif + VLOG(1) << "GpuPs build hbmps done"; device_mutex[dev]->unlock(); } diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cu b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cu index 2eedcd5f1c..2bf564d3f7 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cu +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cu @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#ifdef PADDLE_WITH_PSLIB +#ifdef PADDLE_WITH_HETERPS #include #include #include diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index ef586b41fe..cfb23d1be2 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -14,8 +14,7 @@ limitations under the License. */ #pragma once -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ - (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_HETERPS #include #include @@ -26,7 +25,6 @@ limitations under the License. */ #include #include #include - #ifdef PADDLE_WITH_GLOO #include #include "paddle/fluid/framework/fleet/gloo_wrapper.h" @@ -42,6 +40,9 @@ limitations under the License. */ #include "paddle/fluid/platform/gpu_info.h" #include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN #include "paddle/fluid/platform/place.h" +#ifdef PADDLE_WITH_PSCORE +#include "paddle/fluid/distributed/service/communicator.h" +#endif namespace paddle { namespace framework { @@ -219,7 +220,7 @@ class PSGPUWrapper { std::shared_ptr resource_; int32_t sleep_seconds_before_fail_exit_; std::vector slot_vector_; - int multi_node_{1}; + int multi_node_{0}; int node_size_; std::vector inner_comms_; std::vector inter_comms_; diff --git a/paddle/fluid/framework/heter_service.h b/paddle/fluid/framework/heter_service.h index 8f52235c96..3f65eaf3aa 100644 --- a/paddle/fluid/framework/heter_service.h +++ b/paddle/fluid/framework/heter_service.h @@ -30,10 +30,12 @@ limitations under the License. */ #include "brpc/controller.h" #include "brpc/server.h" #include "paddle/fluid/platform/timer.h" +#endif namespace paddle { namespace framework { +#ifdef PADDLE_WITH_PSLIB typedef std::function HeterServiceHandler; class DataFeed; @@ -142,7 +144,7 @@ class HeterTask { double cpu_2_gpu_time{0}; platform::Timer timeline; }; - +#endif template class HeterObjectPool { public: @@ -153,7 +155,7 @@ class HeterObjectPool { if (pool_.empty()) { num_ += 1; #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - VLOG(0) << "pool construct size: " << num_; + VLOG(3) << "pool construct size: " << num_; #endif return std::make_shared(); } else { @@ -178,6 +180,7 @@ class HeterObjectPool { int num_{0}; }; +#ifdef PADDLE_WITH_PSLIB struct BthreadMutextGuard { BthreadMutextGuard(bthread_mutex_t* rho) { mutex_ = rho; @@ -258,7 +261,6 @@ class HeterList { std::unique_lock lock(mutex_); cond_.wait(lock, [this] { return size < cap_; }); if (task_map_.find(key) != task_map_.end()) { - // std::cout << "try put key=" << key << " false" << std::endl; task_map_.erase(key); return false; } else { @@ -267,7 +269,6 @@ class HeterList { node->value = value; map_[node->key] = node; attach(node); - // std::cout << "try put key=" << key << " true" << std::endl; return true; } } @@ -276,7 +277,6 @@ class HeterList { std::unique_lock lock(mutex_); cond_.wait(lock, [this] { return size < cap_; }); HeterNode* node = new HeterNode; - // std::cout << "put key=" << key << " true" << std::endl; node->key = key; node->value = value; map_[node->key] = node; @@ -288,7 +288,6 @@ class HeterList { std::lock_guard lock(mutex_); auto iter = map_.find(key); if (iter != map_.end()) { - // std::cout << "try get key=" << key << " true" << std::endl; HeterNode* node = iter->second; detach(node); cond_.notify_one(); @@ -298,7 +297,6 @@ class HeterList { return ret; } task_map_.insert(key); - // std::cout << "try get key=" << key << " false" << std::endl; return nullptr; } @@ -306,7 +304,6 @@ class HeterList { std::lock_guard lock(mutex_); auto iter = map_.find(key); if (iter != map_.end()) { - // std::cout << "get key=" << key << " true" << std::endl; HeterNode* node = iter->second; detach(node); cond_.notify_one(); @@ -315,7 +312,6 @@ class HeterList { delete node; return ret; } - // std::cout << "get key=" << key << " false" << std::endl; return nullptr; } @@ -323,14 +319,12 @@ class HeterList { std::lock_guard lock(mutex_); HeterNode* node = head_->next; if (node == tail_) { - // std::cout << "get2 false" << std::endl; return nullptr; } else { detach(node); cond_.notify_one(); T ret = std::move(node->value); map_.erase(node->key); - // std::cout << "get2 key=" << node->key << " true" << std::endl; delete node; return ret; } @@ -371,7 +365,7 @@ class HeterList { int cap_; int size; }; +#endif } // namespace framework } // namespace paddle -#endif diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index ff8e71b92e..198bb65863 100644 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -38,6 +38,13 @@ void MultiTrainer::Initialize(const TrainerDesc& trainer_desc, need_merge_var_names_.push_back( trainer_desc.downpour_param().stat_var_names(i)); } +#ifdef PADDLE_WITH_HETERPS + for (int i = 0; i < thread_num_; ++i) { + int num = trainer_desc.worker_places(i); + platform::CUDAPlace place = platform::CUDAPlace(num); + places_.push_back(place); + } +#endif // get filelist from trainer_desc here const std::vector readers = dataset->GetReaders(); @@ -102,13 +109,42 @@ void MultiTrainer::InitDumpEnv() { void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program, const platform::Place& place) { for (int i = 0; i < thread_num_; ++i) { +#ifdef PADDLE_WITH_HETERPS + workers_[i]->SetPlace(places_[i]); + workers_[i]->SetReaderPlace(places_[i]); +#else workers_[i]->SetPlace(place); workers_[i]->SetReaderPlace(place); +#endif workers_[i]->SetRootScope(root_scope_); workers_[i]->CreateDeviceResource(main_program); // Program workers_[i]->BindingDataFeedMemory(); workers_[i]->CacheProgram(main_program); } +#ifdef PADDLE_WITH_HETERPS + for (int num = 0; num < thread_num_; ++num) { + auto place = places_[num]; + Scope* scope = workers_[num]->GetThreadScope(); + auto& block = main_program.Block(0); + for (auto& var : block.AllVars()) { + if (var->Persistable()) { + auto name = var->Name(); + Variable* root_var = root_scope_->FindVar(name); + if (!root_var) { + continue; + } + if (root_var->IsType()) { + continue; + } + LoDTensor* root_tensor = root_var->GetMutable(); + auto* ptr = scope->Var(name); + InitializeVariable(ptr, proto::VarType::LOD_TENSOR); + LoDTensor* thread_tensor = ptr->GetMutable(); + TensorCopy(*root_tensor, place, thread_tensor); + } + } + } +#endif } void MultiTrainer::InitOtherEnv(const ProgramDesc& main_program) { @@ -138,10 +174,77 @@ void MultiTrainer::Run() { } } +#ifdef PADDLE_WITH_HETERPS +void MultiTrainer::MergeDenseParam() { + auto communicator = paddle::distributed::Communicator::GetInstance(); + auto& recv_ctx = communicator->GetRecvCtxMap(); + Scope* thread_scope = workers_[0]->GetThreadScope(); + for (auto& iter : recv_ctx) { + auto& varnames = iter.second; + for (auto& name : varnames) { + Variable* root_var = root_scope_->FindVar(name); + LoDTensor* root_tensor = root_var->GetMutable(); + Variable* var = thread_scope->FindVar(name); + LoDTensor* tensor = var->GetMutable(); + TensorCopy((*tensor), root_tensor->place(), root_tensor); + } + } +} +#endif + +template +void MultiTrainer::MergeToRootScope(LoDTensor* root_tensor, LoDTensor* tensor) { + LoDTensor tmp_root; + TensorCopy(*root_tensor, platform::CPUPlace(), &tmp_root); + T* tmp_root_data = tmp_root.data(); + LoDTensor tmp_tensor; + TensorCopy(*tensor, platform::CPUPlace(), &tmp_tensor); + T* data = tmp_tensor.data(); + for (int i = 0; i < tmp_tensor.numel(); i++) { + tmp_root_data[i] += data[i]; + } + TensorCopy(tmp_root, platform::CPUPlace(), root_tensor); +} + void MultiTrainer::Finalize() { if (need_dump_field_ || need_dump_param_) { FinalizeDumpEnv(); } +#ifdef PADDLE_WITH_HETERPS + for (size_t i = 0; i < need_merge_var_names_.size(); i++) { + Variable* root_var = root_scope_->FindVar(need_merge_var_names_[i]); + if (root_var == nullptr) { + continue; + } + LoDTensor* root_tensor = root_var->GetMutable(); + + for (size_t j = 0; j < places_.size(); j++) { + Scope* cur_thread_scope = workers_[j]->GetThreadScope(); + Variable* thread_var = + cur_thread_scope->FindVar(need_merge_var_names_[i]); + if (thread_var == nullptr) { + continue; + } + LoDTensor* thread_tensor = thread_var->GetMutable(); +#define MergeCallback(cpp_type, proto_type) \ + do { \ + if (root_tensor->type() == proto_type) { \ + if (thread_tensor->type() != proto_type) { \ + VLOG(0) << "Error: thread id=" << j << ", need_merge_var_names_[" << i \ + << "] " << need_merge_var_names_[i] \ + << ", root tensor type=" << root_tensor->type() \ + << ", thread tensor type=" << thread_tensor->type(); \ + exit(-1); \ + } \ + MergeToRootScope(root_tensor, thread_tensor); \ + } \ + } while (0) + _ForEachDataType_(MergeCallback); + } + } + MergeDenseParam(); + +#endif root_scope_->DropKids(); } diff --git a/paddle/fluid/framework/ps_gpu_trainer.cc b/paddle/fluid/framework/ps_gpu_trainer.cc index e77932fa5f..39bc3f0406 100644 --- a/paddle/fluid/framework/ps_gpu_trainer.cc +++ b/paddle/fluid/framework/ps_gpu_trainer.cc @@ -19,10 +19,6 @@ limitations under the License. */ #include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/data_set.h" #include "paddle/fluid/framework/device_worker_factory.h" -#include "paddle/fluid/framework/fleet/fleet_wrapper.h" -#include "paddle/fluid/framework/fleet/heter_context.h" -#include "paddle/fluid/framework/fleet/heter_ps/feature_value.h" -#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" #include "paddle/fluid/framework/trainer.h" #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ (defined PADDLE_WITH_PSLIB) @@ -64,7 +60,6 @@ void PSGPUTrainer::Initialize(const TrainerDesc& trainer_desc, pull_dense_worker_ = PullDenseWorker::GetInstance(); pull_dense_worker_->Initialize(trainer_desc); SetDebug(trainer_desc.debug()); - fleet_ptr_ = FleetWrapper::GetInstance(); trainer_desc_ = trainer_desc; workers_.resize(place_num); for (int i = 0; i < place_num; ++i) { diff --git a/paddle/fluid/framework/ps_gpu_worker.cc b/paddle/fluid/framework/ps_gpu_worker.cc index 2597901d91..d178c4e556 100644 --- a/paddle/fluid/framework/ps_gpu_worker.cc +++ b/paddle/fluid/framework/ps_gpu_worker.cc @@ -14,7 +14,6 @@ limitations under the License. */ #include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker_factory.h" -#include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/fleet/heter_wrapper.h" #include "paddle/fluid/platform/cpu_helper.h" #include "paddle/fluid/string/string_helper.h" diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index ca290a50b4..7efb89ad7d 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -109,13 +109,22 @@ class MultiTrainer : public TrainerBase { virtual Scope* GetWorkerScope(int thread_id); virtual std::string GetDumpPath(int tid); + template + void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor); +#ifdef PADDLE_WITH_HETERPS + + void MergeDenseParam(); +#endif + protected: int thread_num_; std::vector threads_; std::vector readers_; std::vector> workers_; std::vector need_merge_var_names_; - +#ifdef PADDLE_WITH_HETERPS + std::vector places_; +#endif int mpi_rank_; int mpi_size_; int dump_file_num_; @@ -313,7 +322,6 @@ class PSGPUTrainer : public TrainerBase { float scale_datanorm_; paddle::platform::Place place_; ProgramDesc program_; - std::shared_ptr fleet_ptr_; std::shared_ptr pull_dense_worker_; std::vector> workers_; std::vector places_; diff --git a/paddle/fluid/operators/pull_box_sparse_op.h b/paddle/fluid/operators/pull_box_sparse_op.h index 48903012b5..77021b8961 100644 --- a/paddle/fluid/operators/pull_box_sparse_op.h +++ b/paddle/fluid/operators/pull_box_sparse_op.h @@ -47,8 +47,7 @@ static void PullBoxSparseFunctor(const framework::ExecutionContext &ctx) { box_ptr->PullSparse(ctx.GetPlace(), all_keys, all_values, slot_lengths, hidden_size, 0); #endif -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ - (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_HETERPS auto hidden_size = ctx.Attr("size"); auto gpu_ps_ptr = paddle::framework::PSGPUWrapper::GetInstance(); gpu_ps_ptr->PullSparse(ctx.GetPlace(), 0, all_keys, all_values, slot_lengths, @@ -91,8 +90,7 @@ static void PushBoxSparseFunctor(const framework::ExecutionContext &ctx) { box_ptr->PushSparseGrad(ctx.GetPlace(), all_keys, all_grad_values, slot_lengths, hidden_size, 0, batch_size); #endif -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ - (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_HETERPS auto hidden_size = ctx.Attr("size"); auto gpu_ps_ptr = paddle::framework::PSGPUWrapper::GetInstance(); gpu_ps_ptr->PushSparseGrad(ctx.GetPlace(), 0, all_keys, all_grad_values, diff --git a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc index 5bff9178fd..0c239f8157 100644 --- a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc +++ b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc @@ -32,8 +32,7 @@ namespace py = pybind11; namespace paddle { namespace pybind { -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ - (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_HETERPS void BindPSGPUWrapper(py::module* m) { py::class_>( *m, "PSGPU") diff --git a/paddle/fluid/pybind/ps_gpu_wrapper_py.h b/paddle/fluid/pybind/ps_gpu_wrapper_py.h index 8bd6ee13cf..ba4f146389 100644 --- a/paddle/fluid/pybind/ps_gpu_wrapper_py.h +++ b/paddle/fluid/pybind/ps_gpu_wrapper_py.h @@ -22,8 +22,7 @@ namespace py = pybind11; namespace paddle { namespace pybind { -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ - (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_HETERPS void BindPSGPUWrapper(py::module* m); #endif } // namespace pybind diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 428c7c2420..d96384ddb4 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -3052,8 +3052,7 @@ All parameter, weight, gradient are variables in Paddle. #ifdef PADDLE_WITH_PSLIB BindHeterWrapper(&m); #endif -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ - (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_HETERPS BindPSGPUWrapper(&m); #endif BindGlooWrapper(&m); diff --git a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py index dd13f9bc5d..f6d2af0b41 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py @@ -38,6 +38,23 @@ class ParameterServerOptimizer(MetaOptimizerBase): k_steps = self.user_defined_strategy.a_sync_configs["k_steps"] return True if k_steps >= 0 else False + def get_dist_env(self): + trainer_id = int(os.getenv('PADDLE_TRAINER_ID', '0')) + trainer_endpoints = '' + current_endpoint = '' + num_trainers = 0 + if os.getenv('PADDLE_TRAINER_ENDPOINTS'): + trainer_endpoints = os.getenv('PADDLE_TRAINER_ENDPOINTS') + current_endpoint = trainer_endpoints.split(',')[trainer_id] + num_trainers = len(trainer_endpoints.split(',')) + + return { + 'trainer_id': trainer_id, + 'num_trainers': num_trainers, + 'current_endpoint': current_endpoint, + 'trainer_endpoints': trainer_endpoints + } + def _get_distributed_strategy(self): from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory @@ -64,6 +81,8 @@ class ParameterServerOptimizer(MetaOptimizerBase): _main = compiled_config.origin_main_program.clone() _startup = compiled_config.origin_startup_program.clone() + use_ps_gpu = self.user_defined_strategy.a_sync_configs["use_ps_gpu"] + if not compiled_config.is_geo_mode(): from paddle.fluid.incubate.fleet.parameter_server.ir.public import _add_lr_decay_table_pass _add_lr_decay_table_pass( @@ -71,14 +90,28 @@ class ParameterServerOptimizer(MetaOptimizerBase): self.user_defined_strategy.a_sync_configs["lr_decay_steps"]) # for main program - _main = worker.delete_optimizer_pass(_main, compiled_config) - _main = worker.distributed_ops_pass(_main, compiled_config) - _main = worker.append_send_ops_pass(_main, compiled_config) - - # for startup program + _main = worker.distributed_ops_pass(_main, compiled_config, + use_ps_gpu) + if not use_ps_gpu: + _main = worker.delete_optimizer_pass(_main, compiled_config) + _main = worker.append_send_ops_pass(_main, compiled_config) + _startup = worker.delet_extra_optimizes_pass(_startup, + compiled_config) + + # for startup program _startup = worker.fake_init_ops_pass(_startup, compiled_config) - _startup = worker.delet_extra_optimizes_pass(_startup, - compiled_config) + if use_ps_gpu: + _main = worker.ps_gpu_pass(_main) + from paddle.fluid.transpiler.collective import SingleProcessMultiThread + t = SingleProcessMultiThread() + env = self.get_dist_env() + t.transpile( + startup_program=_startup, + main_program=_main, + rank=env["trainer_id"], + endpoints=env["trainer_endpoints"], + current_endpoint=env['current_endpoint'], + wait_port=False) compiled_config.set_origin_ps_main_program(_main) compiled_config.set_origin_ps_startup_program(_startup) diff --git a/python/paddle/distributed/fleet/runtime/the_one_ps.py b/python/paddle/distributed/fleet/runtime/the_one_ps.py index aa7df57e3c..df07a7a6e7 100644 --- a/python/paddle/distributed/fleet/runtime/the_one_ps.py +++ b/python/paddle/distributed/fleet/runtime/the_one_ps.py @@ -453,6 +453,17 @@ class TheOnePSRuntime(RuntimeBase): worker = self._get_fleet_proto(is_server=False, is_sync=is_sync) server = self._get_fleet_proto(is_server=True, is_sync=is_sync) + dist_strategy = self.context["valid_strategy"] + use_ps_gpu = dist_strategy.a_sync_configs["use_ps_gpu"] + if use_ps_gpu: + main_program = self.context['loss'].block.program + if not main_program._fleet_opt: + main_program._fleet_opt = {} + main_program._fleet_opt["use_ps_gpu"] = True + gpus_env = os.getenv("FLAGS_selected_gpus") + main_program._fleet_opt[ + "worker_places"] = [int(s) for s in gpus_env.split(",")] + def sync_strategy_envs(): kwargs = {} kwargs[ @@ -741,6 +752,11 @@ class TheOnePSRuntime(RuntimeBase): downpour_server = DownpourServer() service = Service() + dist_strategy = self.context["valid_strategy"] + use_ps_gpu = dist_strategy.a_sync_configs["use_ps_gpu"] + if use_ps_gpu: + service.server_class = "PsLocalServer" + service.client_class = "PsLocalClient" downpour_server.set_service_param(service) tables = _get_tables() diff --git a/python/paddle/fluid/device_worker.py b/python/paddle/fluid/device_worker.py index 0f98af5772..7bcd10a726 100644 --- a/python/paddle/fluid/device_worker.py +++ b/python/paddle/fluid/device_worker.py @@ -102,6 +102,12 @@ class Hogwild(DeviceWorker): # when opt_info is None or empty dict, it should return if not opt_info: return + downpour = trainer_desc.downpour_param + hogwild = trainer_desc.hogwild_param + if opt_info["stat_var_names"]: + for i in opt_info["stat_var_names"]: + hogwild.stat_var_names.extend([i]) + downpour.stat_var_names.extend([i]) from paddle.fluid.incubate.fleet.parameter_server import version @@ -109,8 +115,6 @@ class Hogwild(DeviceWorker): return program_configs = opt_info["program_configs"] - downpour = trainer_desc.downpour_param - hogwild = trainer_desc.hogwild_param for pid in program_configs: if pid == program_id: @@ -161,10 +165,6 @@ class Hogwild(DeviceWorker): sparse_table.emb_dim = -1 # not use hard code click sparse_table.label_var_name = "" - if opt_info["stat_var_names"]: - for i in opt_info["stat_var_names"]: - hogwild.stat_var_names.extend([i]) - downpour.stat_var_names.extend([i]) for i in worker.get_desc().dense_table: if i.table_id in dense_table_set: diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 9c85cc6cd5..76bc68f24d 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1373,11 +1373,14 @@ class Executor(object): fetch_info=None, print_period=100): is_heter = 0 + use_ps_gpu = 0 if not program._fleet_opt is None: if program._fleet_opt.get("worker_class", "") == "HeterCpuWorker": is_heter = 1 if program._fleet_opt.get("trainer", "") == "HeterXpuTrainer": is_heter = 1 + if program._fleet_opt.get("use_ps_gpu", False): + use_ps_gpu = True if scope is None: scope = global_scope() if fetch_list is None: @@ -1412,7 +1415,9 @@ class Executor(object): trainer._set_program(program.program) if thread <= 0: - if dataset.thread_num <= 0: + if use_ps_gpu: + trainer._set_thread(len(program._fleet_opt["worker_places"])) + elif dataset.thread_num <= 0: raise RuntimeError( "You should set thread num first, either in Dataset" "or in Executor.train_from_dataset") diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 08e64c15c4..5f32749704 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -24,6 +24,7 @@ from functools import reduce import paddle.fluid as fluid import paddle.fluid.core as core import paddle.fluid.framework as framework +import paddle.compat as cpt from paddle.fluid.transpiler.details.program_utils import delete_ops from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_optimize_ops @@ -93,7 +94,7 @@ def delete_optimizer_pass(program, config): return program -def distributed_ops_pass(program, config): +def distributed_ops_pass(program, config, use_ps_gpu=False): trainer_id = config.get_role_id() send_ctx = config.get_the_one_send_context( split_dense_table=config.is_heter_ps_mode) @@ -109,7 +110,7 @@ def distributed_ops_pass(program, config): pull_sparse_ops[param_name] = ops return pull_sparse_ops - def _pull_sparse_fuse(_program, pull_sparse_ops): + def _pull_sparse_fuse(_program, pull_sparse_ops, use_ps_gpu): for param, ops in pull_sparse_ops.items(): all_ops = program.global_block().ops op_idxs = [all_ops.index(op) for op in ops] @@ -159,18 +160,31 @@ def distributed_ops_pass(program, config): if min(outputs_idxs) - max(inputs_idxs) >= 1: distributed_idx = max(inputs_idxs) + 1 - program.global_block()._insert_op( - index=distributed_idx, - type="distributed_lookup_table", - inputs={"Ids": inputs, - 'W': w}, - outputs={"Outputs": outputs}, - attrs={ - "is_distributed": is_distributed, - "padding_idx": padding_idx, - "table_id": table_id, - "lookup_table_version": op_type - }) + if use_ps_gpu: + program.global_block()._insert_op( + index=distributed_idx, + type="pull_box_sparse", + inputs={"Ids": inputs, + 'W': w}, + outputs={"Out": outputs}, + attrs={ + "size": w.shape[1], + "is_distributed": True, + "is_sparse": True + }) + else: + program.global_block()._insert_op( + index=distributed_idx, + type="distributed_lookup_table", + inputs={"Ids": inputs, + 'W': w}, + outputs={"Outputs": outputs}, + attrs={ + "is_distributed": is_distributed, + "padding_idx": padding_idx, + "table_id": table_id, + "lookup_table_version": op_type + }) else: for i in range(len(inputs_idxs)): distributed_idx = op_idxs[i] + 1 @@ -189,7 +203,7 @@ def distributed_ops_pass(program, config): }) pull_sparse_ops = _get_pull_sparse_ops(program) - _pull_sparse_fuse(program, pull_sparse_ops) + _pull_sparse_fuse(program, pull_sparse_ops, use_ps_gpu) return program @@ -308,6 +322,54 @@ def fake_init_ops_pass(program, config): return program +def ps_gpu_pass(program): + def _add_push_box_sparse_op(program): + op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() + backward = core.op_proto_and_checker_maker.OpRole.Backward + for op in program.global_block().ops: + if op.type != "pull_box_sparse": + continue + grad_op_desc, op_grad_to_var = core.get_grad_op_desc( + op.desc, cpt.to_text(set()), []) + for op_desc in grad_op_desc: + new_op_desc = program.global_block().desc.append_op() + new_op_desc.copy_from(op_desc) + new_op_desc._set_attr(op_role_attr_name, backward) + + def _remove_lookup_table_grad_op_and_var(program): + lookup_table_grad_var = {} + remove_op_index = [] + remove_var = [] + for idx, op in list(enumerate(program.global_block().ops)): + if op.type == "lookup_table_grad": + for name in op.output("W@GRAD"): + lookup_table_grad_var[name] = 1 + remove_op_index.append(idx) + remove_var.append(name) + for name in op.input("W"): + lookup_table_grad_var[name] = 1 + + for idx, op in list(enumerate(program.global_block().ops)): + if op.type == "pull_box_sparse": + continue + for key_name in op.input_names: + for var in op.input(key_name): + if var in lookup_table_grad_var: + remove_op_index.append(idx) + break + + remove_op_index = list(set(remove_op_index)) + remove_op_index.sort(reverse=True) + for idx in remove_op_index: + program.global_block()._remove_op(idx) + for name in remove_var: + program.global_block()._remove_var(name) + + _add_push_box_sparse_op(program) + _remove_lookup_table_grad_op_and_var(program) + return program + + def delet_extra_optimizes_pass(program, config): optimize_vars = [] optimize_op_role_vars = [] diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 604d50b8ed..679f765164 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -30,6 +30,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_simple_dist_transpiler) list(APPEND MIXED_DIST_TEST_OPS test_recv_save_op) list(APPEND MIXED_DIST_TEST_OPS test_c_comm_init_op) list(APPEND MIXED_DIST_TEST_OPS test_communicator_async) +list(APPEND MIXED_DIST_TEST_OPS test_communicator_ps_gpu) list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo) list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async) list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync) @@ -483,6 +484,7 @@ if(WITH_DISTRIBUTE) py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS}) py_test_modules(test_communicator_async MODULES test_communicator_async ENVS ${dist_ENVS}) + py_test_modules(test_communicator_ps_gpu MODULES test_communicator_ps_gpu ENVS ${dist_ENVS}) py_test_modules(test_communicator_geo MODULES test_communicator_geo ENVS ${dist_ENVS}) py_test_modules(test_communicator_half_async MODULES test_communicator_half_async ENVS ${dist_ENVS} FLAGS_communicator_send_queue_size=1 FLAGS_communicator_max_merge_var_num=1) py_test_modules(test_communicator_sync MODULES test_communicator_sync ENVS ${dist_ENVS} FLAGS_communicator_send_queue_size=1 FLAGS_communicator_max_merge_var_num=1) diff --git a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py new file mode 100644 index 0000000000..5de1ebf581 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py @@ -0,0 +1,97 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import unittest +import time +import threading +import numpy + +import paddle +paddle.enable_static() + +import paddle.fluid as fluid +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.distributed.fleet as fleet + + +class TestCommunicator(unittest.TestCase): + def test_communicator_ps_gpu(self): + with open("test_communicator_ps_gpu.txt", "w") as f: + data = "1 0.6 1 0.7\n" + f.write(data) + + os.environ["PADDLE_PSERVER_NUMS"] = "2" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001,127.0.0.2:36001" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002,127.0.0.2:36002" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["FLAGS_selected_gpus"] = "0" + role = role_maker.PaddleCloudRoleMaker() + + fleet.init(role) + x = fluid.layers.data(name='x', shape=[1], dtype='float32') + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + slots_vars = [x, y] + + cost = fluid.layers.square_error_cost(input=x, label=y) + avg_cost = fluid.layers.mean(cost) + + optimizer = fluid.optimizer.Adam(0.01) + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = True + strategy.a_sync_configs = { + "launch_barrier": False, + "use_ps_gpu": 1, + } + startup_program = paddle.static.Program() + main_program = paddle.static.Program() + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(avg_cost) + + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) + dataset.set_filelist(["test_communicator_ps_gpu.txt"]) + dataset.load_into_memory() + + os.environ["TEST_MODE"] = "1" + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(startup_program) + main_program._fleet_opt = {"stat_var_names": [x.name]} + fleet.init_worker() + + try: + exe.train_from_dataset(main_program, dataset) + except ImportError as e: + pass + except Exception as e: + self.assertTrue(False) + + time.sleep(10) + fleet.stop_worker() + os.remove("./test_communicator_ps_gpu.txt") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps11.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps11.py new file mode 100644 index 0000000000..cad7d067e9 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps11.py @@ -0,0 +1,180 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import unittest +import tempfile +import shutil + +import paddle +import paddle.fluid as fluid +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.distributed.fleet as fleet + +paddle.enable_static() + +# For Net +base_lr = 0.2 +emb_lr = base_lr * 3 +dict_dim = 1500 +emb_dim = 128 +hid_dim = 128 +margin = 0.1 +sample_rate = 1 +batch_size = 4 + + +class TestPSPassWithBow(unittest.TestCase): + def net(self): + def get_acc(cos_q_nt, cos_q_pt, batch_size): + cond = fluid.layers.less_than(cos_q_nt, cos_q_pt) + cond = fluid.layers.cast(cond, dtype='float64') + cond_3 = fluid.layers.reduce_sum(cond) + acc = fluid.layers.elementwise_div( + cond_3, + fluid.layers.fill_constant( + shape=[1], value=batch_size * 1.0, dtype='float64'), + name="simnet_acc") + return acc + + def get_loss(cos_q_pt, cos_q_nt): + loss_op1 = fluid.layers.elementwise_sub( + fluid.layers.fill_constant_batch_size_like( + input=cos_q_pt, + shape=[-1, 1], + value=margin, + dtype='float32'), + cos_q_pt) + loss_op2 = fluid.layers.elementwise_add(loss_op1, cos_q_nt) + loss_op3 = fluid.layers.elementwise_max( + fluid.layers.fill_constant_batch_size_like( + input=loss_op2, shape=[-1, 1], value=0.0, dtype='float32'), + loss_op2) + avg_cost = fluid.layers.mean(loss_op3) + return avg_cost + + is_distributed = False + is_sparse = True + + # query + q = fluid.layers.data( + name="query_ids", shape=[1], dtype="int64", lod_level=1) + # embedding + q_emb = fluid.contrib.layers.sparse_embedding( + input=q, + size=[dict_dim, emb_dim], + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__", + learning_rate=emb_lr)) + q_emb = fluid.layers.reshape(q_emb, [-1, emb_dim]) + # vsum + q_sum = fluid.layers.sequence_pool(input=q_emb, pool_type='sum') + q_ss = fluid.layers.softsign(q_sum) + # fc layer after conv + q_fc = fluid.layers.fc( + input=q_ss, + size=hid_dim, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__q_fc__", + learning_rate=base_lr)) + # label data + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + # pt + pt = fluid.layers.data( + name="pos_title_ids", shape=[1], dtype="int64", lod_level=1) + # embedding + pt_emb = fluid.contrib.layers.sparse_embedding( + input=pt, + size=[dict_dim, emb_dim], + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__", + learning_rate=emb_lr)) + pt_emb = fluid.layers.reshape(pt_emb, [-1, emb_dim]) + # vsum + pt_sum = fluid.layers.sequence_pool(input=pt_emb, pool_type='sum') + pt_ss = fluid.layers.softsign(pt_sum) + # fc layer + pt_fc = fluid.layers.fc( + input=pt_ss, + size=hid_dim, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__fc__", + learning_rate=base_lr), + bias_attr=fluid.ParamAttr(name="__fc_b__")) + # nt + nt = fluid.layers.data( + name="neg_title_ids", shape=[1], dtype="int64", lod_level=1) + # embedding + nt_emb = fluid.contrib.layers.sparse_embedding( + input=nt, + size=[dict_dim, emb_dim], + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__", + learning_rate=emb_lr)) + nt_emb = fluid.layers.reshape(nt_emb, [-1, emb_dim]) + # vsum + nt_sum = fluid.layers.sequence_pool(input=nt_emb, pool_type='sum') + nt_ss = fluid.layers.softsign(nt_sum) + # fc layer + nt_fc = fluid.layers.fc( + input=nt_ss, + size=hid_dim, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__fc__", + learning_rate=base_lr), + bias_attr=fluid.ParamAttr(name="__fc_b__")) + cos_q_pt = fluid.layers.cos_sim(q_fc, pt_fc) + cos_q_nt = fluid.layers.cos_sim(q_fc, nt_fc) + # loss + avg_cost = get_loss(cos_q_pt, cos_q_nt) + # acc + acc = get_acc(cos_q_nt, cos_q_pt, batch_size) + return [avg_cost, acc, cos_q_pt] + + def test(self): + os.environ["PADDLE_PSERVER_NUMS"] = "2" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001,127.0.0.2:36001" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002,127.0.0.2:36002" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["FLAGS_selected_gpus"] = "0" + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + loss, acc, _ = self.net() + + strategy = paddle.distributed.fleet.DistributedStrategy() + configs = {"use_ps_gpu": 1, "launch_barrier": False} + strategy.a_sync_configs = configs + strategy.a_sync = True + optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(loss) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps12.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps12.py new file mode 100644 index 0000000000..74c1ccd8a8 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps12.py @@ -0,0 +1,180 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import unittest +import tempfile +import shutil + +import paddle +import paddle.fluid as fluid +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.distributed.fleet as fleet + +paddle.enable_static() + +# For Net +base_lr = 0.2 +emb_lr = base_lr * 3 +dict_dim = 1500 +emb_dim = 128 +hid_dim = 128 +margin = 0.1 +sample_rate = 1 +batch_size = 4 + + +class TestPSPassWithBow(unittest.TestCase): + def net(self): + def get_acc(cos_q_nt, cos_q_pt, batch_size): + cond = fluid.layers.less_than(cos_q_nt, cos_q_pt) + cond = fluid.layers.cast(cond, dtype='float64') + cond_3 = fluid.layers.reduce_sum(cond) + acc = fluid.layers.elementwise_div( + cond_3, + fluid.layers.fill_constant( + shape=[1], value=batch_size * 1.0, dtype='float64'), + name="simnet_acc") + return acc + + def get_loss(cos_q_pt, cos_q_nt): + loss_op1 = fluid.layers.elementwise_sub( + fluid.layers.fill_constant_batch_size_like( + input=cos_q_pt, + shape=[-1, 1], + value=margin, + dtype='float32'), + cos_q_pt) + loss_op2 = fluid.layers.elementwise_add(loss_op1, cos_q_nt) + loss_op3 = fluid.layers.elementwise_max( + fluid.layers.fill_constant_batch_size_like( + input=loss_op2, shape=[-1, 1], value=0.0, dtype='float32'), + loss_op2) + avg_cost = fluid.layers.mean(loss_op3) + return avg_cost + + is_distributed = False + is_sparse = True + + # query + q = fluid.layers.data( + name="query_ids", shape=[1], dtype="int64", lod_level=1) + # embedding + q_emb = fluid.contrib.layers.sparse_embedding( + input=q, + size=[dict_dim, emb_dim], + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__", + learning_rate=emb_lr)) + q_emb = fluid.layers.reshape(q_emb, [-1, emb_dim]) + # vsum + q_sum = fluid.layers.sequence_pool(input=q_emb, pool_type='sum') + q_ss = fluid.layers.softsign(q_sum) + # fc layer after conv + q_fc = fluid.layers.fc( + input=q_ss, + size=hid_dim, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__q_fc__", + learning_rate=base_lr)) + # label data + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + # pt + pt = fluid.layers.data( + name="pos_title_ids", shape=[1], dtype="int64", lod_level=1) + # embedding + pt_emb = fluid.contrib.layers.sparse_embedding( + input=pt, + size=[dict_dim, emb_dim], + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__", + learning_rate=emb_lr)) + pt_emb = fluid.layers.reshape(pt_emb, [-1, emb_dim]) + # vsum + pt_sum = fluid.layers.sequence_pool(input=pt_emb, pool_type='sum') + pt_ss = fluid.layers.softsign(pt_sum) + # fc layer + pt_fc = fluid.layers.fc( + input=pt_ss, + size=hid_dim, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__fc__", + learning_rate=base_lr), + bias_attr=fluid.ParamAttr(name="__fc_b__")) + # nt + nt = fluid.layers.data( + name="neg_title_ids", shape=[1], dtype="int64", lod_level=1) + # embedding + nt_emb = fluid.contrib.layers.sparse_embedding( + input=nt, + size=[dict_dim, emb_dim], + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__", + learning_rate=emb_lr)) + nt_emb = fluid.layers.reshape(nt_emb, [-1, emb_dim]) + # vsum + nt_sum = fluid.layers.sequence_pool(input=nt_emb, pool_type='sum') + nt_ss = fluid.layers.softsign(nt_sum) + # fc layer + nt_fc = fluid.layers.fc( + input=nt_ss, + size=hid_dim, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__fc__", + learning_rate=base_lr), + bias_attr=fluid.ParamAttr(name="__fc_b__")) + cos_q_pt = fluid.layers.cos_sim(q_fc, pt_fc) + cos_q_nt = fluid.layers.cos_sim(q_fc, nt_fc) + # loss + avg_cost = get_loss(cos_q_pt, cos_q_nt) + # acc + acc = get_acc(cos_q_nt, cos_q_pt, batch_size) + return [avg_cost, acc, cos_q_pt] + + def test(self): + os.environ["PADDLE_PSERVER_NUMS"] = "2" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + os.environ["TRAINING_ROLE"] = "PSERVER" + + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + loss, acc, _ = self.net() + + strategy = paddle.distributed.fleet.DistributedStrategy() + configs = {"use_ps_gpu": 1} + strategy.a_sync_configs = configs + strategy.a_sync = True + optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(loss) + + fleet.init_server() + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index c61141bcd3..00dea8d125 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -49,8 +49,8 @@ class TrainerFactory(object): device_worker = Hogwild() trainer._set_device_worker(device_worker) else: - trainer_class = opt_info["trainer"] - device_worker_class = opt_info["device_worker"] + trainer_class = opt_info.get("trainer", "MultiTrainer") + device_worker_class = opt_info.get("device_worker", "Hogwild") trainer = globals()[trainer_class]() device_worker = globals()[device_worker_class]() -- GitLab