“f03d48f79bf7ae3a2cb8ffb1a2e641c0523ce6e9”上不存在“runtime/engine/common/frontend/cmvn.cc”
未验证 提交 fc273bfb 编写于 作者: H hong 提交者: GitHub

Merge branch 'develop' into move_slice_to_pten

要显示的变更太多。

To preserve performance only 1000 of 1000+ files are displayed.
......@@ -52,12 +52,12 @@ tools/__pycache__
# This file is automatically generated.
# TODO(zhiqiang) Move this file to build directory.
paddle/infrt/dialect/pd_ops.td
paddle/infrt/dialect/pd/ir/pd_ops.td
paddle/infrt/dialect/phi/ir/phi_cpu_kernels.td
paddle/infrt/dialect/phi/ir/phi_gpu_kernels.td
tools/infrt/kernels.json
tools/infrt/kernel_signature.json
paddle/infrt/dialect/pd_ops_info.h
paddle/infrt/dialect/pd/common/pd_ops_info.h
.lit_test_times.txt
paddle/infrt/tests/dialect/Output
paddle/infrt/tests/lit.cfg.py
......
......@@ -26,7 +26,7 @@ add_definitions(-w)
######################################
include(ExternalProject)
set(CINN_PREFIX_DIR ${THIRD_PARTY_PATH}/CINN)
set(CINN_GIT_TAG release/v0.1)
set(CINN_GIT_TAG 56879b637e2c4db19091eedad03d7cc674e092a2)
set(CINN_OPTIONAL_ARGS -DPY_VERSION=${PY_VERSION}
-DWITH_CUDA=${WITH_GPU}
-DWITH_CUDNN=${WITH_GPU}
......
......@@ -99,7 +99,8 @@ endfunction()
function(mlir_add_rewriter td_base)
set(LLVM_TARGET_DEFINITIONS ${td_base}.td)
mlir_tablegen(${td_base}.cpp.inc -gen-rewriters "-I${CMAKE_SOURCE_DIR}/infrt/dialect/pass")
set(LLVM_TARGET_DEPENDS ${LLVM_TARGET_DEPENDS} ${CMAKE_SOURCE_DIR}/paddle/infrt/dialect/infrt/ir/infrt_base.td)
mlir_tablegen(${td_base}.cpp.inc -gen-rewriters)
add_public_tablegen_target(MLIR${td_base}IncGen)
add_dependencies(mlir-headers MLIR${td_base}IncGen)
endfunction()
......
......@@ -61,6 +61,7 @@ set(PADDLE2ONNX_OPTIONAL_ARGS
-DONNX_CUSTOM_PROTOC_PATH=${PROTOC_BIN_PATH}
-DWITH_STATIC=OFF
-DCMAKE_INSTALL_PREFIX=${PADDLE2ONNX_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${PADDLE2ONNX_INSTALL_DIR}/${LIBDIR}
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
${EXTERNAL_OPTIONAL_ARGS}
......
cc_library(processgroup SRCS ProcessGroup.cc DEPS phi phi_api eager_api)
cc_library(eager_reducer SRCS reducer.cc DEPS eager_api processgroup phi phi_api string_helper)
if (WITH_DISTRIBUTE)
cc_library(processgroup_gloo SRCS ProcessGroupGloo.cc DEPS phi phi_api eager_api gloo_wrapper)
endif()
cc_library(eager_reducer SRCS reducer.cc DEPS eager_api processgroup)
if(WITH_NCCL)
cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
......
......@@ -171,10 +171,10 @@ ProcessGroupGloo::GlooTask::GlooTask(int rank,
"Only CPU place is supported for ProcessGroupGloo."));
}
ProcessGroupGloo::ProcessGroupGloo(const std::shared_ptr<GlooStore>& store,
int rank, int world_size,
const std::shared_ptr<GlooOptions> options)
: ProcessGroup(rank, world_size), _tag(0), _store(store) {
ProcessGroupGloo::ProcessGroupGloo(
const std::shared_ptr<paddle::distributed::Store>& store, int rank,
int world_size, const std::shared_ptr<GlooOptions> options)
: ProcessGroup(rank, world_size), _tag(0), _store(new GlooStore(store)) {
_context = std::make_shared<gloo::rendezvous::Context>(rank, world_size);
auto prefix_store =
::gloo::rendezvous::PrefixStore(std::to_string(0), *_store);
......
......@@ -52,8 +52,7 @@ class ProcessGroupGloo : public ProcessGroup {
class GlooStore : public ::gloo::rendezvous::Store {
public:
explicit GlooStore(
const std::shared_ptr<paddle::distributed::TCPStore>& store)
explicit GlooStore(const std::shared_ptr<paddle::distributed::Store>& store)
: _store(store) {}
~GlooStore() = default;
......@@ -87,7 +86,7 @@ class ProcessGroupGloo : public ProcessGroup {
}
protected:
std::shared_ptr<paddle::distributed::TCPStore> _store;
std::shared_ptr<paddle::distributed::Store> _store;
};
class GlooOptions {
......@@ -100,9 +99,9 @@ class ProcessGroupGloo : public ProcessGroup {
std::shared_ptr<::gloo::transport::Device> device;
};
explicit ProcessGroupGloo(const std::shared_ptr<GlooStore>& store, int rank,
int world_size,
std::shared_ptr<GlooOptions> options);
explicit ProcessGroupGloo(
const std::shared_ptr<paddle::distributed::Store>& store, int rank,
int world_size, std::shared_ptr<GlooOptions> options);
~ProcessGroupGloo() = default;
......@@ -145,7 +144,7 @@ class ProcessGroupGloo : public ProcessGroup {
protected:
uint32_t _tag;
std::shared_ptr<gloo::rendezvous::Context> _context;
std::shared_ptr<GlooStore> _store;
std::shared_ptr<::gloo::rendezvous::Store> _store;
};
} // namespace distributed
......
......@@ -139,12 +139,10 @@ bool ProcessGroupHCCL::HCCLTask::IsCompleted() {
// TODO(sandyhouse): Add timeout for wait, now timeout unused
bool ProcessGroupHCCL::HCCLTask::Wait(std::chrono::milliseconds timeout) {
SynchronizeStreams();
if (FLAGS_hccl_blocking_wait) {
// NOTE(sandyhouse): It will block host for sync
while (!IsCompleted()) {
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitBlockTImeout));
}
}
return true;
}
......
......@@ -84,29 +84,6 @@ class ProcessGroupHCCL : public ProcessGroup {
std::vector<Tensor>& tensors,
const BroadcastOptions& = BroadcastOptions()) override;
std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) override;
std::shared_ptr<ProcessGroup::Task> Send(std::vector<Tensor>& tensors,
int dst_rank) override;
std::shared_ptr<ProcessGroup::Task> Recv(std::vector<Tensor>& tensors,
int src_rank) override;
std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<Tensor>& in_tensors,
std::vector<Tensor>& out_tensors) override;
std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<Tensor>& in, std::vector<Tensor>& out) override;
std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<Tensor>& tensors, const ReduceOptions& opts) override;
std::shared_ptr<ProcessGroup::Task> Scatter(std::vector<Tensor>& in_tensors,
std::vector<Tensor>& out_tensors,
const ScatterOptions&) override;
protected:
virtual std::shared_ptr<ProcessGroupHCCL::HCCLTask> CreateTask(
std::vector<Place> places, int rank, CommType opType,
......
......@@ -88,8 +88,8 @@ void SyncDefaultStream(
for (size_t i = 0; i < places.size(); ++i) {
auto* default_ctx = static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(places[i]));
ncclEvents[i].Record(*dev_ctx[i]);
ncclEvents[i].Block(*default_ctx);
ncclEvents[i].Record(*default_ctx);
ncclEvents[i].Block(*dev_ctx[i]);
}
}
......
......@@ -17,16 +17,126 @@
#include <map>
#include <vector>
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/eager/accumulation/accumulation_node.h"
#include "paddle/fluid/eager/api/utils/hook_utils.h"
#include "paddle/fluid/eager/api/utils/tensor_utils.h"
#include "paddle/fluid/eager/autograd_meta.h"
#include "paddle/fluid/eager/utils.h"
#include "paddle/fluid/operators/math/concat_and_split.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/api/include/tensor.h"
#include "paddle/phi/api/lib/ext_compat_utils.h"
#include "paddle/phi/common/data_type.h"
#include "paddle/phi/kernels/funcs/math_function.h"
#include "paddle/utils/string/string_helper.h"
namespace paddle {
namespace distributed {
using Tensor = paddle::experimental::Tensor;
using Scalar = paddle::experimental::ScalarBase<paddle::experimental::Tensor>;
using ScalarArray =
paddle::experimental::ScalarArrayBase<paddle::experimental::Tensor>;
using Backend = paddle::experimental::Backend;
std::vector<std::vector<size_t>> Eager_AssignGroupBySize(
const std::vector<Tensor>, const std::vector<bool>& is_sparse_gradient,
const std::vector<size_t>& group_size_limits,
const std::vector<int64_t>& tensor_indices = {});
const std::vector<Tensor>, const std::vector<bool> &is_sparse_gradient,
const std::vector<size_t> &group_size_limits,
const std::vector<int64_t> &tensor_indices = {});
class EagerGroup {
public:
Tensor dense_contents_;
// for concat kernel
std::vector<phi::DenseTensor> dense_tensors_;
std::vector<int64_t> length_;
int64_t all_length_{0};
std::vector<ScalarArray> origin_shapes_;
// Global indices of participating tensors in the group
std::vector<size_t> tensor_indices_;
// Number of params that haven't been ready. When it is 0, it means
// the group is ready.
size_t pending_ = -1;
// external message of group
phi::DataType dtype_;
// help to sync
std::shared_ptr<ProcessGroup::Task> task;
// context is used to select the stream for concat
void ConcatTensors(const platform::Place &);
// context is used to select the stream for split
void SplitTensors(const platform::Place &);
friend std::ostream &operator<<(std::ostream &, const EagerGroup &);
};
struct TensorLocator {
// record the index in groups_
size_t group_index;
size_t inside_group_index;
};
class EagerReducer {
public:
explicit EagerReducer(
const std::vector<Tensor> tensors,
const std::vector<std::vector<size_t>> &group_indices,
const std::vector<bool> &is_sparse_gradient,
std::shared_ptr<distributed::ProcessGroup> process_group,
const std::vector<size_t> &group_size_limits,
bool find_unused_parameters);
virtual ~EagerReducer() {}
std::shared_ptr<egr::GradNodeBase> GetGradNodeFromTensor(Tensor *tensor);
void InitializeGroups(const std::vector<std::vector<size_t>> &group_indices);
void InitializeDenseGroups(const std::vector<size_t> &tensor_indices_,
EagerGroup *p_group);
void PrepareForBackward(const std::vector<Tensor> &outputs);
void AddDistHook(size_t var_index);
void MarkVarReady(const size_t var_index, const bool is_used_var);
void MarkGroupReady(const size_t group_index);
void FusedAllReduceSchedule(EagerGroup *group, const int curr_group_index);
void FinalizeBackward();
void TraverseBackwardGraph(const std::vector<Tensor> &outputs);
void ProcessUnusedDenseVars();
bool HasGrad(size_t var_index);
private:
std::vector<Tensor> tensors_;
std::vector<std::vector<size_t>> group_indices_;
std::vector<bool> is_sparse_gradient_;
std::shared_ptr<distributed::ProcessGroup> process_group_;
std::vector<size_t> group_size_limits_;
std::vector<EagerGroup> groups_;
std::vector<TensorLocator> variable_locators_;
PlaceType place_;
platform::Place inner_place_;
size_t next_group_ = 0;
int64_t nranks_ = -1;
bool grad_need_hooks_{false};
std::vector<bool> vars_marked_ready_;
std::vector<int32_t> local_used_vars_;
// Following variables are to help unused vars
std::vector<size_t> unused_vars_;
std::map<egr::GradNodeBase *, size_t> gradnode_index_map_;
bool has_marked_unused_vars_{false};
bool find_unused_vars_each_step_{false};
bool find_unused_vars_once_{true};
bool groups_need_finalize_{false};
Tensor global_used_vars_;
};
} // namespace distributed
} // namespace paddle
......@@ -4,7 +4,7 @@ if(WITH_PYTHON)
endif()
proto_library(interceptor_message_proto SRCS interceptor_message.proto)
if(WITH_DISTRIBUTE AND WITH_PSCORE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL))
if(WITH_DISTRIBUTE AND WITH_PSCORE)
set(BRPC_DEPS brpc ssl crypto protobuf zlib leveldb snappy gflags glog)
else()
set(BRPC_DEPS "")
......
......@@ -67,8 +67,7 @@ bool MessageBus::IsInit() const { return is_init_; }
MessageBus::~MessageBus() {
VLOG(3) << "Message bus releases resource.";
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
server_.Stop(1000);
server_.Join();
#endif
......@@ -87,8 +86,7 @@ bool MessageBus::Send(int64_t dst_rank,
IsInit(), true,
platform::errors::PreconditionNotMet(
"Using message bus since it has not been initialized."));
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
int retry_time = 0; // message bus will retry sending for 10 times
while (retry_time < 10) {
++retry_time;
......@@ -173,8 +171,7 @@ void MessageBus::ListenPort() {
LOG(INFO) << "No need listen to port since training on single card.";
return;
}
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
// function keep listen the port and handle the message
PADDLE_ENFORCE_EQ(
server_.AddService(&message_service_, brpc::SERVER_DOESNT_OWN_SERVICE), 0,
......@@ -203,8 +200,7 @@ void MessageBus::ListenPort() {
#endif
}
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
bool MessageBus::SendInterRank(int64_t dst_rank,
const InterceptorMessage& interceptor_message) {
const auto& dst_addr = GetAddr(dst_rank);
......
......@@ -20,8 +20,7 @@
#include <thread>
#include <unordered_map>
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
#include "brpc/channel.h"
#include "brpc/server.h"
#include "paddle/fluid/distributed/fleet_executor/message_service.h"
......@@ -64,8 +63,7 @@ class MessageBus final {
const std::string& GetAddr(int64_t rank) const;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
// send the message inter rank (dst is different rank with src)
bool SendInterRank(int64_t dst_rank,
const InterceptorMessage& interceptor_message);
......@@ -81,8 +79,7 @@ class MessageBus final {
// the ip needs to be listened
std::string addr_;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
MessageServiceImpl message_service_;
// brpc server
brpc::Server server_;
......
......@@ -11,8 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
#include "paddle/fluid/distributed/fleet_executor/message_service.h"
#include "brpc/server.h"
#include "paddle/fluid/distributed/fleet_executor/global.h"
......
......@@ -11,8 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
#pragma once
#include "brpc/server.h"
......
......@@ -115,6 +115,7 @@ message TableParameter {
optional CommonAccessorParameter common = 6;
optional TableType type = 7;
optional bool compress_in_save = 8 [ default = false ];
optional GraphParameter graph_parameter = 9;
}
message TableAccessorParameter {
......@@ -211,3 +212,25 @@ message SparseAdamSGDParameter { // SparseAdamSGDRule
optional double ada_epsilon = 5 [ default = 1e-08 ];
repeated float weight_bounds = 6;
}
message GraphParameter {
optional int32 task_pool_size = 1 [ default = 24 ];
optional bool gpups_mode = 2 [ default = false ];
optional string gpups_graph_sample_class = 3
[ default = "CompleteGraphSampler" ];
optional string gpups_graph_sample_args = 4 [ default = "" ];
optional bool use_cache = 5 [ default = true ];
optional float cache_ratio = 6 [ default = 0.3 ];
optional int32 cache_ttl = 7 [ default = 5 ];
optional GraphFeature graph_feature = 8;
optional string table_name = 9 [ default = "" ];
optional string table_type = 10 [ default = "" ];
optional int32 gpups_mode_shard_num = 11 [ default = 127 ];
optional int32 gpu_num = 12 [ default = 1 ];
}
message GraphFeature {
repeated string name = 1;
repeated string dtype = 2;
repeated int32 shape = 3;
}
\ No newline at end of file
......@@ -44,7 +44,7 @@ void GraphPsService_Stub::service(
}
}
int GraphBrpcClient::get_server_index_by_id(uint64_t id) {
int GraphBrpcClient::get_server_index_by_id(int64_t id) {
int shard_num = get_shard_num();
int shard_per_server = shard_num % server_size == 0
? shard_num / server_size
......@@ -53,7 +53,7 @@ int GraphBrpcClient::get_server_index_by_id(uint64_t id) {
}
std::future<int32_t> GraphBrpcClient::get_node_feat(
const uint32_t &table_id, const std::vector<uint64_t> &node_ids,
const uint32_t &table_id, const std::vector<int64_t> &node_ids,
const std::vector<std::string> &feature_names,
std::vector<std::vector<std::string>> &res) {
std::vector<int> request2server;
......@@ -66,7 +66,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
}
}
size_t request_call_num = request2server.size();
std::vector<std::vector<uint64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int>> query_idx_buckets(request_call_num);
for (int query_idx = 0; query_idx < node_ids.size(); ++query_idx) {
int server_index = get_server_index_by_id(node_ids[query_idx]);
......@@ -129,7 +129,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
closure->request(request_idx)
->add_params((char *)node_id_buckets[request_idx].data(),
sizeof(uint64_t) * node_num);
sizeof(int64_t) * node_num);
std::string joint_feature_name =
paddle::string::join_strings(feature_names, '\t');
closure->request(request_idx)
......@@ -179,9 +179,9 @@ std::future<int32_t> GraphBrpcClient::clear_nodes(uint32_t table_id) {
return fut;
}
std::future<int32_t> GraphBrpcClient::add_graph_node(
uint32_t table_id, std::vector<uint64_t> &node_id_list,
uint32_t table_id, std::vector<int64_t> &node_id_list,
std::vector<bool> &is_weighted_list) {
std::vector<std::vector<uint64_t>> request_bucket;
std::vector<std::vector<int64_t>> request_bucket;
std::vector<std::vector<bool>> is_weighted_bucket;
bool add_weight = is_weighted_list.size() > 0;
std::vector<int> server_index_arr;
......@@ -191,7 +191,7 @@ std::future<int32_t> GraphBrpcClient::add_graph_node(
if (index_mapping[server_index] == -1) {
index_mapping[server_index] = request_bucket.size();
server_index_arr.push_back(server_index);
request_bucket.push_back(std::vector<uint64_t>());
request_bucket.push_back(std::vector<int64_t>());
if (add_weight) is_weighted_bucket.push_back(std::vector<bool>());
}
request_bucket[index_mapping[server_index]].push_back(
......@@ -229,7 +229,7 @@ std::future<int32_t> GraphBrpcClient::add_graph_node(
size_t node_num = request_bucket[request_idx].size();
closure->request(request_idx)
->add_params((char *)request_bucket[request_idx].data(),
sizeof(uint64_t) * node_num);
sizeof(int64_t) * node_num);
if (add_weight) {
bool weighted[is_weighted_bucket[request_idx].size() + 1];
for (size_t j = 0; j < is_weighted_bucket[request_idx].size(); j++)
......@@ -248,8 +248,8 @@ std::future<int32_t> GraphBrpcClient::add_graph_node(
return fut;
}
std::future<int32_t> GraphBrpcClient::remove_graph_node(
uint32_t table_id, std::vector<uint64_t> &node_id_list) {
std::vector<std::vector<uint64_t>> request_bucket;
uint32_t table_id, std::vector<int64_t> &node_id_list) {
std::vector<std::vector<int64_t>> request_bucket;
std::vector<int> server_index_arr;
std::vector<int> index_mapping(server_size, -1);
for (size_t query_idx = 0; query_idx < node_id_list.size(); ++query_idx) {
......@@ -257,7 +257,7 @@ std::future<int32_t> GraphBrpcClient::remove_graph_node(
if (index_mapping[server_index] == -1) {
index_mapping[server_index] = request_bucket.size();
server_index_arr.push_back(server_index);
request_bucket.push_back(std::vector<uint64_t>());
request_bucket.push_back(std::vector<int64_t>());
}
request_bucket[index_mapping[server_index]].push_back(
node_id_list[query_idx]);
......@@ -291,7 +291,7 @@ std::future<int32_t> GraphBrpcClient::remove_graph_node(
closure->request(request_idx)
->add_params((char *)request_bucket[request_idx].data(),
sizeof(uint64_t) * node_num);
sizeof(int64_t) * node_num);
// PsService_Stub rpc_stub(get_cmd_channel(server_index));
GraphPsService_Stub rpc_stub =
getServiceStub(get_cmd_channel(server_index));
......@@ -303,9 +303,9 @@ std::future<int32_t> GraphBrpcClient::remove_graph_node(
}
// char* &buffer,int &actual_size
std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
uint32_t table_id, std::vector<uint64_t> node_ids, int sample_size,
// std::vector<std::vector<std::pair<uint64_t, float>>> &res,
std::vector<std::vector<uint64_t>> &res,
uint32_t table_id, std::vector<int64_t> node_ids, int sample_size,
// std::vector<std::vector<std::pair<int64_t, float>>> &res,
std::vector<std::vector<int64_t>> &res,
std::vector<std::vector<float>> &res_weight, bool need_weight,
int server_index) {
if (server_index != -1) {
......@@ -337,7 +337,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
int start = 0;
while (start < actual_size) {
res[node_idx].emplace_back(
*(uint64_t *)(node_buffer + offset + start));
*(int64_t *)(node_buffer + offset + start));
start += GraphNode::id_size;
if (need_weight) {
res_weight[node_idx].emplace_back(
......@@ -358,7 +358,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
closure->request(0)->set_table_id(table_id);
closure->request(0)->set_client_id(_client_id);
closure->request(0)->add_params((char *)node_ids.data(),
sizeof(uint64_t) * node_ids.size());
sizeof(int64_t) * node_ids.size());
closure->request(0)->add_params((char *)&sample_size, sizeof(int));
closure->request(0)->add_params((char *)&need_weight, sizeof(bool));
;
......@@ -380,14 +380,14 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
server2request[server_index] = request2server.size();
request2server.push_back(server_index);
}
// res.push_back(std::vector<std::pair<uint64_t, float>>());
// res.push_back(std::vector<std::pair<int64_t, float>>());
res.push_back({});
if (need_weight) {
res_weight.push_back({});
}
}
size_t request_call_num = request2server.size();
std::vector<std::vector<uint64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int>> query_idx_buckets(request_call_num);
for (int query_idx = 0; query_idx < node_ids.size(); ++query_idx) {
int server_index = get_server_index_by_id(node_ids[query_idx]);
......@@ -428,7 +428,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
int start = 0;
while (start < actual_size) {
res[query_idx].emplace_back(
*(uint64_t *)(node_buffer + offset + start));
*(int64_t *)(node_buffer + offset + start));
start += GraphNode::id_size;
if (need_weight) {
res_weight[query_idx].emplace_back(
......@@ -459,7 +459,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
closure->request(request_idx)
->add_params((char *)node_id_buckets[request_idx].data(),
sizeof(uint64_t) * node_num);
sizeof(int64_t) * node_num);
closure->request(request_idx)
->add_params((char *)&sample_size, sizeof(int));
closure->request(request_idx)
......@@ -476,7 +476,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
}
std::future<int32_t> GraphBrpcClient::random_sample_nodes(
uint32_t table_id, int server_index, int sample_size,
std::vector<uint64_t> &ids) {
std::vector<int64_t> &ids) {
DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) {
int ret = 0;
auto *closure = (DownpourBrpcClosure *)done;
......@@ -490,7 +490,7 @@ std::future<int32_t> GraphBrpcClient::random_sample_nodes(
auto size = io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size);
int index = 0;
while (index < bytes_size) {
ids.push_back(*(uint64_t *)(buffer + index));
ids.push_back(*(int64_t *)(buffer + index));
index += GraphNode::id_size;
}
delete[] buffer;
......@@ -633,7 +633,7 @@ std::future<int32_t> GraphBrpcClient::pull_graph_list(
}
std::future<int32_t> GraphBrpcClient::set_node_feat(
const uint32_t &table_id, const std::vector<uint64_t> &node_ids,
const uint32_t &table_id, const std::vector<int64_t> &node_ids,
const std::vector<std::string> &feature_names,
const std::vector<std::vector<std::string>> &features) {
std::vector<int> request2server;
......@@ -646,7 +646,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
}
}
size_t request_call_num = request2server.size();
std::vector<std::vector<uint64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int>> query_idx_buckets(request_call_num);
std::vector<std::vector<std::vector<std::string>>> features_idx_buckets(
request_call_num);
......@@ -696,7 +696,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
closure->request(request_idx)
->add_params((char *)node_id_buckets[request_idx].data(),
sizeof(uint64_t) * node_num);
sizeof(int64_t) * node_num);
std::string joint_feature_name =
paddle::string::join_strings(feature_names, '\t');
closure->request(request_idx)
......
......@@ -63,8 +63,8 @@ class GraphBrpcClient : public BrpcPsClient {
virtual ~GraphBrpcClient() {}
// given a batch of nodes, sample graph_neighbors for each of them
virtual std::future<int32_t> batch_sample_neighbors(
uint32_t table_id, std::vector<uint64_t> node_ids, int sample_size,
std::vector<std::vector<uint64_t>>& res,
uint32_t table_id, std::vector<int64_t> node_ids, int sample_size,
std::vector<std::vector<int64_t>>& res,
std::vector<std::vector<float>>& res_weight, bool need_weight,
int server_index = -1);
......@@ -75,20 +75,20 @@ class GraphBrpcClient : public BrpcPsClient {
virtual std::future<int32_t> random_sample_nodes(uint32_t table_id,
int server_index,
int sample_size,
std::vector<uint64_t>& ids);
std::vector<int64_t>& ids);
virtual std::future<int32_t> get_node_feat(
const uint32_t& table_id, const std::vector<uint64_t>& node_ids,
const uint32_t& table_id, const std::vector<int64_t>& node_ids,
const std::vector<std::string>& feature_names,
std::vector<std::vector<std::string>>& res);
virtual std::future<int32_t> set_node_feat(
const uint32_t& table_id, const std::vector<uint64_t>& node_ids,
const uint32_t& table_id, const std::vector<int64_t>& node_ids,
const std::vector<std::string>& feature_names,
const std::vector<std::vector<std::string>>& features);
virtual std::future<int32_t> clear_nodes(uint32_t table_id);
virtual std::future<int32_t> add_graph_node(
uint32_t table_id, std::vector<uint64_t>& node_id_list,
uint32_t table_id, std::vector<int64_t>& node_id_list,
std::vector<bool>& is_weighted_list);
virtual std::future<int32_t> use_neighbors_sample_cache(uint32_t table_id,
size_t size_limit,
......@@ -96,11 +96,11 @@ class GraphBrpcClient : public BrpcPsClient {
virtual std::future<int32_t> load_graph_split_config(uint32_t table_id,
std::string path);
virtual std::future<int32_t> remove_graph_node(
uint32_t table_id, std::vector<uint64_t>& node_id_list);
uint32_t table_id, std::vector<int64_t>& node_id_list);
virtual int32_t initialize();
int get_shard_num() { return shard_num; }
void set_shard_num(int shard_num) { this->shard_num = shard_num; }
int get_server_index_by_id(uint64_t id);
int get_server_index_by_id(int64_t id);
void set_local_channel(int index) {
this->local_channel = get_cmd_channel(index);
}
......
......@@ -140,9 +140,9 @@ int32_t GraphBrpcService::add_graph_node(Table *table,
return 0;
}
size_t node_num = request.params(0).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(0).c_str());
std::vector<uint64_t> node_ids(node_data, node_data + node_num);
size_t node_num = request.params(0).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(0).c_str());
std::vector<int64_t> node_ids(node_data, node_data + node_num);
std::vector<bool> is_weighted_list;
if (request.params_size() == 2) {
size_t weight_list_size = request.params(1).size() / sizeof(bool);
......@@ -165,9 +165,9 @@ int32_t GraphBrpcService::remove_graph_node(Table *table,
"graph_get_node_feat request requires at least 1 argument");
return 0;
}
size_t node_num = request.params(0).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(0).c_str());
std::vector<uint64_t> node_ids(node_data, node_data + node_num);
size_t node_num = request.params(0).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(0).c_str());
std::vector<int64_t> node_ids(node_data, node_data + node_num);
((GraphTable *)table)->remove_graph_node(node_ids);
return 0;
......@@ -386,9 +386,9 @@ int32_t GraphBrpcService::graph_random_sample_neighbors(
"graph_random_sample_neighbors request requires at least 3 arguments");
return 0;
}
size_t node_num = request.params(0).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(0).c_str());
int sample_size = *(uint64_t *)(request.params(1).c_str());
size_t node_num = request.params(0).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(0).c_str());
int sample_size = *(int64_t *)(request.params(1).c_str());
bool need_weight = *(bool *)(request.params(2).c_str());
std::vector<std::shared_ptr<char>> buffers(node_num);
std::vector<int> actual_sizes(node_num, 0);
......@@ -407,7 +407,7 @@ int32_t GraphBrpcService::graph_random_sample_neighbors(
int32_t GraphBrpcService::graph_random_sample_nodes(
Table *table, const PsRequestMessage &request, PsResponseMessage &response,
brpc::Controller *cntl) {
size_t size = *(uint64_t *)(request.params(0).c_str());
size_t size = *(int64_t *)(request.params(0).c_str());
std::unique_ptr<char[]> buffer;
int actual_size;
if (((GraphTable *)table)->random_sample_nodes(size, buffer, actual_size) ==
......@@ -430,9 +430,9 @@ int32_t GraphBrpcService::graph_get_node_feat(Table *table,
"graph_get_node_feat request requires at least 2 arguments");
return 0;
}
size_t node_num = request.params(0).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(0).c_str());
std::vector<uint64_t> node_ids(node_data, node_data + node_num);
size_t node_num = request.params(0).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(0).c_str());
std::vector<int64_t> node_ids(node_data, node_data + node_num);
std::vector<std::string> feature_names =
paddle::string::split_string<std::string>(request.params(1), "\t");
......@@ -464,16 +464,16 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers(
"at least 3 arguments");
return 0;
}
size_t node_num = request.params(0).size() / sizeof(uint64_t),
size_t node_num = request.params(0).size() / sizeof(int64_t),
size_of_size_t = sizeof(size_t);
uint64_t *node_data = (uint64_t *)(request.params(0).c_str());
int sample_size = *(uint64_t *)(request.params(1).c_str());
bool need_weight = *(uint64_t *)(request.params(2).c_str());
// std::vector<uint64_t> res = ((GraphTable
int64_t *node_data = (int64_t *)(request.params(0).c_str());
int sample_size = *(int64_t *)(request.params(1).c_str());
bool need_weight = *(int64_t *)(request.params(2).c_str());
// std::vector<int64_t> res = ((GraphTable
// *)table).filter_out_non_exist_nodes(node_data, sample_size);
std::vector<int> request2server;
std::vector<int> server2request(server_size, -1);
std::vector<uint64_t> local_id;
std::vector<int64_t> local_id;
std::vector<int> local_query_idx;
size_t rank = get_rank();
for (int query_idx = 0; query_idx < node_num; ++query_idx) {
......@@ -496,7 +496,7 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers(
std::vector<std::shared_ptr<char>> local_buffers;
std::vector<int> local_actual_sizes;
std::vector<size_t> seq;
std::vector<std::vector<uint64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int>> query_idx_buckets(request_call_num);
for (int query_idx = 0; query_idx < node_num; ++query_idx) {
int server_index =
......@@ -583,7 +583,7 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers(
closure->request(request_idx)
->add_params((char *)node_id_buckets[request_idx].data(),
sizeof(uint64_t) * node_num);
sizeof(int64_t) * node_num);
closure->request(request_idx)
->add_params((char *)&sample_size, sizeof(int));
closure->request(request_idx)
......@@ -618,9 +618,9 @@ int32_t GraphBrpcService::graph_set_node_feat(Table *table,
"graph_set_node_feat request requires at least 3 arguments");
return 0;
}
size_t node_num = request.params(0).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(0).c_str());
std::vector<uint64_t> node_ids(node_data, node_data + node_num);
size_t node_num = request.params(0).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(0).c_str());
std::vector<int64_t> node_ids(node_data, node_data + node_num);
std::vector<std::string> feature_names =
paddle::string::split_string<std::string>(request.params(1), "\t");
......
......@@ -44,9 +44,9 @@ void GraphPyService::add_table_feat_conf(std::string table_name,
}
}
void add_graph_node(std::vector<uint64_t> node_ids,
void add_graph_node(std::vector<int64_t> node_ids,
std::vector<bool> weight_list) {}
void remove_graph_node(std::vector<uint64_t> node_ids) {}
void remove_graph_node(std::vector<int64_t> node_ids) {}
void GraphPyService::set_up(std::string ips_str, int shard_num,
std::vector<std::string> node_types,
std::vector<std::string> edge_types) {
......@@ -260,7 +260,7 @@ void GraphPyClient::clear_nodes(std::string name) {
}
void GraphPyClient::add_graph_node(std::string name,
std::vector<uint64_t>& node_ids,
std::vector<int64_t>& node_ids,
std::vector<bool>& weight_list) {
if (this->table_id_map.count(name)) {
uint32_t table_id = this->table_id_map[name];
......@@ -271,7 +271,7 @@ void GraphPyClient::add_graph_node(std::string name,
}
void GraphPyClient::remove_graph_node(std::string name,
std::vector<uint64_t>& node_ids) {
std::vector<int64_t>& node_ids) {
if (this->table_id_map.count(name)) {
uint32_t table_id = this->table_id_map[name];
auto status = get_ps_client()->remove_graph_node(table_id, node_ids);
......@@ -290,13 +290,12 @@ void GraphPyClient::load_node_file(std::string name, std::string filepath) {
}
}
std::pair<std::vector<std::vector<uint64_t>>, std::vector<float>>
std::pair<std::vector<std::vector<int64_t>>, std::vector<float>>
GraphPyClient::batch_sample_neighbors(std::string name,
std::vector<uint64_t> node_ids,
std::vector<int64_t> node_ids,
int sample_size, bool return_weight,
bool return_edges) {
// std::vector<std::vector<std::pair<uint64_t, float>>> v;
std::vector<std::vector<uint64_t>> v;
std::vector<std::vector<int64_t>> v;
std::vector<std::vector<float>> v1;
if (this->table_id_map.count(name)) {
uint32_t table_id = this->table_id_map[name];
......@@ -309,7 +308,7 @@ GraphPyClient::batch_sample_neighbors(std::string name,
// res.first[1]: slice index
// res.first[2]: src nodes
// res.second: edges weight
std::pair<std::vector<std::vector<uint64_t>>, std::vector<float>> res;
std::pair<std::vector<std::vector<int64_t>>, std::vector<float>> res;
res.first.push_back({});
res.first.push_back({});
if (return_edges) res.first.push_back({});
......@@ -342,10 +341,10 @@ void GraphPyClient::use_neighbors_sample_cache(std::string name,
status.wait();
}
}
std::vector<uint64_t> GraphPyClient::random_sample_nodes(std::string name,
std::vector<int64_t> GraphPyClient::random_sample_nodes(std::string name,
int server_index,
int sample_size) {
std::vector<uint64_t> v;
std::vector<int64_t> v;
if (this->table_id_map.count(name)) {
uint32_t table_id = this->table_id_map[name];
auto status =
......@@ -357,7 +356,7 @@ std::vector<uint64_t> GraphPyClient::random_sample_nodes(std::string name,
// (name, dtype, ndarray)
std::vector<std::vector<std::string>> GraphPyClient::get_node_feat(
std::string node_type, std::vector<uint64_t> node_ids,
std::string node_type, std::vector<int64_t> node_ids,
std::vector<std::string> feature_names) {
std::vector<std::vector<std::string>> v(
feature_names.size(), std::vector<std::string>(node_ids.size()));
......@@ -371,7 +370,7 @@ std::vector<std::vector<std::string>> GraphPyClient::get_node_feat(
}
void GraphPyClient::set_node_feat(
std::string node_type, std::vector<uint64_t> node_ids,
std::string node_type, std::vector<int64_t> node_ids,
std::vector<std::string> feature_names,
const std::vector<std::vector<std::string>> features) {
if (this->table_id_map.count(node_type)) {
......
......@@ -70,18 +70,34 @@ class GraphPyService {
::paddle::distributed::TableAccessorParameter* accessor_proto =
sparse_table_proto->mutable_accessor();
::paddle::distributed::CommonAccessorParameter* common_proto =
sparse_table_proto->mutable_common();
// ::paddle::distributed::CommonAccessorParameter* common_proto =
// sparse_table_proto->mutable_common();
::paddle::distributed::GraphParameter* graph_proto =
sparse_table_proto->mutable_graph_parameter();
::paddle::distributed::GraphFeature* graph_feature =
graph_proto->mutable_graph_feature();
graph_proto->set_task_pool_size(24);
graph_proto->set_table_name(table_name);
graph_proto->set_table_type(table_type);
graph_proto->set_use_cache(false);
// Set GraphTable Parameter
common_proto->set_table_name(table_name);
common_proto->set_name(table_type);
// common_proto->set_table_name(table_name);
// common_proto->set_name(table_type);
// for (size_t i = 0; i < feat_name.size(); i++) {
// common_proto->add_params(feat_dtype[i]);
// common_proto->add_dims(feat_shape[i]);
// common_proto->add_attributes(feat_name[i]);
// }
for (size_t i = 0; i < feat_name.size(); i++) {
common_proto->add_params(feat_dtype[i]);
common_proto->add_dims(feat_shape[i]);
common_proto->add_attributes(feat_name[i]);
graph_feature->add_dtype(feat_dtype[i]);
graph_feature->add_shape(feat_shape[i]);
graph_feature->add_name(feat_name[i]);
}
accessor_proto->set_accessor_class("CommMergeAccessor");
}
......@@ -143,24 +159,24 @@ class GraphPyClient : public GraphPyService {
void load_edge_file(std::string name, std::string filepath, bool reverse);
void load_node_file(std::string name, std::string filepath);
void clear_nodes(std::string name);
void add_graph_node(std::string name, std::vector<uint64_t>& node_ids,
void add_graph_node(std::string name, std::vector<int64_t>& node_ids,
std::vector<bool>& weight_list);
void remove_graph_node(std::string name, std::vector<uint64_t>& node_ids);
void remove_graph_node(std::string name, std::vector<int64_t>& node_ids);
int get_client_id() { return client_id; }
void set_client_id(int client_id) { this->client_id = client_id; }
void start_client();
std::pair<std::vector<std::vector<uint64_t>>, std::vector<float>>
batch_sample_neighbors(std::string name, std::vector<uint64_t> node_ids,
std::pair<std::vector<std::vector<int64_t>>, std::vector<float>>
batch_sample_neighbors(std::string name, std::vector<int64_t> node_ids,
int sample_size, bool return_weight,
bool return_edges);
std::vector<uint64_t> random_sample_nodes(std::string name, int server_index,
std::vector<int64_t> random_sample_nodes(std::string name, int server_index,
int sample_size);
std::vector<std::vector<std::string>> get_node_feat(
std::string node_type, std::vector<uint64_t> node_ids,
std::string node_type, std::vector<int64_t> node_ids,
std::vector<std::string> feature_names);
void use_neighbors_sample_cache(std::string name, size_t total_size_limit,
size_t ttl);
void set_node_feat(std::string node_type, std::vector<uint64_t> node_ids,
void set_node_feat(std::string node_type, std::vector<int64_t> node_ids,
std::vector<std::string> feature_names,
const std::vector<std::vector<std::string>> features);
std::vector<FeatureNode> pull_graph_list(std::string name, int server_index,
......
......@@ -53,7 +53,6 @@ cc_library(memory_sparse_table SRCS memory_sparse_table.cc DEPS ps_framework_pro
set_source_files_properties(memory_sparse_geo_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(memory_sparse_geo_table SRCS memory_sparse_geo_table.cc DEPS ps_framework_proto ${TABLE_DEPS} common_table)
cc_library(table SRCS table.cc DEPS memory_sparse_table memory_sparse_geo_table common_table tensor_accessor tensor_table ps_framework_proto string_helper device_context gflags glog boost)
target_link_libraries(table -fopenmp)
......@@ -38,10 +38,14 @@
#include <vector>
#include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/common_table.h"
#include "paddle/fluid/distributed/ps/table/graph/class_macro.h"
#include "paddle/fluid/distributed/ps/table/graph/graph_node.h"
#include "paddle/fluid/string/string_helper.h"
#include "paddle/phi/core/utils/rw_lock.h"
#ifdef PADDLE_WITH_HETERPS
#include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h"
#endif
namespace paddle {
namespace distributed {
class GraphShard {
......@@ -51,37 +55,37 @@ class GraphShard {
~GraphShard();
std::vector<Node *> &get_bucket() { return bucket; }
std::vector<Node *> get_batch(int start, int end, int step);
std::vector<uint64_t> get_ids_by_range(int start, int end) {
std::vector<uint64_t> res;
std::vector<int64_t> get_ids_by_range(int start, int end) {
std::vector<int64_t> res;
for (int i = start; i < end && i < (int)bucket.size(); i++) {
res.push_back(bucket[i]->get_id());
}
return res;
}
GraphNode *add_graph_node(uint64_t id);
GraphNode *add_graph_node(int64_t id);
GraphNode *add_graph_node(Node *node);
FeatureNode *add_feature_node(uint64_t id);
Node *find_node(uint64_t id);
void delete_node(uint64_t id);
FeatureNode *add_feature_node(int64_t id);
Node *find_node(int64_t id);
void delete_node(int64_t id);
void clear();
void add_neighbor(uint64_t id, uint64_t dst_id, float weight);
std::unordered_map<uint64_t, int> &get_node_location() {
void add_neighbor(int64_t id, int64_t dst_id, float weight);
std::unordered_map<int64_t, int> &get_node_location() {
return node_location;
}
private:
std::unordered_map<uint64_t, int> node_location;
std::unordered_map<int64_t, int> node_location;
std::vector<Node *> bucket;
};
enum LRUResponse { ok = 0, blocked = 1, err = 2 };
struct SampleKey {
uint64_t node_key;
int64_t node_key;
size_t sample_size;
bool is_weighted;
SampleKey(uint64_t _node_key, size_t _sample_size, bool _is_weighted)
SampleKey(int64_t _node_key, size_t _sample_size, bool _is_weighted)
: node_key(_node_key),
sample_size(_sample_size),
is_weighted(_is_weighted) {}
......@@ -300,7 +304,7 @@ class ScaledLRU {
node_size += lru_pool[i].node_size - lru_pool[i].remove_count;
}
if (node_size <= size_t(1.1 * size_limit) + 1) return 0;
if ((size_t)node_size <= size_t(1.1 * size_limit) + 1) return 0;
if (pthread_rwlock_wrlock(&rwlock) == 0) {
// VLOG(0)<"in shrink\n";
global_count = 0;
......@@ -308,9 +312,9 @@ class ScaledLRU {
global_count += lru_pool[i].node_size - lru_pool[i].remove_count;
}
// VLOG(0)<<"global_count "<<global_count<<"\n";
if (global_count > size_limit) {
if ((size_t)global_count > size_limit) {
size_t remove = global_count - size_limit;
for (int i = 0; i < lru_pool.size(); i++) {
for (size_t i = 0; i < lru_pool.size(); i++) {
lru_pool[i].total_diff = 0;
lru_pool[i].remove_count +=
1.0 * (lru_pool[i].node_size - lru_pool[i].remove_count) /
......@@ -352,9 +356,69 @@ class ScaledLRU {
friend class RandomSampleLRU<K, V>;
};
#ifdef PADDLE_WITH_HETERPS
enum GraphSamplerStatus { waiting = 0, running = 1, terminating = 2 };
class GraphTable;
class GraphSampler {
public:
GraphSampler() {
status = GraphSamplerStatus::waiting;
thread_pool.reset(new ::ThreadPool(1));
callback = [](std::vector<paddle::framework::GpuPsCommGraph> &res) {
return;
};
}
virtual int run_graph_sampling() = 0;
virtual int start_graph_sampling() {
if (status != GraphSamplerStatus::waiting) {
return -1;
}
std::promise<int> prom;
std::future<int> fut = prom.get_future();
graph_sample_task_over = thread_pool->enqueue([&prom, this]() {
prom.set_value(0);
status = GraphSamplerStatus::running;
return run_graph_sampling();
});
return fut.get();
}
virtual void init(size_t gpu_num, GraphTable *graph_table,
std::vector<std::string> args) = 0;
virtual void set_graph_sample_callback(
std::function<void(std::vector<paddle::framework::GpuPsCommGraph> &)>
callback) {
this->callback = callback;
}
virtual int end_graph_sampling() {
if (status == GraphSamplerStatus::running) {
status = GraphSamplerStatus::terminating;
return graph_sample_task_over.get();
}
return -1;
}
virtual GraphSamplerStatus get_graph_sampler_status() { return status; }
protected:
std::function<void(std::vector<paddle::framework::GpuPsCommGraph> &)>
callback;
std::shared_ptr<::ThreadPool> thread_pool;
GraphSamplerStatus status;
std::future<int> graph_sample_task_over;
std::vector<paddle::framework::GpuPsCommGraph> sample_res;
};
#endif
class GraphTable : public SparseTable {
public:
GraphTable() { use_cache = false; }
GraphTable() {
use_cache = false;
shard_num = 0;
#ifdef PADDLE_WITH_HETERPS
gpups_mode = false;
#endif
rw_lock.reset(new pthread_rwlock_t());
}
virtual ~GraphTable();
virtual int32_t pull_graph_list(int start, int size,
std::unique_ptr<char[]> &buffer,
......@@ -362,7 +426,7 @@ class GraphTable : public SparseTable {
int step);
virtual int32_t random_sample_neighbors(
uint64_t *node_ids, int sample_size,
int64_t *node_ids, int sample_size,
std::vector<std::shared_ptr<char>> &buffers,
std::vector<int> &actual_sizes, bool need_weight);
......@@ -370,9 +434,11 @@ class GraphTable : public SparseTable {
int &actual_sizes);
virtual int32_t get_nodes_ids_by_ranges(
std::vector<std::pair<int, int>> ranges, std::vector<uint64_t> &res);
virtual int32_t initialize();
std::vector<std::pair<int, int>> ranges, std::vector<int64_t> &res);
virtual int32_t initialize() { return 0; }
virtual int32_t initialize(const TableParameter &config,
const FsClientParameter &fs_config);
virtual int32_t initialize(const GraphParameter &config);
int32_t load(const std::string &path, const std::string &param);
int32_t load_graph_split_config(const std::string &path);
......@@ -380,13 +446,13 @@ class GraphTable : public SparseTable {
int32_t load_nodes(const std::string &path, std::string node_type);
int32_t add_graph_node(std::vector<uint64_t> &id_list,
int32_t add_graph_node(std::vector<int64_t> &id_list,
std::vector<bool> &is_weight_list);
int32_t remove_graph_node(std::vector<uint64_t> &id_list);
int32_t remove_graph_node(std::vector<int64_t> &id_list);
int32_t get_server_index_by_id(uint64_t id);
Node *find_node(uint64_t id);
int32_t get_server_index_by_id(int64_t id);
Node *find_node(int64_t id);
virtual int32_t pull_sparse(float *values,
const PullSparseValue &pull_value) {
......@@ -407,16 +473,27 @@ class GraphTable : public SparseTable {
return 0;
}
virtual int32_t initialize_shard() { return 0; }
virtual uint32_t get_thread_pool_index_by_shard_index(uint64_t shard_index);
virtual uint32_t get_thread_pool_index(uint64_t node_id);
virtual int32_t set_shard(size_t shard_idx, size_t server_num) {
_shard_idx = shard_idx;
/*
_shard_num is not used in graph_table, this following operation is for the
purpose of
being compatible with base class table.
*/
_shard_num = server_num;
this->server_num = server_num;
return 0;
}
virtual uint32_t get_thread_pool_index_by_shard_index(int64_t shard_index);
virtual uint32_t get_thread_pool_index(int64_t node_id);
virtual std::pair<int32_t, std::string> parse_feature(std::string feat_str);
virtual int32_t get_node_feat(const std::vector<uint64_t> &node_ids,
virtual int32_t get_node_feat(const std::vector<int64_t> &node_ids,
const std::vector<std::string> &feature_names,
std::vector<std::vector<std::string>> &res);
virtual int32_t set_node_feat(
const std::vector<uint64_t> &node_ids,
const std::vector<int64_t> &node_ids,
const std::vector<std::string> &feature_names,
const std::vector<std::vector<std::string>> &res);
......@@ -433,11 +510,25 @@ class GraphTable : public SparseTable {
}
return 0;
}
#ifdef PADDLE_WITH_HETERPS
virtual int32_t start_graph_sampling() {
return this->graph_sampler->start_graph_sampling();
}
virtual int32_t end_graph_sampling() {
return this->graph_sampler->end_graph_sampling();
}
virtual int32_t set_graph_sample_callback(
std::function<void(std::vector<paddle::framework::GpuPsCommGraph> &)>
callback) {
graph_sampler->set_graph_sample_callback(callback);
return 0;
}
// virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); }
#endif
protected:
std::vector<GraphShard *> shards, extra_shards;
size_t shard_start, shard_end, server_num, shard_num_per_server, shard_num;
const int task_pool_size_ = 24;
int task_pool_size_ = 24;
const int random_sample_nodes_ranges = 3;
std::vector<std::string> feat_name;
......@@ -450,11 +541,61 @@ class GraphTable : public SparseTable {
std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
std::vector<std::shared_ptr<std::mt19937_64>> _shards_task_rng_pool;
std::shared_ptr<ScaledLRU<SampleKey, SampleResult>> scaled_lru;
std::unordered_set<uint64_t> extra_nodes;
std::unordered_map<uint64_t, size_t> extra_nodes_to_thread_index;
std::unordered_set<int64_t> extra_nodes;
std::unordered_map<int64_t, size_t> extra_nodes_to_thread_index;
bool use_cache, use_duplicate_nodes;
mutable std::mutex mutex_;
std::shared_ptr<pthread_rwlock_t> rw_lock;
#ifdef PADDLE_WITH_HETERPS
// paddle::framework::GpuPsGraphTable gpu_graph_table;
bool gpups_mode;
// std::shared_ptr<::ThreadPool> graph_sample_pool;
std::shared_ptr<GraphSampler> graph_sampler;
REGISTER_GRAPH_FRIEND_CLASS(2, CompleteGraphSampler, BasicBfsGraphSampler)
#endif
};
#ifdef PADDLE_WITH_HETERPS
REGISTER_PSCORE_REGISTERER(GraphSampler);
class CompleteGraphSampler : public GraphSampler {
public:
CompleteGraphSampler() {}
~CompleteGraphSampler() {}
// virtual pthread_rwlock_t *export_rw_lock();
virtual int run_graph_sampling();
virtual void init(size_t gpu_num, GraphTable *graph_table,
std::vector<std::string> args_);
protected:
GraphTable *graph_table;
std::vector<std::vector<paddle::framework::GpuPsGraphNode>> sample_nodes;
std::vector<std::vector<int64_t>> sample_neighbors;
// std::vector<GpuPsCommGraph> sample_res;
// std::shared_ptr<std::mt19937_64> random;
int gpu_num;
};
class BasicBfsGraphSampler : public GraphSampler {
public:
BasicBfsGraphSampler() {}
~BasicBfsGraphSampler() {}
// virtual pthread_rwlock_t *export_rw_lock();
virtual int run_graph_sampling();
virtual void init(size_t gpu_num, GraphTable *graph_table,
std::vector<std::string> args_);
protected:
GraphTable *graph_table;
// std::vector<std::vector<GpuPsGraphNode>> sample_nodes;
std::vector<std::vector<paddle::framework::GpuPsGraphNode>> sample_nodes;
std::vector<std::vector<int64_t>> sample_neighbors;
size_t gpu_num;
int node_num_for_each_shard, edge_num_for_each_node;
int rounds, interval;
std::vector<std::unordered_map<int64_t, std::vector<int64_t>>>
sample_neighbors_map;
};
#endif
} // namespace distributed
}; // namespace paddle
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#define DECLARE_GRAPH_FRIEND_CLASS(a) friend class a;
#define DECLARE_1_FRIEND_CLASS(a, ...) DECLARE_GRAPH_FRIEND_CLASS(a)
#define DECLARE_2_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_1_FRIEND_CLASS(__VA_ARGS__)
#define DECLARE_3_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_2_FRIEND_CLASS(__VA_ARGS__)
#define DECLARE_4_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_3_FRIEND_CLASS(__VA_ARGS__)
#define DECLARE_5_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_4_FRIEND_CLASS(__VA_ARGS__)
#define DECLARE_6_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_5_FRIEND_CLASS(__VA_ARGS__)
#define DECLARE_7_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_6_FRIEND_CLASS(__VA_ARGS__)
#define DECLARE_8_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_7_FRIEND_CLASS(__VA_ARGS__)
#define DECLARE_9_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_8_FRIEND_CLASS(__VA_ARGS__)
#define DECLARE_10_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_9_FRIEND_CLASS(__VA_ARGS__)
#define DECLARE_11_FRIEND_CLASS(a, ...) \
DECLARE_GRAPH_FRIEND_CLASS(a) DECLARE_10_FRIEND_CLASS(__VA_ARGS__)
#define REGISTER_GRAPH_FRIEND_CLASS(n, ...) \
DECLARE_##n##_FRIEND_CLASS(__VA_ARGS__)
......@@ -17,11 +17,11 @@
namespace paddle {
namespace distributed {
void GraphEdgeBlob::add_edge(uint64_t id, float weight = 1) {
void GraphEdgeBlob::add_edge(int64_t id, float weight = 1) {
id_arr.push_back(id);
}
void WeightedGraphEdgeBlob::add_edge(uint64_t id, float weight = 1) {
void WeightedGraphEdgeBlob::add_edge(int64_t id, float weight = 1) {
id_arr.push_back(id);
weight_arr.push_back(weight);
}
......
......@@ -24,19 +24,20 @@ class GraphEdgeBlob {
GraphEdgeBlob() {}
virtual ~GraphEdgeBlob() {}
size_t size() { return id_arr.size(); }
virtual void add_edge(uint64_t id, float weight);
uint64_t get_id(int idx) { return id_arr[idx]; }
virtual void add_edge(int64_t id, float weight);
int64_t get_id(int idx) { return id_arr[idx]; }
virtual float get_weight(int idx) { return 1; }
std::vector<int64_t>& export_id_array() { return id_arr; }
protected:
std::vector<uint64_t> id_arr;
std::vector<int64_t> id_arr;
};
class WeightedGraphEdgeBlob : public GraphEdgeBlob {
public:
WeightedGraphEdgeBlob() {}
virtual ~WeightedGraphEdgeBlob() {}
virtual void add_edge(uint64_t id, float weight);
virtual void add_edge(int64_t id, float weight);
virtual float get_weight(int idx) { return weight_arr[idx]; }
protected:
......
......@@ -48,6 +48,7 @@ class Node {
virtual void set_feature(int idx, std::string str) {}
virtual void set_feature_size(int size) {}
virtual int get_feature_size() { return 0; }
virtual size_t get_neighbor_size() { return 0; }
protected:
uint64_t id;
......@@ -70,6 +71,7 @@ class GraphNode : public Node {
}
virtual uint64_t get_neighbor_id(int idx) { return edges->get_id(idx); }
virtual float get_neighbor_weight(int idx) { return edges->get_weight(idx); }
virtual size_t get_neighbor_size() { return edges->size(); }
protected:
Sampler *sampler;
......
......@@ -37,6 +37,8 @@ REGISTER_PSCORE_CLASS(Table, CommonDenseTable);
REGISTER_PSCORE_CLASS(Table, CommonSparseTable);
#ifdef PADDLE_WITH_HETERPS
REGISTER_PSCORE_CLASS(Table, SSDSparseTable);
REGISTER_PSCORE_CLASS(GraphSampler, CompleteGraphSampler);
REGISTER_PSCORE_CLASS(GraphSampler, BasicBfsGraphSampler);
#endif
REGISTER_PSCORE_CLASS(Table, SparseGeoTable);
REGISTER_PSCORE_CLASS(Table, BarrierTable);
......
......@@ -24,6 +24,9 @@ cc_test(graph_node_test SRCS graph_node_test.cc DEPS graph_py_service scope serv
set_source_files_properties(graph_node_split_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(graph_node_split_test SRCS graph_node_split_test.cc DEPS graph_py_service scope server client communicator ps_service boost table ps_framework_proto ${COMMON_DEPS})
set_source_files_properties(graph_table_sample_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(graph_table_sample_test SRCS graph_table_sample_test.cc DEPS scope server communicator ps_service boost table ps_framework_proto ${COMMON_DEPS})
set_source_files_properties(feature_value_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(feature_value_test SRCS feature_value_test.cc DEPS ${COMMON_DEPS} boost table)
......
......@@ -236,7 +236,7 @@ void RunGraphSplit() {
sleep(2);
std::map<uint64_t, std::vector<paddle::distributed::Region>> dense_regions;
dense_regions.insert(
std::pair<uint64_t, std::vector<paddle::distributed::Region>>(0, {}));
std::pair<int64_t, std::vector<paddle::distributed::Region>>(0, {}));
auto regions = dense_regions[0];
RunClient(dense_regions, 0, pserver_ptr_->get_service());
......@@ -250,16 +250,16 @@ void RunGraphSplit() {
worker_ptr_->load(0, std::string(edge_file_name), std::string("e>"));
srand(time(0));
pull_status.wait();
std::vector<std::vector<uint64_t>> _vs;
std::vector<std::vector<int64_t>> _vs;
std::vector<std::vector<float>> vs;
pull_status = worker_ptr_->batch_sample_neighbors(
0, std::vector<uint64_t>(1, 10240001024), 4, _vs, vs, true);
0, std::vector<int64_t>(1, 10240001024), 4, _vs, vs, true);
pull_status.wait();
ASSERT_EQ(0, _vs[0].size());
_vs.clear();
vs.clear();
pull_status = worker_ptr_->batch_sample_neighbors(
0, std::vector<uint64_t>(1, 97), 4, _vs, vs, true);
0, std::vector<int64_t>(1, 97), 4, _vs, vs, true);
pull_status.wait();
ASSERT_EQ(3, _vs[0].size());
std::remove(edge_file_name);
......
......@@ -48,10 +48,10 @@ namespace distributed = paddle::distributed;
void testSampleNodes(
std::shared_ptr<paddle::distributed::GraphBrpcClient>& worker_ptr_) {
std::vector<uint64_t> ids;
std::vector<int64_t> ids;
auto pull_status = worker_ptr_->random_sample_nodes(0, 0, 6, ids);
std::unordered_set<uint64_t> s;
std::unordered_set<uint64_t> s1 = {37, 59};
std::unordered_set<int64_t> s;
std::unordered_set<int64_t> s1 = {37, 59};
pull_status.wait();
for (auto id : ids) s.insert(id);
ASSERT_EQ(true, s.size() == s1.size());
......@@ -106,14 +106,14 @@ void testFeatureNodeSerializeFloat64() {
void testSingleSampleNeighboor(
std::shared_ptr<paddle::distributed::GraphBrpcClient>& worker_ptr_) {
std::vector<std::vector<uint64_t>> vs;
std::vector<std::vector<int64_t>> vs;
std::vector<std::vector<float>> vs1;
auto pull_status = worker_ptr_->batch_sample_neighbors(
0, std::vector<uint64_t>(1, 37), 4, vs, vs1, true);
0, std::vector<int64_t>(1, 37), 4, vs, vs1, true);
pull_status.wait();
std::unordered_set<uint64_t> s;
std::unordered_set<uint64_t> s1 = {112, 45, 145};
std::unordered_set<int64_t> s;
std::unordered_set<int64_t> s1 = {112, 45, 145};
for (auto g : vs[0]) {
s.insert(g);
}
......@@ -126,7 +126,7 @@ void testSingleSampleNeighboor(
vs.clear();
vs1.clear();
pull_status = worker_ptr_->batch_sample_neighbors(
0, std::vector<uint64_t>(1, 96), 4, vs, vs1, true);
0, std::vector<int64_t>(1, 96), 4, vs, vs1, true);
pull_status.wait();
s1 = {111, 48, 247};
for (auto g : vs[0]) {
......@@ -147,30 +147,30 @@ void testAddNode(
std::shared_ptr<paddle::distributed::GraphBrpcClient>& worker_ptr_) {
worker_ptr_->clear_nodes(0);
int total_num = 270000;
uint64_t id;
std::unordered_set<uint64_t> id_set;
int64_t id;
std::unordered_set<int64_t> id_set;
for (int i = 0; i < total_num; i++) {
while (id_set.find(id = rand()) != id_set.end())
;
id_set.insert(id);
}
std::vector<uint64_t> id_list(id_set.begin(), id_set.end());
std::vector<int64_t> id_list(id_set.begin(), id_set.end());
std::vector<bool> weight_list;
auto status = worker_ptr_->add_graph_node(0, id_list, weight_list);
status.wait();
std::vector<uint64_t> ids[2];
std::vector<int64_t> ids[2];
for (int i = 0; i < 2; i++) {
auto sample_status =
worker_ptr_->random_sample_nodes(0, i, total_num, ids[i]);
sample_status.wait();
}
std::unordered_set<uint64_t> id_set_check(ids[0].begin(), ids[0].end());
std::unordered_set<int64_t> id_set_check(ids[0].begin(), ids[0].end());
for (auto x : ids[1]) id_set_check.insert(x);
ASSERT_EQ(id_set.size(), id_set_check.size());
for (auto x : id_set) {
ASSERT_EQ(id_set_check.find(x) != id_set_check.end(), true);
}
std::vector<uint64_t> remove_ids;
std::vector<int64_t> remove_ids;
for (auto p : id_set_check) {
if (remove_ids.size() == 0)
remove_ids.push_back(p);
......@@ -187,7 +187,7 @@ void testAddNode(
worker_ptr_->random_sample_nodes(0, i, total_num, ids[i]);
sample_status.wait();
}
std::unordered_set<uint64_t> id_set_check1(ids[0].begin(), ids[0].end());
std::unordered_set<int64_t> id_set_check1(ids[0].begin(), ids[0].end());
for (auto x : ids[1]) id_set_check1.insert(x);
ASSERT_EQ(id_set_check1.size(), id_set_check.size());
for (auto x : id_set_check1) {
......@@ -196,14 +196,14 @@ void testAddNode(
}
void testBatchSampleNeighboor(
std::shared_ptr<paddle::distributed::GraphBrpcClient>& worker_ptr_) {
std::vector<std::vector<uint64_t>> vs;
std::vector<std::vector<int64_t>> vs;
std::vector<std::vector<float>> vs1;
std::vector<std::uint64_t> v = {37, 96};
std::vector<std::int64_t> v = {37, 96};
auto pull_status =
worker_ptr_->batch_sample_neighbors(0, v, 4, vs, vs1, false);
pull_status.wait();
std::unordered_set<uint64_t> s;
std::unordered_set<uint64_t> s1 = {112, 45, 145};
std::unordered_set<int64_t> s;
std::unordered_set<int64_t> s1 = {112, 45, 145};
for (auto g : vs[0]) {
s.insert(g);
}
......@@ -417,7 +417,7 @@ void RunBrpcPushSparse() {
std::map<uint64_t, std::vector<paddle::distributed::Region>> dense_regions;
dense_regions.insert(
std::pair<uint64_t, std::vector<paddle::distributed::Region>>(0, {}));
std::pair<int64_t, std::vector<paddle::distributed::Region>>(0, {}));
auto regions = dense_regions[0];
RunClient(dense_regions, 0, pserver_ptr_->get_service());
......@@ -427,14 +427,14 @@ void RunBrpcPushSparse() {
worker_ptr_->load(0, std::string(edge_file_name), std::string("e>"));
srand(time(0));
pull_status.wait();
std::vector<std::vector<uint64_t>> _vs;
std::vector<std::vector<int64_t>> _vs;
std::vector<std::vector<float>> vs;
testSampleNodes(worker_ptr_);
sleep(5);
testSingleSampleNeighboor(worker_ptr_);
testBatchSampleNeighboor(worker_ptr_);
pull_status = worker_ptr_->batch_sample_neighbors(
0, std::vector<uint64_t>(1, 10240001024), 4, _vs, vs, true);
0, std::vector<int64_t>(1, 10240001024), 4, _vs, vs, true);
pull_status.wait();
ASSERT_EQ(0, _vs[0].size());
paddle::distributed::GraphTable* g =
......@@ -445,14 +445,14 @@ void RunBrpcPushSparse() {
while (round--) {
vs.clear();
pull_status = worker_ptr_->batch_sample_neighbors(
0, std::vector<uint64_t>(1, 37), 1, _vs, vs, false);
0, std::vector<int64_t>(1, 37), 1, _vs, vs, false);
pull_status.wait();
for (int i = 0; i < ttl; i++) {
std::vector<std::vector<uint64_t>> vs1;
std::vector<std::vector<int64_t>> vs1;
std::vector<std::vector<float>> vs2;
pull_status = worker_ptr_->batch_sample_neighbors(
0, std::vector<uint64_t>(1, 37), 1, vs1, vs2, false);
0, std::vector<int64_t>(1, 37), 1, vs1, vs2, false);
pull_status.wait();
ASSERT_EQ(_vs[0].size(), vs1[0].size());
......@@ -540,7 +540,7 @@ void RunBrpcPushSparse() {
// Test Pull by step
std::unordered_set<uint64_t> count_item_nodes;
std::unordered_set<int64_t> count_item_nodes;
// pull by step 2
for (int test_step = 1; test_step < 4; test_step++) {
count_item_nodes.clear();
......@@ -558,18 +558,18 @@ void RunBrpcPushSparse() {
ASSERT_EQ(count_item_nodes.size(), 12);
}
std::pair<std::vector<std::vector<uint64_t>>, std::vector<float>> res;
std::pair<std::vector<std::vector<int64_t>>, std::vector<float>> res;
res = client1.batch_sample_neighbors(
std::string("user2item"), std::vector<uint64_t>(1, 96), 4, true, false);
std::string("user2item"), std::vector<int64_t>(1, 96), 4, true, false);
ASSERT_EQ(res.first[0].size(), 3);
std::vector<uint64_t> node_ids;
std::vector<int64_t> node_ids;
node_ids.push_back(96);
node_ids.push_back(37);
res = client1.batch_sample_neighbors(std::string("user2item"), node_ids, 4,
true, false);
ASSERT_EQ(res.first[1].size(), 1);
std::vector<uint64_t> nodes_ids = client2.random_sample_nodes("user", 0, 6);
std::vector<int64_t> nodes_ids = client2.random_sample_nodes("user", 0, 6);
ASSERT_EQ(nodes_ids.size(), 2);
ASSERT_EQ(true, (nodes_ids[0] == 59 && nodes_ids[1] == 37) ||
(nodes_ids[0] == 37 && nodes_ids[1] == 59));
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <unistd.h>
#include <condition_variable> // NOLINT
#include <fstream>
#include <iomanip>
#include <string>
#include <thread> // NOLINT
#include <unordered_set>
#include <vector>
#include "google/protobuf/text_format.h"
#include <chrono>
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/service/env.h"
#include "paddle/fluid/distributed/ps/service/sendrecv.pb.h"
#include "paddle/fluid/distributed/ps/table/common_graph_table.h"
#include "paddle/fluid/distributed/ps/table/graph/graph_node.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/phi/kernels/funcs/math_function.h"
namespace framework = paddle::framework;
namespace platform = paddle::platform;
namespace operators = paddle::operators;
namespace memory = paddle::memory;
namespace distributed = paddle::distributed;
std::vector<std::string> edges = {
std::string("37\t45\t0.34"), std::string("37\t145\t0.31"),
std::string("37\t112\t0.21"), std::string("96\t48\t1.4"),
std::string("96\t247\t0.31"), std::string("96\t111\t1.21"),
std::string("59\t45\t0.34"), std::string("59\t145\t0.31"),
std::string("59\t122\t0.21"), std::string("97\t48\t0.34"),
std::string("97\t247\t0.31"), std::string("97\t111\t0.21")};
// odd id:96 48 122 112
char edge_file_name[] = "edges.txt";
std::vector<std::string> nodes = {
std::string("user\t37\ta 0.34\tb 13 14\tc hello\td abc"),
std::string("user\t96\ta 0.31\tb 15 10\tc 96hello\td abcd"),
std::string("user\t59\ta 0.11\tb 11 14"),
std::string("user\t97\ta 0.11\tb 12 11"),
std::string("item\t45\ta 0.21"),
std::string("item\t145\ta 0.21"),
std::string("item\t112\ta 0.21"),
std::string("item\t48\ta 0.21"),
std::string("item\t247\ta 0.21"),
std::string("item\t111\ta 0.21"),
std::string("item\t46\ta 0.21"),
std::string("item\t146\ta 0.21"),
std::string("item\t122\ta 0.21"),
std::string("item\t49\ta 0.21"),
std::string("item\t248\ta 0.21"),
std::string("item\t113\ta 0.21")};
char node_file_name[] = "nodes.txt";
void prepare_file(char file_name[], std::vector<std::string> data) {
std::ofstream ofile;
ofile.open(file_name);
for (auto x : data) {
ofile << x << std::endl;
}
ofile.close();
}
void testGraphSample() {
#ifdef PADDLE_WITH_HETERPS
::paddle::distributed::GraphParameter table_proto;
table_proto.set_gpups_mode(true);
table_proto.set_gpups_mode_shard_num(127);
table_proto.set_gpu_num(2);
distributed::GraphTable graph_table, graph_table1;
graph_table.initialize(table_proto);
prepare_file(edge_file_name, edges);
graph_table.load(std::string(edge_file_name), std::string("e>"));
std::vector<paddle::framework::GpuPsCommGraph> res;
std::promise<int> prom;
std::future<int> fut = prom.get_future();
graph_table.set_graph_sample_callback(
[&res, &prom](std::vector<paddle::framework::GpuPsCommGraph> &res0) {
res = res0;
prom.set_value(0);
});
graph_table.start_graph_sampling();
fut.get();
graph_table.end_graph_sampling();
ASSERT_EQ(2, res.size());
// 37 59 97
for (int i = 0; i < (int)res[1].node_size; i++) {
std::cout << res[1].node_list[i].node_id << std::endl;
}
ASSERT_EQ(3, res[1].node_size);
::paddle::distributed::GraphParameter table_proto1;
table_proto1.set_gpups_mode(true);
table_proto1.set_gpups_mode_shard_num(127);
table_proto1.set_gpu_num(2);
table_proto1.set_gpups_graph_sample_class("BasicBfsGraphSampler");
table_proto1.set_gpups_graph_sample_args("5,5,1,1");
graph_table1.initialize(table_proto1);
graph_table1.load(std::string(edge_file_name), std::string("e>"));
std::vector<paddle::framework::GpuPsCommGraph> res1;
std::promise<int> prom1;
std::future<int> fut1 = prom1.get_future();
graph_table1.set_graph_sample_callback(
[&res1, &prom1](std::vector<paddle::framework::GpuPsCommGraph> &res0) {
res1 = res0;
prom1.set_value(0);
});
graph_table1.start_graph_sampling();
fut1.get();
graph_table1.end_graph_sampling();
// distributed::BasicBfsGraphSampler *sampler1 =
// (distributed::BasicBfsGraphSampler *)graph_table1.get_graph_sampler();
// sampler1->start_graph_sampling();
// std::this_thread::sleep_for (std::chrono::seconds(1));
// std::vector<paddle::framework::GpuPsCommGraph> res1;// =
// sampler1->fetch_sample_res();
ASSERT_EQ(2, res1.size());
// odd id:96 48 122 112
for (int i = 0; i < (int)res1[0].node_size; i++) {
std::cout << res1[0].node_list[i].node_id << std::endl;
}
ASSERT_EQ(4, res1[0].node_size);
#endif
}
TEST(testGraphSample, Run) { testGraphSample(); }
set(eager_deps phi_api hook_utils tensor_utils utils global_utils backward phi_tensor tracer layer autograd_meta grad_node_info grad_tensor_holder accumulation_node)
set(eager_deps phi_api hook_utils tensor_utils utils global_utils backward phi_tensor tracer layer autograd_meta grad_node_info grad_tensor_holder accumulation_node custom_operator_node)
set(fluid_deps tracer layer proto_desc operator op_registry variable_helper memcpy)
set(generated_deps dygraph_function dygraph_node)
set(generated_deps final_dygraph_function final_dygraph_node dygraph_function dygraph_node)
if(NOT ((NOT WITH_PYTHON) AND ON_INFER))
message("Performing Eager Dygraph Auto Code Generation")
......@@ -9,6 +10,8 @@ endif()
add_subdirectory(api)
add_subdirectory(accumulation)
add_subdirectory(custom_operator)
cc_library(grad_node_info SRCS grad_node_info.cc DEPS phi_api phi_tensor)
cc_library(grad_tensor_holder SRCS grad_tensor_holder.cc DEPS grad_node_info gradient_accumulator)
......
......@@ -24,7 +24,7 @@
#include "paddle/fluid/platform/errors.h"
#include "glog/logging.h"
DECLARE_bool(retain_grad_for_all_tensor);
namespace egr {
static void CopyOrAddTensor(paddle::experimental::Tensor* tensor,
......@@ -39,8 +39,8 @@ static void CopyOrAddTensor(paddle::experimental::Tensor* tensor,
}
std::vector<std::vector<paddle::experimental::Tensor>> GradNodeAccumulation::
operator()(
const std::vector<std::vector<paddle::experimental::Tensor>>& grads) {
operator()(const std::vector<std::vector<paddle::experimental::Tensor>>& grads,
bool create_graph) {
VLOG(3) << "Running Eager Backward Node: GradNodeAccumulation";
PADDLE_ENFORCE(grads.size() == 1,
paddle::platform::errors::Fatal(
......@@ -62,7 +62,7 @@ operator()(
grad_out = grads[0][0];
}
if (!weak_grad_.expired()) {
if (!weak_grad_.expired() && FLAGS_retain_grad_for_all_tensor) {
auto grad = weak_grad_.lock();
CopyOrAddTensor(grad.get(), grad_out);
}
......
......@@ -35,8 +35,15 @@ class GradNodeAccumulation : public GradNodeBase {
// Functor: perform backward computations
virtual std::vector<std::vector<paddle::experimental::Tensor>> operator()(
const std::vector<std::vector<paddle::experimental::Tensor>>& grads)
override;
const std::vector<std::vector<paddle::experimental::Tensor>>& grads,
bool create_graph = false) override;
void ClearTensorWrappers() override { VLOG(6) << "Do nothing here now"; }
bool IsTensorWrappersCleared() override {
VLOG(6) << "Do nothing here now";
return false;
}
std::string name() { return "GradNodeAccumulation"; }
......
......@@ -145,8 +145,8 @@ void GradNodeScale::SetTensorWrappers_X(
void GradNodeScale::SetAttributes_scale(float scale) { scale_ = scale; }
std::vector<std::vector<paddle::experimental::Tensor>> GradNodeScale::
operator()(
const std::vector<std::vector<paddle::experimental::Tensor>>& grads) {
operator()(const std::vector<std::vector<paddle::experimental::Tensor>>& grads,
bool create_graph) {
// 1. Check Output Size
PADDLE_ENFORCE(
((grads.size() == 1) && (grads[0].size() == 1)),
......
......@@ -39,8 +39,15 @@ class GradNodeScale : public GradNodeBase {
// Functor: perform backward computations
virtual std::vector<std::vector<paddle::experimental::Tensor>> operator()(
const std::vector<std::vector<paddle::experimental::Tensor>>& grads)
override;
const std::vector<std::vector<paddle::experimental::Tensor>>& grads,
bool create_graph = false) override;
void ClearTensorWrappers() override { VLOG(6) << "Do nothing here now"; }
bool IsTensorWrappersCleared() override {
VLOG(6) << "Do nothing here now";
return false;
}
void SetTensorWrappers_X(
const std::vector<paddle::experimental::Tensor>& tensors);
......
......@@ -86,9 +86,9 @@ paddle::experimental::Tensor scale(const paddle::experimental::Tensor& x,
scale_node->SetTensorWrappers_X({x});
// Set Grad out rank as same as fwd input and set stop gradient to bwd
scale_node->SetGradOutMeta(p_autograd_in, /*slot id*/ 0);
scale_node->SetGradOutMeta(x, /*slot id*/ 0);
// Set Grad out rank as same as fwd input and set stop gradient to bwd
scale_node->SetGradInMeta(p_autograd_out, /*slot id*/ 0);
scale_node->SetGradInMeta(out, /*slot id*/ 0);
// Set History for output set current Grad Node for
EagerUtils::SetHistory(p_autograd_out, scale_node);
......
......@@ -18,7 +18,7 @@
#include <atomic>
#include <memory>
#include "paddle/fluid/imperative/tracer.h"
#include "paddle/phi/api/ext/op_meta_info.h"
namespace egr {
class UniqueNameGenerator {
......@@ -70,6 +70,21 @@ class Controller {
void SetInEagerMode(bool in_eager_mode) { in_eager_mode_ = in_eager_mode; }
const std::unordered_map<std::string, std::vector<paddle::OpMetaInfo>>&
GetOpMetaInfoMap() {
return op_meta_info_map_;
}
void MergeOpMetaInfoMap(const std::unordered_map<
std::string, std::vector<paddle::OpMetaInfo>>& map) {
op_meta_info_map_.insert(map.begin(), map.end());
}
std::unordered_map<std::string, std::vector<std::unordered_map<int, int>>>&
GetCustomEdgesSlotMap() {
return custom_edges_slot_map_;
}
private:
Controller() = default;
static Controller* controller_;
......@@ -77,6 +92,11 @@ class Controller {
new paddle::imperative::Tracer()};
// TODO(jiabin): remove when we don't need imperative.
bool in_eager_mode_{false};
std::unordered_map<std::string, std::vector<paddle::OpMetaInfo>>
op_meta_info_map_;
/* op_type : {{grad_outputs}, {grad_inputs}, {input}, {output}, {attrs}}*/
std::unordered_map<std::string, std::vector<std::unordered_map<int, int>>>
custom_edges_slot_map_;
DISABLE_COPY_AND_ASSIGN(Controller);
};
......
......@@ -30,7 +30,8 @@ namespace egr_utils_api {
bool IsLeafTensor(const paddle::experimental::Tensor& target) {
std::shared_ptr<GradNodeBase> grad_node = EagerUtils::grad_node(target);
if (std::dynamic_pointer_cast<GradNodeAccumulation>(grad_node)) {
if (!grad_node ||
std::dynamic_pointer_cast<GradNodeAccumulation>(grad_node)) {
return true;
}
......
......@@ -27,6 +27,7 @@ add_custom_target(eager_final_state_codegen
set(tmp_python_c_output_path "${PADDLE_SOURCE_DIR}/paddle/fluid/pybind/tmp_eager_final_state_op_function_impl.h")
set(python_c_output_path "${PADDLE_SOURCE_DIR}/paddle/fluid/pybind/eager_final_state_op_function_impl.h")
add_custom_target(eager_final_state_python_c_codegen
COMMAND "${PYTHON_EXECUTABLE}" "${PADDLE_SOURCE_DIR}/paddle/fluid/eager/auto_code_generator/final_state_generator/python_c_gen.py"
"--api_yaml_path=${api_yaml_path}"
......
......@@ -28,6 +28,7 @@ namespace = ""
yaml_types_mapping = {
'int' : 'int', 'int32' : 'int32_t', 'int64' : 'int64_t', 'size_t' : 'size_t', \
'float' : 'float', 'double' : 'double', 'bool' : 'bool', \
'str' : 'std::string', \
'Backend' : 'paddle::experimental::Backend', 'DataLayout' : 'paddle::experimental::DataLayout', 'DataType' : 'paddle::experimental::DataType', \
'int64[]' : 'std::vector<int64_t>', 'int[]' : 'std::vector<int>',
'Tensor' : 'Tensor',
......@@ -148,6 +149,12 @@ def ReadBwdFile(filepath):
######################
### Yaml Parsers ###
######################
def RemoveSpecialSymbolsInName(string):
# Remove any name after '@'
ret = string.split("@")[0]
return ret
def IntermediateValidationCheck(intermediate_outputs, forward_returns_list):
# intermediate_outputs : [name0, name1, ...]
# forward_returns_list : [[ret_name, type, orig_pos], ...]
......@@ -166,15 +173,19 @@ def IntermediateValidationCheck(intermediate_outputs, forward_returns_list):
def ParseDispensable(string):
# string: "X, Y"
string = RemoveSpecialSymbolsInName(string)
return [v.strip() for v in string.split(",")]
def ParseIntermediate(string):
string = RemoveSpecialSymbolsInName(string)
return [v.strip() for v in string.split(",")]
def ParseNoNeedBuffer(string):
# string: "x, y"
string = RemoveSpecialSymbolsInName(string)
no_need_buffer_set = set()
for name in string.split(","):
no_need_buffer_set.add(name.strip())
......@@ -202,8 +213,11 @@ def ParseYamlArgs(string):
default_value = m.group(3).split("=")[1].strip() if len(
m.group(3).split("=")) > 1 else None
assert arg_type in yaml_types_mapping.keys()
assert arg_type in yaml_types_mapping.keys(
), f"The argument type {arg_type} in yaml config is not supported in yaml_types_mapping."
arg_type = yaml_types_mapping[arg_type]
arg_name = RemoveSpecialSymbolsInName(arg_name)
if "Tensor" in arg_type:
assert default_value is None
inputs_list.append([arg_name, arg_type, i])
......@@ -235,10 +249,12 @@ def ParseYamlReturns(string):
else:
ret_type = ret.strip()
assert ret_type in yaml_types_mapping.keys()
assert ret_type in yaml_types_mapping.keys(
), f"The return type {ret_type} in yaml config is not supported in yaml_types_mapping."
ret_type = yaml_types_mapping[ret_type]
assert "Tensor" in ret_type
ret_name = RemoveSpecialSymbolsInName(ret_name)
returns_list.append([ret_name, ret_type, i])
return returns_list
......@@ -462,6 +478,7 @@ def GenerateNodeDeclaration(fwd_api_name, backward_fwd_input_map,
# SetTensorWrapper Methods & TensorWrapper Members
set_tensor_wrapper_methods_str = ""
tensor_wrapper_members_str = ""
clear_tensor_wrapper_str = ""
for tname, (ttype, is_fwd_input, _) in backward_fwd_input_map.items():
if tname in no_need_buffer_set:
no_need_buffer = "true"
......@@ -483,6 +500,13 @@ def GenerateNodeDeclaration(fwd_api_name, backward_fwd_input_map,
"""
tensor_wrapper_members_str += PLAIN_TENSOR_MEMBER_TEMPLATE.format(
tensor_wrapper_name)
CLEAR_TENSOR_WRAPPERS_TEMPLATE = """
{}.clear();
"""
clear_tensor_wrapper_str += CLEAR_TENSOR_WRAPPERS_TEMPLATE.format(
tensor_wrapper_name)
else:
assert IsVectorTensorType(ttype)
SET_VECTOR_TENSOR_WRAPPER_TEMPLATE = """
......@@ -500,6 +524,15 @@ def GenerateNodeDeclaration(fwd_api_name, backward_fwd_input_map,
"""
tensor_wrapper_members_str += VECTOR_TENSOR_MEMBER_TEMPLATE.format(
tensor_wrapper_name)
CLEAR_TENSOR_WRAPPERS_TEMPLATE = """
for (auto tw: {}) {
tw.clear();
};
"""
clear_tensor_wrapper_str += CLEAR_TENSOR_WRAPPERS_TEMPLATE.format(
tensor_wrapper_name)
# End: SetTensorWrapper Methods & TensorWrapper Members
# SetAttributes & Attribute Members
......@@ -539,25 +572,37 @@ class {} : public egr::GradNodeBase {{
~{}() override = default;
virtual std::vector<std::vector<paddle::experimental::Tensor>> operator()(
const std::vector<std::vector<paddle::experimental::Tensor>>& grads) override;
const std::vector<std::vector<paddle::experimental::Tensor>>& grads, bool create_graph = false) override;
std::string name() override {{ return \" {} \"; }}
void ClearTensorWrappers() override {{
{}
is_tensor_wrappers_cleared = true;
}}
// SetTensorWrapperX, SetTensorWrapperY, ...
{}
// SetAttributes
{}
bool IsTensorWrappersCleared() override {{
return is_tensor_wrappers_cleared;
}}
private:
// TensorWrappers
{}
bool is_tensor_wrappers_cleared = false;
// Attributes
{}
}};
"""
node_declaration_str = NODE_DECLARATION_TEMPLATE.format(
grad_node_name, grad_node_name, grad_node_name, grad_node_name,
grad_node_name, set_tensor_wrapper_methods_str,
set_attribute_methods_str, tensor_wrapper_members_str,
attribute_members_str)
grad_node_name, clear_tensor_wrapper_str,
set_tensor_wrapper_methods_str, set_attribute_methods_str,
tensor_wrapper_members_str, attribute_members_str)
return node_declaration_str
......@@ -611,6 +656,7 @@ def GenerateNodeDefinition(fwd_api_name, bwd_api_name, backward_fwd_input_map,
else:
# Rearrange output order accordingly
returns_str += f"returns[{fwd_position}] = grad_api_returns[{grad_api_position}];\n"
returns_str += f"if(NeedComplexToRealConversion()) HandleComplexGradToRealGrad(&returns);\n"
returns_str += f"return returns;\n"
grad_node_name = GetGradNodeName(fwd_api_name)
......@@ -621,7 +667,7 @@ def GenerateNodeDefinition(fwd_api_name, bwd_api_name, backward_fwd_input_map,
grad_api_namespace = f"paddle::experimental"
FUNCTION_TEMPLATE = """
std::vector<std::vector<paddle::experimental::Tensor>> {}::operator()(const std::vector<std::vector<paddle::experimental::Tensor>>& grads) {{
std::vector<std::vector<paddle::experimental::Tensor>> {}::operator()(const std::vector<std::vector<paddle::experimental::Tensor>>& grads, bool create_graph) {{
// Call grad_api function
auto grad_api_returns = {}::{}({});
{}
......@@ -684,7 +730,7 @@ def GenerateNodeCreationCodes(
else:
# Tuple api_result
if IsPlainTensorType(rtype):
output_autograd_meta = f" egr::AutogradMeta* {output_autograd_meta_name} = egr::EagerUtils::autograd_meta(&api_result[{pos}]);"
output_autograd_meta = f" egr::AutogradMeta* {output_autograd_meta_name} = egr::EagerUtils::autograd_meta(&std::get<{pos}>(api_result));"
else:
assert IsVectorTensorType(rtype)
output_autograd_meta = f" std::vector<egr::AutogradMeta*> {output_autograd_meta_vec_name} = egr::EagerUtils::autograd_meta(&api_result[{pos}]);\n"
......@@ -721,8 +767,11 @@ def GenerateNodeCreationCodes(
else:
set_tensor_wrappers = f" grad_node->SetTensorWrapper{name}({name}, true);"
else:
if IsVectorTensorType(atype):
tw_name = f"api_result[{pos}]"
if num_fwd_outputs > 1:
# Aligned with forward output position
assert name in forward_outputs_position_map.keys()
fwd_output_pos = forward_outputs_position_map[name][1]
tw_name = f"std::get<{fwd_output_pos}>(api_result)"
else:
tw_name = f"api_result"
......@@ -738,7 +787,7 @@ def GenerateNodeCreationCodes(
set_edges_list = []
for name, (_, pos) in forward_inputs_position_map.items():
input_autograd_meta_name = GetAutoGradMetaName(name)
set_grad_out_meta = f" grad_node->SetGradOutMeta({input_autograd_meta_name}, {pos});"
set_grad_out_meta = f" grad_node->SetGradOutMeta({name}, {pos});"
set_edges = f" grad_node->AddEdges({input_autograd_meta_name}, {pos});"
set_grad_out_meta_list.append(set_grad_out_meta)
set_edges_list.append(set_edges)
......@@ -755,17 +804,18 @@ def GenerateNodeCreationCodes(
output_autograd_meta_name = GetAutoGradMetaName(name)
set_out_rank = f" egr::EagerUtils::SetOutRankWithSlot({output_autograd_meta_name}, {pos});"
set_history = f" egr::EagerUtils::SetHistory({output_autograd_meta_name}, grad_node);"
set_grad_in_meta = f" grad_node->SetGradInMeta({output_autograd_meta_name}, {pos});"
if num_outputs == 1:
set_retain_grad = f" egr::EagerUtils::CheckAndRetainGrad(api_result);"
set_grad_in_meta = f" grad_node->SetGradInMeta(api_result, {pos});"
else:
set_retain_grad = f" egr::EagerUtils::CheckAndRetainGrad(std::get<{pos}>(api_result));"
set_grad_in_meta = f" grad_node->SetGradInMeta(std::get<{pos}>(api_result), {pos});"
set_out_rank_list.append(set_out_rank)
set_history_list.append(set_history)
set_grad_in_meta_list.append(set_grad_in_meta)
if num_outputs == 1:
set_retain_grad = f" egr::EagerUtils::CheckAndRetainGrad(api_result);"
else:
set_retain_grad = f" egr::EagerUtils::CheckAndRetainGrad(api_result[{pos}]);"
set_retain_grad_list.append(set_retain_grad)
set_out_rank_str = "\n".join(set_out_rank_list)
set_history_str = "\n".join(set_history_list)
set_grad_in_meta_str = "\n".join(set_grad_in_meta_list)
......@@ -887,7 +937,7 @@ def GenerateForwardDefinition(fwd_api_name, bwd_api_name,
returns_list[0] = f"api_result"
else:
# Tuple api_result
returns_list[pos] = f"api_result[{pos}]"
returns_list[pos] = f"std::get<{pos}>(api_result)"
if IsPlainTensorType(rtype):
returns_type_list[pos] = "paddle::experimental::Tensor"
......@@ -910,8 +960,20 @@ def GenerateForwardDefinition(fwd_api_name, bwd_api_name,
backward_fwd_input_map, backward_grad_input_map,
backward_grad_output_map, backward_attrs_list, optional_inputs)
node_event_name = fwd_api_name + " node_creation"
NODE_CREATION_TEMPLATE = """{{\n
paddle::platform::RecordEvent node_creation_record_event(\"{}\", paddle::platform::TracerEventType::Operator, 1);\n
{}\n
}}"""
node_creation_str = NODE_CREATION_TEMPLATE.format(node_event_name,
node_creation_str)
dygraph_event_str = f"paddle::platform::RecordEvent dygraph_entrance_record_event(\"{fwd_api_name} dygraph\", paddle::platform::TracerEventType::Operator, 1);"
FORWARD_FUNCTION_TEMPLATE = """
{} {}({}) {{
{}
// Forward API Call
{}
......@@ -925,7 +987,7 @@ def GenerateForwardDefinition(fwd_api_name, bwd_api_name,
forward_function_name = GetForwardFunctionName(fwd_api_name)
forward_function_str = FORWARD_FUNCTION_TEMPLATE.format(
returns_type_str, forward_function_name, inputs_args_definition_str,
forward_call_str, node_creation_str, returns_str)
dygraph_event_str, forward_call_str, node_creation_str, returns_str)
forward_function_declaration_str = f"{returns_type_str} {forward_function_name}({inputs_args_declaration_str});"
return forward_function_str, forward_function_declaration_str
......@@ -1025,7 +1087,7 @@ def GenerateNodeCCFile(filepath, node_definition_str):
#include "paddle/fluid/eager/api/generated/eager_generated/backwards/nodes.h"
#include "paddle/fluid/eager/to_static/run_program_op_node.h"
#include "paddle/phi/api/include/sparse_api.h"
#include "paddle/phi/api/backward/sparse_bw_api.h"
"""
file_contents += node_definition_str
with open(filepath, 'a') as f:
......@@ -1052,6 +1114,8 @@ def GenerateForwardCCFile(filepath, forward_definition_str):
#include "paddle/phi/api/include/sparse_api.h"
#include "paddle/fluid/eager/api/utils/global_utils.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
"""
file_contents += GenerateCoreOpInfoDefinition()
......@@ -1218,7 +1282,7 @@ if __name__ == "__main__":
# Node Definition Generation
definition_declaration_pair = GenerateForwardDefinition(
fwd_api_name, bwd_api_name, forward_inputs_position_map,
forward_outputs_position_map, forward_attrs_list,
forward_outputs_position_map, orig_forward_attrs_list,
backward_fwd_input_map, backward_grad_input_map,
backward_grad_output_map, backward_attrs_list, optional_inputs,
intermediate_outputs)
......@@ -1230,7 +1294,7 @@ if __name__ == "__main__":
# For python-level API dispatch
CollectCoreOpsInformation(fwd_api_name, forward_inputs_position_map,
forward_outputs_position_map,
forward_attrs_list)
orig_forward_attrs_list)
if len(namespace) > 0:
forward_definition_str += f"""namespace {namespace} {{
......
......@@ -145,8 +145,7 @@ class AutogradMeta : public AbstractAutogradMeta {
private:
// TODO(jiabin) :Should we use pointer instead of object?
std::shared_ptr<paddle::experimental::Tensor> grad_{
std::make_shared<paddle::experimental::Tensor>(
egr::Controller::Instance().GenerateUniqueName("@grad"))};
std::make_shared<paddle::experimental::Tensor>()};
// GradNodeBase is base class of all grad op which is a
// wrapper for grad op. This class will make grad op easy
......
此差异已折叠。
......@@ -19,13 +19,21 @@
namespace egr {
// run_backward():
// Backward():
// tensors corresponds to those lived in the backward graph
// each grad_tensors[i] keeps the value for its corresponding tensors[i]
void RunBackward(const std::vector<paddle::experimental::Tensor> &tensors,
const std::vector<paddle::experimental::Tensor> &grad_tensors,
void Backward(const std::vector<paddle::experimental::Tensor>& tensors,
const std::vector<paddle::experimental::Tensor>& grad_tensors,
bool retain_graph = false);
std::vector<paddle::experimental::Tensor> Grad(
const std::vector<paddle::experimental::Tensor>& tensors,
const std::vector<paddle::experimental::Tensor>& inputs,
const std::vector<paddle::experimental::Tensor>& grad_tensors = {},
bool retain_graph = false, bool create_graph = false,
bool only_inputs = false, bool allow_unused = false,
const std::vector<paddle::experimental::Tensor>& no_grad_vars = {});
// Reserved for gradient()
} // namespace egr
cc_library(custom_operator_node SRCS custom_operator_node.cc DEPS phi_tensor phi_api grad_node_info custom_operator op_meta_info)
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/eager/custom_operator/custom_operator_node.h"
#include "paddle/fluid/framework/custom_operator.h"
#include "paddle/fluid/framework/op_meta_info_helper.h"
#include "paddle/phi/api/ext/op_meta_info.h"
#include "paddle/phi/core/dense_tensor.h"
namespace egr {
std::vector<std::vector<paddle::experimental::Tensor>> RunCustomOpNode::
operator()(const std::vector<std::vector<paddle::experimental::Tensor>>& grads,
bool create_graph) {
paddle::CustomOpKernelContext ctx;
auto grad_inputs_name = paddle::framework::OpMetaInfoHelper::GetInputs(
egr::Controller::Instance().GetOpMetaInfoMap().at(op_type_)[1]);
auto grad_outputs_names = paddle::framework::OpMetaInfoHelper::GetOutputs(
egr::Controller::Instance().GetOpMetaInfoMap().at(op_type_)[1]);
auto map = egr::Controller::Instance().GetCustomEdgesSlotMap().at(op_type_);
auto kernel_map = egr::Controller::Instance().GetOpMetaInfoMap();
std::vector<std::vector<paddle::experimental::Tensor>> tmp_ins(
grad_inputs_name.size());
VLOG(7) << " Prepare Backward inputs of grads with size: " << grads.size()
<< ", whose grad_inputs_name size is: " << grad_inputs_name.size();
for (size_t i = 0; i < grads.size(); i++) {
if (map[1].find(i) != map[1].end()) {
VLOG(7) << "Insert grad: " << i << " to grad_inputs: " << map[1][i];
tmp_ins[map[1][i]] = grads[i];
}
}
for (auto it : fwd_outs) {
VLOG(7) << "Insert fwd_outs to grad_inputs: " << it.first;
tmp_ins[it.first] = RunCustomOpNode::Recover(&(it.second));
}
for (auto it : fwd_ins) {
VLOG(7) << "Insert fwd_ins to grad_inputs: " << it.first;
tmp_ins[it.first] = RunCustomOpNode::Recover(&(it.second));
}
VLOG(6) << "Prepare Grad inputs";
for (const auto& in : tmp_ins) {
ctx.EmplaceBackInputs(in);
}
VLOG(6) << "Prepare Grad attrs";
ctx.EmplaceBackAttrs(attrs_);
std::vector<std::vector<paddle::experimental::Tensor>> outs(
GetEdges().size());
std::vector<std::vector<paddle::experimental::Tensor>> tmp_outs(
grad_outputs_names.size());
VLOG(6) << "Prepare Grad outputs for size: " << grad_outputs_names.size();
for (size_t i = 0; i < GetEdges().size(); i++) {
if (map[0].find(i) != map[0].end()) {
VLOG(7) << "Insert grad outputs: " << i
<< " with size: " << GetEdges()[i].size()
<< " to tmp_outputs: " << map[0][i];
for (size_t j = 0; j < GetEdges()[i].size(); j++) {
outs[i].emplace_back(/* init it incase of copy nullptr of shared_ptr */
std::make_shared<phi::DenseTensor>(
phi::DataType::UNDEFINED),
egr::Controller::Instance().GenerateUniqueName(
"custom_tmp_grad"));
}
tmp_outs[map[0][i]] = outs[i];
}
}
for (size_t i = 0; i < tmp_outs.size(); i++) {
VLOG(7) << "Prepare grad outputs size: " << tmp_outs[i].size();
ctx.EmplaceBackOutputs(tmp_outs[i]);
}
VLOG(7) << "Run Kernel of Grad Custom Op: " << op_type_;
(*paddle::framework::OpMetaInfoHelper::GetKernelFn(
kernel_map.at(op_type_)[1]))(&ctx);
return outs;
}
} // namespace egr
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/eager/autograd_meta.h"
#include "paddle/fluid/eager/grad_node_info.h"
#include "paddle/fluid/eager/hooks.h"
#include "paddle/fluid/eager/tensor_wrapper.h"
#include "paddle/fluid/framework/custom_operator.h"
#include "paddle/utils/any.h"
namespace egr {
class RunCustomOpNode : public GradNodeBase {
public:
// Constructor: configure fwd input tensors to grad node
explicit RunCustomOpNode(size_t bwd_in_slot_num, size_t bwd_out_slot_num,
const std::string& op_type)
: GradNodeBase(bwd_in_slot_num, bwd_out_slot_num), op_type_(op_type) {
VLOG(6) << "Construct RunCustomOpNode for op: " << op_type;
}
~RunCustomOpNode() override {
VLOG(6) << "Destruct RunCustomOpNode for op: " << op_type_;
}
// Functor: perform backward computations
virtual std::vector<std::vector<paddle::experimental::Tensor>> operator()(
const std::vector<std::vector<paddle::experimental::Tensor>>& grads,
bool create_graph) override;
std::string name() {
return paddle::string::Sprintf("RunCustomOpNode: %s_grad", op_type_);
}
static std::vector<egr::TensorWrapper> ConstructTensorWrapper(
const std::vector<paddle::experimental::Tensor>& fwd_var) {
std::vector<egr::TensorWrapper> res;
for (auto const& var : fwd_var) {
res.emplace_back(var);
}
return res;
}
static std::vector<paddle::experimental::Tensor> Recover(
std::vector<egr::TensorWrapper>* fwd_var) {
std::vector<paddle::experimental::Tensor> res;
for (size_t i = 0; i < fwd_var->size(); i++) {
res.emplace_back(fwd_var->at(i).recover(nullptr));
}
return res;
}
void ClearTensorWrappers() override { VLOG(6) << "Do nothing here now"; }
bool IsTensorWrappersCleared() override {
VLOG(6) << "Do nothing here now";
return false;
}
void SetAttrs(const std::vector<paddle::any>& attr) { attrs_ = attr; }
public:
std::unordered_map<int, std::vector<egr::TensorWrapper>> fwd_outs;
std::unordered_map<int, std::vector<egr::TensorWrapper>> fwd_ins;
std::unordered_map<int, int> grads2grad_in_map;
private:
std::vector<paddle::any> attrs_;
std::string op_type_{""};
};
} // namespace egr
此差异已折叠。
......@@ -21,6 +21,11 @@
namespace egr {
void GradTensorHolder::SetBufferSlotRankZeros(size_t slot_id, size_t rank) {
buffer_[slot_id][rank] =
paddle::experimental::zeros_like(buffer_[slot_id][rank]);
}
void GradTensorHolder::add(size_t slot_id, size_t rank,
const paddle::experimental::Tensor& t,
bool fill_one) {
......
......@@ -26,12 +26,13 @@ namespace egr {
* GradTensorHolder should have as same format as forward output **/
class GradTensorHolder {
public:
explicit GradTensorHolder(const std::vector<GradSlotMeta>& meta) {
VLOG(7) << "Init GradTensorHolder with meta size: " << meta.size();
buffer_.resize(meta.size());
explicit GradTensorHolder(
const std::vector<std::vector<GradSlotMeta>>& metas) {
VLOG(7) << "Init GradTensorHolder with meta size: " << metas.size();
buffer_.resize(metas.size());
for (size_t i = 0; i < buffer_.size(); i++) {
VLOG(7) << "Init GradTensorHolder with meta rank: " << meta[i].Size();
buffer_[i].resize(meta[i].Size());
VLOG(7) << "Init GradTensorHolder with meta rank: " << metas[i].size();
buffer_[i].resize(metas[i].size());
}
}
......@@ -56,6 +57,8 @@ class GradTensorHolder {
return buffer_;
}
void SetBufferSlotRankZeros(size_t slot_id, size_t rank);
private:
std::vector<std::vector<paddle::experimental::Tensor>> buffer_;
};
......
此差异已折叠。
......@@ -32,8 +32,8 @@ class GradTestNode : public egr::GradNodeBase {
GradTestNode() : GradNodeBase() { val_ = 1.0; }
std::string name() override { return "GradTestNode"; }
std::vector<std::vector<paddle::experimental::Tensor>> operator()(
const std::vector<std::vector<paddle::experimental::Tensor>>& grads)
override {
const std::vector<std::vector<paddle::experimental::Tensor>>& grads,
bool create_graph = false) override {
val_ = std::dynamic_pointer_cast<phi::DenseTensor>(grads[0][0].impl())
->data<float>()[0];
phi::DenseTensorMeta meta =
......@@ -49,6 +49,11 @@ class GradTestNode : public egr::GradNodeBase {
std::vector<std::vector<paddle::experimental::Tensor>> res = {{et1}};
return res;
}
void ClearTensorWrappers() override { VLOG(6) << "Do nothing here now"; }
bool IsTensorWrappersCleared() override {
VLOG(6) << "Do nothing here now";
return false;
}
float val_;
};
} // namespace eager_test
......@@ -41,6 +41,8 @@ PD_DECLARE_KERNEL(matmul, CPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(matmul_grad, CPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(add, CPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(add_grad, CPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(sum, CPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(sum_grad, CPU, ALL_LAYOUT);
namespace paddle {
namespace imperative {
......
......@@ -43,6 +43,8 @@ PD_DECLARE_KERNEL(matmul, GPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(matmul_grad, GPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(add, GPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(add_grad, GPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(sum, GPU, ALL_LAYOUT);
PD_DECLARE_KERNEL(sum_grad, GPU, ALL_LAYOUT);
namespace paddle {
namespace imperative {
......
......@@ -5,6 +5,7 @@ cc_test(test_egr_task_backward SRCS backward_test.cc DEPS ${eager_deps} ${fluid_
cc_test(test_egr_task_hook SRCS hook_test.cc DEPS ${eager_deps} ${fluid_deps} eager_scale scale_node)
cc_test(test_egr_task_cross_batch SRCS cross_batch_accumulation_test.cc DEPS ${eager_deps} ${fluid_deps} eager_scale scale_node)
cc_test(test_egr_task_fwd_bwd_joint SRCS fwd_bwd_joint_test.cc DEPS ${eager_deps} ${fluid_deps} eager_scale scale_node)
cc_test(test_egr_task_grad SRCS grad_test.cc DEPS ${eager_deps} ${fluid_deps} eager_scale scale_node)
if(NOT ((NOT WITH_PYTHON) AND ON_INFER))
cc_test(test_egr_task_hook_intermidiate SRCS hook_test_intermidiate.cc DEPS ${eager_deps} ${fluid_deps} ${generated_deps} dygraph_node)
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册