未验证 提交 630f5b89 编写于 作者: W wangguanqun 提交者: GitHub

delete commonsparsetable and communicator from gpups (#40973)

* trainer and worker

* delete commonsparsetable from gpups

* delete vlog

* codestyle

* delete communicator from gpups
上级 62af5903
......@@ -25,9 +25,9 @@ int32_t PsLocalClient::initialize() {
for (size_t i = 0; i < downpour_param.downpour_table_param_size(); ++i) {
auto* table = CREATE_PSCORE_CLASS(
Table, downpour_param.downpour_table_param(i).table_class());
table->set_shard(0, 1);
table->initialize(downpour_param.downpour_table_param(i),
_config.fs_client_param());
table->set_shard(0, 1);
_table_map[downpour_param.downpour_table_param(i).table_id()].reset(table);
}
return 0;
......
......@@ -57,6 +57,7 @@ struct alignas(64) SparseTableShard {
}
const KEY& key() const { return it->first; }
VALUE& value() const { return *(VALUE*)(void*)it->second; } // NOLINT
VALUE* value_ptr() const { return (VALUE*)(void*)it->second; } // NOLINT
iterator& operator++() {
++it;
......
......@@ -481,6 +481,52 @@ int32_t MemorySparseTable::pull_sparse(float* pull_values,
int32_t MemorySparseTable::pull_sparse_ptr(char** pull_values,
const uint64_t* keys, size_t num) {
CostTimer timer("pscore_sparse_select_all");
size_t value_size = _value_accesor->size() / sizeof(float);
size_t mf_value_size = _value_accesor->mf_size() / sizeof(float);
std::vector<std::future<int>> tasks(_real_local_shard_num);
std::vector<std::vector<std::pair<uint64_t, int>>> task_keys(
_real_local_shard_num);
for (size_t i = 0; i < num; ++i) {
int shard_id = (keys[i] % _sparse_table_shard_num) % _avg_local_shard_num;
task_keys[shard_id].push_back({keys[i], i});
}
// std::atomic<uint32_t> missed_keys{0};
for (size_t shard_id = 0; shard_id < _real_local_shard_num; ++shard_id) {
tasks[shard_id] =
_shards_task_pool[shard_id % _shards_task_pool.size()]->enqueue(
[this, shard_id, &task_keys, pull_values, value_size,
mf_value_size]() -> int {
auto& keys = task_keys[shard_id];
auto& local_shard = _local_shards[shard_id];
float data_buffer[value_size];
float* data_buffer_ptr = data_buffer;
for (int i = 0; i < keys.size(); ++i) {
uint64_t key = keys[i].first;
auto itr = local_shard.find(key);
size_t data_size = value_size - mf_value_size;
FixedFeatureValue* ret = NULL;
if (itr == local_shard.end()) {
// ++missed_keys;
auto& feature_value = local_shard[key];
feature_value.resize(data_size);
float* data_ptr = feature_value.data();
_value_accesor->create(&data_buffer_ptr, 1);
memcpy(data_ptr, data_buffer_ptr, data_size * sizeof(float));
ret = &feature_value;
} else {
ret = itr.value_ptr();
}
int pull_data_idx = keys[i].second;
pull_values[pull_data_idx] = (char*)ret;
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
......
......@@ -379,10 +379,12 @@ void FleetWrapper::PullDenseVarsSync(
for (auto& t : var_names) {
Variable* var = scope.FindVar(t);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (!platform::is_gpu_place(tensor->place())) {
float* w = tensor->data<float>();
paddle::distributed::Region reg(w, tensor->numel());
regions.emplace_back(std::move(reg));
}
}
auto status = worker_ptr_->pull_dense(regions.data(), regions.size(), tid);
status.wait();
}
......@@ -396,10 +398,12 @@ void FleetWrapper::PushDenseParamSync(
Variable* var = scope.FindVar(t);
CHECK(var != nullptr) << "var[" << t << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (!platform::is_gpu_place(tensor->place())) {
float* g = tensor->mutable_data<float>(place);
paddle::distributed::Region reg(g, tensor->numel());
regions.emplace_back(std::move(reg));
}
}
auto push_status =
worker_ptr_->push_dense_param(regions.data(), regions.size(), table_id);
push_status.wait();
......
......@@ -26,7 +26,7 @@ limitations under the License. */
#endif
#ifdef PADDLE_WITH_PSCORE
#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h"
#include "paddle/fluid/distributed/ps/table/depends/feature_value.h"
#endif
#include "paddle/fluid/distributed/ps/thirdparty/round_robin.h"
......@@ -65,10 +65,10 @@ class HeterContext {
device_dim_ptr_;
#endif
#ifdef PADDLE_WITH_PSCORE
std::vector<std::vector<paddle::distributed::VALUE*>> value_ptr_;
std::vector<std::vector<std::vector<paddle::distributed::VALUE*>>>
std::vector<std::vector<paddle::distributed::FixedFeatureValue*>> value_ptr_;
std::vector<std::vector<std::vector<paddle::distributed::FixedFeatureValue*>>>
value_dim_ptr_;
std::vector<std::vector<std::vector<paddle::distributed::VALUE*>>>
std::vector<std::vector<std::vector<paddle::distributed::FixedFeatureValue*>>>
device_dim_ptr_;
#endif
std::vector<std::vector<FeatureValue>> device_values_;
......
......@@ -21,7 +21,7 @@ limitations under the License. */
#include "common_value.h" // NOLINT
#endif
#ifdef PADDLE_WITH_PSCORE
#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h"
#include "paddle/fluid/distributed/ps/table/depends/feature_value.h"
#endif
#include "paddle/phi/core/utils/rw_lock.h"
#include "thrust/pair.h"
......
......@@ -224,10 +224,24 @@ void HashTable<KeyType, ValType>::dump_to_cpu(int devid, cudaStream_t stream) {
}
#endif
#ifdef PADDLE_WITH_PSCORE
auto* downpour_value = (paddle::distributed::VALUE*)(gpu_val.cpu_ptr);
downpour_value->count_ = gpu_val.show;
auto* downpour_value =
(paddle::distributed::FixedFeatureValue*)(gpu_val.cpu_ptr);
int downpour_value_size = downpour_value->size();
if (gpu_val.mf_size > 0 && downpour_value_size == 7) {
downpour_value->resize(gpu_val.mf_size + downpour_value_size);
}
float* cpu_val = downpour_value->data();
// cpu_val[0] = 0;
cpu_val[2] = gpu_val.delta_score;
cpu_val[3] = gpu_val.show;
cpu_val[4] = gpu_val.clk;
cpu_val[5] = gpu_val.lr;
cpu_val[6] = gpu_val.lr_g2sum;
cpu_val[0] = gpu_val.slot;
if (gpu_val.mf_size > 0) {
for (int x = 0; x < gpu_val.mf_size; x++) {
downpour_value->data_[x] = gpu_val.mf[x];
cpu_val[x + 7] = gpu_val.mf[x];
}
}
#endif
}
......
......@@ -286,7 +286,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
auto fleet_ptr = FleetWrapper::GetInstance();
#endif
#ifdef PADDLE_WITH_PSCORE
auto fleet_ptr = paddle::distributed::Communicator::GetInstance();
auto fleet_ptr = paddle::distributed::FleetWrapper::GetInstance();
#endif
#if (defined PADDLE_WITH_PSLIB) && (defined PADDLE_WITH_HETERPS)
......@@ -343,7 +343,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
#ifdef PADDLE_WITH_PSCORE
int32_t cnt = 0;
while (true) {
auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr(
auto tt = fleet_ptr->worker_ptr_->pull_sparse_ptr(
reinterpret_cast<char**>(local_ptr[i].data()), this->table_id_,
local_keys[i].data(), key_size);
bool flag = true;
......@@ -506,7 +506,8 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
#endif
#ifdef PADDLE_WITH_PSCORE
std::vector<std::vector<paddle::distributed::VALUE*>> task_ptrs(device_num);
std::vector<std::vector<paddle::distributed::FixedFeatureValue*>> task_ptrs(
device_num);
#endif
for (size_t j = 0; j < local_keys[i].size(); j++) {
......@@ -569,21 +570,21 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
#ifdef PADDLE_WITH_PSCORE
for (int j = 0; j < len; ++j) {
device_keys[dev][cur + j] = task_keys[dev][j];
distributed::VALUE* ptr_val = task_ptrs[dev][j];
float* ptr_val = task_ptrs[dev][j]->data();
FeatureValue& val = device_vals[dev][cur + j];
bool has_mf = 1;
val.delta_score = 0;
val.show = ptr_val->count_;
val.clk = 0;
val.slot = 0;
val.lr = 0;
val.lr_g2sum = 0;
size_t dim = task_ptrs[dev][j]->size();
val.delta_score = ptr_val[2];
val.show = ptr_val[3];
val.clk = ptr_val[4];
val.slot = ptr_val[0];
val.lr = ptr_val[5];
val.lr_g2sum = ptr_val[6];
val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]);
if (has_mf) {
if (dim > 7) {
val.mf_size = MF_DIM + 1;
for (int x = 0; x < val.mf_size; x++) {
val.mf[x] = ptr_val->data_[x];
val.mf[x] = ptr_val[x + 7];
}
} else {
val.mf_size = 0;
......
......@@ -43,7 +43,7 @@ limitations under the License. */
#include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN
#include "paddle/fluid/platform/place.h"
#ifdef PADDLE_WITH_PSCORE
#include "paddle/fluid/distributed/ps/service/communicator/communicator.h"
#include "paddle/fluid/distributed/ps/wrapper/fleet.h"
#endif
#ifdef PADDLE_WITH_PSLIB
#include "afs_api.h"
......@@ -177,10 +177,8 @@ class PSGPUWrapper {
current_task_ = nullptr;
gpu_free_channel_->Put(current_task_);
table_id_ = 1;
#ifdef PADDLE_WITH_PSLIB
table_id_ = 0;
#endif
// start build cpu&gpu ps thread
start_build_thread();
}
......
......@@ -134,6 +134,9 @@ class Accessor:
if not accessor_proto.HasField("accessor_class"):
# DownpourSparseValueAccessor
if context['use_ps_gpu']:
accessor_proto.accessor_class = "CtrCommonAccessor"
else:
accessor_proto.accessor_class = "SparseAccessor"
if not accessor_proto.HasField("fea_dim"):
if accessor_proto.accessor_class == "SparseAccessor":
......@@ -1010,6 +1013,7 @@ class TheOnePSRuntime(RuntimeBase):
self._init_params(scopes, send_ctx, dense_map)
fleet.util.barrier()
self._pull_all_dense(scopes, send_ctx, dense_map)
fleet.util.barrier()
......
......@@ -40,32 +40,8 @@ class PsProgramBuilder(object):
def _build_trainer_desc(self):
opt_info = self.loss.block.program._fleet_opt
opt_info = {} if opt_info is None else opt_info
opt_info["trainer"] = opt_info.get("trainer", "DistMultiTrainer")
opt_info["device_worker"] = opt_info.get("device_worker",
"DownpourLite")
pid = str(id(self.cloned_main))
program_configs = {
pid: {
'pull_dense': [],
'push_dense': [],
'pull_sparse': [],
'push_sparse': []
}
}
dense_table_config = {}
send_ctx = get_the_one_send_context(self.attrs)
recv_ctx = get_the_one_recv_context(self.attrs)
for name, ctx in send_ctx.items():
if ctx.program_id() != id(self.loss.block.program):
continue
if ctx.is_sparse():
continue
if not ctx.is_tensor_table():
program_configs[pid]['pull_dense'].append(ctx.table_id())
program_configs[pid]['push_dense'].append(ctx.table_id())
dense_table_config[ctx.table_id()] = recv_ctx[ctx.table_id()]
opt_info['program_configs'] = program_configs
opt_info['dense_table_config'] = dense_table_config
opt_info["trainer"] = opt_info.get("trainer", "MultiTrainer")
opt_info["device_worker"] = opt_info.get("device_worker", "Hogwild")
self.cloned_main._fleet_opt = opt_info
def _optimize_programs(self):
......@@ -188,6 +164,37 @@ class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder):
logger.info("start building cpu-async-ps program")
super(CpuAsyncPsProgramBuilder, self).__init__(pass_ctx)
def _build_trainer_desc(self):
opt_info = self.loss.block.program._fleet_opt
opt_info = {} if opt_info is None else opt_info
opt_info["trainer"] = opt_info.get("trainer", "DistMultiTrainer")
opt_info["device_worker"] = opt_info.get("device_worker",
"DownpourLite")
pid = str(id(self.cloned_main))
program_configs = {
pid: {
'pull_dense': [],
'push_dense': [],
'pull_sparse': [],
'push_sparse': []
}
}
dense_table_config = {}
send_ctx = get_the_one_send_context(self.attrs)
recv_ctx = get_the_one_recv_context(self.attrs)
for name, ctx in send_ctx.items():
if ctx.program_id() != id(self.loss.block.program):
continue
if ctx.is_sparse():
continue
if not ctx.is_tensor_table():
program_configs[pid]['pull_dense'].append(ctx.table_id())
program_configs[pid]['push_dense'].append(ctx.table_id())
dense_table_config[ctx.table_id()] = recv_ctx[ctx.table_id()]
opt_info['program_configs'] = program_configs
opt_info['dense_table_config'] = dense_table_config
self.cloned_main._fleet_opt = opt_info
class GpuPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册