未验证 提交 fbbc3394 编写于 作者: T Thunderbrook 提交者: GitHub

[pslib] pslib with cmake (#32800)

* pslib with cmake

* heter util

* vlog

* heter server test

* add dtor

* cmake
上级 5aa8faa2
...@@ -261,6 +261,14 @@ if(WITH_PSLIB) ...@@ -261,6 +261,14 @@ if(WITH_PSLIB)
if(WITH_PSLIB_BRPC) if(WITH_PSLIB_BRPC)
include(external/pslib_brpc) # download, build, install pslib_brpc include(external/pslib_brpc) # download, build, install pslib_brpc
list(APPEND third_party_deps extern_pslib_brpc) list(APPEND third_party_deps extern_pslib_brpc)
else()
include(external/snappy)
list(APPEND third_party_deps extern_snappy)
include(external/leveldb)
list(APPEND third_party_deps extern_leveldb)
include(external/brpc)
list(APPEND third_party_deps extern_brpc)
endif() endif()
endif(WITH_PSLIB) endif(WITH_PSLIB)
......
...@@ -100,8 +100,16 @@ if (WITH_GPU) ...@@ -100,8 +100,16 @@ if (WITH_GPU)
endif() endif()
cc_test(var_type_traits_test SRCS var_type_traits_test.cc DEPS var_type_traits) cc_test(var_type_traits_test SRCS var_type_traits_test.cc DEPS var_type_traits)
set(BRPC_DEPS "")
if(WITH_PSLIB OR WITH_PSCORE)
set(BRPC_DEPS brpc)
if(WITH_PSLIB_BRPC)
set(BRPC_DEPS pslib_brpc)
endif()
endif()
cc_library(scope SRCS scope.cc DEPS glog threadpool xxhash var_type_traits) cc_library(scope SRCS scope.cc DEPS glog threadpool xxhash var_type_traits)
cc_library(device_worker SRCS device_worker.cc DEPS trainer_desc_proto lod_tensor scope) cc_library(device_worker SRCS device_worker.cc DEPS trainer_desc_proto lod_tensor scope ${BRPC_DEPS})
cc_test(device_worker_test SRCS device_worker_test.cc DEPS device_worker) cc_test(device_worker_test SRCS device_worker_test.cc DEPS device_worker)
cc_library(scope_pool SRCS scope_pool.cc DEPS scope) cc_library(scope_pool SRCS scope_pool.cc DEPS scope)
...@@ -243,9 +251,16 @@ if(WITH_DISTRIBUTE) ...@@ -243,9 +251,16 @@ if(WITH_DISTRIBUTE)
fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer
lod_rank_table feed_fetch_method collective_helper ${GLOB_DISTRIBUTE_DEPS} lod_rank_table feed_fetch_method collective_helper ${GLOB_DISTRIBUTE_DEPS}
graph_to_program_pass variable_helper data_feed_proto timer monitor graph_to_program_pass variable_helper data_feed_proto timer monitor
heter_service_proto pslib_brpc) heter_service_proto ${BRPC_DEP})
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(DISTRIBUTE_COMPILE_FLAGS
"${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
endif()
set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(device_worker.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(hetercpu_worker.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(heterxpu_trainer.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
elseif(WITH_PSCORE) elseif(WITH_PSCORE)
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
...@@ -280,7 +295,7 @@ elseif(WITH_PSLIB) ...@@ -280,7 +295,7 @@ elseif(WITH_PSLIB)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor pslib_brpc ) graph_to_program_pass variable_helper timer monitor ${BRPC_DEP})
else() else()
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
......
...@@ -29,7 +29,7 @@ limitations under the License. */ ...@@ -29,7 +29,7 @@ limitations under the License. */
#include "paddle/fluid/framework/data_feed.h" #include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/heter_service.h" #include "paddle/fluid/framework/heter_util.h"
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
......
...@@ -20,14 +20,12 @@ limitations under the License. */ ...@@ -20,14 +20,12 @@ limitations under the License. */
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/garbage_collector.h" #include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/trainer.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
namespace paddle { namespace paddle {
......
...@@ -22,8 +22,10 @@ ...@@ -22,8 +22,10 @@
#include <vector> #include <vector>
#include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/op_proto_maker.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/platform/macros.h" #include "paddle/fluid/platform/macros.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
......
if(WITH_PSLIB) if(WITH_PSLIB)
cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope pslib_brpc pslib) if(WITH_PSLIB_BRPC)
set(BRPC_DEPS pslib_brpc)
else()
set(BRPC_DEPS brpc)
endif(WITH_PSLIB_BRPC)
cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope ${BRPC_DEPS} pslib)
else() else()
cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope) cc_library(fleet_wrapper SRCS fleet_wrapper.cc DEPS framework_proto variable_helper scope)
endif(WITH_PSLIB) endif(WITH_PSLIB)
...@@ -7,11 +12,11 @@ endif(WITH_PSLIB) ...@@ -7,11 +12,11 @@ endif(WITH_PSLIB)
if(WITH_HETERPS) if(WITH_HETERPS)
if(WITH_NCCL) if(WITH_NCCL)
nv_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc nv_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps) DEPS heter_ps ${BRPC_DEPS})
add_subdirectory(heter_ps) add_subdirectory(heter_ps)
elseif(WITH_RCCL) elseif(WITH_RCCL)
hip_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc hip_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps) DEPS heter_ps ${BRPC_DEPS})
add_subdirectory(heter_ps) add_subdirectory(heter_ps)
endif(WITH_NCCL) endif(WITH_NCCL)
else() else()
...@@ -39,7 +44,17 @@ else() ...@@ -39,7 +44,17 @@ else()
cc_library(gloo_wrapper SRCS gloo_wrapper.cc DEPS framework_proto variable_helper scope) cc_library(gloo_wrapper SRCS gloo_wrapper.cc DEPS framework_proto variable_helper scope)
endif(WITH_GLOO) endif(WITH_GLOO)
cc_library(heter_wrapper SRCS heter_wrapper.cc DEPS framework_proto device_context heter_service_proto) if(WITH_PSLIB)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(DISTRIBUTE_COMPILE_FLAGS
"${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
endif()
set_source_files_properties(heter_wrapper.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
endif()
cc_library(heter_wrapper SRCS heter_wrapper.cc DEPS framework_proto
device_context heter_service_proto ${BRPC_DEPS})
cc_test(test_fleet_cc SRCS test_fleet.cc DEPS fleet_wrapper gloo_wrapper fs shell) cc_test(test_fleet_cc SRCS test_fleet.cc DEPS fleet_wrapper gloo_wrapper fs shell)
......
...@@ -28,7 +28,7 @@ limitations under the License. */ ...@@ -28,7 +28,7 @@ limitations under the License. */
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/fluid/framework/heter_service.h" #include "paddle/fluid/framework/heter_util.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/tensor.h"
......
...@@ -17,16 +17,16 @@ limitations under the License. */ ...@@ -17,16 +17,16 @@ limitations under the License. */
#include <limits> #include <limits>
#include <memory> #include <memory>
#include <vector> #include <vector>
#ifdef PADDLE_WTIH_PSLIB #ifdef PADDLE_WITH_PSLIB
#include "common_value.h" // NOLINT #include "common_value.h" // NOLINT
#endif #endif
#ifdef PADDLE_WITH_PSCORE #ifdef PADDLE_WITH_PSCORE
#include "paddle/fluid/distributed/table/depends/large_scale_kv.h"
#endif #endif
#include "thrust/pair.h" #include "thrust/pair.h"
//#include "cudf/concurrent_unordered_map.cuh.h" //#include "cudf/concurrent_unordered_map.cuh.h"
#include "paddle/fluid/framework/fleet/heter_ps/cudf/concurrent_unordered_map.cuh.h" #include "paddle/fluid/framework/fleet/heter_ps/cudf/concurrent_unordered_map.cuh.h"
#ifdef PADDLE_WITH_HETERPS #ifdef PADDLE_WITH_HETERPS
#include "paddle/fluid/distributed/table/depends/large_scale_kv.h"
#include "paddle/fluid/platform/type_defs.h" #include "paddle/fluid/platform/type_defs.h"
namespace paddle { namespace paddle {
......
...@@ -25,6 +25,7 @@ limitations under the License. */ ...@@ -25,6 +25,7 @@ limitations under the License. */
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
#include "paddle/fluid/framework/heter_service.h" #include "paddle/fluid/framework/heter_service.h"
#include "paddle/fluid/framework/heter_util.h"
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/framework/variable_helper.h"
......
...@@ -72,299 +72,6 @@ class HeterXpuService : public HeterService { ...@@ -72,299 +72,6 @@ class HeterXpuService : public HeterService {
std::unordered_map<int, HeterServiceHandler> handler_map_; std::unordered_map<int, HeterServiceHandler> handler_map_;
}; };
enum HeterTaskState { PULL_SPARSE, OP_RUN, XPU, OP_RUN_END, PUSH_GRAD, DONE };
class HeterTask {
public:
void Update() {
if (state_ == PULL_SPARSE) {
state_ = OP_RUN;
} else if (state_ == OP_RUN) {
state_ = XPU;
// state_ = PUSH_GRAD;
// state_ = PUSH_GRAD;
} else if (state_ == XPU) {
state_ = OP_RUN_END;
} else if (state_ == OP_RUN_END) {
state_ = PUSH_GRAD;
} else if (state_ == PUSH_GRAD) {
state_ = DONE;
}
}
void Reset() {
total_time = 0;
read_time = 0;
pack_time = 0;
pull_sparse_local_time = 0;
op_all_time = 0;
xpu_op_time = 0;
xpu_wait_time = 0;
cpu_op_time = 0;
collect_label_time = 0;
fill_sparse_time = 0;
push_sparse_time = 0;
gpu_2_cpu_time = 0;
cpu_2_gpu_time = 0;
timeline.Reset();
}
void Show() {
std::cout << "features size " << features_.size() << std::endl;
for (size_t i = 0; i < features_.size(); ++i) {
std::cout << "features[" << i << "] size " << features_[i].size()
<< std::endl;
}
}
void PackTask(Scope* scope, int taskid, DataFeed* reader, int cur_batch,
const ProgramDesc& program);
void PackGpuTask(Scope* thread_scope, DataFeed* reader,
const ProgramDesc& program);
Scope* scope_{nullptr};
int taskid_;
int cur_batch_;
HeterTaskState state_;
// cache
std::map<uint64_t, std::vector<uint64_t>> features_;
std::map<uint64_t, std::vector<float>> feature_labels_;
std::map<uint64_t, std::vector<std::vector<float>>> feature_values_;
std::map<uint64_t, std::vector<std::vector<float>>> feature_grads_;
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
double total_time{0};
double read_time{0};
double pack_time{0};
double pull_sparse_local_time{0};
double op_all_time{0};
double xpu_op_time{0};
double xpu_wait_time{0};
double cpu_op_time{0};
double collect_label_time{0};
double fill_sparse_time{0};
double push_sparse_time{0};
double gpu_2_cpu_time{0};
double cpu_2_gpu_time{0};
platform::Timer timeline;
};
#endif
template <class T>
class HeterObjectPool {
public:
HeterObjectPool() {}
virtual ~HeterObjectPool(){};
std::shared_ptr<T> Get() {
std::lock_guard<std::mutex> lock(mutex_);
if (pool_.empty()) {
num_ += 1;
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
VLOG(3) << "pool construct size: " << num_;
#endif
return std::make_shared<T>();
} else {
auto ret = pool_.back();
pool_.pop_back();
return ret;
}
}
void Push(std::shared_ptr<T> data) {
std::lock_guard<std::mutex> lock(mutex_);
pool_.push_back(std::move(data));
}
int Size() {
std::lock_guard<std::mutex> lock(mutex_);
return pool_.size();
}
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }
private:
std::vector<std::shared_ptr<T>> pool_;
std::mutex mutex_;
int num_{0};
};
#ifdef PADDLE_WITH_PSLIB
struct BthreadMutextGuard {
BthreadMutextGuard(bthread_mutex_t* rho) {
mutex_ = rho;
bthread_mutex_lock(mutex_);
}
~BthreadMutextGuard() { bthread_mutex_unlock(mutex_); }
bthread_mutex_t* mutex_;
};
template <class T>
class BtObjectPool {
public:
BtObjectPool() {
bthread_mutex_init(&mutex_, NULL);
bthread_cond_init(&cond_, NULL);
}
virtual ~BtObjectPool() {
bthread_cond_destroy(&cond_);
bthread_mutex_destroy(&mutex_);
};
std::shared_ptr<T> Get() {
BthreadMutextGuard guard(&mutex_);
while (pool_.empty()) {
bthread_cond_wait(&cond_, &mutex_);
}
auto ret = pool_.back();
pool_.pop_back();
return ret;
}
void Push(std::shared_ptr<T> data) {
BthreadMutextGuard guard(&mutex_);
pool_.push_back(std::move(data));
bthread_cond_signal(&cond_);
}
int Size() { return pool_.size(); }
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }
private:
std::vector<std::shared_ptr<T>> pool_;
bthread_mutex_t mutex_;
bthread_cond_t cond_;
int num_{0};
};
template <class K, class T>
struct HeterNode {
K key;
T value;
HeterNode* prev;
HeterNode* next;
};
template <class K, class T>
class HeterList {
public:
HeterList() : head_(new HeterNode<K, T>), tail_(new HeterNode<K, T>) {
head_->prev = NULL;
head_->next = tail_;
tail_->prev = head_;
tail_->next = NULL;
size = 0;
cap_ = 1e9;
}
~HeterList() {
delete head_;
delete tail_;
}
void SetCap(int num) { cap_ = num; }
bool TryPut(K& key, T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
if (task_map_.find(key) != task_map_.end()) {
task_map_.erase(key);
return false;
} else {
HeterNode<K, T>* node = new HeterNode<K, T>;
node->key = key;
node->value = value;
map_[node->key] = node;
attach(node);
return true;
}
}
bool Put(K& key, T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
HeterNode<K, T>* node = new HeterNode<K, T>;
node->key = key;
node->value = value;
map_[node->key] = node;
attach(node);
return true;
}
T TryGet(const K& key) {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(key);
delete node;
return ret;
}
task_map_.insert(key);
return nullptr;
}
T Get(const K& key) {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(key);
delete node;
return ret;
}
return nullptr;
}
T Get() {
std::lock_guard<std::mutex> lock(mutex_);
HeterNode<K, T>* node = head_->next;
if (node == tail_) {
return nullptr;
} else {
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(node->key);
delete node;
return ret;
}
}
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return head_->next == tail_;
}
int Size() {
std::lock_guard<std::mutex> lock(mutex_);
return size;
}
private:
void detach(HeterNode<K, T>* node) {
node->prev->next = node->next;
node->next->prev = node->prev;
size--;
}
void attach(HeterNode<K, T>* node) {
node->prev = head_;
node->next = head_->next;
head_->next->prev = node;
head_->next = node;
size++;
}
private:
HeterNode<K, T>* head_;
HeterNode<K, T>* tail_;
std::unordered_map<K, HeterNode<K, T>*> map_;
std::unordered_set<K> task_map_;
std::mutex mutex_;
std::condition_variable cond_;
int cap_;
int size;
};
#endif #endif
} // namespace framework } // namespace framework
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#ifdef PADDLE_WITH_PSLIB
#include <fstream>
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <thread> // NOLINT
#include <unordered_map> // NOLINT
#include <unordered_set> // NOLINT
#include <vector>
#include "bthread/bthread.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/timer.h"
namespace paddle {
namespace framework {
class DataFeed;
enum HeterTaskState { PULL_SPARSE, OP_RUN, XPU, OP_RUN_END, PUSH_GRAD, DONE };
class HeterTask {
public:
HeterTask() {}
virtual ~HeterTask(){};
void Update() {
if (state_ == PULL_SPARSE) {
state_ = OP_RUN;
} else if (state_ == OP_RUN) {
state_ = XPU;
// state_ = PUSH_GRAD;
// state_ = PUSH_GRAD;
} else if (state_ == XPU) {
state_ = OP_RUN_END;
} else if (state_ == OP_RUN_END) {
state_ = PUSH_GRAD;
} else if (state_ == PUSH_GRAD) {
state_ = DONE;
}
}
void Reset() {
total_time = 0;
read_time = 0;
pack_time = 0;
pull_sparse_local_time = 0;
op_all_time = 0;
xpu_op_time = 0;
xpu_wait_time = 0;
cpu_op_time = 0;
collect_label_time = 0;
fill_sparse_time = 0;
push_sparse_time = 0;
gpu_2_cpu_time = 0;
cpu_2_gpu_time = 0;
timeline.Reset();
}
void Show() {
std::cout << "features size " << features_.size() << std::endl;
for (size_t i = 0; i < features_.size(); ++i) {
std::cout << "features[" << i << "] size " << features_[i].size()
<< std::endl;
}
}
void PackTask(Scope* scope, int taskid, DataFeed* reader, int cur_batch,
const ProgramDesc& program);
void PackGpuTask(Scope* thread_scope, DataFeed* reader,
const ProgramDesc& program);
Scope* scope_{nullptr};
int taskid_;
int cur_batch_;
HeterTaskState state_;
// cache
std::map<uint64_t, std::vector<uint64_t>> features_;
std::map<uint64_t, std::vector<float>> feature_labels_;
std::map<uint64_t, std::vector<std::vector<float>>> feature_values_;
std::map<uint64_t, std::vector<std::vector<float>>> feature_grads_;
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
double total_time{0};
double read_time{0};
double pack_time{0};
double pull_sparse_local_time{0};
double op_all_time{0};
double xpu_op_time{0};
double xpu_wait_time{0};
double cpu_op_time{0};
double collect_label_time{0};
double fill_sparse_time{0};
double push_sparse_time{0};
double gpu_2_cpu_time{0};
double cpu_2_gpu_time{0};
platform::Timer timeline;
};
#endif
template <class T>
class HeterObjectPool {
public:
HeterObjectPool() {}
virtual ~HeterObjectPool(){};
std::shared_ptr<T> Get() {
std::lock_guard<std::mutex> lock(mutex_);
if (pool_.empty()) {
num_ += 1;
return std::make_shared<T>();
} else {
auto ret = pool_.back();
pool_.pop_back();
return ret;
}
}
void Push(std::shared_ptr<T> data) {
std::lock_guard<std::mutex> lock(mutex_);
pool_.push_back(std::move(data));
}
int Size() {
std::lock_guard<std::mutex> lock(mutex_);
return pool_.size();
}
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }
private:
std::vector<std::shared_ptr<T>> pool_;
std::mutex mutex_;
int num_{0};
};
#ifdef PADDLE_WITH_PSLIB
struct BthreadMutextGuard {
BthreadMutextGuard(bthread_mutex_t* rho) {
mutex_ = rho;
bthread_mutex_lock(mutex_);
}
~BthreadMutextGuard() { bthread_mutex_unlock(mutex_); }
bthread_mutex_t* mutex_;
};
template <class T>
class BtObjectPool {
public:
BtObjectPool() {
bthread_mutex_init(&mutex_, NULL);
bthread_cond_init(&cond_, NULL);
}
virtual ~BtObjectPool() {
bthread_cond_destroy(&cond_);
bthread_mutex_destroy(&mutex_);
};
std::shared_ptr<T> Get() {
BthreadMutextGuard guard(&mutex_);
while (pool_.empty()) {
bthread_cond_wait(&cond_, &mutex_);
}
auto ret = pool_.back();
pool_.pop_back();
return ret;
}
void Push(std::shared_ptr<T> data) {
BthreadMutextGuard guard(&mutex_);
pool_.push_back(std::move(data));
bthread_cond_signal(&cond_);
}
int Size() { return pool_.size(); }
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }
private:
std::vector<std::shared_ptr<T>> pool_;
bthread_mutex_t mutex_;
bthread_cond_t cond_;
int num_{0};
};
template <class K, class T>
struct HeterNode {
K key;
T value;
HeterNode* prev;
HeterNode* next;
};
template <class K, class T>
class HeterList {
public:
HeterList() : head_(new HeterNode<K, T>), tail_(new HeterNode<K, T>) {
head_->prev = NULL;
head_->next = tail_;
tail_->prev = head_;
tail_->next = NULL;
size = 0;
cap_ = 1e9;
}
~HeterList() {
delete head_;
delete tail_;
}
void SetCap(int num) { cap_ = num; }
bool TryPut(K& key, T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
if (task_map_.find(key) != task_map_.end()) {
task_map_.erase(key);
return false;
} else {
HeterNode<K, T>* node = new HeterNode<K, T>;
node->key = key;
node->value = value;
map_[node->key] = node;
attach(node);
return true;
}
}
bool Put(K& key, T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
HeterNode<K, T>* node = new HeterNode<K, T>;
node->key = key;
node->value = value;
map_[node->key] = node;
attach(node);
return true;
}
T TryGet(const K& key) {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(key);
delete node;
return ret;
}
task_map_.insert(key);
return nullptr;
}
T Get(const K& key) {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(key);
delete node;
return ret;
}
return nullptr;
}
T Get() {
std::lock_guard<std::mutex> lock(mutex_);
HeterNode<K, T>* node = head_->next;
if (node == tail_) {
return nullptr;
} else {
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(node->key);
delete node;
return ret;
}
}
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return head_->next == tail_;
}
int Size() {
std::lock_guard<std::mutex> lock(mutex_);
return size;
}
private:
void detach(HeterNode<K, T>* node) {
node->prev->next = node->next;
node->next->prev = node->prev;
size--;
}
void attach(HeterNode<K, T>* node) {
node->prev = head_;
node->next = head_->next;
head_->next->prev = node;
head_->next = node;
size++;
}
private:
HeterNode<K, T>* head_;
HeterNode<K, T>* tail_;
std::unordered_map<K, HeterNode<K, T>*> map_;
std::unordered_set<K> task_map_;
std::mutex mutex_;
std::condition_variable cond_;
int cap_;
int size;
};
} // namespace framework
} // namespace paddle
#endif
...@@ -15,7 +15,7 @@ limitations under the License. */ ...@@ -15,7 +15,7 @@ limitations under the License. */
#include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/fleet/heter_wrapper.h" #include "paddle/fluid/framework/heter_util.h"
#include "paddle/fluid/platform/cpu_helper.h" #include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
......
...@@ -21,6 +21,7 @@ limitations under the License. */ ...@@ -21,6 +21,7 @@ limitations under the License. */
#include "paddle/fluid/framework/data_set.h" #include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include "paddle/fluid/framework/trainer.h" #include "paddle/fluid/framework/trainer.h"
#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \ #if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \
(defined PADDLE_WITH_PSLIB) (defined PADDLE_WITH_PSLIB)
......
...@@ -176,6 +176,7 @@ void MultiTrainer::Run() { ...@@ -176,6 +176,7 @@ void MultiTrainer::Run() {
#ifdef PADDLE_WITH_HETERPS #ifdef PADDLE_WITH_HETERPS
void MultiTrainer::MergeDenseParam() { void MultiTrainer::MergeDenseParam() {
#ifdef PADDLE_WTIH_PSCORE
auto communicator = paddle::distributed::Communicator::GetInstance(); auto communicator = paddle::distributed::Communicator::GetInstance();
auto& recv_ctx = communicator->GetRecvCtxMap(); auto& recv_ctx = communicator->GetRecvCtxMap();
Scope* thread_scope = workers_[0]->GetThreadScope(); Scope* thread_scope = workers_[0]->GetThreadScope();
...@@ -189,6 +190,7 @@ void MultiTrainer::MergeDenseParam() { ...@@ -189,6 +190,7 @@ void MultiTrainer::MergeDenseParam() {
TensorCopy((*tensor), root_tensor->place(), root_tensor); TensorCopy((*tensor), root_tensor->place(), root_tensor);
} }
} }
#endif
} }
#endif #endif
......
...@@ -14,7 +14,6 @@ limitations under the License. */ ...@@ -14,7 +14,6 @@ limitations under the License. */
#include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include "paddle/fluid/platform/cpu_helper.h" #include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
...@@ -129,8 +128,6 @@ void PSGPUWorker::Initialize(const TrainerDesc& desc) { ...@@ -129,8 +128,6 @@ void PSGPUWorker::Initialize(const TrainerDesc& desc) {
} }
} }
} }
// pull_queue_ = paddle::framework::MakeChannel<std::shared_ptr<HeterTask>>();
// push_queue_ = paddle::framework::MakeChannel<std::shared_ptr<HeterTask>>();
} }
void PSGPUWorker::SetChannelWriter(ChannelObject<std::string>* queue) { void PSGPUWorker::SetChannelWriter(ChannelObject<std::string>* queue) {
......
...@@ -26,8 +26,9 @@ limitations under the License. */ ...@@ -26,8 +26,9 @@ limitations under the License. */
#include "paddle/fluid/framework/data_feed.h" #include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/data_set.h" #include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/fleet/heter_wrapper.h" #include "paddle/fluid/framework/fleet/heter_context.h"
#include "paddle/fluid/framework/heter_service.h" //#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include "paddle/fluid/framework/heter_util.h"
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/reader.h"
...@@ -46,6 +47,10 @@ class PullDenseWorker; ...@@ -46,6 +47,10 @@ class PullDenseWorker;
class Scope; class Scope;
class VarDesc; class VarDesc;
class DeviceWorker; class DeviceWorker;
class HeterWrapper;
class HeterRequest;
class HeterResponse;
template <class T> template <class T>
class ChannelObject; class ChannelObject;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/operators/controlflow/conditional_block_op.h" #include "paddle/fluid/operators/controlflow/conditional_block_op.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
......
...@@ -20,6 +20,8 @@ limitations under the License. */ ...@@ -20,6 +20,8 @@ limitations under the License. */
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/distributed/service/heter_client.h" #include "paddle/fluid/distributed/service/heter_client.h"
#include "paddle/fluid/distributed/service/heter_server.h" #include "paddle/fluid/distributed/service/heter_server.h"
#include "paddle/fluid/framework/op_registry.h"
namespace framework = paddle::framework; namespace framework = paddle::framework;
namespace platform = paddle::platform; namespace platform = paddle::platform;
namespace distributed = paddle::distributed; namespace distributed = paddle::distributed;
......
...@@ -73,6 +73,14 @@ if (WITH_CRYPTO) ...@@ -73,6 +73,14 @@ if (WITH_CRYPTO)
set(PYBIND_SRCS ${PYBIND_SRCS} crypto.cc) set(PYBIND_SRCS ${PYBIND_SRCS} crypto.cc)
endif (WITH_CRYPTO) endif (WITH_CRYPTO)
if (WITH_PSLIB)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=sign-compare -Wno-error=unused-variable -Wno-error=return-type -Wno-error=unused-but-set-variable -Wno-error=type-limits -Wno-error=unknown-pragmas -Wno-error=parentheses -Wno-error=unused-result")
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(DISTRIBUTE_COMPILE_FLAGS
"${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
endif()
set_source_files_properties(heter_wrapper_py.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
endif(WITH_PSLIB)
if (WITH_PSCORE) if (WITH_PSCORE)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=sign-compare -Wno-error=unused-variable -Wno-error=return-type -Wno-error=unused-but-set-variable -Wno-error=type-limits -Wno-error=unknown-pragmas -Wno-error=parentheses -Wno-error=unused-result") set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=sign-compare -Wno-error=unused-variable -Wno-error=return-type -Wno-error=unused-but-set-variable -Wno-error=type-limits -Wno-error=unknown-pragmas -Wno-error=parentheses -Wno-error=unused-result")
set_source_files_properties(fleet_py.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(fleet_py.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册