未验证 提交 7a07c4a5 编写于 作者: Z zhaocaibei123 提交者: GitHub

Unittest recover (#41431)

* update name

* update name

* fix test

* fix fleet bind

* update name

* update name

* fix test

* fix gpups wrapper

* remove Push/Pull/Load/Save with context in client and wrapper base class

* fix

* fix

* remove some interface

* fix

* remove

* code style

* recover

* fix

* remove code unused

* remove some unused table & accessor & CommonDenseTable => MemoryDenseTable

* fix

* fix

* fix

* recover

* remove unused code

* recover unittest

* fix

* remove

* fix

* remove code unuseful

* remove

* fix

* recover

* remove
Co-authored-by: Nesythan <esythan@126.com>
上级 ff2fba39
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <sys/time.h>
#include <iostream>
#include <ostream>
#include <string>
#include <thread> // NOLINT
#include <vector>
#include <ThreadPool.h>
#include "glog/logging.h"
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/string/split.h"
#include "paddle/phi/core/utils/dim.h"
constexpr int FG = 256 * 1024 * 1024;
constexpr int Q_SIZE = 10000;
constexpr int BUCKET = 10;
constexpr char XEOF[] = "EOF";
inline double GetCurrentUS() {
struct timeval time;
gettimeofday(&time, NULL);
return 1e+6 * time.tv_sec + time.tv_usec;
}
namespace paddle {
namespace distributed {
class ShardingMerge {
public:
ShardingMerge() {}
~ShardingMerge() {}
void Merge(const std::vector<std::string> &inputs,
const std::vector<int64_t> &feasigns, const std::string &output,
const int embedding_dim) {
pool_.reset(new ::ThreadPool(inputs.size()));
std::vector<std::future<int>> tasks(inputs.size());
std::vector<std::vector<int64_t>> rows;
rows.resize(inputs.size());
auto begin = GetCurrentUS();
for (int x = 0; x < inputs.size(); ++x) {
tasks[x] = pool_->enqueue([this, x, &rows, &inputs, &feasigns]() -> int {
DeserializeRowsFromFile(inputs[x], feasigns[x], &rows[x]);
return 0;
});
}
for (size_t x = 0; x < tasks.size(); ++x) {
tasks[x].wait();
}
int64_t total_rows = 0;
for (auto x = 0; x < rows.size(); x++) {
total_rows += rows[x].size();
}
auto end = GetCurrentUS();
VLOG(0) << "got " << total_rows
<< " feasigin ids from sparse embedding using " << end - begin;
std::vector<int64_t> total_dims = {total_rows,
static_cast<int64_t>(embedding_dim)};
std::vector<std::vector<int>> batch_buckets;
batch_buckets.resize(inputs.size());
for (int x = 0; x < rows.size(); ++x) {
batch_buckets[x] = bucket(rows[x].size(), BUCKET);
}
std::ofstream out(output, std::ios::binary);
begin = GetCurrentUS();
SerializeRowsToStream(out, rows, batch_buckets, total_rows);
end = GetCurrentUS();
VLOG(0) << "write rows to oostrream using " << end - begin;
begin = GetCurrentUS();
SerializePreTensorToStream(out, total_dims);
end = GetCurrentUS();
VLOG(0) << "write pretensor to oostrream using " << end - begin;
begin = GetCurrentUS();
SerializeValueToStream(out, inputs, batch_buckets, embedding_dim);
end = GetCurrentUS();
VLOG(0) << "write values to oostrream using " << end - begin;
}
private:
void SerializeRowsToStream(std::ostream &os,
const std::vector<std::vector<int64_t>> &rows,
const std::vector<std::vector<int>> &batch_buckets,
int64_t total_rows) {
{ // the 1st field, uint32_t version
constexpr uint32_t version = 0;
os.write(reinterpret_cast<const char *>(&version), sizeof(version));
}
{
// the 2st field, rows information
os.write(reinterpret_cast<const char *>(&total_rows), sizeof(total_rows));
for (int b = 0; b < BUCKET; ++b) {
for (int x = 0; x < batch_buckets.size(); ++x) {
auto begin = batch_buckets[x][b];
auto end = batch_buckets[x][b + 1];
if (end - begin == 0) continue;
os.write(reinterpret_cast<const char *>(rows[x].data() + begin),
sizeof(int64_t) * (end - begin));
}
}
// the 3st field, the height of SelectedRows
int64_t height = total_rows;
os.write(reinterpret_cast<const char *>(&height), sizeof(height));
}
}
void SerializePreTensorToStream(std::ostream &os,
const std::vector<int64_t> &dims) {
{ // the 1st field, uint32_t version
constexpr uint32_t version = 0;
os.write(reinterpret_cast<const char *>(&version), sizeof(version));
}
{ // the 2nd field, tensor description
// int32_t size
framework::proto::VarType::TensorDesc desc;
desc.set_data_type(framework::proto::VarType::FP32);
auto *pb_dims = desc.mutable_dims();
pb_dims->Resize(static_cast<int>(dims.size()), 0);
std::copy(dims.begin(), dims.end(), pb_dims->begin());
int32_t size = desc.ByteSize();
os.write(reinterpret_cast<const char *>(&size), sizeof(size));
auto out = desc.SerializeAsString();
os.write(out.data(), size);
}
}
void SerializeValueToVec(std::ifstream &in, const int batch,
const int embedding_dim, std::vector<float> *out) {
auto queue =
std::make_shared<framework::BlockingQueue<std::vector<std::string>>>();
auto read = [batch, &in, &queue]() {
std::string line;
std::vector<std::string> columns;
std::vector<std::string> values_str;
int count = 0;
while (std::getline(in, line)) {
++count;
columns = string::Split(line, '\t');
if (columns.size() != 5) {
VLOG(0) << "unexpected line: " << line << ", skip it";
continue;
}
values_str = string::Split(columns[4], ',');
queue->Push(values_str);
if (count >= batch) {
break;
}
}
queue->Push({});
};
auto write = [embedding_dim, &out, &queue]() {
std::vector<std::string> values_str;
std::string line;
while (true) {
queue->Pop(&values_str);
if (values_str.size() == 0) {
break;
}
for (int x = 0; x < embedding_dim; ++x) {
float v = 0.0;
try {
v = std::stof(values_str[x]);
} catch (std::invalid_argument &e) {
VLOG(0) << " get unexpected line: " << line;
} catch (std::out_of_range &e) {
VLOG(0) << " get unexpected line: " << line;
}
out->push_back(v);
}
}
};
std::thread p_read(read);
std::thread p_write(write);
p_read.join();
p_write.join();
}
void SerializeVecToStream(std::ostream &out,
const std::vector<float> &value) {
out.write(reinterpret_cast<const char *>(value.data()),
static_cast<std::streamsize>(sizeof(float) * value.size()));
}
void SerializeValueToStream(
std::ostream &out, const std::vector<std::string> &ins,
const std::vector<std::vector<int>> &batch_buckets,
const int embedding_dim) {
std::vector<std::shared_ptr<std::ifstream>> in_streams;
for (int x = 0; x < ins.size(); ++x) {
in_streams.emplace_back(std::make_shared<std::ifstream>(ins[x]));
}
std::vector<std::future<int>> tasks(ins.size());
for (int b = 0; b < BUCKET; ++b) {
std::vector<std::vector<float>> values;
values.resize(tasks.size());
auto begin = GetCurrentUS();
for (int x = 0; x < tasks.size(); ++x) {
auto batch = batch_buckets[x][b + 1] - batch_buckets[x][b];
values[x].clear();
values[x].reserve(batch * embedding_dim);
}
for (int x = 0; x < tasks.size(); ++x) {
tasks[x] =
pool_->enqueue([this, b, x, &out, &in_streams, &batch_buckets,
&values, embedding_dim]() -> int {
auto batch = batch_buckets[x][b + 1] - batch_buckets[x][b];
if (batch == 0) return 0;
SerializeValueToVec(*(in_streams[x].get()), batch, embedding_dim,
&values[x]);
return 0;
});
}
for (size_t x = 0; x < tasks.size(); ++x) {
tasks[x].wait();
}
auto end = GetCurrentUS();
auto begin1 = GetCurrentUS();
for (size_t x = 0; x < tasks.size(); ++x) {
SerializeVecToStream(out, values[x]);
}
auto end1 = GetCurrentUS();
VLOG(0) << "serialize buckets " << b << " read using " << end - begin
<< ", to oostream using " << end1 - begin1;
}
}
void DeserializeRowsFromFile(const std::string &input_file,
const int64_t feasigns,
std::vector<int64_t> *rows) {
std::string line;
std::vector<std::string> columns;
std::ifstream file(input_file);
rows->reserve(feasigns);
while (std::getline(file, line)) {
columns = string::Split(line, '\t');
if (columns.size() != 5) {
VLOG(0) << "unexpected line: " << line << ", skip it";
continue;
}
rows->push_back(std::stoull(columns[0]));
}
VLOG(0) << "parse " << rows->size() << " embedding rows from "
<< input_file;
}
private:
std::unique_ptr<::ThreadPool> pool_;
};
} // namespace distributed
} // namespace paddle
# 目录说明
> 成型之后,上级目录的 table、thirdparty、table、service 目录可以删除,communicator_common.h 、fleet.cc、fleet.h 删除
Table: for param storage and update
-----MemorySparseTable: table for sparse param, used in cpu async mode
-----MemoryDenseTable: table for dense param, used in cpu async/geo mode
-----MemorySparseGeoTable: table for sparse param, used in cpu async mode
-----CommonGraphTable: table used for graph learning
-----BarrierTable: table for barrier function, used in cpu sync mode
-----TensorTable: table which run program, used for learning rate decay only
ValueAccessor: for pull param and push gradient
-----CtrCommonAccessor: pull/push value with show/click, float type
-----DownpourCtrDoubleAccessor: same as CtrCommonAccessor, other than show/click with double type
-----SparseAccessor: used for common embedding, pull value without show/click, push value with show/click
-----CommMergeAccessor: used for dense table only, for get param dim
PsService(proto): for server to handle request
-----PsBaseService
----------BrpcPsService: for cpu dnn training task
----------GraphBrpcService: for graph learning
-----HeterService: for dnn training task with heterogeneous computing resources
PSServer: recv request from trainer and handle it by service
-----BrpcPsServer: for cpu dnn training task
-----GraphBrpcServer: for graph learning
-----PsLocalServer: for GpuPS
HeterServer: for HeterPS
PSClient: pull param and push gradient for trainer
-----BrpcPsClient: for cpu dnn training task
----------GraphBrpcClient: for graph learning
-----PsLocalClient: for GpuPS
HeterClient: for HeterPS
PSCore: Wrapper for InitServer
GraphPyService: for graph learning
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <ThreadPool.h>
#include <assert.h>
#include <pthread.h>
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "Eigen/Dense"
#include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/common_table.h"
#include "paddle/fluid/distributed/ps/table/depends/initializers.h"
#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h"
#include "paddle/fluid/distributed/ps/table/depends/sparse.h"
#include "paddle/fluid/string/string_helper.h"
#include "paddle/phi/core/utils/rw_lock.h"
#define PSERVER_SAVE_SUFFIX ".shard"
namespace paddle {
namespace distributed {
class SparseOptimizer;
enum SaveMode { all, base, delta };
struct Meta {
std::string param;
int shard_id;
std::vector<std::string> names;
std::vector<int> dims;
uint64_t count;
std::unordered_map<std::string, int> dims_map;
explicit Meta(const std::string& metapath) {
std::ifstream file(metapath);
std::string line;
int num_lines = 0;
while (std::getline(file, line)) {
if (StartWith(line, "#")) {
continue;
}
auto pairs = paddle::string::split_string<std::string>(line, "=");
PADDLE_ENFORCE_EQ(
pairs.size(), 2,
paddle::platform::errors::InvalidArgument(
"info in %s except k=v, but got %s", metapath, line));
if (pairs[0] == "param") {
param = pairs[1];
}
if (pairs[0] == "shard_id") {
shard_id = std::stoi(pairs[1]);
}
if (pairs[0] == "row_names") {
names = paddle::string::split_string<std::string>(pairs[1], ",");
}
if (pairs[0] == "row_dims") {
auto dims_strs =
paddle::string::split_string<std::string>(pairs[1], ",");
for (auto& str : dims_strs) {
dims.push_back(std::stoi(str));
}
}
if (pairs[0] == "count") {
count = std::stoull(pairs[1]);
}
}
for (int x = 0; x < names.size(); ++x) {
dims_map[names[x]] = dims[x];
}
}
Meta(std::string param, int shard_id, std::vector<std::string> row_names,
std::vector<int> dims, uint64_t count) {
this->param = param;
this->shard_id = shard_id;
this->names = row_names;
this->dims = dims;
this->count = count;
}
std::string ToString() {
std::stringstream ss;
ss << "param=" << param << "\n";
ss << "shard_id=" << shard_id << "\n";
ss << "row_names=" << paddle::string::join_strings(names, ',') << "\n";
ss << "row_dims=" << paddle::string::join_strings(dims, ',') << "\n";
ss << "count=" << count << "\n";
return ss.str();
}
};
class CommonSparseTable : public Table {
public:
CommonSparseTable() { rwlock_.reset(new phi::RWLock); }
virtual ~CommonSparseTable() {}
// unused method begin
// virtual int32_t PullDense(float* pull_values, size_t num) { return 0; }
// virtual int32_t PushDenseParam(const float* values, size_t num) { return
// 0; }
// virtual int32_t PushDense(const float* values, size_t num) { return 0; }
// unused method end
virtual int32_t Pull(TableContext& context);
virtual int32_t Push(TableContext& context);
virtual int32_t Initialize();
virtual int32_t InitializeShard() { return 0; }
virtual int32_t InitializeValue();
virtual int32_t InitializeOptimizer();
virtual int32_t InitializeRecorder();
virtual int32_t Load(const std::string& path, const std::string& param);
virtual int32_t Save(const std::string& path, const std::string& param);
void SaveMetaToText(std::ostream* os, const CommonAccessorParameter& common,
const size_t shard_idx, const int64_t total);
int64_t SaveValueToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
std::shared_ptr<::ThreadPool> pool, const int mode,
int shard_id);
virtual void ProcessALine(const std::vector<std::string>& columns,
const Meta& meta, const int64_t id,
std::vector<std::vector<float>>* values);
virtual int64_t LoadFromText(
const std::string& valuepath, const std::string& metapath,
const int pserver_id, const int pserver_num, const int local_shard_num,
std::vector<std::shared_ptr<ValueBlock>>* blocks);
virtual std::pair<int64_t, int64_t> PrintTableStat();
virtual int32_t PullSparse(float* values, const PullSparseValue& pull_value);
virtual int32_t PullSparsePtr(char** pull_values, const uint64_t* keys,
size_t num);
virtual int32_t PushSparse(const uint64_t* keys, const float* values,
size_t num);
virtual int32_t PushSparse(const uint64_t* keys, const float** values,
size_t num);
// only for sparse geo table
virtual int32_t PushSparseParam(const uint64_t* keys, const float* values,
size_t num);
virtual int32_t SetGlobalLR(float* lr);
virtual int32_t Pour();
virtual int32_t Flush();
virtual int32_t Shrink(const std::string& param);
virtual void Clear();
virtual void* GetShard(size_t shard_idx) { return 0; }
protected:
virtual int32_t _PushSparse(const uint64_t* keys, const float* values,
size_t num);
virtual int32_t _PushSparse(const uint64_t* keys, const float** values,
size_t num);
protected:
const int task_pool_size_ = 11;
std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
bool sync = false;
int param_dim_ = 0;
int param_offset_ = 0;
std::unordered_map<std::string, int> value_idx_;
std::vector<std::string> value_names_;
std::vector<int> value_dims_;
std::vector<int> value_offsets_;
std::vector<std::string> initializer_attrs_;
std::shared_ptr<SparseOptimizer> optimizer_;
std::vector<std::shared_ptr<ValueBlock>> shard_values_;
std::unordered_map<uint64_t, ReservoirValue<float>> pull_reservoir_;
std::unique_ptr<phi::RWLock> rwlock_{nullptr};
};
} // namespace distributed
} // namespace paddle
......@@ -186,6 +186,7 @@ class CtrCommonAccessor : public ValueAccessor {
// CtrCommonFeatureValue common_feature_value;
float _show_click_decay_rate;
int32_t _ssd_unseenday_threshold;
bool _show_scale = false;
public: // TODO(zhaocaibei123): it should be private, but we make it public
// for unit test
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <ThreadPool.h>
#include <functional>
#include <future> // NOLINT
#include <memory>
#include <string>
#include <thread> // NOLINT
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "gflags/gflags.h"
#include "butil/object_pool.h"
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/ps/table/depends/initializers.h"
#include "paddle/fluid/distributed/ps/thirdparty/round_robin.h"
#include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/selected_rows_utils.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/string_helper.h"
#include "paddle/phi/backends/dynload/port.h"
#include "paddle/phi/core/utils/rw_lock.h"
namespace paddle {
namespace distributed {
enum Mode { training, infer };
static const int SPARSE_SHARD_BUCKET_NUM_BITS = 6;
static const size_t SPARSE_SHARD_BUCKET_NUM = (size_t)1
<< SPARSE_SHARD_BUCKET_NUM_BITS;
struct VALUE {
explicit VALUE(size_t length)
: length_(length),
count_(0),
unseen_days_(0),
need_save_(false),
is_entry_(false) {
data_.resize(length);
memset(data_.data(), 0, sizeof(float) * length);
}
size_t length_;
std::vector<float> data_;
int count_;
int unseen_days_; // use to check knock-out
bool need_save_; // whether need to save
bool is_entry_; // whether knock-in
};
inline bool count_entry(VALUE *value, int threshold) {
return value->count_ >= threshold;
}
inline bool probility_entry(VALUE *value, float threshold) {
UniformInitializer uniform = UniformInitializer({"uniform", "0", "0", "1"});
return uniform.GetValue() >= threshold;
}
class ValueBlock {
public:
typedef typename robin_hood::unordered_map<uint64_t, VALUE *> map_type;
explicit ValueBlock(const std::vector<std::string> &value_names,
const std::vector<int> &value_dims,
const std::vector<int> &value_offsets,
const std::unordered_map<std::string, int> &value_idx,
const std::vector<std::string> &init_attrs,
const std::string &entry_attr)
: value_names_(value_names),
value_dims_(value_dims),
value_offsets_(value_offsets),
value_idx_(value_idx) {
for (size_t x = 0; x < value_dims.size(); ++x) {
value_length_ += value_dims[x];
}
// for Entry
{
auto slices = string::split_string<std::string>(entry_attr, ":");
if (slices[0] == "none") {
entry_func_ = std::bind(&count_entry, std::placeholders::_1, 0);
threshold_ = 0;
} else if (slices[0] == "count_filter_entry") {
threshold_ = std::stoi(slices[1]);
entry_func_ =
std::bind(&count_entry, std::placeholders::_1, threshold_);
} else if (slices[0] == "probability_entry") {
threshold_ = std::stof(slices[1]);
entry_func_ =
std::bind(&probility_entry, std::placeholders::_1, threshold_);
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Not supported Entry Type : %s, Only support [CountFilterEntry, "
"ProbabilityEntry]",
slices[0]));
}
}
// for Initializer
{
for (auto &attr : init_attrs) {
auto slices = string::split_string<std::string>(attr, "&");
if (slices[0] == "gaussian_random") {
initializers_.emplace_back(
std::make_shared<GaussianInitializer>(slices));
} else if (slices[0] == "fill_constant") {
initializers_.emplace_back(
std::make_shared<FillConstantInitializer>(slices));
} else if (slices[0] == "uniform_random") {
initializers_.emplace_back(
std::make_shared<UniformInitializer>(slices));
} else if (slices[0] == "truncated_gaussian_random") {
initializers_.emplace_back(
std::make_shared<TruncatedGaussianInitializer>(slices));
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"%s can not be supported", attr));
}
}
}
}
~ValueBlock() {}
std::vector<float *> Get(const uint64_t &id,
const std::vector<std::string> &value_names,
const std::vector<int> &value_dims) {
auto pts = std::vector<float *>();
pts.reserve(value_names.size());
auto values = GetValue(id);
for (int i = 0; i < static_cast<int>(value_names.size()); i++) {
PADDLE_ENFORCE_EQ(
value_dims[i], value_dims_[i],
platform::errors::InvalidArgument("value dims is not match"));
pts.push_back(values->data_.data() +
value_offsets_.at(value_idx_.at(value_names[i])));
}
return pts;
}
// pull
float *Init(const uint64_t &id, const bool with_update = true,
const int counter = 1) {
size_t hash = _hasher(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];
auto res = table.find(id);
VALUE *value = nullptr;
if (res == table.end()) {
value = butil::get_object<VALUE>(value_length_);
table[id] = value;
} else {
value = res->second;
}
if (with_update) {
AttrUpdate(value, counter);
}
return value->data_.data();
}
VALUE *InitGet(const uint64_t &id, const bool with_update = true,
const int counter = 1) {
size_t hash = _hasher(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];
auto res = table.find(id);
VALUE *value = nullptr;
if (res == table.end()) {
value = butil::get_object<VALUE>(value_length_);
// value = _alloc.acquire(value_length_);
table[id] = value;
} else {
value = (VALUE *)(void *)(res->second); // NOLINT
}
return value;
}
void AttrUpdate(VALUE *value, const int counter) {
// update state
value->unseen_days_ = 0;
value->count_ += counter;
if (!value->is_entry_) {
value->is_entry_ = entry_func_(value);
if (value->is_entry_) {
// initialize
for (size_t x = 0; x < value_names_.size(); ++x) {
initializers_[x]->GetValue(value->data_.data() + value_offsets_[x],
value_dims_[x]);
}
value->need_save_ = true;
}
} else {
value->need_save_ = true;
}
return;
}
// dont jude if (has(id))
float *Get(const uint64_t &id) {
size_t hash = _hasher(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];
// auto &value = table.at(id);
// return value->data_.data();
auto res = table.find(id);
VALUE *value = res->second;
return value->data_.data();
}
// for load, to reset count, unseen_days
VALUE *GetValue(const uint64_t &id) {
size_t hash = _hasher(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];
auto res = table.find(id);
return res->second;
}
bool GetEntry(const uint64_t &id) {
auto value = GetValue(id);
return value->is_entry_;
}
void SetEntry(const uint64_t &id, const bool state) {
auto value = GetValue(id);
value->is_entry_ = state;
}
void erase(uint64_t feasign) {
size_t hash = _hasher(feasign);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];
auto iter = table.find(feasign);
if (iter != table.end()) {
butil::return_object(iter->second);
iter = table.erase(iter);
}
}
void Shrink(const int threshold) {
for (auto &table : values_) {
for (auto iter = table.begin(); iter != table.end();) {
// VALUE* value = (VALUE*)(void*)(iter->second);
VALUE *value = iter->second;
value->unseen_days_++;
if (value->unseen_days_ >= threshold) {
butil::return_object(iter->second);
// _alloc.release(iter->second);
// _alloc.release(value);
iter = table.erase(iter);
} else {
++iter;
}
}
}
return;
}
float GetThreshold() { return threshold_; }
size_t compute_bucket(size_t hash) {
if (SPARSE_SHARD_BUCKET_NUM == 1) {
return 0;
} else {
return hash >> (sizeof(size_t) * 8 - SPARSE_SHARD_BUCKET_NUM_BITS);
}
}
map_type::iterator end() {
return values_[SPARSE_SHARD_BUCKET_NUM - 1].end();
}
map_type::iterator Find(uint64_t id) {
size_t hash = _hasher(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];
auto got = table.find(id);
if (got == table.end()) {
return end();
} else {
return got;
}
}
private:
bool Has(const uint64_t id) {
size_t hash = _hasher(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];
auto got = table.find(id);
if (got == table.end()) {
return false;
} else {
return true;
}
}
public:
map_type values_[SPARSE_SHARD_BUCKET_NUM];
size_t value_length_ = 0;
std::hash<uint64_t> _hasher;
private:
const std::vector<std::string> &value_names_;
const std::vector<int> &value_dims_;
const std::vector<int> &value_offsets_;
const std::unordered_map<std::string, int> &value_idx_;
std::function<bool(VALUE *)> entry_func_;
std::vector<std::shared_ptr<Initializer>> initializers_;
float threshold_;
};
} // namespace distributed
} // namespace paddle
......@@ -117,6 +117,9 @@ void DistMultiTrainer::InitOtherEnv(const ProgramDesc &main_program) {
InitDumpEnv();
}
pull_dense_worker_->SetRootScope(root_scope_);
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_CUDA)
pull_dense_worker_->CreatePinVar();
#endif
pull_dense_worker_->Start();
#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_PSCORE)
for (int i = 0; i < thread_num_; ++i) {
......
......@@ -28,7 +28,6 @@ limitations under the License. */
#include <string>
#include <vector>
#include "paddle/fluid/distributed/common/sparse_sharding_merge.h"
#include "paddle/fluid/distributed/index_dataset/index_sampler.h"
#include "paddle/fluid/distributed/index_dataset/index_wrapper.h"
#include "paddle/fluid/distributed/ps/service/communicator/communicator.h"
......@@ -49,7 +48,6 @@ using paddle::distributed::GraphNode;
using paddle::distributed::GraphPyServer;
using paddle::distributed::GraphPyClient;
using paddle::distributed::FeatureNode;
using paddle::distributed::ShardingMerge;
namespace paddle {
namespace pybind {
......@@ -93,12 +91,6 @@ void BindPSHost(py::module* m) {
.def("to_string", &distributed::PSHost::ToString);
}
void BindSparseShardingTools(py::module* m) {
py::class_<ShardingMerge>(*m, "ShardingMerge")
.def(py::init<>())
.def("merge", &ShardingMerge::Merge);
}
void BindCommunicatorContext(py::module* m) {
py::class_<CommContext>(*m, "CommContext")
.def(
......
......@@ -36,6 +36,5 @@ void BindIndexNode(py::module* m);
void BindTreeIndex(py::module* m);
void BindIndexWrapper(py::module* m);
void BindIndexSampler(py::module* m);
void BindSparseShardingTools(py::module* m);
} // namespace pybind
} // namespace paddle
......@@ -4544,7 +4544,6 @@ All parameter, weight, gradient are variables in Paddle.
BindTreeIndex(&m);
BindIndexWrapper(&m);
BindIndexSampler(&m);
BindSparseShardingTools(&m);
#endif
}
} // namespace pybind
......
......@@ -58,7 +58,7 @@ DATA_NORM_GRAD_NAME = [x + "@GRAD" for x in DATA_NORM_NAME]
def logger_config(log_path, logging_name):
logger = logging.getLogger(logging_name)
logger.setLevel(level=logging.DEBUG)
logger.setLevel(level=logging.WARNING)
handler = logging.FileHandler(
log_path, mode='a', encoding='UTF-8', delay=True)
handler.setLevel(logging.INFO)
......
......@@ -51,9 +51,8 @@ class TestDistMnistAsyncInMemoryDataset2x2(TestFleetBase):
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train(self):
# self.check_with_place(
# "dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
print('recover later')
self.check_with_place(
"dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
class TestDistMnistAsync2x2(TestFleetBase):
......@@ -86,9 +85,8 @@ class TestDistMnistAsync2x2(TestFleetBase):
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train(self):
# self.check_with_place(
# "dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
print('recover later')
self.check_with_place(
"dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
class TestDistCtrHalfAsync2x2(TestFleetBase):
......@@ -124,9 +122,8 @@ class TestDistCtrHalfAsync2x2(TestFleetBase):
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train(self):
# self.check_with_place(
# "dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
print('recover later')
self.check_with_place(
"dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
if __name__ == "__main__":
......
......@@ -52,9 +52,8 @@ class TestDistMnistSync2x2(TestFleetBase):
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train(self):
# self.check_with_place(
# "dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
print('recover later')
self.check_with_place(
"dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
# @unittest.skip(reason="Skip unstable ut, reader need to be rewrite")
......@@ -92,9 +91,8 @@ class TestDistMnistAsyncDataset2x2(TestFleetBase):
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train(self):
# self.check_with_place(
# "dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
print('recover later')
self.check_with_place(
"dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册