Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
a2d7e1d7
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a2d7e1d7
编写于
1月 03, 2023
作者:
W
wangzhen38
提交者:
GitHub
1月 03, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[code_style fix] graph_brpc_client cpplint (#49457)
上级
0de94cd9
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
84 addition
and
60 deletion
+84
-60
paddle/fluid/distributed/ps/service/graph_brpc_client.cc
paddle/fluid/distributed/ps/service/graph_brpc_client.cc
+84
-60
未找到文件。
paddle/fluid/distributed/ps/service/graph_brpc_client.cc
浏览文件 @
a2d7e1d7
...
@@ -83,7 +83,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
...
@@ -83,7 +83,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
request_call_num
,
request_call_num
,
[
&
,
node_id_buckets
,
query_idx_buckets
,
request_call_num
](
void
*
done
)
{
[
&
,
node_id_buckets
,
query_idx_buckets
,
request_call_num
](
void
*
done
)
{
int
ret
=
0
;
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
auto
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
)
;
size_t
fail_num
=
0
;
size_t
fail_num
=
0
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
++
request_idx
)
{
...
@@ -97,7 +97,8 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
...
@@ -97,7 +97,8 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
std
::
unique_ptr
<
char
[]
>
buffer_wrapper
(
new
char
[
bytes_size
]);
std
::
unique_ptr
<
char
[]
>
buffer_wrapper
(
new
char
[
bytes_size
]);
char
*
buffer
=
buffer_wrapper
.
get
();
char
*
buffer
=
buffer_wrapper
.
get
();
io_buffer_itr
.
copy_and_forward
((
void
*
)(
buffer
),
bytes_size
);
io_buffer_itr
.
copy_and_forward
(
reinterpret_cast
<
void
*>
(
buffer
),
bytes_size
);
for
(
size_t
feat_idx
=
0
;
feat_idx
<
feature_names
.
size
();
for
(
size_t
feat_idx
=
0
;
feat_idx
<
feature_names
.
size
();
++
feat_idx
)
{
++
feat_idx
)
{
...
@@ -105,7 +106,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
...
@@ -105,7 +106,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
node_idx
<
query_idx_buckets
.
at
(
request_idx
).
size
();
node_idx
<
query_idx_buckets
.
at
(
request_idx
).
size
();
++
node_idx
)
{
++
node_idx
)
{
int
query_idx
=
query_idx_buckets
.
at
(
request_idx
).
at
(
node_idx
);
int
query_idx
=
query_idx_buckets
.
at
(
request_idx
).
at
(
node_idx
);
size_t
feat_len
=
*
(
size_t
*
)
(
buffer
);
size_t
feat_len
=
*
reinterpret_cast
<
size_t
*>
(
buffer
);
buffer
+=
sizeof
(
size_t
);
buffer
+=
sizeof
(
size_t
);
auto
feature
=
std
::
string
(
buffer
,
feat_len
);
auto
feature
=
std
::
string
(
buffer
,
feat_len
);
res
[
feat_idx
][
query_idx
]
=
feature
;
res
[
feat_idx
][
query_idx
]
=
feature
;
...
@@ -132,9 +133,11 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
...
@@ -132,9 +133,11 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
size_t
node_num
=
node_id_buckets
[
request_idx
].
size
();
size_t
node_num
=
node_id_buckets
[
request_idx
].
size
();
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
&
idx_
,
sizeof
(
int
));
closure
->
request
(
request_idx
)
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
node_id_buckets
[
request_idx
].
data
(),
->
add_params
(
reinterpret_cast
<
char
*>
(
&
idx_
),
sizeof
(
int
));
closure
->
request
(
request_idx
)
->
add_params
(
reinterpret_cast
<
char
*>
(
node_id_buckets
[
request_idx
].
data
()),
sizeof
(
int64_t
)
*
node_num
);
sizeof
(
int64_t
)
*
node_num
);
std
::
string
joint_feature_name
=
std
::
string
joint_feature_name
=
paddle
::
string
::
join_strings
(
feature_names
,
'\t'
);
paddle
::
string
::
join_strings
(
feature_names
,
'\t'
);
...
@@ -158,7 +161,7 @@ std::future<int32_t> GraphBrpcClient::clear_nodes(uint32_t table_id,
...
@@ -158,7 +161,7 @@ std::future<int32_t> GraphBrpcClient::clear_nodes(uint32_t table_id,
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
server_size
,
[
&
,
server_size
=
this
->
server_size
](
void
*
done
)
{
server_size
,
[
&
,
server_size
=
this
->
server_size
](
void
*
done
)
{
int
ret
=
0
;
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
auto
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
)
;
size_t
fail_num
=
0
;
size_t
fail_num
=
0
;
for
(
size_t
request_idx
=
0
;
request_idx
<
server_size
;
++
request_idx
)
{
for
(
size_t
request_idx
=
0
;
request_idx
<
server_size
;
++
request_idx
)
{
if
(
closure
->
check_response
(
request_idx
,
PS_GRAPH_CLEAR
)
!=
0
)
{
if
(
closure
->
check_response
(
request_idx
,
PS_GRAPH_CLEAR
)
!=
0
)
{
...
@@ -177,8 +180,10 @@ std::future<int32_t> GraphBrpcClient::clear_nodes(uint32_t table_id,
...
@@ -177,8 +180,10 @@ std::future<int32_t> GraphBrpcClient::clear_nodes(uint32_t table_id,
closure
->
request
(
server_index
)
->
set_cmd_id
(
PS_GRAPH_CLEAR
);
closure
->
request
(
server_index
)
->
set_cmd_id
(
PS_GRAPH_CLEAR
);
closure
->
request
(
server_index
)
->
set_table_id
(
table_id
);
closure
->
request
(
server_index
)
->
set_table_id
(
table_id
);
closure
->
request
(
server_index
)
->
set_client_id
(
_client_id
);
closure
->
request
(
server_index
)
->
set_client_id
(
_client_id
);
closure
->
request
(
server_index
)
->
add_params
((
char
*
)
&
type_id
,
sizeof
(
int
));
closure
->
request
(
server_index
)
closure
->
request
(
server_index
)
->
add_params
((
char
*
)
&
idx_
,
sizeof
(
int
));
->
add_params
(
reinterpret_cast
<
char
*>
(
&
type_id
),
sizeof
(
int
));
closure
->
request
(
server_index
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
idx_
),
sizeof
(
int
));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
closure
->
cntl
(
server_index
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
closure
->
cntl
(
server_index
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
rpc_stub
.
service
(
closure
->
cntl
(
server_index
),
rpc_stub
.
service
(
closure
->
cntl
(
server_index
),
...
@@ -217,7 +222,7 @@ std::future<int32_t> GraphBrpcClient::add_graph_node(
...
@@ -217,7 +222,7 @@ std::future<int32_t> GraphBrpcClient::add_graph_node(
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
request_call_num
,
[
&
,
request_call_num
](
void
*
done
)
{
request_call_num
,
[
&
,
request_call_num
](
void
*
done
)
{
int
ret
=
0
;
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
auto
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
)
;
size_t
fail_num
=
0
;
size_t
fail_num
=
0
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
++
request_idx
)
{
...
@@ -239,16 +244,18 @@ std::future<int32_t> GraphBrpcClient::add_graph_node(
...
@@ -239,16 +244,18 @@ std::future<int32_t> GraphBrpcClient::add_graph_node(
closure
->
request
(
request_idx
)
->
set_table_id
(
table_id
);
closure
->
request
(
request_idx
)
->
set_table_id
(
table_id
);
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
size_t
node_num
=
request_bucket
[
request_idx
].
size
();
size_t
node_num
=
request_bucket
[
request_idx
].
size
();
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
&
idx_
,
sizeof
(
int
));
closure
->
request
(
request_idx
)
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
request_bucket
[
request_idx
].
data
(),
->
add_params
(
reinterpret_cast
<
char
*>
(
&
idx_
),
sizeof
(
int
));
closure
->
request
(
request_idx
)
->
add_params
(
reinterpret_cast
<
char
*>
(
request_bucket
[
request_idx
].
data
()),
sizeof
(
int64_t
)
*
node_num
);
sizeof
(
int64_t
)
*
node_num
);
if
(
add_weight
)
{
if
(
add_weight
)
{
bool
weighted
[
is_weighted_bucket
[
request_idx
].
size
()
+
1
];
bool
weighted
[
is_weighted_bucket
[
request_idx
].
size
()
+
1
];
for
(
size_t
j
=
0
;
j
<
is_weighted_bucket
[
request_idx
].
size
();
j
++
)
for
(
size_t
j
=
0
;
j
<
is_weighted_bucket
[
request_idx
].
size
();
j
++
)
weighted
[
j
]
=
is_weighted_bucket
[
request_idx
][
j
];
weighted
[
j
]
=
is_weighted_bucket
[
request_idx
][
j
];
closure
->
request
(
request_idx
)
closure
->
request
(
request_idx
)
->
add_params
(
(
char
*
)
weighted
,
->
add_params
(
reinterpret_cast
<
char
*>
(
weighted
)
,
sizeof
(
bool
)
*
is_weighted_bucket
[
request_idx
].
size
());
sizeof
(
bool
)
*
is_weighted_bucket
[
request_idx
].
size
());
}
}
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
...
@@ -280,7 +287,7 @@ std::future<int32_t> GraphBrpcClient::remove_graph_node(
...
@@ -280,7 +287,7 @@ std::future<int32_t> GraphBrpcClient::remove_graph_node(
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
request_call_num
,
[
&
,
request_call_num
](
void
*
done
)
{
request_call_num
,
[
&
,
request_call_num
](
void
*
done
)
{
int
ret
=
0
;
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
auto
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
)
;
size_t
fail_num
=
0
;
size_t
fail_num
=
0
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
++
request_idx
)
{
...
@@ -303,9 +310,11 @@ std::future<int32_t> GraphBrpcClient::remove_graph_node(
...
@@ -303,9 +310,11 @@ std::future<int32_t> GraphBrpcClient::remove_graph_node(
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
size_t
node_num
=
request_bucket
[
request_idx
].
size
();
size_t
node_num
=
request_bucket
[
request_idx
].
size
();
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
&
idx_
,
sizeof
(
int
));
closure
->
request
(
request_idx
)
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
request_bucket
[
request_idx
].
data
(),
->
add_params
(
reinterpret_cast
<
char
*>
(
&
idx_
),
sizeof
(
int
));
closure
->
request
(
request_idx
)
->
add_params
(
reinterpret_cast
<
char
*>
(
request_bucket
[
request_idx
].
data
()),
sizeof
(
int64_t
)
*
node_num
);
sizeof
(
int64_t
)
*
node_num
);
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
...
@@ -335,7 +344,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
...
@@ -335,7 +344,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
}
}
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
1
,
[
&
](
void
*
done
)
{
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
1
,
[
&
](
void
*
done
)
{
int
ret
=
0
;
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
auto
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
)
;
if
(
closure
->
check_response
(
0
,
PS_GRAPH_SAMPLE_NODES_FROM_ONE_SERVER
)
!=
if
(
closure
->
check_response
(
0
,
PS_GRAPH_SAMPLE_NODES_FROM_ONE_SERVER
)
!=
0
)
{
0
)
{
ret
=
-
1
;
ret
=
-
1
;
...
@@ -345,10 +354,11 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
...
@@ -345,10 +354,11 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
std
::
unique_ptr
<
char
[]
>
buffer_wrapper
(
new
char
[
bytes_size
]);
std
::
unique_ptr
<
char
[]
>
buffer_wrapper
(
new
char
[
bytes_size
]);
char
*
buffer
=
buffer_wrapper
.
get
();
char
*
buffer
=
buffer_wrapper
.
get
();
io_buffer_itr
.
copy_and_forward
((
void
*
)(
buffer
),
bytes_size
);
io_buffer_itr
.
copy_and_forward
(
reinterpret_cast
<
void
*>
(
buffer
),
bytes_size
);
size_t
node_num
=
*
(
size_t
*
)
buffer
;
size_t
node_num
=
*
reinterpret_cast
<
size_t
*>
(
buffer
)
;
int
*
actual_sizes
=
(
int
*
)
(
buffer
+
sizeof
(
size_t
));
int
*
actual_sizes
=
reinterpret_cast
<
int
*>
(
buffer
+
sizeof
(
size_t
));
char
*
node_buffer
=
buffer
+
sizeof
(
size_t
)
+
sizeof
(
int
)
*
node_num
;
char
*
node_buffer
=
buffer
+
sizeof
(
size_t
)
+
sizeof
(
int
)
*
node_num
;
int
offset
=
0
;
int
offset
=
0
;
...
@@ -357,11 +367,11 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
...
@@ -357,11 +367,11 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
int
start
=
0
;
int
start
=
0
;
while
(
start
<
actual_size
)
{
while
(
start
<
actual_size
)
{
res
[
node_idx
].
emplace_back
(
res
[
node_idx
].
emplace_back
(
*
(
int64_t
*
)
(
node_buffer
+
offset
+
start
));
*
reinterpret_cast
<
int64_t
*>
(
node_buffer
+
offset
+
start
));
start
+=
GraphNode
::
id_size
;
start
+=
GraphNode
::
id_size
;
if
(
need_weight
)
{
if
(
need_weight
)
{
res_weight
[
node_idx
].
emplace_back
(
res_weight
[
node_idx
].
emplace_back
(
*
(
float
*
)
(
node_buffer
+
offset
+
start
));
*
reinterpret_cast
<
float
*>
(
node_buffer
+
offset
+
start
));
start
+=
GraphNode
::
weight_size
;
start
+=
GraphNode
::
weight_size
;
}
}
}
}
...
@@ -373,16 +383,19 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
...
@@ -373,16 +383,19 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
auto
promise
=
std
::
make_shared
<
std
::
promise
<
int32_t
>>
();
auto
promise
=
std
::
make_shared
<
std
::
promise
<
int32_t
>>
();
closure
->
add_promise
(
promise
);
closure
->
add_promise
(
promise
);
std
::
future
<
int
>
fut
=
promise
->
get_future
();
std
::
future
<
int
>
fut
=
promise
->
get_future
();
;
closure
->
request
(
0
)
->
set_cmd_id
(
PS_GRAPH_SAMPLE_NODES_FROM_ONE_SERVER
);
closure
->
request
(
0
)
->
set_cmd_id
(
PS_GRAPH_SAMPLE_NODES_FROM_ONE_SERVER
);
closure
->
request
(
0
)
->
set_table_id
(
table_id
);
closure
->
request
(
0
)
->
set_table_id
(
table_id
);
closure
->
request
(
0
)
->
set_client_id
(
_client_id
);
closure
->
request
(
0
)
->
set_client_id
(
_client_id
);
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
idx_
,
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
idx_
),
closure
->
request
(
0
)
->
add_params
((
char
*
)
node_ids
.
data
(),
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
node_ids
.
data
()),
sizeof
(
int64_t
)
*
node_ids
.
size
());
sizeof
(
int64_t
)
*
node_ids
.
size
());
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
sample_size
,
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
sample_size
),
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
need_weight
,
sizeof
(
bool
));
sizeof
(
int
));
;
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
need_weight
),
sizeof
(
bool
));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
closure
->
cntl
(
0
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
closure
->
cntl
(
0
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
...
@@ -420,7 +433,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
...
@@ -420,7 +433,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
request_call_num
,
request_call_num
,
[
&
,
node_id_buckets
,
query_idx_buckets
,
request_call_num
](
void
*
done
)
{
[
&
,
node_id_buckets
,
query_idx_buckets
,
request_call_num
](
void
*
done
)
{
int
ret
=
0
;
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
auto
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
)
;
size_t
fail_num
=
0
;
size_t
fail_num
=
0
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
++
request_idx
)
{
...
@@ -434,10 +447,12 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
...
@@ -434,10 +447,12 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
std
::
unique_ptr
<
char
[]
>
buffer_wrapper
(
new
char
[
bytes_size
]);
std
::
unique_ptr
<
char
[]
>
buffer_wrapper
(
new
char
[
bytes_size
]);
char
*
buffer
=
buffer_wrapper
.
get
();
char
*
buffer
=
buffer_wrapper
.
get
();
io_buffer_itr
.
copy_and_forward
((
void
*
)(
buffer
),
bytes_size
);
io_buffer_itr
.
copy_and_forward
(
reinterpret_cast
<
void
*>
(
buffer
),
bytes_size
);
size_t
node_num
=
*
(
size_t
*
)
buffer
;
size_t
node_num
=
*
reinterpret_cast
<
size_t
*>
(
buffer
);
int
*
actual_sizes
=
(
int
*
)(
buffer
+
sizeof
(
size_t
));
int
*
actual_sizes
=
reinterpret_cast
<
int
*>
(
buffer
+
sizeof
(
size_t
));
char
*
node_buffer
=
char
*
node_buffer
=
buffer
+
sizeof
(
size_t
)
+
sizeof
(
int
)
*
node_num
;
buffer
+
sizeof
(
size_t
)
+
sizeof
(
int
)
*
node_num
;
...
@@ -448,11 +463,11 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
...
@@ -448,11 +463,11 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
int
start
=
0
;
int
start
=
0
;
while
(
start
<
actual_size
)
{
while
(
start
<
actual_size
)
{
res
[
query_idx
].
emplace_back
(
res
[
query_idx
].
emplace_back
(
*
(
int64_t
*
)
(
node_buffer
+
offset
+
start
));
*
reinterpret_cast
<
int64_t
*>
(
node_buffer
+
offset
+
start
));
start
+=
GraphNode
::
id_size
;
start
+=
GraphNode
::
id_size
;
if
(
need_weight
)
{
if
(
need_weight
)
{
res_weight
[
query_idx
].
emplace_back
(
res_weight
[
query_idx
].
emplace_back
(
*
(
float
*
)
(
node_buffer
+
offset
+
start
));
*
reinterpret_cast
<
float
*>
(
node_buffer
+
offset
+
start
));
start
+=
GraphNode
::
weight_size
;
start
+=
GraphNode
::
weight_size
;
}
}
}
}
...
@@ -477,14 +492,16 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
...
@@ -477,14 +492,16 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
size_t
node_num
=
node_id_buckets
[
request_idx
].
size
();
size_t
node_num
=
node_id_buckets
[
request_idx
].
size
();
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
&
idx_
,
sizeof
(
int
));
closure
->
request
(
request_idx
)
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
node_id_buckets
[
request_idx
].
data
(),
->
add_params
(
reinterpret_cast
<
char
*>
(
&
idx_
),
sizeof
(
int
));
closure
->
request
(
request_idx
)
->
add_params
(
reinterpret_cast
<
char
*>
(
node_id_buckets
[
request_idx
].
data
()),
sizeof
(
int64_t
)
*
node_num
);
sizeof
(
int64_t
)
*
node_num
);
closure
->
request
(
request_idx
)
closure
->
request
(
request_idx
)
->
add_params
(
(
char
*
)
&
sample_size
,
sizeof
(
int
));
->
add_params
(
reinterpret_cast
<
char
*>
(
&
sample_size
)
,
sizeof
(
int
));
closure
->
request
(
request_idx
)
closure
->
request
(
request_idx
)
->
add_params
(
(
char
*
)
&
need_weight
,
sizeof
(
bool
));
->
add_params
(
reinterpret_cast
<
char
*>
(
&
need_weight
)
,
sizeof
(
bool
));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
closure
->
cntl
(
request_idx
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
closure
->
cntl
(
request_idx
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
...
@@ -505,7 +522,7 @@ std::future<int32_t> GraphBrpcClient::random_sample_nodes(
...
@@ -505,7 +522,7 @@ std::future<int32_t> GraphBrpcClient::random_sample_nodes(
std
::
vector
<
int64_t
>
&
ids
)
{
std
::
vector
<
int64_t
>
&
ids
)
{
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
1
,
[
&
](
void
*
done
)
{
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
1
,
[
&
](
void
*
done
)
{
int
ret
=
0
;
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
auto
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
)
;
if
(
closure
->
check_response
(
0
,
PS_GRAPH_SAMPLE_NODES
)
!=
0
)
{
if
(
closure
->
check_response
(
0
,
PS_GRAPH_SAMPLE_NODES
)
!=
0
)
{
ret
=
-
1
;
ret
=
-
1
;
}
else
{
}
else
{
...
@@ -515,7 +532,7 @@ std::future<int32_t> GraphBrpcClient::random_sample_nodes(
...
@@ -515,7 +532,7 @@ std::future<int32_t> GraphBrpcClient::random_sample_nodes(
char
*
buffer
=
new
char
[
bytes_size
];
char
*
buffer
=
new
char
[
bytes_size
];
size_t
index
=
0
;
size_t
index
=
0
;
while
(
index
<
bytes_size
)
{
while
(
index
<
bytes_size
)
{
ids
.
push_back
(
*
(
int64_t
*
)
(
buffer
+
index
));
ids
.
push_back
(
*
reinterpret_cast
<
int64_t
*>
(
buffer
+
index
));
index
+=
GraphNode
::
id_size
;
index
+=
GraphNode
::
id_size
;
}
}
delete
[]
buffer
;
delete
[]
buffer
;
...
@@ -525,14 +542,16 @@ std::future<int32_t> GraphBrpcClient::random_sample_nodes(
...
@@ -525,14 +542,16 @@ std::future<int32_t> GraphBrpcClient::random_sample_nodes(
auto
promise
=
std
::
make_shared
<
std
::
promise
<
int32_t
>>
();
auto
promise
=
std
::
make_shared
<
std
::
promise
<
int32_t
>>
();
closure
->
add_promise
(
promise
);
closure
->
add_promise
(
promise
);
std
::
future
<
int
>
fut
=
promise
->
get_future
();
std
::
future
<
int
>
fut
=
promise
->
get_future
();
;
closure
->
request
(
0
)
->
set_cmd_id
(
PS_GRAPH_SAMPLE_NODES
);
closure
->
request
(
0
)
->
set_cmd_id
(
PS_GRAPH_SAMPLE_NODES
);
closure
->
request
(
0
)
->
set_table_id
(
table_id
);
closure
->
request
(
0
)
->
set_table_id
(
table_id
);
closure
->
request
(
0
)
->
set_client_id
(
_client_id
);
closure
->
request
(
0
)
->
set_client_id
(
_client_id
);
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
type_id
,
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
type_id
),
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
idx_
,
sizeof
(
int
));
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
sample_size
,
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
idx_
),
sizeof
(
int
));
;
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
sample_size
),
sizeof
(
int
));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
closure
->
cntl
(
0
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
closure
->
cntl
(
0
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
...
@@ -552,7 +571,7 @@ std::future<int32_t> GraphBrpcClient::pull_graph_list(
...
@@ -552,7 +571,7 @@ std::future<int32_t> GraphBrpcClient::pull_graph_list(
std
::
vector
<
FeatureNode
>
&
res
)
{
std
::
vector
<
FeatureNode
>
&
res
)
{
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
1
,
[
&
](
void
*
done
)
{
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
1
,
[
&
](
void
*
done
)
{
int
ret
=
0
;
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
auto
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
)
;
if
(
closure
->
check_response
(
0
,
PS_PULL_GRAPH_LIST
)
!=
0
)
{
if
(
closure
->
check_response
(
0
,
PS_PULL_GRAPH_LIST
)
!=
0
)
{
ret
=
-
1
;
ret
=
-
1
;
}
else
{
}
else
{
...
@@ -560,7 +579,8 @@ std::future<int32_t> GraphBrpcClient::pull_graph_list(
...
@@ -560,7 +579,8 @@ std::future<int32_t> GraphBrpcClient::pull_graph_list(
butil
::
IOBufBytesIterator
io_buffer_itr
(
res_io_buffer
);
butil
::
IOBufBytesIterator
io_buffer_itr
(
res_io_buffer
);
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
char
*
buffer
=
new
char
[
bytes_size
];
char
*
buffer
=
new
char
[
bytes_size
];
io_buffer_itr
.
copy_and_forward
((
void
*
)(
buffer
),
bytes_size
);
io_buffer_itr
.
copy_and_forward
(
reinterpret_cast
<
void
*>
(
buffer
),
bytes_size
);
size_t
index
=
0
;
size_t
index
=
0
;
while
(
index
<
bytes_size
)
{
while
(
index
<
bytes_size
)
{
FeatureNode
node
;
FeatureNode
node
;
...
@@ -578,11 +598,13 @@ std::future<int32_t> GraphBrpcClient::pull_graph_list(
...
@@ -578,11 +598,13 @@ std::future<int32_t> GraphBrpcClient::pull_graph_list(
closure
->
request
(
0
)
->
set_cmd_id
(
PS_PULL_GRAPH_LIST
);
closure
->
request
(
0
)
->
set_cmd_id
(
PS_PULL_GRAPH_LIST
);
closure
->
request
(
0
)
->
set_table_id
(
table_id
);
closure
->
request
(
0
)
->
set_table_id
(
table_id
);
closure
->
request
(
0
)
->
set_client_id
(
_client_id
);
closure
->
request
(
0
)
->
set_client_id
(
_client_id
);
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
type_id
,
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
type_id
),
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
idx_
,
sizeof
(
int
));
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
start
,
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
idx_
),
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
size
,
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
start
),
closure
->
request
(
0
)
->
add_params
((
char
*
)
&
step
,
sizeof
(
int
));
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
size
),
sizeof
(
int
));
closure
->
request
(
0
)
->
add_params
(
reinterpret_cast
<
char
*>
(
&
step
),
sizeof
(
int
));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
// PsService_Stub rpc_stub(GetCmdChannel(server_index));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
GraphPsService_Stub
rpc_stub
=
getServiceStub
(
GetCmdChannel
(
server_index
));
closure
->
cntl
(
0
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
closure
->
cntl
(
0
)
->
set_log_id
(
butil
::
gettimeofday_ms
());
...
@@ -629,7 +651,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
...
@@ -629,7 +651,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
request_call_num
,
request_call_num
,
[
&
,
node_id_buckets
,
query_idx_buckets
,
request_call_num
](
void
*
done
)
{
[
&
,
node_id_buckets
,
query_idx_buckets
,
request_call_num
](
void
*
done
)
{
int
ret
=
0
;
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
auto
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
)
;
size_t
fail_num
=
0
;
size_t
fail_num
=
0
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
++
request_idx
)
{
...
@@ -655,9 +677,11 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
...
@@ -655,9 +677,11 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
closure
->
request
(
request_idx
)
->
set_client_id
(
_client_id
);
size_t
node_num
=
node_id_buckets
[
request_idx
].
size
();
size_t
node_num
=
node_id_buckets
[
request_idx
].
size
();
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
&
idx_
,
sizeof
(
int
));
closure
->
request
(
request_idx
)
closure
->
request
(
request_idx
)
->
add_params
((
char
*
)
node_id_buckets
[
request_idx
].
data
(),
->
add_params
(
reinterpret_cast
<
char
*>
(
&
idx_
),
sizeof
(
int
));
closure
->
request
(
request_idx
)
->
add_params
(
reinterpret_cast
<
char
*>
(
node_id_buckets
[
request_idx
].
data
()),
sizeof
(
int64_t
)
*
node_num
);
sizeof
(
int64_t
)
*
node_num
);
std
::
string
joint_feature_name
=
std
::
string
joint_feature_name
=
paddle
::
string
::
join_strings
(
feature_names
,
'\t'
);
paddle
::
string
::
join_strings
(
feature_names
,
'\t'
);
...
@@ -670,7 +694,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
...
@@ -670,7 +694,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
for
(
size_t
node_idx
=
0
;
node_idx
<
node_num
;
++
node_idx
)
{
for
(
size_t
node_idx
=
0
;
node_idx
<
node_num
;
++
node_idx
)
{
size_t
feat_len
=
size_t
feat_len
=
features_idx_buckets
[
request_idx
][
feat_idx
][
node_idx
].
size
();
features_idx_buckets
[
request_idx
][
feat_idx
][
node_idx
].
size
();
set_feature
.
append
(
(
char
*
)
&
feat_len
,
sizeof
(
size_t
));
set_feature
.
append
(
reinterpret_cast
<
char
*>
(
&
feat_len
)
,
sizeof
(
size_t
));
set_feature
.
append
(
set_feature
.
append
(
features_idx_buckets
[
request_idx
][
feat_idx
][
node_idx
].
data
(),
features_idx_buckets
[
request_idx
][
feat_idx
][
node_idx
].
data
(),
feat_len
);
feat_len
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录