Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
d5c5bbc3
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d5c5bbc3
编写于
1月 13, 2023
作者:
W
wangzhen38
提交者:
GitHub
1月 13, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[cpplint fix] under ps (#49759)
* [cpplint fix] under ps
上级
8447f876
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
101 addition
and
91 deletion
+101
-91
paddle/fluid/distributed/common/registerer.h
paddle/fluid/distributed/common/registerer.h
+5
-2
paddle/fluid/distributed/ps/service/ps_service/graph_py_service.cc
...uid/distributed/ps/service/ps_service/graph_py_service.cc
+8
-8
paddle/fluid/distributed/ps/service/ps_service/graph_py_service.h
...luid/distributed/ps/service/ps_service/graph_py_service.h
+7
-5
paddle/fluid/distributed/ps/table/graph/graph_node.cc
paddle/fluid/distributed/ps/table/graph/graph_node.cc
+1
-1
paddle/fluid/distributed/ps/table/graph/graph_weighted_sampler.cc
...luid/distributed/ps/table/graph/graph_weighted_sampler.cc
+2
-1
paddle/fluid/framework/fleet/heter_ps/graph_sampler.h
paddle/fluid/framework/fleet/heter_ps/graph_sampler.h
+0
-2
paddle/fluid/framework/fleet/heter_ps/graph_sampler_inl.h
paddle/fluid/framework/fleet/heter_ps/graph_sampler_inl.h
+3
-3
paddle/fluid/framework/fleet/heter_ps/test_comm.cu
paddle/fluid/framework/fleet/heter_ps/test_comm.cu
+1
-1
paddle/fluid/framework/fleet/heter_ps/test_cpu_graph_sample.cu
...e/fluid/framework/fleet/heter_ps/test_cpu_graph_sample.cu
+4
-3
paddle/fluid/framework/fleet/heter_ps/test_cpu_query.cu
paddle/fluid/framework/fleet/heter_ps/test_cpu_query.cu
+33
-35
paddle/fluid/framework/fleet/heter_ps/test_graph.cu
paddle/fluid/framework/fleet/heter_ps/test_graph.cu
+4
-3
paddle/fluid/framework/fleet/heter_ps/test_sample_rate.cu
paddle/fluid/framework/fleet/heter_ps/test_sample_rate.cu
+20
-15
paddle/fluid/framework/fleet/heter_wrapper.cc
paddle/fluid/framework/fleet/heter_wrapper.cc
+6
-5
paddle/fluid/framework/fleet/heter_wrapper.h
paddle/fluid/framework/fleet/heter_wrapper.h
+3
-3
paddle/phi/kernels/cpu/graph_sample_neighbors_kernel.cc
paddle/phi/kernels/cpu/graph_sample_neighbors_kernel.cc
+4
-4
未找到文件。
paddle/fluid/distributed/common/registerer.h
浏览文件 @
d5c5bbc3
...
@@ -29,7 +29,8 @@ class Any {
...
@@ -29,7 +29,8 @@ class Any {
Any
()
:
content_
(
NULL
)
{}
Any
()
:
content_
(
NULL
)
{}
template
<
typename
ValueType
>
template
<
typename
ValueType
>
Any
(
const
ValueType
&
value
)
:
content_
(
new
Holder
<
ValueType
>
(
value
))
{}
explicit
Any
(
const
ValueType
&
value
)
:
content_
(
new
Holder
<
ValueType
>
(
value
))
{}
Any
(
const
Any
&
other
)
Any
(
const
Any
&
other
)
:
content_
(
other
.
content_
?
other
.
content_
->
clone
()
:
NULL
)
{}
:
content_
(
other
.
content_
?
other
.
content_
->
clone
()
:
NULL
)
{}
...
@@ -38,7 +39,9 @@ class Any {
...
@@ -38,7 +39,9 @@ class Any {
template
<
typename
ValueType
>
template
<
typename
ValueType
>
ValueType
*
any_cast
()
{
ValueType
*
any_cast
()
{
return
content_
?
&
static_cast
<
Holder
<
ValueType
>
*>
(
content_
)
->
held_
:
NULL
;
return
content_
?
&
static_cast
<
Holder
<
ValueType
>
*>
(
content_
)
->
held_
// NOLINT
:
NULL
;
}
}
private:
private:
...
...
paddle/fluid/distributed/ps/service/ps_service/graph_py_service.cc
浏览文件 @
d5c5bbc3
...
@@ -23,7 +23,7 @@
...
@@ -23,7 +23,7 @@
#include "paddle/fluid/platform/profiler/event_tracing.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace
paddle
{
namespace
paddle
{
namespace
distributed
{
namespace
distributed
{
std
::
vector
<
std
::
string
>
GraphPyService
::
split
(
std
::
string
&
str
,
std
::
vector
<
std
::
string
>
GraphPyService
::
split
(
const
std
::
string
&
str
,
const
char
pattern
)
{
const
char
pattern
)
{
std
::
vector
<
std
::
string
>
res
;
std
::
vector
<
std
::
string
>
res
;
std
::
stringstream
input
(
str
);
std
::
stringstream
input
(
str
);
...
@@ -44,7 +44,7 @@ void GraphPyService::add_table_feat_conf(std::string table_name,
...
@@ -44,7 +44,7 @@ void GraphPyService::add_table_feat_conf(std::string table_name,
if
(
table_feat_mapping
[
idx
].
find
(
feat_name
)
==
if
(
table_feat_mapping
[
idx
].
find
(
feat_name
)
==
table_feat_mapping
[
idx
].
end
())
{
table_feat_mapping
[
idx
].
end
())
{
VLOG
(
0
)
<<
"for table name not found,make a new one"
;
VLOG
(
0
)
<<
"for table name not found,make a new one"
;
int
res
=
(
int
)
table_feat_mapping
[
idx
].
size
(
);
int
res
=
static_cast
<
int
>
(
table_feat_mapping
[
idx
].
size
()
);
table_feat_mapping
[
idx
][
feat_name
]
=
res
;
table_feat_mapping
[
idx
][
feat_name
]
=
res
;
VLOG
(
0
)
<<
"seq id = "
<<
table_feat_mapping
[
idx
][
feat_name
];
VLOG
(
0
)
<<
"seq id = "
<<
table_feat_mapping
[
idx
][
feat_name
];
}
}
...
@@ -72,8 +72,8 @@ void add_graph_node(std::string name,
...
@@ -72,8 +72,8 @@ void add_graph_node(std::string name,
void
remove_graph_node
(
std
::
string
name
,
std
::
vector
<
int64_t
>
node_ids
)
{}
void
remove_graph_node
(
std
::
string
name
,
std
::
vector
<
int64_t
>
node_ids
)
{}
void
GraphPyService
::
set_up
(
std
::
string
ips_str
,
void
GraphPyService
::
set_up
(
std
::
string
ips_str
,
int
shard_num
,
int
shard_num
,
std
::
vector
<
std
::
string
>
node_types
,
const
std
::
vector
<
std
::
string
>
node_types
,
std
::
vector
<
std
::
string
>
edge_types
)
{
const
std
::
vector
<
std
::
string
>
edge_types
)
{
set_shard_num
(
shard_num
);
set_shard_num
(
shard_num
);
set_num_node_types
(
node_types
.
size
());
set_num_node_types
(
node_types
.
size
());
/*
/*
...
@@ -86,12 +86,12 @@ void GraphPyService::set_up(std::string ips_str,
...
@@ -86,12 +86,12 @@ void GraphPyService::set_up(std::string ips_str,
*/
*/
id_to_edge
=
edge_types
;
id_to_edge
=
edge_types
;
for
(
size_t
table_id
=
0
;
table_id
<
edge_types
.
size
();
table_id
++
)
{
for
(
size_t
table_id
=
0
;
table_id
<
edge_types
.
size
();
table_id
++
)
{
int
res
=
(
int
)
edge_to_id
.
size
(
);
int
res
=
static_cast
<
int
>
(
edge_to_id
.
size
()
);
edge_to_id
[
edge_types
[
table_id
]]
=
res
;
edge_to_id
[
edge_types
[
table_id
]]
=
res
;
}
}
id_to_feature
=
node_types
;
id_to_feature
=
node_types
;
for
(
size_t
table_id
=
0
;
table_id
<
node_types
.
size
();
table_id
++
)
{
for
(
size_t
table_id
=
0
;
table_id
<
node_types
.
size
();
table_id
++
)
{
int
res
=
(
int
)
feature_to_id
.
size
(
);
int
res
=
static_cast
<
int
>
(
feature_to_id
.
size
()
);
feature_to_id
[
node_types
[
table_id
]]
=
res
;
feature_to_id
[
node_types
[
table_id
]]
=
res
;
}
}
table_feat_mapping
.
resize
(
node_types
.
size
());
table_feat_mapping
.
resize
(
node_types
.
size
());
...
@@ -312,8 +312,8 @@ void GraphPyClient::clear_nodes(std::string name) {
...
@@ -312,8 +312,8 @@ void GraphPyClient::clear_nodes(std::string name) {
}
}
void
GraphPyClient
::
add_graph_node
(
std
::
string
name
,
void
GraphPyClient
::
add_graph_node
(
std
::
string
name
,
std
::
vector
<
int64_t
>&
node_ids
,
std
::
vector
<
int64_t
>&
node_ids
,
// NOLINT
std
::
vector
<
bool
>&
weight_list
)
{
std
::
vector
<
bool
>&
weight_list
)
{
// NOLINT
// if (this->table_id_map.count(name)) {
// if (this->table_id_map.count(name)) {
// uint32_t table_id = this->table_id_map[name];
// uint32_t table_id = this->table_id_map[name];
// auto status =
// auto status =
...
...
paddle/fluid/distributed/ps/service/ps_service/graph_py_service.h
浏览文件 @
d5c5bbc3
...
@@ -116,7 +116,7 @@ class GraphPyService {
...
@@ -116,7 +116,7 @@ class GraphPyService {
this
->
num_node_types
=
num_node_types
;
this
->
num_node_types
=
num_node_types
;
}
}
int
get_server_size
(
int
server_size
)
{
return
server_size
;
}
int
get_server_size
(
int
server_size
)
{
return
server_size
;
}
std
::
vector
<
std
::
string
>
split
(
std
::
string
&
str
,
const
char
pattern
);
std
::
vector
<
std
::
string
>
split
(
const
std
::
string
&
str
,
const
char
pattern
);
void
set_up
(
std
::
string
ips_str
,
void
set_up
(
std
::
string
ips_str
,
int
shard_num
,
int
shard_num
,
std
::
vector
<
std
::
string
>
node_types
,
std
::
vector
<
std
::
string
>
node_types
,
...
@@ -165,7 +165,8 @@ class GraphPyClient : public GraphPyService {
...
@@ -165,7 +165,8 @@ class GraphPyClient : public GraphPyService {
std
::
shared_ptr
<
paddle
::
distributed
::
GraphBrpcClient
>
get_ps_client
()
{
std
::
shared_ptr
<
paddle
::
distributed
::
GraphBrpcClient
>
get_ps_client
()
{
return
worker_ptr
;
return
worker_ptr
;
}
}
void
bind_local_server
(
int
local_channel_index
,
GraphPyServer
&
server
)
{
void
bind_local_server
(
int
local_channel_index
,
GraphPyServer
&
server
)
{
// NOLINT
worker_ptr
->
set_local_channel
(
local_channel_index
);
worker_ptr
->
set_local_channel
(
local_channel_index
);
worker_ptr
->
set_local_graph_service
(
worker_ptr
->
set_local_graph_service
(
(
paddle
::
distributed
::
GraphBrpcService
*
)
server
.
get_ps_server
()
(
paddle
::
distributed
::
GraphBrpcService
*
)
server
.
get_ps_server
()
...
@@ -177,9 +178,10 @@ class GraphPyClient : public GraphPyService {
...
@@ -177,9 +178,10 @@ class GraphPyClient : public GraphPyService {
void
load_node_file
(
std
::
string
name
,
std
::
string
filepath
);
void
load_node_file
(
std
::
string
name
,
std
::
string
filepath
);
void
clear_nodes
(
std
::
string
name
);
void
clear_nodes
(
std
::
string
name
);
void
add_graph_node
(
std
::
string
name
,
void
add_graph_node
(
std
::
string
name
,
std
::
vector
<
int64_t
>&
node_ids
,
std
::
vector
<
int64_t
>&
node_ids
,
// NOLINT
std
::
vector
<
bool
>&
weight_list
);
std
::
vector
<
bool
>&
weight_list
);
// NOLINT
void
remove_graph_node
(
std
::
string
name
,
std
::
vector
<
int64_t
>&
node_ids
);
void
remove_graph_node
(
std
::
string
name
,
std
::
vector
<
int64_t
>&
node_ids
);
// NOLINT
int
get_client_id
()
{
return
client_id
;
}
int
get_client_id
()
{
return
client_id
;
}
void
set_client_id
(
int
client_id
)
{
this
->
client_id
=
client_id
;
}
void
set_client_id
(
int
client_id
)
{
this
->
client_id
=
client_id
;
}
void
start_client
();
void
start_client
();
...
...
paddle/fluid/distributed/ps/table/graph/graph_node.cc
浏览文件 @
d5c5bbc3
...
@@ -110,7 +110,7 @@ void FeatureNode::recover_from_buffer(char* buffer) {
...
@@ -110,7 +110,7 @@ void FeatureNode::recover_from_buffer(char* buffer) {
memcpy
(
&
feat_len
,
buffer
,
sizeof
(
int
));
memcpy
(
&
feat_len
,
buffer
,
sizeof
(
int
));
buffer
+=
sizeof
(
int
);
buffer
+=
sizeof
(
int
);
char
str
[
feat_len
+
1
];
char
str
[
feat_len
+
1
];
// NOLINT
memcpy
(
str
,
buffer
,
feat_len
);
memcpy
(
str
,
buffer
,
feat_len
);
buffer
+=
feat_len
;
buffer
+=
feat_len
;
str
[
feat_len
]
=
'\0'
;
str
[
feat_len
]
=
'\0'
;
...
...
paddle/fluid/distributed/ps/table/graph/graph_weighted_sampler.cc
浏览文件 @
d5c5bbc3
...
@@ -84,7 +84,8 @@ void WeightedSampler::build(GraphEdgeBlob *edges) {
...
@@ -84,7 +84,8 @@ void WeightedSampler::build(GraphEdgeBlob *edges) {
delete
right
;
delete
right
;
right
=
nullptr
;
right
=
nullptr
;
}
}
return
build_one
((
WeightedGraphEdgeBlob
*
)
edges
,
0
,
edges
->
size
());
return
build_one
(
reinterpret_cast
<
WeightedGraphEdgeBlob
*>
(
edges
),
0
,
edges
->
size
());
}
}
void
WeightedSampler
::
build_one
(
WeightedGraphEdgeBlob
*
edges
,
void
WeightedSampler
::
build_one
(
WeightedGraphEdgeBlob
*
edges
,
...
...
paddle/fluid/framework/fleet/heter_ps/graph_sampler.h
浏览文件 @
d5c5bbc3
...
@@ -63,9 +63,7 @@ class GraphSampler {
...
@@ -63,9 +63,7 @@ class GraphSampler {
}
}
~
GraphSampler
()
{
end_graph_sampling
();
}
~
GraphSampler
()
{
end_graph_sampling
();
}
virtual
int
load_from_ssd
(
std
::
string
path
)
=
0
;
virtual
int
load_from_ssd
(
std
::
string
path
)
=
0
;
;
virtual
int
run_graph_sampling
()
=
0
;
virtual
int
run_graph_sampling
()
=
0
;
;
virtual
void
init
(
GpuPsGraphTable
*
gpu_table
,
virtual
void
init
(
GpuPsGraphTable
*
gpu_table
,
std
::
vector
<
std
::
string
>
args_
)
=
0
;
std
::
vector
<
std
::
string
>
args_
)
=
0
;
std
::
shared_ptr
<::
ThreadPool
>
thread_pool
;
std
::
shared_ptr
<::
ThreadPool
>
thread_pool
;
...
...
paddle/fluid/framework/fleet/heter_ps/graph_sampler_inl.h
浏览文件 @
d5c5bbc3
...
@@ -12,7 +12,7 @@
...
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// See the License for the specific language governing permissions and
// limitations under the License.
// limitations under the License.
#
ifdef PADDLE_WITH_HETERPS
#
pragma once
namespace
paddle
{
namespace
paddle
{
namespace
framework
{
namespace
framework
{
int
CommonGraphSampler
::
load_from_ssd
(
std
::
string
path
)
{
int
CommonGraphSampler
::
load_from_ssd
(
std
::
string
path
)
{
...
@@ -30,9 +30,9 @@ int CommonGraphSampler::load_from_ssd(std::string path) {
...
@@ -30,9 +30,9 @@ int CommonGraphSampler::load_from_ssd(std::string path) {
}
}
auto
src_id
=
std
::
stoll
(
values
[
0
]);
auto
src_id
=
std
::
stoll
(
values
[
0
]);
_db
->
put
(
0
,
_db
->
put
(
0
,
(
char
*
)
&
src_id
,
reinterpret_cast
<
char
*>
(
&
src_id
)
,
sizeof
(
uint64_t
),
sizeof
(
uint64_t
),
(
char
*
)
neighbor_data
.
data
(
),
reinterpret_cast
<
char
*>
(
neighbor_data
.
data
()
),
sizeof
(
uint64_t
)
*
neighbor_data
.
size
());
sizeof
(
uint64_t
)
*
neighbor_data
.
size
());
int
gpu_shard
=
src_id
%
gpu_num
;
int
gpu_shard
=
src_id
%
gpu_num
;
if
(
gpu_edges_count
[
gpu_shard
]
+
neighbor_data
.
size
()
<=
if
(
gpu_edges_count
[
gpu_shard
]
+
neighbor_data
.
size
()
<=
...
...
paddle/fluid/framework/fleet/heter_ps/test_comm.cu
浏览文件 @
d5c5bbc3
...
@@ -22,7 +22,7 @@ limitations under the License. */
...
@@ -22,7 +22,7 @@ limitations under the License. */
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
using
namespace
paddle
::
framework
;
using
paddle
::
framework
;
TEST
(
TEST_FLEET
,
heter_comm
)
{
TEST
(
TEST_FLEET
,
heter_comm
)
{
int
gpu_count
=
3
;
int
gpu_count
=
3
;
...
...
paddle/fluid/framework/fleet/heter_ps/test_cpu_graph_sample.cu
浏览文件 @
d5c5bbc3
...
@@ -24,7 +24,7 @@
...
@@ -24,7 +24,7 @@
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
using
namespace
paddle
::
framework
;
using
paddle
::
framework
;
void
prepare_file
(
char
file_name
[],
std
::
vector
<
std
::
string
>
data
)
{
void
prepare_file
(
char
file_name
[],
std
::
vector
<
std
::
string
>
data
)
{
std
::
ofstream
ofile
;
std
::
ofstream
ofile
;
ofile
.
open
(
file_name
);
ofile
.
open
(
file_name
);
...
@@ -91,9 +91,10 @@ TEST(TEST_FLEET, graph_sample) {
...
@@ -91,9 +91,10 @@ TEST(TEST_FLEET, graph_sample) {
*/
*/
int64_t
cpu_key
[
3
]
=
{
7
,
0
,
6
};
int64_t
cpu_key
[
3
]
=
{
7
,
0
,
6
};
void
*
key
;
void
*
key
;
cudaMalloc
(
(
void
**
)
&
key
,
3
*
sizeof
(
int64_t
));
cudaMalloc
(
reinterpret_cast
<
void
**>
(
&
key
)
,
3
*
sizeof
(
int64_t
));
cudaMemcpy
(
key
,
cpu_key
,
3
*
sizeof
(
int64_t
),
cudaMemcpyHostToDevice
);
cudaMemcpy
(
key
,
cpu_key
,
3
*
sizeof
(
int64_t
),
cudaMemcpyHostToDevice
);
auto
neighbor_sample_res
=
g
.
graph_neighbor_sample
(
0
,
(
int64_t
*
)
key
,
3
,
3
);
auto
neighbor_sample_res
=
g
.
graph_neighbor_sample
(
0
,
reinterpret_cast
<
int64_t
**>
(
key
),
3
,
3
);
int64_t
*
res
=
new
int64_t
[
7
];
int64_t
*
res
=
new
int64_t
[
7
];
/*
/*
cudaMemcpy(res, neighbor_sample_res->val, 56, cudaMemcpyDeviceToHost);
cudaMemcpy(res, neighbor_sample_res->val, 56, cudaMemcpyDeviceToHost);
...
...
paddle/fluid/framework/fleet/heter_ps/test_cpu_query.cu
浏览文件 @
d5c5bbc3
...
@@ -28,44 +28,42 @@
...
@@ -28,44 +28,42 @@
using
namespace
paddle
::
framework
;
// NOLINT
using
namespace
paddle
::
framework
;
// NOLINT
namespace
platform
=
paddle
::
platform
;
namespace
platform
=
paddle
::
platform
;
std
::
string
edges
[]
=
{
const
char
*
edges
[]
=
{
// NOLINT
"0
\t
1"
,
std
::
string
(
"0
\t
1"
),
"0
\t
9"
,
std
::
string
(
"0
\t
9"
),
"1
\t
2"
,
std
::
string
(
"1
\t
2"
),
"1
\t
0"
,
std
::
string
(
"1
\t
0"
),
"2
\t
1"
,
std
::
string
(
"2
\t
1"
),
"2
\t
3"
,
std
::
string
(
"2
\t
3"
),
"3
\t
2"
,
std
::
string
(
"3
\t
2"
),
"3
\t
4"
,
std
::
string
(
"3
\t
4"
),
"4
\t
3"
,
std
::
string
(
"4
\t
3"
),
"4
\t
5"
,
std
::
string
(
"4
\t
5"
),
"5
\t
4"
,
std
::
string
(
"5
\t
4"
),
"5
\t
6"
,
std
::
string
(
"5
\t
6"
),
"6
\t
5"
,
std
::
string
(
"6
\t
5"
),
"6
\t
7"
,
std
::
string
(
"6
\t
7"
),
"7
\t
6"
,
std
::
string
(
"7
\t
6"
),
"7
\t
8"
,
std
::
string
(
"7
\t
8"
),
};
};
char
edge_file_name
[]
=
"edges1.txt"
;
char
edge_file_name
[]
=
"edges1.txt"
;
std
::
string
nodes
[]
=
{
// NOLINT
const
char
*
nodes
[]
=
{
"user
\t
37
\t
a 0.34
\t
b 13 14
\t
c hello
\t
d abc"
,
std
::
string
(
"user
\t
37
\t
a 0.34
\t
b 13 14
\t
c hello
\t
d abc"
),
"user
\t
96
\t
a 0.31
\t
b 15 10
\t
c 96hello
\t
d abcd"
,
std
::
string
(
"user
\t
96
\t
a 0.31
\t
b 15 10
\t
c 96hello
\t
d abcd"
),
"user
\t
59
\t
a 0.11
\t
b 11 14"
,
std
::
string
(
"user
\t
59
\t
a 0.11
\t
b 11 14"
),
"user
\t
97
\t
a 0.11
\t
b 12 11"
,
std
::
string
(
"user
\t
97
\t
a 0.11
\t
b 12 11"
),
"item
\t
45
\t
a 0.21"
,
std
::
string
(
"item
\t
45
\t
a 0.21"
),
"item
\t
145
\t
a 0.21"
,
std
::
string
(
"item
\t
145
\t
a 0.21"
),
"item
\t
112
\t
a 0.21"
,
std
::
string
(
"item
\t
112
\t
a 0.21"
),
"item
\t
48
\t
a 0.21"
,
std
::
string
(
"item
\t
48
\t
a 0.21"
),
"item
\t
247
\t
a 0.21"
,
std
::
string
(
"item
\t
247
\t
a 0.21"
),
"item
\t
111
\t
a 0.21"
,
std
::
string
(
"item
\t
111
\t
a 0.21"
),
"item
\t
46
\t
a 0.21"
,
std
::
string
(
"item
\t
46
\t
a 0.21"
),
"item
\t
146
\t
a 0.21"
,
std
::
string
(
"item
\t
146
\t
a 0.21"
),
"item
\t
122
\t
a 0.21"
,
std
::
string
(
"item
\t
122
\t
a 0.21"
),
"item
\t
49
\t
a 0.21"
,
std
::
string
(
"item
\t
49
\t
a 0.21"
),
"item
\t
248
\t
a 0.21"
,
std
::
string
(
"item
\t
248
\t
a 0.21"
),
"item
\t
113
\t
a 0.21"
};
std
::
string
(
"item
\t
113
\t
a 0.21"
)};
char
node_file_name
[]
=
"nodes.txt"
;
char
node_file_name
[]
=
"nodes.txt"
;
std
::
vector
<
std
::
string
>
user_feature_name
=
{
"a"
,
"b"
,
"c"
,
"d"
};
std
::
vector
<
std
::
string
>
user_feature_name
=
{
"a"
,
"b"
,
"c"
,
"d"
};
std
::
vector
<
std
::
string
>
item_feature_name
=
{
"a"
};
std
::
vector
<
std
::
string
>
item_feature_name
=
{
"a"
};
...
...
paddle/fluid/framework/fleet/heter_ps/test_graph.cu
浏览文件 @
d5c5bbc3
...
@@ -23,7 +23,7 @@ limitations under the License. */
...
@@ -23,7 +23,7 @@ limitations under the License. */
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
using
namespace
paddle
::
framework
;
using
paddle
::
framework
;
TEST
(
TEST_FLEET
,
graph_comm
)
{
TEST
(
TEST_FLEET
,
graph_comm
)
{
int
gpu_count
=
3
;
int
gpu_count
=
3
;
std
::
vector
<
int
>
dev_ids
;
std
::
vector
<
int
>
dev_ids
;
...
@@ -100,9 +100,10 @@ TEST(TEST_FLEET, graph_comm) {
...
@@ -100,9 +100,10 @@ TEST(TEST_FLEET, graph_comm) {
int64_t
cpu_key
[
3
]
=
{
7
,
0
,
6
};
int64_t
cpu_key
[
3
]
=
{
7
,
0
,
6
};
void
*
key
;
void
*
key
;
cudaMalloc
(
(
void
**
)
&
key
,
3
*
sizeof
(
int64_t
));
cudaMalloc
(
reinterpret_cast
<
void
**>
(
&
key
)
,
3
*
sizeof
(
int64_t
));
cudaMemcpy
(
key
,
cpu_key
,
3
*
sizeof
(
int64_t
),
cudaMemcpyHostToDevice
);
cudaMemcpy
(
key
,
cpu_key
,
3
*
sizeof
(
int64_t
),
cudaMemcpyHostToDevice
);
auto
neighbor_sample_res
=
g
.
graph_neighbor_sample
(
0
,
(
int64_t
*
)
key
,
3
,
3
);
auto
neighbor_sample_res
=
g
.
graph_neighbor_sample
(
0
,
reinterpret_cast
<
int64_t
*>
(
key
),
3
,
3
);
res
=
new
int64_t
[
7
];
res
=
new
int64_t
[
7
];
cudaMemcpy
(
res
,
neighbor_sample_res
->
val
,
56
,
cudaMemcpyDeviceToHost
);
cudaMemcpy
(
res
,
neighbor_sample_res
->
val
,
56
,
cudaMemcpyDeviceToHost
);
int
*
actual_sample_size
=
new
int
[
3
];
int
*
actual_sample_size
=
new
int
[
3
];
...
...
paddle/fluid/framework/fleet/heter_ps/test_sample_rate.cu
浏览文件 @
d5c5bbc3
...
@@ -46,19 +46,19 @@
...
@@ -46,19 +46,19 @@
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/phi/kernels/funcs/math_function.h"
#include "paddle/phi/kernels/funcs/math_function.h"
using
namespace
paddle
::
framework
;
using
paddle
::
framework
;
namespace
platform
=
paddle
::
platform
;
namespace
platform
=
paddle
::
platform
;
namespace
operators
=
paddle
::
operators
;
namespace
operators
=
paddle
::
operators
;
namespace
memory
=
paddle
::
memory
;
namespace
memory
=
paddle
::
memory
;
namespace
distributed
=
paddle
::
distributed
;
namespace
distributed
=
paddle
::
distributed
;
std
::
string
input_file
;
const
char
*
input_file
;
int
exe_count
=
100
;
int
exe_count
=
100
;
int
use_nv
=
1
;
int
use_nv
=
1
;
int
fixed_key_size
=
50000
,
sample_size
=
32
,
int
fixed_key_size
=
50000
,
sample_size
=
32
,
bfs_sample_nodes_in_each_shard
=
10000
,
init_search_size
=
1
,
bfs_sample_nodes_in_each_shard
=
10000
,
init_search_size
=
1
,
bfs_sample_edges
=
20
,
gpu_num1
=
8
,
gpu_num
=
8
;
bfs_sample_edges
=
20
,
gpu_num1
=
8
,
gpu_num
=
8
;
std
::
string
gpu_str
=
"0,1,2,3,4,5,6,7"
;
const
char
*
gpu_str
=
"0,1,2,3,4,5,6,7"
;
int64_t
*
key
[
8
];
int64_t
*
key
[
8
];
std
::
vector
<
std
::
string
>
edges
=
{
std
::
string
(
"37
\t
45
\t
0.34"
),
std
::
vector
<
std
::
string
>
edges
=
{
std
::
string
(
"37
\t
45
\t
0.34"
),
std
::
string
(
"37
\t
145
\t
0.31"
),
std
::
string
(
"37
\t
145
\t
0.31"
),
...
@@ -115,8 +115,8 @@ void testSampleRate() {
...
@@ -115,8 +115,8 @@ void testSampleRate() {
index
+=
node
.
get_size
(
false
);
index
+=
node
.
get_size
(
false
);
// res.push_back(node);
// res.push_back(node);
ids
.
push_back
(
node
.
get_id
());
ids
.
push_back
(
node
.
get_id
());
int
swap_pos
=
rand
()
%
ids
.
size
();
int
swap_pos
=
rand
_r
()
%
ids
.
size
();
std
::
swap
(
ids
[
swap_pos
],
ids
[
(
int
)
ids
.
size
(
)
-
1
]);
std
::
swap
(
ids
[
swap_pos
],
ids
[
static_cast
<
int
>
(
ids
.
size
()
)
-
1
]);
}
}
cur
=
ids
.
size
();
cur
=
ids
.
size
();
// if (sample_actual_size == 0) break;
// if (sample_actual_size == 0) break;
...
@@ -161,8 +161,8 @@ void testSampleRate() {
...
@@ -161,8 +161,8 @@ void testSampleRate() {
actual_size
[
i
].
push_back
(
ac
[
j
-
s
]
/
sizeof
(
int64_t
));
actual_size
[
i
].
push_back
(
ac
[
j
-
s
]
/
sizeof
(
int64_t
));
int
ss
=
ac
[
j
-
s
]
/
sizeof
(
int64_t
);
int
ss
=
ac
[
j
-
s
]
/
sizeof
(
int64_t
);
for
(
int
k
=
0
;
k
<
ss
;
k
++
)
{
for
(
int
k
=
0
;
k
<
ss
;
k
++
)
{
sample_neighbors
[
i
].
push_back
(
sample_neighbors
[
i
].
push_back
(
*
(
reinterpret_cast
<
int64_t
*>
(
*
((
int64_t
*
)(
buffers
[
j
-
s
].
get
()
+
k
*
sizeof
(
int64_t
))));
buffers
[
j
-
s
].
get
()
+
k
*
sizeof
(
int64_t
))));
}
}
}
}
}
}
...
@@ -252,7 +252,8 @@ void testSampleRate() {
...
@@ -252,7 +252,8 @@ void testSampleRate() {
*/
*/
for
(
int
i
=
0
;
i
<
gpu_num1
;
i
++
)
{
for
(
int
i
=
0
;
i
<
gpu_num1
;
i
++
)
{
platform
::
CUDADeviceGuard
guard
(
device_id_mapping
[
i
]);
platform
::
CUDADeviceGuard
guard
(
device_id_mapping
[
i
]);
cudaMalloc
((
void
**
)
&
key
[
i
],
ids
.
size
()
*
sizeof
(
int64_t
));
cudaMalloc
(
reinterpret_cast
<
void
**>
(
&
key
[
i
]),
ids
.
size
()
*
sizeof
(
int64_t
));
cudaMemcpy
(
key
[
i
],
cudaMemcpy
(
key
[
i
],
ids
.
data
(),
ids
.
data
(),
ids
.
size
()
*
sizeof
(
int64_t
),
ids
.
size
()
*
sizeof
(
int64_t
),
...
@@ -285,16 +286,16 @@ void testSampleRate() {
...
@@ -285,16 +286,16 @@ void testSampleRate() {
for
(
int
k
=
0
;
k
<
exe_count
;
k
++
)
{
for
(
int
k
=
0
;
k
<
exe_count
;
k
++
)
{
st
=
0
;
st
=
0
;
while
(
st
<
size
)
{
while
(
st
<
size
)
{
int
len
=
std
::
min
(
fixed_key_size
,
(
int
)
ids
.
size
(
)
-
st
);
int
len
=
std
::
min
(
fixed_key_size
,
static_cast
<
int
>
(
ids
.
size
()
)
-
st
);
auto
r
=
g
.
graph_neighbor_sample
(
auto
r
=
g
.
graph_neighbor_sample
(
i
,
(
int64_t
*
)
(
key
[
i
]
+
st
),
sample_size
,
len
);
i
,
reinterpret_cast
<
int64_t
*>
(
key
[
i
]
+
st
),
sample_size
,
len
);
st
+=
len
;
st
+=
len
;
delete
r
;
delete
r
;
}
}
}
}
};
};
auto
start1
=
std
::
chrono
::
steady_clock
::
now
();
auto
start1
=
std
::
chrono
::
steady_clock
::
now
();
std
::
thread
thr
[
gpu_num1
];
std
::
thread
thr
[
gpu_num1
];
// NOLINT
for
(
int
i
=
0
;
i
<
gpu_num1
;
i
++
)
{
for
(
int
i
=
0
;
i
<
gpu_num1
;
i
++
)
{
thr
[
i
]
=
std
::
thread
(
func
,
i
);
thr
[
i
]
=
std
::
thread
(
func
,
i
);
}
}
...
@@ -313,16 +314,20 @@ void testSampleRate() {
...
@@ -313,16 +314,20 @@ void testSampleRate() {
for
(
int
k
=
0
;
k
<
exe_count
;
k
++
)
{
for
(
int
k
=
0
;
k
<
exe_count
;
k
++
)
{
st
=
0
;
st
=
0
;
while
(
st
<
size
)
{
while
(
st
<
size
)
{
int
len
=
std
::
min
(
fixed_key_size
,
(
int
)
ids
.
size
()
-
st
);
int
len
=
std
::
min
(
fixed_key_size
,
static_cast
<
int
>
(
ids
.
size
())
-
st
);
auto
r
=
g
.
graph_neighbor_sample_v2
(
auto
r
=
i
,
(
int64_t
*
)(
key
[
i
]
+
st
),
sample_size
,
len
,
false
);
g
.
graph_neighbor_sample_v2
(
i
,
reinterpret_cast
<
int64_t
*>
(
key
[
i
]
+
st
),
sample_size
,
len
,
false
);
st
+=
len
;
st
+=
len
;
delete
r
;
delete
r
;
}
}
}
}
};
};
auto
start2
=
std
::
chrono
::
steady_clock
::
now
();
auto
start2
=
std
::
chrono
::
steady_clock
::
now
();
std
::
thread
thr2
[
gpu_num1
];
std
::
thread
thr2
[
gpu_num1
];
// NOLINT
for
(
int
i
=
0
;
i
<
gpu_num1
;
i
++
)
{
for
(
int
i
=
0
;
i
<
gpu_num1
;
i
++
)
{
thr2
[
i
]
=
std
::
thread
(
func2
,
i
);
thr2
[
i
]
=
std
::
thread
(
func2
,
i
);
}
}
...
...
paddle/fluid/framework/fleet/heter_wrapper.cc
浏览文件 @
d5c5bbc3
...
@@ -286,7 +286,7 @@ void HeterWrapper::EndPass(Scope* scope, int num) {
...
@@ -286,7 +286,7 @@ void HeterWrapper::EndPass(Scope* scope, int num) {
void
HeterWrapper
::
CallRemoteXpu
(
std
::
shared_ptr
<
HeterTask
>
task
,
void
HeterWrapper
::
CallRemoteXpu
(
std
::
shared_ptr
<
HeterTask
>
task
,
HeterCpuWorker
*
worker
,
HeterCpuWorker
*
worker
,
int
mpi_rank
,
int
mpi_rank
,
std
::
vector
<
std
::
string
>&
send_vars
)
{
const
std
::
vector
<
std
::
string
>&
send_vars
)
{
HeterRequest
request
;
HeterRequest
request
;
request
.
set_cmd
(
0
);
request
.
set_cmd
(
0
);
request
.
set_cur_batch
(
task
->
cur_batch_
);
request
.
set_cur_batch
(
task
->
cur_batch_
);
...
@@ -329,10 +329,11 @@ void HeterWrapper::CallRemoteXpu(std::shared_ptr<HeterTask> task,
...
@@ -329,10 +329,11 @@ void HeterWrapper::CallRemoteXpu(std::shared_ptr<HeterTask> task,
stub
.
service
(
&
done
->
cntl
,
&
request
,
&
done
->
response
,
done
);
stub
.
service
(
&
done
->
cntl
,
&
request
,
&
done
->
response
,
done
);
}
}
void
HeterWrapper
::
CallRemoteXpuSync
(
std
::
shared_ptr
<
HeterTask
>
task
,
void
HeterWrapper
::
CallRemoteXpuSync
(
HeterCpuWorker
*
worker
,
std
::
shared_ptr
<
HeterTask
>
task
,
int
mpi_rank
,
HeterCpuWorker
*
worker
,
std
::
vector
<
std
::
string
>&
send_vars
)
{
int
mpi_rank
,
const
std
::
vector
<
std
::
string
>&
send_vars
)
{
HeterRequest
request
;
HeterRequest
request
;
HeterResponse
response
;
HeterResponse
response
;
brpc
::
Controller
cntl
;
brpc
::
Controller
cntl
;
...
...
paddle/fluid/framework/fleet/heter_wrapper.h
浏览文件 @
d5c5bbc3
...
@@ -40,7 +40,7 @@ typedef std::function<void(void*)> HeterRpcCallbackFunc;
...
@@ -40,7 +40,7 @@ typedef std::function<void(void*)> HeterRpcCallbackFunc;
class
OnHeterRpcDone
:
public
google
::
protobuf
::
Closure
{
class
OnHeterRpcDone
:
public
google
::
protobuf
::
Closure
{
public:
public:
OnHeterRpcDone
(
HeterRpcCallbackFunc
func
)
:
handler_
(
func
)
{}
explicit
OnHeterRpcDone
(
HeterRpcCallbackFunc
func
)
:
handler_
(
func
)
{}
virtual
~
OnHeterRpcDone
()
{}
virtual
~
OnHeterRpcDone
()
{}
void
Run
()
{
void
Run
()
{
std
::
unique_ptr
<
OnHeterRpcDone
>
self_guard
(
this
);
std
::
unique_ptr
<
OnHeterRpcDone
>
self_guard
(
this
);
...
@@ -75,12 +75,12 @@ class HeterWrapper {
...
@@ -75,12 +75,12 @@ class HeterWrapper {
void
CallRemoteXpu
(
std
::
shared_ptr
<
HeterTask
>
task
,
void
CallRemoteXpu
(
std
::
shared_ptr
<
HeterTask
>
task
,
HeterCpuWorker
*
worker
,
HeterCpuWorker
*
worker
,
int
mpi_rank
,
int
mpi_rank
,
std
::
vector
<
std
::
string
>&
send_vars
);
const
std
::
vector
<
std
::
string
>&
send_vars
);
void
CallRemoteXpuSync
(
std
::
shared_ptr
<
HeterTask
>
task
,
void
CallRemoteXpuSync
(
std
::
shared_ptr
<
HeterTask
>
task
,
HeterCpuWorker
*
worker
,
HeterCpuWorker
*
worker
,
int
mpi_rank
,
int
mpi_rank
,
std
::
vector
<
std
::
string
>&
send_vars
);
const
std
::
vector
<
std
::
string
>&
send_vars
);
void
StopXpuService
(
int
num
);
void
StopXpuService
(
int
num
);
...
...
paddle/phi/kernels/cpu/graph_sample_neighbors_kernel.cc
浏览文件 @
d5c5bbc3
...
@@ -26,8 +26,8 @@ void SampleUniqueNeighbors(
...
@@ -26,8 +26,8 @@ void SampleUniqueNeighbors(
bidiiter
begin
,
bidiiter
begin
,
bidiiter
end
,
bidiiter
end
,
int
num_samples
,
int
num_samples
,
std
::
mt19937
&
rng
,
std
::
mt19937
&
rng
,
// NOLINT
std
::
uniform_int_distribution
<
int
>&
dice_distribution
)
{
std
::
uniform_int_distribution
<
int
>&
dice_distribution
)
{
// NOLINT
int
left_num
=
std
::
distance
(
begin
,
end
);
int
left_num
=
std
::
distance
(
begin
,
end
);
for
(
int
i
=
0
;
i
<
num_samples
;
i
++
)
{
for
(
int
i
=
0
;
i
<
num_samples
;
i
++
)
{
bidiiter
r
=
begin
;
bidiiter
r
=
begin
;
...
@@ -46,8 +46,8 @@ void SampleUniqueNeighborsWithEids(
...
@@ -46,8 +46,8 @@ void SampleUniqueNeighborsWithEids(
bidiiter
eid_begin
,
bidiiter
eid_begin
,
bidiiter
eid_end
,
bidiiter
eid_end
,
int
num_samples
,
int
num_samples
,
std
::
mt19937
&
rng
,
std
::
mt19937
&
rng
,
// NOLINT
std
::
uniform_int_distribution
<
int
>&
dice_distribution
)
{
std
::
uniform_int_distribution
<
int
>&
dice_distribution
)
{
// NOLINT
int
left_num
=
std
::
distance
(
src_begin
,
src_end
);
int
left_num
=
std
::
distance
(
src_begin
,
src_end
);
for
(
int
i
=
0
;
i
<
num_samples
;
i
++
)
{
for
(
int
i
=
0
;
i
<
num_samples
;
i
++
)
{
bidiiter
r1
=
src_begin
,
r2
=
eid_begin
;
bidiiter
r1
=
src_begin
,
r2
=
eid_begin
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录