未验证 提交 9f8c8f96 编写于 作者: T Thunderbrook 提交者: GitHub

heterps support pscore (#32093)

* pscore support heterps

* fleet cmake

* fleet wrapper

* macro

* solve conflict

* solve conflict

* add unitest

* paddle enforce

* unitest

* unitest

* unitest
上级 668a0d3b
......@@ -184,6 +184,7 @@ option(WITH_XBYAK "Compile with xbyak support" ON)
option(WITH_CONTRIB "Compile the third-party contributation" OFF)
option(WITH_GRPC "Use grpc as the default rpc framework" ${WITH_DISTRIBUTE})
option(WITH_PSCORE "Compile with parameter server support" ${WITH_DISTRIBUTE})
option(WITH_HETERPS "Compile with heterps" OFF})
option(WITH_INFERENCE_API_TEST "Test fluid inference C++ high-level api interface" OFF)
option(PY_VERSION "Compile PaddlePaddle with python3 support" ${PY_VERSION})
option(WITH_DGC "Use DGC(Deep Gradient Compression) or not" ${WITH_DISTRIBUTE})
......
......@@ -173,6 +173,9 @@ if(WITH_PSCORE)
add_definitions(-DPADDLE_WITH_PSCORE)
endif()
if(WITH_HETERPS)
add_definitions(-DPADDLE_WITH_HETERPS)
endif()
if(WITH_GRPC)
add_definitions(-DPADDLE_WITH_GRPC)
......
......@@ -16,6 +16,7 @@ set_source_files_properties(communicator.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUT
set_source_files_properties(service.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(brpc_ps_server.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(brpc_ps_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(ps_local_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(brpc_utils.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(heter_server.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
......@@ -29,7 +30,8 @@ set_source_files_properties(graph_brpc_client.cc PROPERTIES COMPILE_FLAGS ${DIST
cc_library(brpc_utils SRCS brpc_utils.cc DEPS tensor device_context ${COMMON_DEPS} ${RPC_DEPS})
cc_library(downpour_server SRCS graph_brpc_server.cc brpc_ps_server.cc DEPS boost eigen3 table brpc_utils simple_threadpool ${RPC_DEPS})
cc_library(downpour_client SRCS graph_brpc_client.cc brpc_ps_client.cc DEPS boost eigen3 table brpc_utils simple_threadpool ${RPC_DEPS})
cc_library(downpour_client SRCS graph_brpc_client.cc brpc_ps_client.cc
ps_local_client.cc DEPS boost eigen3 table brpc_utils simple_threadpool ${RPC_DEPS})
cc_library(client SRCS ps_client.cc DEPS downpour_client boost ${RPC_DEPS})
cc_library(server SRCS server.cc DEPS downpour_server boost ${RPC_DEPS})
......
......@@ -880,8 +880,8 @@ std::future<int32_t> BrpcPsClient::send_client2client_msg(
auto promise = std::make_shared<std::promise<int32_t>>();
std::future<int> fut = promise->get_future();
if (to_client_id >= _client_channels.size()) {
LOG(FATAL) << "to_client_id is out of range clients, which size is "
<< _client_channels.size();
VLOG(0) << "to_client_id is out of range clients, which size is "
<< _client_channels.size();
promise->set_value(-1);
return fut;
}
......@@ -1001,4 +1001,4 @@ int32_t BrpcPsClient::recv_and_save_table(const uint64_t table_id,
}
} // namespace distributed
} // namespace paddle
\ No newline at end of file
} // namespace paddle
......@@ -310,6 +310,8 @@ class Communicator {
return _worker_ptr;
}
RecvCtxMap &GetRecvCtxMap() { return recv_varname_to_ctx_; }
std::shared_ptr<PSClient> _worker_ptr; // pointer to worker
protected:
......
......@@ -16,12 +16,15 @@
#include "glog/logging.h"
#include "paddle/fluid/distributed/service/brpc_ps_client.h"
#include "paddle/fluid/distributed/service/graph_brpc_client.h"
#include "paddle/fluid/distributed/service/ps_local_client.h"
#include "paddle/fluid/distributed/table/table.h"
namespace paddle {
namespace distributed {
REGISTER_PSCORE_CLASS(PSClient, BrpcPsClient);
REGISTER_PSCORE_CLASS(PSClient, PsLocalClient);
REGISTER_PSCORE_CLASS(PSClient, GraphBrpcClient);
int32_t PSClient::configure(
const PSParameter &config,
const std::map<uint64_t, std::vector<paddle::distributed::Region>> &regions,
......@@ -83,4 +86,4 @@ PSClient *PSClientFactory::create(const PSParameter &ps_config) {
return client;
}
} // namespace distributed
} // namespace paddle
\ No newline at end of file
} // namespace paddle
......@@ -118,6 +118,17 @@ class PSClient {
const uint64_t *keys, size_t num,
bool is_training) = 0;
virtual ::std::future<int32_t> pull_sparse_ptr(char **select_values,
size_t table_id,
const uint64_t *keys,
size_t num) {
VLOG(0) << "Did not implement";
std::promise<int32_t> promise;
std::future<int> fut = promise.get_future();
promise.set_value(-1);
return fut;
}
virtual std::future<int32_t> print_table_stat(uint32_t table_id) = 0;
// 确保所有积攒中的请求都发起发送
......@@ -150,7 +161,7 @@ class PSClient {
virtual std::future<int32_t> send_client2client_msg(int msg_type,
int to_client_id,
const std::string &msg) {
LOG(FATAL) << "Did not implement";
VLOG(0) << "Did not implement";
std::promise<int32_t> promise;
std::future<int> fut = promise.get_future();
promise.set_value(-1);
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/service/ps_local_client.h"
#include "paddle/fluid/distributed/table/table.h"
//#define pslib_debug_dense_compress
namespace paddle {
namespace distributed {
int32_t PsLocalClient::initialize() {
const auto& downpour_param = _config.server_param().downpour_server_param();
TableManager::instance().initialize();
for (size_t i = 0; i < downpour_param.downpour_table_param_size(); ++i) {
auto* table = CREATE_PSCORE_CLASS(
Table, downpour_param.downpour_table_param(i).table_class());
table->initialize(downpour_param.downpour_table_param(i),
_config.fs_client_param());
table->set_shard(0, 1);
_table_map[downpour_param.downpour_table_param(i).table_id()].reset(table);
}
return 0;
}
::std::future<int32_t> PsLocalClient::shrink(uint32_t table_id,
const std::string threshold) {
// TODO
return done();
}
::std::future<int32_t> PsLocalClient::load(const std::string& epoch,
const std::string& mode) {
// TODO
// for (auto& it : _table_map) {
// load(it.first, epoch, mode);
//}
return done();
}
::std::future<int32_t> PsLocalClient::load(uint32_t table_id,
const std::string& epoch,
const std::string& mode) {
// TODO
// auto* table_ptr = table(table_id);
// table_ptr->load(epoch, mode);
return done();
}
::std::future<int32_t> PsLocalClient::save(const std::string& epoch,
const std::string& mode) {
// TODO
for (auto& it : _table_map) {
save(it.first, epoch, mode);
}
return done();
}
::std::future<int32_t> PsLocalClient::save(uint32_t table_id,
const std::string& epoch,
const std::string& mode) {
// TODO
auto* table_ptr = table(table_id);
table_ptr->flush();
table_ptr->save(epoch, mode);
return done();
}
::std::future<int32_t> PsLocalClient::clear() {
// TODO
return done();
}
::std::future<int32_t> PsLocalClient::clear(uint32_t table_id) {
// TODO
return done();
}
::std::future<int32_t> PsLocalClient::flush() {
// no need
return done();
}
::std::future<int32_t> PsLocalClient::stop_server() {
// no need
return done();
}
::std::future<int32_t> PsLocalClient::pull_dense(Region* regions,
size_t region_num,
size_t table_id) {
auto* accessor = table_accessor(table_id);
auto* table_ptr = table(table_id);
uint32_t num_per_shard = dense_dim_per_shard(accessor->fea_dim(), 1);
std::vector<float> region_buffer;
region_buffer.resize(num_per_shard);
table_ptr->pull_dense(region_buffer.data(), region_buffer.size());
size_t region_idx = 0;
size_t region_data_idx = 0;
size_t shard_data_size = num_per_shard;
size_t shard_buffer_remain = shard_data_size * sizeof(float);
PADDLE_ENFORCE_EQ(
shard_buffer_remain, region_buffer.size() * sizeof(float),
platform::errors::PreconditionNotMet("pull dense size error."));
size_t index = 0;
while (shard_buffer_remain > 0 && region_idx < region_num) {
auto& region = regions[region_idx];
if (region.size - region_data_idx >= shard_buffer_remain) {
memcpy((void*)(region.data + region_data_idx),
(uint8_t*)(void*)(region_buffer.data()) + index,
shard_buffer_remain);
region_data_idx += shard_buffer_remain;
shard_buffer_remain = 0;
} else if (region.size - region_data_idx == 0) {
++region_idx;
region_data_idx = 0;
} else {
memcpy((void*)(region.data + region_data_idx),
(uint8_t*)(void*)(region_buffer.data()) + index,
region.size - region_data_idx);
shard_buffer_remain -= (region.size - region_data_idx);
index += (region.size - region_data_idx);
++region_idx;
region_data_idx = 0;
}
}
return done();
}
::std::future<int32_t> PsLocalClient::push_dense_param(const Region* regions,
size_t region_num,
size_t table_id) {
auto* accessor = table_accessor(table_id);
auto* table_ptr = table(table_id);
std::vector<float> region_buffer;
region_buffer.resize(dense_dim_per_shard(accessor->fea_dim(), 1), 0);
for (size_t i = 0, offset = 0; i < region_num; ++i) {
uint32_t data_num = regions[i].size / sizeof(float);
memcpy(region_buffer.data() + offset, regions[i].data, regions[i].size);
offset += data_num;
}
// table_ptr->push_dense_param(region_buffer.data(), region_buffer.size());
return done();
}
::std::future<int32_t> PsLocalClient::push_dense_raw_gradient(
int table_id, float* total_send_data, size_t total_send_data_size,
void* callback) {
VLOG(1) << "wxx push_dense_raw_gradient";
PSClientClosure* closure = reinterpret_cast<PSClientClosure*>(callback);
auto* table_ptr = table(table_id);
table_ptr->push_dense(total_send_data, total_send_data_size);
delete closure;
return done();
}
::std::future<int32_t> PsLocalClient::push_dense(const Region* regions,
size_t region_num,
size_t table_id) {
auto* accessor = table_accessor(table_id);
auto* table_ptr = table(table_id);
std::vector<float> region_buffer;
region_buffer.resize(dense_dim_per_shard(accessor->fea_dim(), 1));
size_t data_size = region_buffer.size();
for (size_t i = 0, offset = 0; i < region_num; ++i) {
uint32_t data_num = regions[i].size / sizeof(float);
PADDLE_ENFORCE_LE(
offset + data_num, data_size,
platform::errors::PreconditionNotMet(
"invalid dense size, cur pos[%d] data_num[%d] size[%d]", offset,
data_num, data_size));
memcpy(region_buffer.data() + offset, regions[i].data, regions[i].size);
offset += data_num;
}
table_ptr->push_dense(region_buffer.data(), region_buffer.size());
return done();
}
//::std::future<int32_t> PsLocalClient::pull_sparse(float** select_values,
// size_t table_id,
// const uint64_t* keys,
// size_t num) {
// // FIXME
// // auto timer =
// // std::make_shared<CostTimer>("pslib_downpour_client_pull_sparse");
// // auto local_timer =
// // std::make_shared<CostTimer>("pslib_downpour_client_pull_sparse_local");
// //将key拆分到各shard请求,并记录原始对应value指针
// auto* accessor = table_accessor(table_id);
// auto* table_ptr = table(table_id);
// size_t value_size = accessor->select_size();
//
// // table_ptr->pull_sparse(keys, num);
// std::vector<float> res_data;
// res_data.resize(num * value_size / sizeof(float));
// table_ptr->pull_sparse(res_data.data(), keys, num);
// // memcpy(select_values[0], res_data->data(), res_data->size() *
// // sizeof(float));
// size_t offset = 0;
// for (int i = 0; i < num; ++i) {
// memcpy(select_values[i], (char*)res_data.data() + offset, value_size);
// offset += value_size;
// }
//
// // return fut;
// return done();
//}
::std::future<int32_t> PsLocalClient::pull_sparse_ptr(char** select_values,
size_t table_id,
const uint64_t* keys,
size_t num) {
// FIXME
// auto timer =
// std::make_shared<CostTimer>("pslib_downpour_client_pull_sparse");
// auto local_timer =
// std::make_shared<CostTimer>("pslib_downpour_client_pull_sparse_local");
//将key拆分到各shard请求,并记录原始对应value指针
auto* table_ptr = table(table_id);
table_ptr->pull_sparse_ptr(select_values, keys, num);
return done();
}
::std::future<int32_t> PsLocalClient::push_sparse_raw_gradient(
size_t table_id, const uint64_t* keys, const float** update_values,
size_t num, void* callback) {
VLOG(1) << "wxx push_sparse_raw_gradient";
PSClientClosure* closure = reinterpret_cast<PSClientClosure*>(callback);
auto* accessor = table_accessor(table_id);
auto* table_ptr = table(table_id);
table_ptr->push_sparse(keys, update_values, num);
delete closure;
return done();
}
::std::future<int32_t> PsLocalClient::push_sparse(size_t table_id,
const uint64_t* keys,
const float** update_values,
size_t num) {
auto* accessor = table_accessor(table_id);
auto* table_ptr = table(table_id);
table_ptr->push_sparse(keys, update_values, num);
return done();
}
}
}
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License 0//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/server.h"
#include "paddle/fluid/distributed/service/ps_client.h"
namespace paddle {
namespace distributed {
class Table;
class PsLocalClient : public PSClient {
public:
PsLocalClient() {}
virtual ~PsLocalClient() { _running = false; }
virtual int32_t create_client2client_connection(int pslib_timeout_ms,
int pslib_connect_timeout_ms,
int max_retry) {
return 0;
}
virtual ::std::future<int32_t> shrink(uint32_t table_id,
const std::string threshold) override;
virtual ::std::future<int32_t> load(const std::string& epoch,
const std::string& mode) override;
virtual ::std::future<int32_t> load(uint32_t table_id,
const std::string& epoch,
const std::string& mode) override;
virtual ::std::future<int32_t> save(const std::string& epoch,
const std::string& mode) override;
virtual ::std::future<int32_t> save(uint32_t table_id,
const std::string& epoch,
const std::string& mode) override;
virtual ::std::future<int32_t> clear() override;
virtual ::std::future<int32_t> clear(uint32_t table_id) override;
virtual ::std::future<int32_t> stop_server() override;
virtual void finalize_worker() override {}
virtual ::std::future<int32_t> pull_dense(Region* regions, size_t region_num,
size_t table_id);
virtual ::std::future<int32_t> push_dense(const Region* regions,
size_t region_num, size_t table_id);
virtual ::std::future<int32_t> push_dense_param(const Region* regions,
size_t region_num,
size_t table_id);
virtual ::std::future<int32_t> pull_sparse(float** select_values,
size_t table_id,
const uint64_t* keys, size_t num,
bool is_training) {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
}
virtual ::std::future<int32_t> pull_sparse_ptr(char** select_values,
size_t table_id,
const uint64_t* keys,
size_t num);
virtual ::std::future<int32_t> print_table_stat(uint32_t table_id) {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
}
virtual ::std::future<int32_t> push_sparse(size_t table_id,
const uint64_t* keys,
const float** update_values,
size_t num);
virtual ::std::future<int32_t> flush();
// server profilera
virtual std::future<int32_t> start_profiler() {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
};
virtual std::future<int32_t> stop_profiler() {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
}
virtual std::future<int32_t> barrier(size_t table_id, uint32_t barrier_type) {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
}
virtual std::future<int32_t> pull_geo_param(size_t table_id,
std::vector<float>* values,
std::vector<uint64_t>* keys,
int pserver_idx) {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
}
virtual std::future<int32_t> push_global_step(int table_id,
int64_t* total_send_data,
void* done) {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
}
// recv table from server and save it in LodTensor
virtual int32_t recv_and_save_table(const uint64_t table_id,
const std::string& path) {
return 0;
}
virtual ::std::future<int32_t> send_client2client_msg(
int msg_type, int to_client_id, const std::string& msg) override {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
}
virtual size_t get_server_nums() { return 1; }
virtual std::future<int32_t> push_dense_raw_gradient(
int table_id, float* total_send_data, size_t total_send_data_size,
void* callback) override;
virtual std::future<int32_t> push_sparse_raw_gradient(
size_t table_id, const uint64_t* keys, const float** update_values,
size_t num, void* callback) override;
virtual std::future<int32_t> push_sparse_raw_gradient_partial(
size_t table_id, const uint64_t* keys, const float** update_values,
uint32_t num, void* done, int pserver_idx) override {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
}
virtual std::future<int32_t> push_sparse_param(size_t table_id,
const uint64_t* keys,
const float** update_values,
size_t num,
void* done) override {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
return fut;
}
private:
virtual int32_t initialize() override;
std::future<int32_t> done() {
std::shared_ptr<std::promise<int32_t>> prom =
std::make_shared<std::promise<int32_t>>();
std::future<int32_t> fut = prom->get_future();
prom->set_value(0);
return fut;
}
inline uint32_t dense_dim_per_shard(uint32_t dense_dim_total,
uint32_t shard_num) {
return dense_dim_total / shard_num + 1;
}
inline std::unordered_map<uint32_t, std::shared_ptr<Table>>* table() {
return &_table_map;
}
inline Table* table(size_t table_id) {
auto itr = _table_map.find(table_id);
if (itr != _table_map.end()) {
return itr->second.get();
}
LOG(ERROR) << "table not found " << table_id;
return NULL;
}
std::unordered_map<uint32_t, std::shared_ptr<Table>> _table_map;
bool _running = false;
bool _flushing = false;
private:
float _mae = 0;
float _mse = 0;
uint16_t _push_times = 0;
};
}
}
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <vector>
#include "paddle/fluid/distributed/service/server.h"
namespace paddle {
namespace distributed {
class PsLocalServer : public PSServer {
public:
PsLocalServer() {}
virtual ~PsLocalServer() {}
virtual uint64_t start() { return 0; }
virtual uint64_t start(const std::string& ip, uint32_t port) { return 0; }
virtual int32_t stop() { return 0; }
virtual int32_t port() { return 0; }
private:
virtual int32_t initialize() { return 0; }
};
}
}
......@@ -17,12 +17,14 @@
#include "glog/logging.h"
#include "paddle/fluid/distributed/service/brpc_ps_server.h"
#include "paddle/fluid/distributed/service/graph_brpc_server.h"
#include "paddle/fluid/distributed/service/ps_local_server.h"
#include "paddle/fluid/distributed/table/table.h"
namespace paddle {
namespace distributed {
REGISTER_PSCORE_CLASS(PSServer, BrpcPsServer);
REGISTER_PSCORE_CLASS(PSServer, PsLocalServer);
REGISTER_PSCORE_CLASS(PsBaseService, BrpcPsService);
REGISTER_PSCORE_CLASS(PSServer, GraphBrpcServer);
REGISTER_PSCORE_CLASS(PsBaseService, GraphBrpcService);
......
......@@ -446,6 +446,43 @@ int32_t CommonSparseTable::pull_sparse(float* pull_values,
return 0;
}
int32_t CommonSparseTable::pull_sparse_ptr(char** pull_values,
const uint64_t* keys, size_t num) {
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &pull_values]() -> int {
auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id];
for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i];
auto id = keys[offset];
auto* value = block->InitGet(id);
// std::copy_n(value + param_offset_, param_dim_,
// pull_values + param_dim_ * offset);
pull_values[offset] = (char*)value;
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
int32_t CommonSparseTable::_push_sparse(const uint64_t* keys,
const float* values, size_t num) {
rwlock_->RDLock();
......@@ -502,6 +539,45 @@ int32_t CommonSparseTable::push_sparse(const uint64_t* keys,
return 0;
}
int32_t CommonSparseTable::push_sparse(const uint64_t* keys,
const float** values, size_t num) {
_push_sparse(keys, values, num);
return 0;
}
int32_t CommonSparseTable::_push_sparse(const uint64_t* keys,
const float** values, size_t num) {
rwlock_->RDLock();
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &values, num, &offset_bucket]() -> int {
auto& offsets = offset_bucket[shard_id];
for (size_t i = 0; i < offsets.size(); ++i) {
std::vector<uint64_t> tmp_off = {0};
optimizer_->update(keys + offsets[i], values[offsets[i]], num,
tmp_off, shard_values_[shard_id].get());
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
rwlock_->UNLock();
return 0;
}
int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys,
const float* values, size_t num) {
rwlock_->RDLock();
......
......@@ -63,9 +63,15 @@ class CommonSparseTable : public SparseTable {
virtual std::pair<int64_t, int64_t> print_table_stat();
virtual int32_t pull_sparse(float* values, const PullSparseValue& pull_value);
virtual int32_t pull_sparse_ptr(char** pull_values, const uint64_t* keys,
size_t num);
virtual int32_t push_sparse(const uint64_t* keys, const float* values,
size_t num);
virtual int32_t push_sparse(const uint64_t* keys, const float** values,
size_t num);
// only for sparse geo table
virtual int32_t push_sparse_param(const uint64_t* keys, const float* values,
size_t num);
......@@ -80,6 +86,8 @@ class CommonSparseTable : public SparseTable {
protected:
virtual int32_t _push_sparse(const uint64_t* keys, const float* values,
size_t num);
virtual int32_t _push_sparse(const uint64_t* keys, const float** values,
size_t num);
private:
const int task_pool_size_ = 11;
......
......@@ -87,7 +87,7 @@ class ValueBlock {
value_dims_(value_dims),
value_offsets_(value_offsets),
value_idx_(value_idx) {
for (int x = 0; x < value_dims.size(); ++x) {
for (size_t x = 0; x < value_dims.size(); ++x) {
value_length_ += value_dims[x];
}
......@@ -96,13 +96,15 @@ class ValueBlock {
auto slices = string::split_string<std::string>(entry_attr, ":");
if (slices[0] == "none") {
entry_func_ = std::bind(&count_entry, std::placeholders::_1, 0);
threshold_ = 0;
} else if (slices[0] == "count_filter_entry") {
int threshold = std::stoi(slices[1]);
entry_func_ = std::bind(&count_entry, std::placeholders::_1, threshold);
threshold_ = std::stoi(slices[1]);
entry_func_ =
std::bind(&count_entry, std::placeholders::_1, threshold_);
} else if (slices[0] == "probability_entry") {
float threshold = std::stof(slices[1]);
threshold_ = std::stof(slices[1]);
entry_func_ =
std::bind(&probility_entry, std::placeholders::_1, threshold);
std::bind(&probility_entry, std::placeholders::_1, threshold_);
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Not supported Entry Type : %s, Only support [CountFilterEntry, "
......@@ -170,6 +172,21 @@ class ValueBlock {
return value->data_.data();
}
VALUE *InitGet(const uint64_t &id, const bool with_update = true,
const int counter = 1) {
if (!Has(id)) {
values_[id] = std::make_shared<VALUE>(value_length_);
}
auto &value = values_.at(id);
if (with_update) {
AttrUpdate(value, counter);
}
return value.get();
}
void AttrUpdate(std::shared_ptr<VALUE> value, const int counter) {
// update state
value->unseen_days_ = 0;
......@@ -179,7 +196,7 @@ class ValueBlock {
value->is_entry_ = entry_func_(value);
if (value->is_entry_) {
// initialize
for (int x = 0; x < value_names_.size(); ++x) {
for (size_t x = 0; x < value_names_.size(); ++x) {
initializers_[x]->GetValue(value->data_.data() + value_offsets_[x],
value_dims_[x]);
}
......@@ -224,6 +241,8 @@ class ValueBlock {
return;
}
float GetThreshold() { return threshold_; }
private:
bool Has(const uint64_t id) {
auto got = values_.find(id);
......@@ -246,6 +265,7 @@ class ValueBlock {
std::function<bool(std::shared_ptr<VALUE>)> entry_func_;
std::vector<std::shared_ptr<Initializer>> initializers_;
float threshold_;
};
} // namespace distributed
......
......@@ -48,10 +48,17 @@ class Table {
return 0;
}
virtual int32_t pull_sparse_ptr(char **pull_values, const uint64_t *keys,
size_t num) {
VLOG(0) << "NOT IMPLEMENT";
return 0;
}
virtual int32_t pull_sparse(float *values,
const PullSparseValue &pull_value) = 0;
virtual int32_t push_sparse(const uint64_t *keys, const float *values,
size_t num) = 0;
virtual int32_t push_sparse(const uint64_t *keys, const float **values,
size_t num){};
virtual int32_t push_sparse_param(const uint64_t *keys, const float *values,
size_t num) {
return 0;
......
......@@ -562,7 +562,6 @@ class PSGPUWorker : public HogwildWorker {
void ResetStat();
protected:
std::shared_ptr<paddle::framework::FleetWrapper> fleet_ptr_;
void PushGradients();
void DumpParam();
void CopySparseTable();
......
......@@ -124,6 +124,7 @@ message AsyncConfig {
optional bool launch_barrier = 9 [ default = true ];
optional string heter_worker_device_guard = 10 [ default = 'cpu' ];
optional int32 lr_decay_steps = 11 [ default = 10 ];
optional int32 use_ps_gpu = 12 [ default = 0 ];
}
message PipelineConfig {
......
if(WITH_PSLIB)
cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope pslib_brpc pslib)
else()
cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope)
endif(WITH_PSLIB)
if(WITH_HETERPS)
if(WITH_NCCL)
nv_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps)
......@@ -8,13 +13,10 @@ if(WITH_PSLIB)
hip_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps)
add_subdirectory(heter_ps)
else()
cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc)
endif(WITH_NCCL)
else()
cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope)
cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc)
endif(WITH_PSLIB)
endif(WITH_HETERPS)
if(WITH_NCCL OR WITH_RCCL)
cc_library(nccl_wrapper SRCS nccl_wrapper.cc DEPS framework_proto variable_helper scope)
......
......@@ -34,6 +34,9 @@ limitations under the License. */
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN
#ifdef PADDLE_WITH_HETERPS
#include "paddle/fluid/platform/type_defs.h"
#endif
namespace paddle {
namespace framework {
......
......@@ -14,15 +14,21 @@ limitations under the License. */
#pragma once
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_HETERPS
#include <algorithm>
#include <map>
#include <unordered_map>
#include <vector>
#ifdef PADDLE_WITH_PSLIB
#include "common_value.h" // NOLINT
#endif
#ifdef PADDLE_WITH_PSCORE
#include "paddle/fluid/distributed/table/depends/large_scale_kv.h"
#endif
#include "paddle/fluid/framework/fleet/heter_ps/feature_value.h"
#include "paddle/fluid/framework/scope.h"
......@@ -39,7 +45,12 @@ class HeterContext {
}
Scope* scope_{nullptr};
std::vector<std::vector<FeatureKey>> feature_keys_;
#ifdef PADDLE_WITH_PSLIB
std::vector<std::vector<paddle::ps::DownpourFixedFeatureValue*>> value_ptr_;
#endif
#ifdef PADDLE_WITH_PSCORE
std::vector<std::vector<paddle::distributed::VALUE*>> value_ptr_;
#endif
std::vector<std::vector<FeatureValue>> device_values_;
std::vector<std::vector<FeatureKey>> device_keys_;
std::vector<std::mutex*> mutex_;
......
......@@ -14,7 +14,7 @@ limitations under the License. */
#pragma once
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
#include <iostream>
......
......@@ -17,11 +17,17 @@ limitations under the License. */
#include <limits>
#include <memory>
#include <vector>
#ifdef PADDLE_WTIH_PSLIB
#include "common_value.h" // NOLINT
#endif
#ifdef PADDLE_WITH_PSCORE
#endif
#include "thrust/pair.h"
//#include "cudf/concurrent_unordered_map.cuh.h"
#include "paddle/fluid/framework/fleet/heter_ps/cudf/concurrent_unordered_map.cuh.h"
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
#include "paddle/fluid/distributed/table/depends/large_scale_kv.h"
#include "paddle/fluid/platform/type_defs.h"
namespace paddle {
namespace framework {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
......@@ -119,6 +119,7 @@ void HashTable<KeyType, ValType>::dump_to_cpu(int devid, cudaStream_t stream) {
continue;
}
ValType& gpu_val = kv[i].second;
#ifdef PADDLE_WITH_PSLIB
auto* downpour_value =
(paddle::ps::DownpourFixedFeatureValue*)(gpu_val.cpu_ptr);
int downpour_value_size = downpour_value->size();
......@@ -138,6 +139,14 @@ void HashTable<KeyType, ValType>::dump_to_cpu(int devid, cudaStream_t stream) {
cpu_val[x + 7] = gpu_val.mf[x];
}
}
#endif
#ifdef PADDLE_WITH_PSCORE
auto* downpour_value = (paddle::distributed::VALUE*)(gpu_val.cpu_ptr);
downpour_value->count_ = gpu_val.show;
for (int x = 0; x < gpu_val.mf_size; x++) {
downpour_value->data_[x] = gpu_val.mf[x];
}
#endif
}
container_->prefetch(devid, stream);
......
......@@ -25,7 +25,7 @@ limitations under the License. */
#include "paddle/fluid/platform/place.h"
#include "thrust/pair.h"
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
......@@ -182,7 +182,7 @@ class HeterComm {
std::vector<std::vector<Path>> path_;
std::vector<LocalStorage> storage_;
int feanum_{1800 * 2048};
int multi_node_{1};
int multi_node_{0};
std::vector<ncclComm_t> nccl_inner_comms_;
std::vector<ncclComm_t> nccl_inter_comms_;
int node_size_;
......
......@@ -12,9 +12,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#ifdef PADDLE_WITH_HETERPS
#include <queue>
#ifdef PADDLE_WITH_PSLIB
namespace paddle {
namespace framework {
......
......@@ -15,7 +15,7 @@ limitations under the License. */
#include <vector>
#include "paddle/fluid/framework/fleet/heter_ps/heter_ps.h"
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
......@@ -54,8 +54,8 @@ void HeterPs::show_one_table(int gpu_num) { comm_->show_one_table(gpu_num); }
void HeterPs::push_sparse(int num, FeatureKey* d_keys,
FeaturePushValue* d_grads, size_t len) {
// comm_->push_sparse(num, d_keys, d_grads, len, opt_);
comm_->push_sparse_multi_node(num, d_keys, d_grads, len, opt_);
comm_->push_sparse(num, d_keys, d_grads, len, opt_);
// comm_->push_sparse_multi_node(num, d_keys, d_grads, len, opt_);
}
void HeterPs::set_nccl_comm_and_size(const std::vector<ncclComm_t>& inner_comms,
......
......@@ -18,7 +18,7 @@ limitations under the License. */
#include "paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h"
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
......
......@@ -17,7 +17,7 @@ limitations under the License. */
#include "paddle/fluid/framework/fleet/heter_ps/feature_value.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h"
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
#include "heter_resource.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
......
......@@ -20,7 +20,7 @@ limitations under the License. */
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/enforce.h"
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
......
......@@ -18,7 +18,7 @@ limitations under the License. */
#include "optimizer_conf.h"
#include "paddle/fluid/framework/fleet/heter_ps/feature_value.h"
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
......
......@@ -26,8 +26,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_HETERPS
#include <algorithm>
#include <deque>
......@@ -58,7 +57,12 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
auto& device_mutex = gpu_task->mutex_;
std::vector<std::thread> threads;
#ifdef PADDLE_WITH_PSLIB
auto fleet_ptr = FleetWrapper::GetInstance();
#endif
#ifdef PADDLE_WITH_PSCORE
auto fleet_ptr = paddle::distributed::Communicator::GetInstance();
#endif
// data should be in input channel
thread_keys_.resize(thread_keys_thread_num_);
......@@ -124,9 +128,16 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
auto ptl_func = [this, &local_keys, &local_ptr, &table_id,
&fleet_ptr](int i) {
size_t key_size = local_keys[i].size();
#ifdef PADDLE_WITH_PSLIB
auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(
reinterpret_cast<char**>(local_ptr[i].data()), table_id,
local_keys[i].data(), key_size);
#endif
#ifdef PADDLE_WITH_PSCORE
auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr(
reinterpret_cast<char**>(local_ptr[i].data()), table_id,
local_keys[i].data(), key_size);
#endif
tt.wait();
auto status = tt.get();
// auto status = 0;
......@@ -153,8 +164,14 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
auto build_func = [device_num, &local_keys, &local_ptr, &device_keys,
&device_vals, &device_mutex](int i) {
std::vector<std::vector<FeatureKey>> task_keys(device_num);
#ifdef PADDLE_WITH_PSLIB
std::vector<std::vector<paddle::ps::DownpourFixedFeatureValue*>> task_ptrs(
device_num);
#endif
#ifdef PADDLE_WITH_PSCORE
std::vector<std::vector<paddle::distributed::VALUE*>> task_ptrs(device_num);
#endif
for (size_t j = 0; j < local_keys[i].size(); j++) {
int shard = local_keys[i][j] % device_num;
......@@ -169,7 +186,7 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
int cur = device_keys[dev].size();
device_keys[dev].resize(device_keys[dev].size() + len);
device_vals[dev].resize(device_vals[dev].size() + len);
#ifdef PADDLE_WITH_PSLIB
for (int j = 0; j < len; ++j) {
device_keys[dev][cur + j] = task_keys[dev][j];
float* ptr_val = task_ptrs[dev][j]->data();
......@@ -196,6 +213,35 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
}
}
}
#endif
#ifdef PADDLE_WITH_PSCORE
for (int j = 0; j < len; ++j) {
device_keys[dev][cur + j] = task_keys[dev][j];
distributed::VALUE* ptr_val = task_ptrs[dev][j];
FeatureValue& val = device_vals[dev][cur + j];
bool has_mf = 1;
val.delta_score = 0;
val.show = ptr_val->count_;
val.clk = 0;
val.slot = 0;
val.lr = 0;
val.lr_g2sum = 0;
val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]);
if (has_mf) {
val.mf_size = MF_DIM + 1;
for (int x = 0; x < val.mf_size; x++) {
val.mf[x] = ptr_val->data_[x];
}
} else {
val.mf_size = 0;
for (int x = 0; x < MF_DIM + 1; x++) {
val.mf[x] = 0;
}
}
}
#endif
VLOG(1) << "GpuPs build hbmps done";
device_mutex[dev]->unlock();
}
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#ifdef PADDLE_WITH_PSLIB
#ifdef PADDLE_WITH_HETERPS
#include <algorithm>
#include <ctime>
#include <memory>
......
......@@ -14,8 +14,7 @@ limitations under the License. */
#pragma once
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_HETERPS
#include <atomic>
#include <ctime>
......@@ -26,7 +25,6 @@ limitations under the License. */
#include <unordered_map>
#include <unordered_set>
#include <vector>
#ifdef PADDLE_WITH_GLOO
#include <gloo/broadcast.h>
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
......@@ -42,6 +40,9 @@ limitations under the License. */
#include "paddle/fluid/platform/gpu_info.h"
#include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN
#include "paddle/fluid/platform/place.h"
#ifdef PADDLE_WITH_PSCORE
#include "paddle/fluid/distributed/service/communicator.h"
#endif
namespace paddle {
namespace framework {
......@@ -219,7 +220,7 @@ class PSGPUWrapper {
std::shared_ptr<HeterPsResource> resource_;
int32_t sleep_seconds_before_fail_exit_;
std::vector<int> slot_vector_;
int multi_node_{1};
int multi_node_{0};
int node_size_;
std::vector<ncclComm_t> inner_comms_;
std::vector<ncclComm_t> inter_comms_;
......
......@@ -30,10 +30,12 @@ limitations under the License. */
#include "brpc/controller.h"
#include "brpc/server.h"
#include "paddle/fluid/platform/timer.h"
#endif
namespace paddle {
namespace framework {
#ifdef PADDLE_WITH_PSLIB
typedef std::function<int(const HeterRequest*, HeterResponse*)>
HeterServiceHandler;
class DataFeed;
......@@ -142,7 +144,7 @@ class HeterTask {
double cpu_2_gpu_time{0};
platform::Timer timeline;
};
#endif
template <class T>
class HeterObjectPool {
public:
......@@ -153,7 +155,7 @@ class HeterObjectPool {
if (pool_.empty()) {
num_ += 1;
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
VLOG(0) << "pool construct size: " << num_;
VLOG(3) << "pool construct size: " << num_;
#endif
return std::make_shared<T>();
} else {
......@@ -178,6 +180,7 @@ class HeterObjectPool {
int num_{0};
};
#ifdef PADDLE_WITH_PSLIB
struct BthreadMutextGuard {
BthreadMutextGuard(bthread_mutex_t* rho) {
mutex_ = rho;
......@@ -258,7 +261,6 @@ class HeterList {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
if (task_map_.find(key) != task_map_.end()) {
// std::cout << "try put key=" << key << " false" << std::endl;
task_map_.erase(key);
return false;
} else {
......@@ -267,7 +269,6 @@ class HeterList {
node->value = value;
map_[node->key] = node;
attach(node);
// std::cout << "try put key=" << key << " true" << std::endl;
return true;
}
}
......@@ -276,7 +277,6 @@ class HeterList {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
HeterNode<K, T>* node = new HeterNode<K, T>;
// std::cout << "put key=" << key << " true" << std::endl;
node->key = key;
node->value = value;
map_[node->key] = node;
......@@ -288,7 +288,6 @@ class HeterList {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
// std::cout << "try get key=" << key << " true" << std::endl;
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
......@@ -298,7 +297,6 @@ class HeterList {
return ret;
}
task_map_.insert(key);
// std::cout << "try get key=" << key << " false" << std::endl;
return nullptr;
}
......@@ -306,7 +304,6 @@ class HeterList {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
// std::cout << "get key=" << key << " true" << std::endl;
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
......@@ -315,7 +312,6 @@ class HeterList {
delete node;
return ret;
}
// std::cout << "get key=" << key << " false" << std::endl;
return nullptr;
}
......@@ -323,14 +319,12 @@ class HeterList {
std::lock_guard<std::mutex> lock(mutex_);
HeterNode<K, T>* node = head_->next;
if (node == tail_) {
// std::cout << "get2 false" << std::endl;
return nullptr;
} else {
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(node->key);
// std::cout << "get2 key=" << node->key << " true" << std::endl;
delete node;
return ret;
}
......@@ -371,7 +365,7 @@ class HeterList {
int cap_;
int size;
};
#endif
} // namespace framework
} // namespace paddle
#endif
......@@ -38,6 +38,13 @@ void MultiTrainer::Initialize(const TrainerDesc& trainer_desc,
need_merge_var_names_.push_back(
trainer_desc.downpour_param().stat_var_names(i));
}
#ifdef PADDLE_WITH_HETERPS
for (int i = 0; i < thread_num_; ++i) {
int num = trainer_desc.worker_places(i);
platform::CUDAPlace place = platform::CUDAPlace(num);
places_.push_back(place);
}
#endif
// get filelist from trainer_desc here
const std::vector<paddle::framework::DataFeed*> readers =
dataset->GetReaders();
......@@ -102,13 +109,42 @@ void MultiTrainer::InitDumpEnv() {
void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place) {
for (int i = 0; i < thread_num_; ++i) {
#ifdef PADDLE_WITH_HETERPS
workers_[i]->SetPlace(places_[i]);
workers_[i]->SetReaderPlace(places_[i]);
#else
workers_[i]->SetPlace(place);
workers_[i]->SetReaderPlace(place);
#endif
workers_[i]->SetRootScope(root_scope_);
workers_[i]->CreateDeviceResource(main_program); // Program
workers_[i]->BindingDataFeedMemory();
workers_[i]->CacheProgram(main_program);
}
#ifdef PADDLE_WITH_HETERPS
for (int num = 0; num < thread_num_; ++num) {
auto place = places_[num];
Scope* scope = workers_[num]->GetThreadScope();
auto& block = main_program.Block(0);
for (auto& var : block.AllVars()) {
if (var->Persistable()) {
auto name = var->Name();
Variable* root_var = root_scope_->FindVar(name);
if (!root_var) {
continue;
}
if (root_var->IsType<SelectedRows>()) {
continue;
}
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
auto* ptr = scope->Var(name);
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
LoDTensor* thread_tensor = ptr->GetMutable<LoDTensor>();
TensorCopy(*root_tensor, place, thread_tensor);
}
}
}
#endif
}
void MultiTrainer::InitOtherEnv(const ProgramDesc& main_program) {
......@@ -138,10 +174,77 @@ void MultiTrainer::Run() {
}
}
#ifdef PADDLE_WITH_HETERPS
void MultiTrainer::MergeDenseParam() {
auto communicator = paddle::distributed::Communicator::GetInstance();
auto& recv_ctx = communicator->GetRecvCtxMap();
Scope* thread_scope = workers_[0]->GetThreadScope();
for (auto& iter : recv_ctx) {
auto& varnames = iter.second;
for (auto& name : varnames) {
Variable* root_var = root_scope_->FindVar(name);
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
Variable* var = thread_scope->FindVar(name);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
TensorCopy((*tensor), root_tensor->place(), root_tensor);
}
}
}
#endif
template <typename T>
void MultiTrainer::MergeToRootScope(LoDTensor* root_tensor, LoDTensor* tensor) {
LoDTensor tmp_root;
TensorCopy(*root_tensor, platform::CPUPlace(), &tmp_root);
T* tmp_root_data = tmp_root.data<T>();
LoDTensor tmp_tensor;
TensorCopy(*tensor, platform::CPUPlace(), &tmp_tensor);
T* data = tmp_tensor.data<T>();
for (int i = 0; i < tmp_tensor.numel(); i++) {
tmp_root_data[i] += data[i];
}
TensorCopy(tmp_root, platform::CPUPlace(), root_tensor);
}
void MultiTrainer::Finalize() {
if (need_dump_field_ || need_dump_param_) {
FinalizeDumpEnv();
}
#ifdef PADDLE_WITH_HETERPS
for (size_t i = 0; i < need_merge_var_names_.size(); i++) {
Variable* root_var = root_scope_->FindVar(need_merge_var_names_[i]);
if (root_var == nullptr) {
continue;
}
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
for (size_t j = 0; j < places_.size(); j++) {
Scope* cur_thread_scope = workers_[j]->GetThreadScope();
Variable* thread_var =
cur_thread_scope->FindVar(need_merge_var_names_[i]);
if (thread_var == nullptr) {
continue;
}
LoDTensor* thread_tensor = thread_var->GetMutable<LoDTensor>();
#define MergeCallback(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
if (thread_tensor->type() != proto_type) { \
VLOG(0) << "Error: thread id=" << j << ", need_merge_var_names_[" << i \
<< "] " << need_merge_var_names_[i] \
<< ", root tensor type=" << root_tensor->type() \
<< ", thread tensor type=" << thread_tensor->type(); \
exit(-1); \
} \
MergeToRootScope<cpp_type>(root_tensor, thread_tensor); \
} \
} while (0)
_ForEachDataType_(MergeCallback);
}
}
MergeDenseParam();
#endif
root_scope_->DropKids();
}
......
......@@ -19,10 +19,6 @@ limitations under the License. */
#include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/fleet/heter_context.h"
#include "paddle/fluid/framework/fleet/heter_ps/feature_value.h"
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#include "paddle/fluid/framework/trainer.h"
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
......@@ -64,7 +60,6 @@ void PSGPUTrainer::Initialize(const TrainerDesc& trainer_desc,
pull_dense_worker_ = PullDenseWorker::GetInstance();
pull_dense_worker_->Initialize(trainer_desc);
SetDebug(trainer_desc.debug());
fleet_ptr_ = FleetWrapper::GetInstance();
trainer_desc_ = trainer_desc;
workers_.resize(place_num);
for (int i = 0; i < place_num; ++i) {
......
......@@ -14,7 +14,6 @@ limitations under the License. */
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/string/string_helper.h"
......
......@@ -109,13 +109,22 @@ class MultiTrainer : public TrainerBase {
virtual Scope* GetWorkerScope(int thread_id);
virtual std::string GetDumpPath(int tid);
template <typename T>
void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor);
#ifdef PADDLE_WITH_HETERPS
void MergeDenseParam();
#endif
protected:
int thread_num_;
std::vector<std::thread> threads_;
std::vector<DataFeed*> readers_;
std::vector<std::shared_ptr<DeviceWorker>> workers_;
std::vector<std::string> need_merge_var_names_;
#ifdef PADDLE_WITH_HETERPS
std::vector<platform::Place> places_;
#endif
int mpi_rank_;
int mpi_size_;
int dump_file_num_;
......@@ -313,7 +322,6 @@ class PSGPUTrainer : public TrainerBase {
float scale_datanorm_;
paddle::platform::Place place_;
ProgramDesc program_;
std::shared_ptr<paddle::framework::FleetWrapper> fleet_ptr_;
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
std::vector<std::shared_ptr<DeviceWorker>> workers_;
std::vector<platform::Place> places_;
......
......@@ -47,8 +47,7 @@ static void PullBoxSparseFunctor(const framework::ExecutionContext &ctx) {
box_ptr->PullSparse(ctx.GetPlace(), all_keys, all_values, slot_lengths,
hidden_size, 0);
#endif
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_HETERPS
auto hidden_size = ctx.Attr<int>("size");
auto gpu_ps_ptr = paddle::framework::PSGPUWrapper::GetInstance();
gpu_ps_ptr->PullSparse(ctx.GetPlace(), 0, all_keys, all_values, slot_lengths,
......@@ -91,8 +90,7 @@ static void PushBoxSparseFunctor(const framework::ExecutionContext &ctx) {
box_ptr->PushSparseGrad(ctx.GetPlace(), all_keys, all_grad_values,
slot_lengths, hidden_size, 0, batch_size);
#endif
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_HETERPS
auto hidden_size = ctx.Attr<int>("size");
auto gpu_ps_ptr = paddle::framework::PSGPUWrapper::GetInstance();
gpu_ps_ptr->PushSparseGrad(ctx.GetPlace(), 0, all_keys, all_grad_values,
......
......@@ -32,8 +32,7 @@ namespace py = pybind11;
namespace paddle {
namespace pybind {
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_HETERPS
void BindPSGPUWrapper(py::module* m) {
py::class_<framework::PSGPUWrapper, std::shared_ptr<framework::PSGPUWrapper>>(
*m, "PSGPU")
......
......@@ -22,8 +22,7 @@ namespace py = pybind11;
namespace paddle {
namespace pybind {
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_HETERPS
void BindPSGPUWrapper(py::module* m);
#endif
} // namespace pybind
......
......@@ -3052,8 +3052,7 @@ All parameter, weight, gradient are variables in Paddle.
#ifdef PADDLE_WITH_PSLIB
BindHeterWrapper(&m);
#endif
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_HETERPS
BindPSGPUWrapper(&m);
#endif
BindGlooWrapper(&m);
......
......@@ -38,6 +38,23 @@ class ParameterServerOptimizer(MetaOptimizerBase):
k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]
return True if k_steps >= 0 else False
def get_dist_env(self):
trainer_id = int(os.getenv('PADDLE_TRAINER_ID', '0'))
trainer_endpoints = ''
current_endpoint = ''
num_trainers = 0
if os.getenv('PADDLE_TRAINER_ENDPOINTS'):
trainer_endpoints = os.getenv('PADDLE_TRAINER_ENDPOINTS')
current_endpoint = trainer_endpoints.split(',')[trainer_id]
num_trainers = len(trainer_endpoints.split(','))
return {
'trainer_id': trainer_id,
'num_trainers': num_trainers,
'current_endpoint': current_endpoint,
'trainer_endpoints': trainer_endpoints
}
def _get_distributed_strategy(self):
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
......@@ -64,6 +81,8 @@ class ParameterServerOptimizer(MetaOptimizerBase):
_main = compiled_config.origin_main_program.clone()
_startup = compiled_config.origin_startup_program.clone()
use_ps_gpu = self.user_defined_strategy.a_sync_configs["use_ps_gpu"]
if not compiled_config.is_geo_mode():
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _add_lr_decay_table_pass
_add_lr_decay_table_pass(
......@@ -71,14 +90,28 @@ class ParameterServerOptimizer(MetaOptimizerBase):
self.user_defined_strategy.a_sync_configs["lr_decay_steps"])
# for main program
_main = worker.delete_optimizer_pass(_main, compiled_config)
_main = worker.distributed_ops_pass(_main, compiled_config)
_main = worker.append_send_ops_pass(_main, compiled_config)
# for startup program
_main = worker.distributed_ops_pass(_main, compiled_config,
use_ps_gpu)
if not use_ps_gpu:
_main = worker.delete_optimizer_pass(_main, compiled_config)
_main = worker.append_send_ops_pass(_main, compiled_config)
_startup = worker.delet_extra_optimizes_pass(_startup,
compiled_config)
# for startup program
_startup = worker.fake_init_ops_pass(_startup, compiled_config)
_startup = worker.delet_extra_optimizes_pass(_startup,
compiled_config)
if use_ps_gpu:
_main = worker.ps_gpu_pass(_main)
from paddle.fluid.transpiler.collective import SingleProcessMultiThread
t = SingleProcessMultiThread()
env = self.get_dist_env()
t.transpile(
startup_program=_startup,
main_program=_main,
rank=env["trainer_id"],
endpoints=env["trainer_endpoints"],
current_endpoint=env['current_endpoint'],
wait_port=False)
compiled_config.set_origin_ps_main_program(_main)
compiled_config.set_origin_ps_startup_program(_startup)
......
......@@ -453,6 +453,17 @@ class TheOnePSRuntime(RuntimeBase):
worker = self._get_fleet_proto(is_server=False, is_sync=is_sync)
server = self._get_fleet_proto(is_server=True, is_sync=is_sync)
dist_strategy = self.context["valid_strategy"]
use_ps_gpu = dist_strategy.a_sync_configs["use_ps_gpu"]
if use_ps_gpu:
main_program = self.context['loss'].block.program
if not main_program._fleet_opt:
main_program._fleet_opt = {}
main_program._fleet_opt["use_ps_gpu"] = True
gpus_env = os.getenv("FLAGS_selected_gpus")
main_program._fleet_opt[
"worker_places"] = [int(s) for s in gpus_env.split(",")]
def sync_strategy_envs():
kwargs = {}
kwargs[
......@@ -741,6 +752,11 @@ class TheOnePSRuntime(RuntimeBase):
downpour_server = DownpourServer()
service = Service()
dist_strategy = self.context["valid_strategy"]
use_ps_gpu = dist_strategy.a_sync_configs["use_ps_gpu"]
if use_ps_gpu:
service.server_class = "PsLocalServer"
service.client_class = "PsLocalClient"
downpour_server.set_service_param(service)
tables = _get_tables()
......
......@@ -102,6 +102,12 @@ class Hogwild(DeviceWorker):
# when opt_info is None or empty dict, it should return
if not opt_info:
return
downpour = trainer_desc.downpour_param
hogwild = trainer_desc.hogwild_param
if opt_info["stat_var_names"]:
for i in opt_info["stat_var_names"]:
hogwild.stat_var_names.extend([i])
downpour.stat_var_names.extend([i])
from paddle.fluid.incubate.fleet.parameter_server import version
......@@ -109,8 +115,6 @@ class Hogwild(DeviceWorker):
return
program_configs = opt_info["program_configs"]
downpour = trainer_desc.downpour_param
hogwild = trainer_desc.hogwild_param
for pid in program_configs:
if pid == program_id:
......@@ -161,10 +165,6 @@ class Hogwild(DeviceWorker):
sparse_table.emb_dim = -1
# not use hard code click
sparse_table.label_var_name = ""
if opt_info["stat_var_names"]:
for i in opt_info["stat_var_names"]:
hogwild.stat_var_names.extend([i])
downpour.stat_var_names.extend([i])
for i in worker.get_desc().dense_table:
if i.table_id in dense_table_set:
......
......@@ -1373,11 +1373,14 @@ class Executor(object):
fetch_info=None,
print_period=100):
is_heter = 0
use_ps_gpu = 0
if not program._fleet_opt is None:
if program._fleet_opt.get("worker_class", "") == "HeterCpuWorker":
is_heter = 1
if program._fleet_opt.get("trainer", "") == "HeterXpuTrainer":
is_heter = 1
if program._fleet_opt.get("use_ps_gpu", False):
use_ps_gpu = True
if scope is None:
scope = global_scope()
if fetch_list is None:
......@@ -1412,7 +1415,9 @@ class Executor(object):
trainer._set_program(program.program)
if thread <= 0:
if dataset.thread_num <= 0:
if use_ps_gpu:
trainer._set_thread(len(program._fleet_opt["worker_places"]))
elif dataset.thread_num <= 0:
raise RuntimeError(
"You should set thread num first, either in Dataset"
"or in Executor.train_from_dataset")
......
......@@ -24,6 +24,7 @@ from functools import reduce
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.framework as framework
import paddle.compat as cpt
from paddle.fluid.transpiler.details.program_utils import delete_ops
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_optimize_ops
......@@ -93,7 +94,7 @@ def delete_optimizer_pass(program, config):
return program
def distributed_ops_pass(program, config):
def distributed_ops_pass(program, config, use_ps_gpu=False):
trainer_id = config.get_role_id()
send_ctx = config.get_the_one_send_context(
split_dense_table=config.is_heter_ps_mode)
......@@ -109,7 +110,7 @@ def distributed_ops_pass(program, config):
pull_sparse_ops[param_name] = ops
return pull_sparse_ops
def _pull_sparse_fuse(_program, pull_sparse_ops):
def _pull_sparse_fuse(_program, pull_sparse_ops, use_ps_gpu):
for param, ops in pull_sparse_ops.items():
all_ops = program.global_block().ops
op_idxs = [all_ops.index(op) for op in ops]
......@@ -159,18 +160,31 @@ def distributed_ops_pass(program, config):
if min(outputs_idxs) - max(inputs_idxs) >= 1:
distributed_idx = max(inputs_idxs) + 1
program.global_block()._insert_op(
index=distributed_idx,
type="distributed_lookup_table",
inputs={"Ids": inputs,
'W': w},
outputs={"Outputs": outputs},
attrs={
"is_distributed": is_distributed,
"padding_idx": padding_idx,
"table_id": table_id,
"lookup_table_version": op_type
})
if use_ps_gpu:
program.global_block()._insert_op(
index=distributed_idx,
type="pull_box_sparse",
inputs={"Ids": inputs,
'W': w},
outputs={"Out": outputs},
attrs={
"size": w.shape[1],
"is_distributed": True,
"is_sparse": True
})
else:
program.global_block()._insert_op(
index=distributed_idx,
type="distributed_lookup_table",
inputs={"Ids": inputs,
'W': w},
outputs={"Outputs": outputs},
attrs={
"is_distributed": is_distributed,
"padding_idx": padding_idx,
"table_id": table_id,
"lookup_table_version": op_type
})
else:
for i in range(len(inputs_idxs)):
distributed_idx = op_idxs[i] + 1
......@@ -189,7 +203,7 @@ def distributed_ops_pass(program, config):
})
pull_sparse_ops = _get_pull_sparse_ops(program)
_pull_sparse_fuse(program, pull_sparse_ops)
_pull_sparse_fuse(program, pull_sparse_ops, use_ps_gpu)
return program
......@@ -308,6 +322,54 @@ def fake_init_ops_pass(program, config):
return program
def ps_gpu_pass(program):
def _add_push_box_sparse_op(program):
op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
backward = core.op_proto_and_checker_maker.OpRole.Backward
for op in program.global_block().ops:
if op.type != "pull_box_sparse":
continue
grad_op_desc, op_grad_to_var = core.get_grad_op_desc(
op.desc, cpt.to_text(set()), [])
for op_desc in grad_op_desc:
new_op_desc = program.global_block().desc.append_op()
new_op_desc.copy_from(op_desc)
new_op_desc._set_attr(op_role_attr_name, backward)
def _remove_lookup_table_grad_op_and_var(program):
lookup_table_grad_var = {}
remove_op_index = []
remove_var = []
for idx, op in list(enumerate(program.global_block().ops)):
if op.type == "lookup_table_grad":
for name in op.output("W@GRAD"):
lookup_table_grad_var[name] = 1
remove_op_index.append(idx)
remove_var.append(name)
for name in op.input("W"):
lookup_table_grad_var[name] = 1
for idx, op in list(enumerate(program.global_block().ops)):
if op.type == "pull_box_sparse":
continue
for key_name in op.input_names:
for var in op.input(key_name):
if var in lookup_table_grad_var:
remove_op_index.append(idx)
break
remove_op_index = list(set(remove_op_index))
remove_op_index.sort(reverse=True)
for idx in remove_op_index:
program.global_block()._remove_op(idx)
for name in remove_var:
program.global_block()._remove_var(name)
_add_push_box_sparse_op(program)
_remove_lookup_table_grad_op_and_var(program)
return program
def delet_extra_optimizes_pass(program, config):
optimize_vars = []
optimize_op_role_vars = []
......
......@@ -30,6 +30,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_simple_dist_transpiler)
list(APPEND MIXED_DIST_TEST_OPS test_recv_save_op)
list(APPEND MIXED_DIST_TEST_OPS test_c_comm_init_op)
list(APPEND MIXED_DIST_TEST_OPS test_communicator_async)
list(APPEND MIXED_DIST_TEST_OPS test_communicator_ps_gpu)
list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo)
list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async)
list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync)
......@@ -483,6 +484,7 @@ if(WITH_DISTRIBUTE)
py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS})
py_test_modules(test_communicator_async MODULES test_communicator_async ENVS ${dist_ENVS})
py_test_modules(test_communicator_ps_gpu MODULES test_communicator_ps_gpu ENVS ${dist_ENVS})
py_test_modules(test_communicator_geo MODULES test_communicator_geo ENVS ${dist_ENVS})
py_test_modules(test_communicator_half_async MODULES test_communicator_half_async ENVS ${dist_ENVS} FLAGS_communicator_send_queue_size=1 FLAGS_communicator_max_merge_var_num=1)
py_test_modules(test_communicator_sync MODULES test_communicator_sync ENVS ${dist_ENVS} FLAGS_communicator_send_queue_size=1 FLAGS_communicator_max_merge_var_num=1)
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import unittest
import time
import threading
import numpy
import paddle
paddle.enable_static()
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet
class TestCommunicator(unittest.TestCase):
def test_communicator_ps_gpu(self):
with open("test_communicator_ps_gpu.txt", "w") as f:
data = "1 0.6 1 0.7\n"
f.write(data)
os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001,127.0.0.2:36001"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002,127.0.0.2:36002"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["FLAGS_selected_gpus"] = "0"
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
x = fluid.layers.data(name='x', shape=[1], dtype='float32')
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
slots_vars = [x, y]
cost = fluid.layers.square_error_cost(input=x, label=y)
avg_cost = fluid.layers.mean(cost)
optimizer = fluid.optimizer.Adam(0.01)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {
"launch_barrier": False,
"use_ps_gpu": 1,
}
startup_program = paddle.static.Program()
main_program = paddle.static.Program()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
dataset = paddle.distributed.InMemoryDataset()
dataset.init(
batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars)
dataset.set_filelist(["test_communicator_ps_gpu.txt"])
dataset.load_into_memory()
os.environ["TEST_MODE"] = "1"
exe = fluid.Executor(fluid.CPUPlace())
exe.run(startup_program)
main_program._fleet_opt = {"stat_var_names": [x.name]}
fleet.init_worker()
try:
exe.train_from_dataset(main_program, dataset)
except ImportError as e:
pass
except Exception as e:
self.assertTrue(False)
time.sleep(10)
fleet.stop_worker()
os.remove("./test_communicator_ps_gpu.txt")
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import unittest
import tempfile
import shutil
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet
paddle.enable_static()
# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1
batch_size = 4
class TestPSPassWithBow(unittest.TestCase):
def net(self):
def get_acc(cos_q_nt, cos_q_pt, batch_size):
cond = fluid.layers.less_than(cos_q_nt, cos_q_pt)
cond = fluid.layers.cast(cond, dtype='float64')
cond_3 = fluid.layers.reduce_sum(cond)
acc = fluid.layers.elementwise_div(
cond_3,
fluid.layers.fill_constant(
shape=[1], value=batch_size * 1.0, dtype='float64'),
name="simnet_acc")
return acc
def get_loss(cos_q_pt, cos_q_nt):
loss_op1 = fluid.layers.elementwise_sub(
fluid.layers.fill_constant_batch_size_like(
input=cos_q_pt,
shape=[-1, 1],
value=margin,
dtype='float32'),
cos_q_pt)
loss_op2 = fluid.layers.elementwise_add(loss_op1, cos_q_nt)
loss_op3 = fluid.layers.elementwise_max(
fluid.layers.fill_constant_batch_size_like(
input=loss_op2, shape=[-1, 1], value=0.0, dtype='float32'),
loss_op2)
avg_cost = fluid.layers.mean(loss_op3)
return avg_cost
is_distributed = False
is_sparse = True
# query
q = fluid.layers.data(
name="query_ids", shape=[1], dtype="int64", lod_level=1)
# embedding
q_emb = fluid.contrib.layers.sparse_embedding(
input=q,
size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__",
learning_rate=emb_lr))
q_emb = fluid.layers.reshape(q_emb, [-1, emb_dim])
# vsum
q_sum = fluid.layers.sequence_pool(input=q_emb, pool_type='sum')
q_ss = fluid.layers.softsign(q_sum)
# fc layer after conv
q_fc = fluid.layers.fc(
input=q_ss,
size=hid_dim,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__q_fc__",
learning_rate=base_lr))
# label data
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
# pt
pt = fluid.layers.data(
name="pos_title_ids", shape=[1], dtype="int64", lod_level=1)
# embedding
pt_emb = fluid.contrib.layers.sparse_embedding(
input=pt,
size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__",
learning_rate=emb_lr))
pt_emb = fluid.layers.reshape(pt_emb, [-1, emb_dim])
# vsum
pt_sum = fluid.layers.sequence_pool(input=pt_emb, pool_type='sum')
pt_ss = fluid.layers.softsign(pt_sum)
# fc layer
pt_fc = fluid.layers.fc(
input=pt_ss,
size=hid_dim,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__fc__",
learning_rate=base_lr),
bias_attr=fluid.ParamAttr(name="__fc_b__"))
# nt
nt = fluid.layers.data(
name="neg_title_ids", shape=[1], dtype="int64", lod_level=1)
# embedding
nt_emb = fluid.contrib.layers.sparse_embedding(
input=nt,
size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__",
learning_rate=emb_lr))
nt_emb = fluid.layers.reshape(nt_emb, [-1, emb_dim])
# vsum
nt_sum = fluid.layers.sequence_pool(input=nt_emb, pool_type='sum')
nt_ss = fluid.layers.softsign(nt_sum)
# fc layer
nt_fc = fluid.layers.fc(
input=nt_ss,
size=hid_dim,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__fc__",
learning_rate=base_lr),
bias_attr=fluid.ParamAttr(name="__fc_b__"))
cos_q_pt = fluid.layers.cos_sim(q_fc, pt_fc)
cos_q_nt = fluid.layers.cos_sim(q_fc, nt_fc)
# loss
avg_cost = get_loss(cos_q_pt, cos_q_nt)
# acc
acc = get_acc(cos_q_nt, cos_q_pt, batch_size)
return [avg_cost, acc, cos_q_pt]
def test(self):
os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001,127.0.0.2:36001"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002,127.0.0.2:36002"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["FLAGS_selected_gpus"] = "0"
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
loss, acc, _ = self.net()
strategy = paddle.distributed.fleet.DistributedStrategy()
configs = {"use_ps_gpu": 1, "launch_barrier": False}
strategy.a_sync_configs = configs
strategy.a_sync = True
optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(loss)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import unittest
import tempfile
import shutil
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet
paddle.enable_static()
# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1
batch_size = 4
class TestPSPassWithBow(unittest.TestCase):
def net(self):
def get_acc(cos_q_nt, cos_q_pt, batch_size):
cond = fluid.layers.less_than(cos_q_nt, cos_q_pt)
cond = fluid.layers.cast(cond, dtype='float64')
cond_3 = fluid.layers.reduce_sum(cond)
acc = fluid.layers.elementwise_div(
cond_3,
fluid.layers.fill_constant(
shape=[1], value=batch_size * 1.0, dtype='float64'),
name="simnet_acc")
return acc
def get_loss(cos_q_pt, cos_q_nt):
loss_op1 = fluid.layers.elementwise_sub(
fluid.layers.fill_constant_batch_size_like(
input=cos_q_pt,
shape=[-1, 1],
value=margin,
dtype='float32'),
cos_q_pt)
loss_op2 = fluid.layers.elementwise_add(loss_op1, cos_q_nt)
loss_op3 = fluid.layers.elementwise_max(
fluid.layers.fill_constant_batch_size_like(
input=loss_op2, shape=[-1, 1], value=0.0, dtype='float32'),
loss_op2)
avg_cost = fluid.layers.mean(loss_op3)
return avg_cost
is_distributed = False
is_sparse = True
# query
q = fluid.layers.data(
name="query_ids", shape=[1], dtype="int64", lod_level=1)
# embedding
q_emb = fluid.contrib.layers.sparse_embedding(
input=q,
size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__",
learning_rate=emb_lr))
q_emb = fluid.layers.reshape(q_emb, [-1, emb_dim])
# vsum
q_sum = fluid.layers.sequence_pool(input=q_emb, pool_type='sum')
q_ss = fluid.layers.softsign(q_sum)
# fc layer after conv
q_fc = fluid.layers.fc(
input=q_ss,
size=hid_dim,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__q_fc__",
learning_rate=base_lr))
# label data
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
# pt
pt = fluid.layers.data(
name="pos_title_ids", shape=[1], dtype="int64", lod_level=1)
# embedding
pt_emb = fluid.contrib.layers.sparse_embedding(
input=pt,
size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__",
learning_rate=emb_lr))
pt_emb = fluid.layers.reshape(pt_emb, [-1, emb_dim])
# vsum
pt_sum = fluid.layers.sequence_pool(input=pt_emb, pool_type='sum')
pt_ss = fluid.layers.softsign(pt_sum)
# fc layer
pt_fc = fluid.layers.fc(
input=pt_ss,
size=hid_dim,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__fc__",
learning_rate=base_lr),
bias_attr=fluid.ParamAttr(name="__fc_b__"))
# nt
nt = fluid.layers.data(
name="neg_title_ids", shape=[1], dtype="int64", lod_level=1)
# embedding
nt_emb = fluid.contrib.layers.sparse_embedding(
input=nt,
size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__",
learning_rate=emb_lr))
nt_emb = fluid.layers.reshape(nt_emb, [-1, emb_dim])
# vsum
nt_sum = fluid.layers.sequence_pool(input=nt_emb, pool_type='sum')
nt_ss = fluid.layers.softsign(nt_sum)
# fc layer
nt_fc = fluid.layers.fc(
input=nt_ss,
size=hid_dim,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__fc__",
learning_rate=base_lr),
bias_attr=fluid.ParamAttr(name="__fc_b__"))
cos_q_pt = fluid.layers.cos_sim(q_fc, pt_fc)
cos_q_nt = fluid.layers.cos_sim(q_fc, nt_fc)
# loss
avg_cost = get_loss(cos_q_pt, cos_q_nt)
# acc
acc = get_acc(cos_q_nt, cos_q_pt, batch_size)
return [avg_cost, acc, cos_q_pt]
def test(self):
os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
os.environ["TRAINING_ROLE"] = "PSERVER"
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
loss, acc, _ = self.net()
strategy = paddle.distributed.fleet.DistributedStrategy()
configs = {"use_ps_gpu": 1}
strategy.a_sync_configs = configs
strategy.a_sync = True
optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(loss)
fleet.init_server()
if __name__ == '__main__':
unittest.main()
......@@ -49,8 +49,8 @@ class TrainerFactory(object):
device_worker = Hogwild()
trainer._set_device_worker(device_worker)
else:
trainer_class = opt_info["trainer"]
device_worker_class = opt_info["device_worker"]
trainer_class = opt_info.get("trainer", "MultiTrainer")
device_worker_class = opt_info.get("device_worker", "Hogwild")
trainer = globals()[trainer_class]()
device_worker = globals()[device_worker_class]()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册