Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
c49f35cf
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c49f35cf
编写于
6月 09, 2022
作者:
Z
zhangchunle
提交者:
GitHub
6月 09, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[part1] fix sign-compare warning (#43276)
* fix sign-compare warning * fix sign-compare 2
上级
caa57498
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
114 addition
and
113 deletion
+114
-113
paddle/fluid/distributed/ps/service/brpc_ps_client.cc
paddle/fluid/distributed/ps/service/brpc_ps_client.cc
+14
-14
paddle/fluid/distributed/ps/service/graph_brpc_client.cc
paddle/fluid/distributed/ps/service/graph_brpc_client.cc
+15
-15
paddle/fluid/distributed/ps/service/graph_brpc_server.cc
paddle/fluid/distributed/ps/service/graph_brpc_server.cc
+3
-3
paddle/fluid/distributed/ps/table/ctr_accessor.cc
paddle/fluid/distributed/ps/table/ctr_accessor.cc
+3
-3
paddle/fluid/distributed/ps/table/ctr_accessor.h
paddle/fluid/distributed/ps/table/ctr_accessor.h
+1
-1
paddle/fluid/distributed/ps/table/ctr_double_accessor.cc
paddle/fluid/distributed/ps/table/ctr_double_accessor.cc
+5
-5
paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc
paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc
+1
-1
paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h
paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h
+1
-1
paddle/fluid/distributed/ps/table/memory_dense_table.cc
paddle/fluid/distributed/ps/table/memory_dense_table.cc
+8
-8
paddle/fluid/distributed/ps/table/memory_sparse_geo_table.cc
paddle/fluid/distributed/ps/table/memory_sparse_geo_table.cc
+5
-5
paddle/fluid/distributed/ps/table/memory_sparse_table.cc
paddle/fluid/distributed/ps/table/memory_sparse_table.cc
+16
-16
paddle/fluid/distributed/ps/table/sparse_accessor.cc
paddle/fluid/distributed/ps/table/sparse_accessor.cc
+2
-2
paddle/fluid/distributed/ps/table/sparse_accessor.h
paddle/fluid/distributed/ps/table/sparse_accessor.h
+1
-1
paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc
paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc
+7
-7
paddle/fluid/distributed/ps/table/ssd_sparse_table.cc
paddle/fluid/distributed/ps/table/ssd_sparse_table.cc
+19
-18
paddle/fluid/distributed/ps/table/ssd_sparse_table.h
paddle/fluid/distributed/ps/table/ssd_sparse_table.h
+2
-2
paddle/fluid/distributed/ps/wrapper/fleet.cc
paddle/fluid/distributed/ps/wrapper/fleet.cc
+3
-3
paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc
paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc
+3
-3
paddle/fluid/distributed/test/sparse_sgd_rule_test.cc
paddle/fluid/distributed/test/sparse_sgd_rule_test.cc
+5
-5
未找到文件。
paddle/fluid/distributed/ps/service/brpc_ps_client.cc
浏览文件 @
c49f35cf
...
...
@@ -197,7 +197,7 @@ int32_t BrpcPsClient::Initialize() {
// 异步push 请求队列初始化
const
auto
&
worker_param
=
_config
.
worker_param
().
downpour_worker_param
();
for
(
size_
t
i
=
0
;
i
<
worker_param
.
downpour_table_param_size
();
++
i
)
{
for
(
in
t
i
=
0
;
i
<
worker_param
.
downpour_table_param_size
();
++
i
)
{
auto
type
=
worker_param
.
downpour_table_param
(
i
).
type
();
auto
table_id
=
worker_param
.
downpour_table_param
(
i
).
table_id
();
if
(
type
==
PS_DENSE_TABLE
)
{
...
...
@@ -662,7 +662,7 @@ std::future<int32_t> BrpcPsClient::PushSparseParam(size_t table_id,
char
*
push_data_ptr
=
const_cast
<
char
*>
(
push_data
->
data
());
memcpy
(
push_data_ptr
,
kvs
.
data
(),
kv_size
*
sizeof
(
uint64_t
));
push_data_ptr
+=
kv_size
*
sizeof
(
uint64_t
);
for
(
in
t
i
=
0
;
i
<
kv_size
;
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
kv_size
;
++
i
)
{
memcpy
(
push_data_ptr
,
value_ptr
[
i
],
value_size
);
push_data_ptr
+=
value_size
;
}
...
...
@@ -882,7 +882,7 @@ std::future<int32_t> BrpcPsClient::PushSparseRawGradient(
memcpy
(
push_data_ptr
,
kvs
.
data
(),
kv_size
*
sizeof
(
uint64_t
));
push_data_ptr
+=
kv_size
*
sizeof
(
uint64_t
);
for
(
in
t
i
=
0
;
i
<
kv_size
;
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
kv_size
;
++
i
)
{
memcpy
(
push_data_ptr
,
value_ptr
[
i
],
value_size
);
push_data_ptr
+=
value_size
;
}
...
...
@@ -1237,7 +1237,7 @@ std::future<int32_t> BrpcPsClient::PushSparseRawGradientPartial(
char
*
push_data_ptr
=
const_cast
<
char
*>
(
push_data
->
data
());
memcpy
(
push_data_ptr
,
keys
,
num
*
sizeof
(
uint64_t
));
push_data_ptr
+=
num
*
sizeof
(
uint64_t
);
for
(
in
t
i
=
0
;
i
<
num
;
++
i
)
{
for
(
uint32_
t
i
=
0
;
i
<
num
;
++
i
)
{
memcpy
(
push_data_ptr
,
update_values
[
i
],
value_size
);
push_data_ptr
+=
value_size
;
}
...
...
@@ -1257,7 +1257,7 @@ int32_t BrpcPsClient::RecvAndSaveTable(const uint64_t table_id,
int64_t
var_shape
=
0
;
std
::
string
table_class
;
const
auto
&
worker_param
=
_config
.
worker_param
().
downpour_worker_param
();
for
(
size_
t
i
=
0
;
i
<
worker_param
.
downpour_table_param_size
();
++
i
)
{
for
(
in
t
i
=
0
;
i
<
worker_param
.
downpour_table_param_size
();
++
i
)
{
if
(
worker_param
.
downpour_table_param
(
i
).
table_id
()
==
table_id
)
{
var_name
=
worker_param
.
downpour_table_param
(
i
).
common
().
table_name
();
var_num
=
worker_param
.
downpour_table_param
(
i
).
common
().
table_num
();
...
...
@@ -1481,13 +1481,13 @@ void BrpcPsClient::PushSparseTaskConsume() {
closure
->
add_timer
(
rpc_timer
);
std
::
vector
<
std
::
future
<
int
>>
merge_status
(
request_call_num
);
for
(
in
t
shard_idx
=
0
;
shard_idx
<
request_call_num
;
++
shard_idx
)
{
for
(
size_
t
shard_idx
=
0
;
shard_idx
<
request_call_num
;
++
shard_idx
)
{
merge_status
[
shard_idx
]
=
async_push_sparse_shard_threads
.
enqueue
(
std
::
bind
(
&
BrpcPsClient
::
PushSparseAsyncShardPush
,
this
,
task_list
,
request_kv_num
,
table_id
,
shard_idx
,
closure
,
accessor
));
}
for
(
in
t
shard_idx
=
0
;
shard_idx
<
request_call_num
;
++
shard_idx
)
{
for
(
size_
t
shard_idx
=
0
;
shard_idx
<
request_call_num
;
++
shard_idx
)
{
merge_status
[
shard_idx
].
wait
();
}
merge_status
.
clear
();
...
...
@@ -1497,13 +1497,13 @@ void BrpcPsClient::PushSparseTaskConsume() {
auto
queue_size
=
task_queue
->
Size
();
}
else
{
// 未达到阈值 只做多路归并
std
::
vector
<
std
::
future
<
int
>>
merge_status
(
request_call_num
);
for
(
in
t
shard_idx
=
0
;
shard_idx
<
request_call_num
;
++
shard_idx
)
{
for
(
size_
t
shard_idx
=
0
;
shard_idx
<
request_call_num
;
++
shard_idx
)
{
merge_status
[
shard_idx
]
=
async_push_sparse_shard_threads
.
enqueue
(
std
::
bind
(
&
BrpcPsClient
::
PushSparseAsyncShardMerge
,
this
,
task_list
,
request_kv_num
,
table_id
,
shard_idx
,
accessor
));
}
for
(
in
t
shard_idx
=
0
;
shard_idx
<
request_call_num
;
++
shard_idx
)
{
for
(
size_
t
shard_idx
=
0
;
shard_idx
<
request_call_num
;
++
shard_idx
)
{
merge_status
[
shard_idx
].
wait
();
}
...
...
@@ -1529,7 +1529,7 @@ void sparse_local_merge(ValueAccessor *accessor, float *merge_data,
size_t
col_num
=
accessor
->
GetAccessorInfo
().
update_dim
;
float
*
merge_data_shell
[
col_num
];
const
float
*
another_data_shell
[
col_num
];
for
(
in
t
i
=
0
;
i
<
col_num
;
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
col_num
;
++
i
)
{
merge_data_shell
[
i
]
=
merge_data
+
i
;
another_data_shell
[
i
]
=
another_data
+
i
;
}
...
...
@@ -1546,12 +1546,12 @@ int BrpcPsClient::PushSparseAsyncShardMerge(
thread_local
std
::
vector
<
std
::
pair
<
uint64_t
,
const
float
*>>
sorted_kv_list
;
sorted_kv_list
.
clear
();
for
(
in
t
i
=
1
;
i
<
task_list
.
size
();
++
i
)
{
for
(
size_
t
i
=
1
;
i
<
task_list
.
size
();
++
i
)
{
size_t
kv_num
=
task_list
[
i
]
->
data
()
->
shared_data
[
shard_idx
].
kv_num
;
auto
&
key_list
=
task_list
[
i
]
->
data
()
->
shared_data
[
shard_idx
].
key_list
;
auto
&
value_list
=
task_list
[
i
]
->
data
()
->
shared_data
[
shard_idx
].
value_list
;
for
(
in
t
j
=
0
;
j
<
kv_num
;
++
j
)
{
for
(
size_
t
j
=
0
;
j
<
kv_num
;
++
j
)
{
if
(
value_list
[
j
].
size
()
<
value_size
)
{
LOG
(
WARNING
)
<<
"value_list["
<<
j
<<
"]: "
<<
value_list
[
j
].
c_str
()
<<
"is invalid."
;
...
...
@@ -1654,7 +1654,7 @@ int BrpcPsClient::PushSparseAsyncShardPush(
memcpy
(
push_data_ptr
,
merged_key_list
.
data
(),
merged_kv_count
*
sizeof
(
uint64_t
));
push_data_ptr
+=
merged_kv_count
*
sizeof
(
uint64_t
);
for
(
in
t
i
=
0
;
i
<
merged_kv_count
;
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
merged_kv_count
;
++
i
)
{
const
char
*
task_data_ptr
=
merged_value_list
[
i
].
data
();
memcpy
(
push_data_ptr
,
(
float
*
)(
task_data_ptr
),
// NOLINT
...
...
@@ -1778,7 +1778,7 @@ void BrpcPsClient::PushDenseTaskConsume() {
});
++
merge_count
;
}
for
(
in
t
i
=
0
;
i
<
merge_count
;
++
i
)
{
for
(
uint32_
t
i
=
0
;
i
<
merge_count
;
++
i
)
{
merge_status
[
i
].
wait
();
}
...
...
paddle/fluid/distributed/ps/service/graph_brpc_client.cc
浏览文件 @
c49f35cf
...
...
@@ -60,7 +60,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
std
::
vector
<
std
::
vector
<
std
::
string
>>
&
res
)
{
std
::
vector
<
int
>
request2server
;
std
::
vector
<
int
>
server2request
(
server_size
,
-
1
);
for
(
in
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
for
(
size_
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
int
server_index
=
get_server_index_by_id
(
node_ids
[
query_idx
]);
if
(
server2request
[
server_index
]
==
-
1
)
{
server2request
[
server_index
]
=
request2server
.
size
();
...
...
@@ -70,7 +70,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
size_t
request_call_num
=
request2server
.
size
();
std
::
vector
<
std
::
vector
<
int64_t
>>
node_id_buckets
(
request_call_num
);
std
::
vector
<
std
::
vector
<
int
>>
query_idx_buckets
(
request_call_num
);
for
(
in
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
for
(
size_
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
int
server_index
=
get_server_index_by_id
(
node_ids
[
query_idx
]);
int
request_idx
=
server2request
[
server_index
];
node_id_buckets
[
request_idx
].
push_back
(
node_ids
[
query_idx
]);
...
...
@@ -83,7 +83,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
size_t
fail_num
=
0
;
for
(
in
t
request_idx
=
0
;
request_idx
<
request_call_num
;
for
(
size_
t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
if
(
closure
->
check_response
(
request_idx
,
PS_GRAPH_GET_NODE_FEAT
)
!=
0
)
{
...
...
@@ -122,7 +122,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
closure
->
add_promise
(
promise
);
std
::
future
<
int
>
fut
=
promise
->
get_future
();
for
(
in
t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
for
(
size_
t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
int
server_index
=
request2server
[
request_idx
];
closure
->
request
(
request_idx
)
->
set_cmd_id
(
PS_GRAPH_GET_NODE_FEAT
);
closure
->
request
(
request_idx
)
->
set_table_id
(
table_id
);
...
...
@@ -271,7 +271,7 @@ std::future<int32_t> GraphBrpcClient::remove_graph_node(
request_call_num
,
[
&
,
request_call_num
](
void
*
done
)
{
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
in
t
fail_num
=
0
;
size_
t
fail_num
=
0
;
for
(
size_t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
if
(
closure
->
check_response
(
request_idx
,
...
...
@@ -378,7 +378,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
std
::
vector
<
int
>
server2request
(
server_size
,
-
1
);
res
.
clear
();
res_weight
.
clear
();
for
(
in
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
for
(
size_
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
int
server_index
=
get_server_index_by_id
(
node_ids
[
query_idx
]);
if
(
server2request
[
server_index
]
==
-
1
)
{
server2request
[
server_index
]
=
request2server
.
size
();
...
...
@@ -393,7 +393,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
size_t
request_call_num
=
request2server
.
size
();
std
::
vector
<
std
::
vector
<
int64_t
>>
node_id_buckets
(
request_call_num
);
std
::
vector
<
std
::
vector
<
int
>>
query_idx_buckets
(
request_call_num
);
for
(
in
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
for
(
size_
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
int
server_index
=
get_server_index_by_id
(
node_ids
[
query_idx
]);
int
request_idx
=
server2request
[
server_index
];
node_id_buckets
[
request_idx
].
push_back
(
node_ids
[
query_idx
]);
...
...
@@ -454,7 +454,7 @@ std::future<int32_t> GraphBrpcClient::batch_sample_neighbors(
closure
->
add_promise
(
promise
);
std
::
future
<
int
>
fut
=
promise
->
get_future
();
for
(
in
t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
for
(
size_
t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
int
server_index
=
request2server
[
request_idx
];
closure
->
request
(
request_idx
)
->
set_cmd_id
(
PS_GRAPH_SAMPLE_NEIGHBORS
);
closure
->
request
(
request_idx
)
->
set_table_id
(
table_id
);
...
...
@@ -492,7 +492,7 @@ std::future<int32_t> GraphBrpcClient::random_sample_nodes(
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
char
*
buffer
=
new
char
[
bytes_size
];
auto
size
=
io_buffer_itr
.
copy_and_forward
((
void
*
)(
buffer
),
bytes_size
);
in
t
index
=
0
;
size_
t
index
=
0
;
while
(
index
<
bytes_size
)
{
ids
.
push_back
(
*
(
int64_t
*
)(
buffer
+
index
));
index
+=
GraphNode
::
id_size
;
...
...
@@ -534,7 +534,7 @@ std::future<int32_t> GraphBrpcClient::pull_graph_list(
size_t
bytes_size
=
io_buffer_itr
.
bytes_left
();
char
*
buffer
=
new
char
[
bytes_size
];
io_buffer_itr
.
copy_and_forward
((
void
*
)(
buffer
),
bytes_size
);
in
t
index
=
0
;
size_
t
index
=
0
;
while
(
index
<
bytes_size
)
{
FeatureNode
node
;
node
.
recover_from_buffer
(
buffer
+
index
);
...
...
@@ -570,7 +570,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
const
std
::
vector
<
std
::
vector
<
std
::
string
>>
&
features
)
{
std
::
vector
<
int
>
request2server
;
std
::
vector
<
int
>
server2request
(
server_size
,
-
1
);
for
(
in
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
for
(
size_
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
int
server_index
=
get_server_index_by_id
(
node_ids
[
query_idx
]);
if
(
server2request
[
server_index
]
==
-
1
)
{
server2request
[
server_index
]
=
request2server
.
size
();
...
...
@@ -582,7 +582,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
std
::
vector
<
std
::
vector
<
int
>>
query_idx_buckets
(
request_call_num
);
std
::
vector
<
std
::
vector
<
std
::
vector
<
std
::
string
>>>
features_idx_buckets
(
request_call_num
);
for
(
in
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
for
(
size_
t
query_idx
=
0
;
query_idx
<
node_ids
.
size
();
++
query_idx
)
{
int
server_index
=
get_server_index_by_id
(
node_ids
[
query_idx
]);
int
request_idx
=
server2request
[
server_index
];
node_id_buckets
[
request_idx
].
push_back
(
node_ids
[
query_idx
]);
...
...
@@ -590,7 +590,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
if
(
features_idx_buckets
[
request_idx
].
size
()
==
0
)
{
features_idx_buckets
[
request_idx
].
resize
(
feature_names
.
size
());
}
for
(
in
t
feat_idx
=
0
;
feat_idx
<
feature_names
.
size
();
++
feat_idx
)
{
for
(
size_
t
feat_idx
=
0
;
feat_idx
<
feature_names
.
size
();
++
feat_idx
)
{
features_idx_buckets
[
request_idx
][
feat_idx
].
push_back
(
features
[
feat_idx
][
query_idx
]);
}
...
...
@@ -602,7 +602,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
size_t
fail_num
=
0
;
for
(
in
t
request_idx
=
0
;
request_idx
<
request_call_num
;
for
(
size_
t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
if
(
closure
->
check_response
(
request_idx
,
PS_GRAPH_SET_NODE_FEAT
)
!=
0
)
{
...
...
@@ -619,7 +619,7 @@ std::future<int32_t> GraphBrpcClient::set_node_feat(
closure
->
add_promise
(
promise
);
std
::
future
<
int
>
fut
=
promise
->
get_future
();
for
(
in
t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
for
(
size_
t
request_idx
=
0
;
request_idx
<
request_call_num
;
++
request_idx
)
{
int
server_index
=
request2server
[
request_idx
];
closure
->
request
(
request_idx
)
->
set_cmd_id
(
PS_GRAPH_SET_NODE_FEAT
);
closure
->
request
(
request_idx
)
->
set_table_id
(
table_id
);
...
...
paddle/fluid/distributed/ps/service/graph_brpc_server.cc
浏览文件 @
c49f35cf
...
...
@@ -516,7 +516,7 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers(
std
::
vector
<
int64_t
>
local_id
;
std
::
vector
<
int
>
local_query_idx
;
size_t
rank
=
GetRank
();
for
(
in
t
query_idx
=
0
;
query_idx
<
node_num
;
++
query_idx
)
{
for
(
size_
t
query_idx
=
0
;
query_idx
<
node_num
;
++
query_idx
)
{
int
server_index
=
((
GraphTable
*
)
table
)
->
get_server_index_by_id
(
node_data
[
query_idx
]);
if
(
server2request
[
server_index
]
==
-
1
)
{
...
...
@@ -538,7 +538,7 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers(
std
::
vector
<
size_t
>
seq
;
std
::
vector
<
std
::
vector
<
int64_t
>>
node_id_buckets
(
request_call_num
);
std
::
vector
<
std
::
vector
<
int
>>
query_idx_buckets
(
request_call_num
);
for
(
in
t
query_idx
=
0
;
query_idx
<
node_num
;
++
query_idx
)
{
for
(
size_
t
query_idx
=
0
;
query_idx
<
node_num
;
++
query_idx
)
{
int
server_index
=
((
GraphTable
*
)
table
)
->
get_server_index_by_id
(
node_data
[
query_idx
]);
int
request_idx
=
server2request
[
server_index
];
...
...
@@ -614,7 +614,7 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers(
closure
->
add_promise
(
promise
);
std
::
future
<
int
>
fut
=
promise
->
get_future
();
for
(
in
t
request_idx
=
0
;
request_idx
<
remote_call_num
;
++
request_idx
)
{
for
(
size_
t
request_idx
=
0
;
request_idx
<
remote_call_num
;
++
request_idx
)
{
int
server_index
=
request2server
[
request_idx
];
closure
->
request
(
request_idx
)
->
set_cmd_id
(
PS_GRAPH_SAMPLE_NEIGHBORS
);
closure
->
request
(
request_idx
)
->
set_table_id
(
request
.
table_id
());
...
...
paddle/fluid/distributed/ps/table/ctr_accessor.cc
浏览文件 @
c49f35cf
...
...
@@ -196,7 +196,7 @@ bool CtrCommonAccessor::NeedExtendMF(float* value) {
return
score
>=
_config
.
embedx_threshold
();
}
bool
CtrCommonAccessor
::
HasMF
(
size_
t
size
)
{
bool
CtrCommonAccessor
::
HasMF
(
in
t
size
)
{
return
size
>
common_feature_value
.
EmbedxG2SumIndex
();
}
...
...
@@ -227,11 +227,11 @@ int32_t CtrCommonAccessor::Merge(float** update_values,
const
float
**
other_update_values
,
size_t
num
)
{
auto
embedx_dim
=
_config
.
embedx_dim
();
size_
t
total_dim
=
CtrCommonPushValue
::
Dim
(
embedx_dim
);
in
t
total_dim
=
CtrCommonPushValue
::
Dim
(
embedx_dim
);
for
(
size_t
value_item
=
0
;
value_item
<
num
;
++
value_item
)
{
float
*
update_value
=
update_values
[
value_item
];
const
float
*
other_update_value
=
other_update_values
[
value_item
];
for
(
auto
i
=
0u
;
i
<
total_dim
;
++
i
)
{
for
(
int
i
=
0
;
i
<
total_dim
;
++
i
)
{
if
(
i
!=
CtrCommonPushValue
::
SlotIndex
())
{
update_value
[
i
]
+=
other_update_value
[
i
];
}
...
...
paddle/fluid/distributed/ps/table/ctr_accessor.h
浏览文件 @
c49f35cf
...
...
@@ -143,7 +143,7 @@ class CtrCommonAccessor : public ValueAccessor {
// 判断该value是否保存到ssd
// virtual bool save_ssd(float* value);
virtual
bool
NeedExtendMF
(
float
*
value
);
virtual
bool
HasMF
(
size_
t
size
);
virtual
bool
HasMF
(
in
t
size
);
// 判断该value是否在save阶段dump,
// param作为参数用于标识save阶段,如downpour的xbox与batch_model
// param = 0, save all feature
...
...
paddle/fluid/distributed/ps/table/ctr_double_accessor.cc
浏览文件 @
c49f35cf
...
...
@@ -139,7 +139,7 @@ bool CtrDoubleAccessor::Save(float* value, int param) {
}
default:
return
true
;
}
;
}
}
void
CtrDoubleAccessor
::
UpdateStatAfterSave
(
float
*
value
,
int
param
)
{
...
...
@@ -166,7 +166,7 @@ void CtrDoubleAccessor::UpdateStatAfterSave(float* value, int param) {
return
;
default:
return
;
}
;
}
}
int32_t
CtrDoubleAccessor
::
Create
(
float
**
values
,
size_t
num
)
{
...
...
@@ -175,7 +175,7 @@ int32_t CtrDoubleAccessor::Create(float** values, size_t num) {
float
*
value
=
values
[
value_item
];
value
[
CtrDoubleFeatureValue
::
UnseenDaysIndex
()]
=
0
;
value
[
CtrDoubleFeatureValue
::
DeltaScoreIndex
()]
=
0
;
*
(
double
*
)
(
value
+
CtrDoubleFeatureValue
::
ShowIndex
())
=
0
;
*
reinterpret_cast
<
double
*>
(
value
+
CtrDoubleFeatureValue
::
ShowIndex
())
=
0
;
*
(
double
*
)(
value
+
CtrDoubleFeatureValue
::
ClickIndex
())
=
0
;
value
[
CtrDoubleFeatureValue
::
SlotIndex
()]
=
-
1
;
_embed_sgd_rule
->
InitValue
(
...
...
@@ -233,7 +233,7 @@ int32_t CtrDoubleAccessor::Merge(float** update_values,
for (auto i = 3u; i < total_dim; ++i) {
update_value[i] += other_update_value[i];
}*/
for
(
auto
i
=
0u
;
i
<
total_dim
;
++
i
)
{
for
(
size_t
i
=
0
;
i
<
total_dim
;
++
i
)
{
if
(
i
!=
CtrDoublePushValue
::
SlotIndex
())
{
update_value
[
i
]
+=
other_update_value
[
i
];
}
...
...
@@ -320,7 +320,7 @@ std::string CtrDoubleAccessor::ParseToString(const float* v, int param_size) {
auto
score
=
ShowClickScore
(
show
,
click
);
if
(
score
>=
_config
.
embedx_threshold
()
&&
param_size
>
9
)
{
os
<<
" "
<<
v
[
9
];
for
(
auto
i
=
0
;
i
<
_config
.
embedx_dim
();
++
i
)
{
for
(
size_t
i
=
0
;
i
<
_config
.
embedx_dim
();
++
i
)
{
os
<<
" "
<<
v
[
10
+
i
];
}
}
...
...
paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc
浏览文件 @
c49f35cf
...
...
@@ -198,7 +198,7 @@ bool CtrDymfAccessor::NeedExtendMF(float* value) {
return
score
>=
_config
.
embedx_threshold
();
}
bool
CtrDymfAccessor
::
HasMF
(
size_
t
size
)
{
bool
CtrDymfAccessor
::
HasMF
(
in
t
size
)
{
return
size
>
common_feature_value
.
EmbedxG2SumIndex
();
}
...
...
paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h
浏览文件 @
c49f35cf
...
...
@@ -158,7 +158,7 @@ class CtrDymfAccessor : public ValueAccessor {
// 判断该value是否保存到ssd
// virtual bool save_ssd(float* value);
virtual
bool
NeedExtendMF
(
float
*
value
);
virtual
bool
HasMF
(
size_
t
size
);
virtual
bool
HasMF
(
in
t
size
);
// 判断该value是否在save阶段dump,
// param作为参数用于标识save阶段,如downpour的xbox与batch_model
// param = 0, save all feature
...
...
paddle/fluid/distributed/ps/table/memory_dense_table.cc
浏览文件 @
c49f35cf
...
...
@@ -41,7 +41,7 @@ void MemoryDenseTable::CreateInitializer(const std::string& attr,
int32_t
MemoryDenseTable
::
Initialize
()
{
_shards_task_pool
.
resize
(
task_pool_size_
);
for
(
in
t
i
=
0
;
i
<
_shards_task_pool
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
_shards_task_pool
.
size
();
++
i
)
{
_shards_task_pool
[
i
].
reset
(
new
::
ThreadPool
(
1
));
}
...
...
@@ -74,14 +74,14 @@ int32_t MemoryDenseTable::InitializeValue() {
values_
[
x
].
resize
(
dim
);
names_index_
[
varname
]
=
x
;
for
(
in
t
y
=
0
;
y
<
dim
;
++
y
)
{
for
(
size_
t
y
=
0
;
y
<
dim
;
++
y
)
{
values_
[
x
][
y
]
=
initializers_
[
varname
]
->
GetValue
();
}
}
fixed_len_params_dim_
=
0
;
for
(
int
x
=
0
;
x
<
size
;
++
x
)
{
auto
&
dim
=
common
.
dims
()[
x
];
int
dim
=
common
.
dims
()[
x
];
if
(
dim
!=
param_dim_
)
{
fixed_len_params_dim_
+=
dim
;
}
else
{
...
...
@@ -245,14 +245,14 @@ int32_t MemoryDenseTable::Load(const std::string& path,
do
{
is_read_failed
=
false
;
try
{
size_
t
dim_idx
=
0
;
in
t
dim_idx
=
0
;
float
data_buffer
[
5
];
float
*
data_buff_ptr
=
data_buffer
;
std
::
string
line_data
;
int
size
=
static_cast
<
int
>
(
values_
.
size
());
auto
common
=
_config
.
common
();
for
(
in
t
i
=
start_file_idx
;
i
<
end_file_idx
+
1
;
++
i
)
{
for
(
size_
t
i
=
start_file_idx
;
i
<
end_file_idx
+
1
;
++
i
)
{
channel_config
.
path
=
file_list
[
i
];
err_no
=
0
;
auto
read_channel
=
_afs_client
.
open_r
(
channel_config
,
0
,
&
err_no
);
...
...
@@ -271,12 +271,12 @@ int32_t MemoryDenseTable::Load(const std::string& path,
if
(
file_dim_idx
<
file_start_idx
)
{
continue
;
}
auto
str_len
=
size_t
str_len
=
paddle
::
string
::
str_to_float
(
line_data
.
data
(),
data_buff_ptr
);
CHECK
(
str_len
==
param_col_ids_
.
size
())
<<
"expect "
<<
param_col_ids_
.
size
()
<<
" float, but got "
<<
str_len
;
for
(
size_
t
col_idx
=
0
;
col_idx
<
str_len
;
++
col_idx
)
{
for
(
in
t
col_idx
=
0
;
col_idx
<
str_len
;
++
col_idx
)
{
if
(
param_col_ids_
[
col_idx
]
<
0
)
{
continue
;
}
...
...
@@ -355,7 +355,7 @@ int32_t MemoryDenseTable::Save(const std::string& path,
std
::
ostringstream
os
;
for
(
int
x
=
0
;
x
<
size
;
++
x
)
{
auto
&
varname
=
common
.
params
()[
x
];
auto
&
dim
=
common
.
dims
()[
x
];
int
dim
=
common
.
dims
()[
x
];
VLOG
(
3
)
<<
"MemoryDenseTable::save dim "
<<
x
<<
" size: "
<<
dim
;
for
(
int
y
=
0
;
y
<
dim
;
++
y
)
{
os
.
clear
();
...
...
paddle/fluid/distributed/ps/table/memory_sparse_geo_table.cc
浏览文件 @
c49f35cf
...
...
@@ -49,7 +49,7 @@ int32_t MemorySparseGeoTable::PushSparseParam(const uint64_t* keys,
std
::
vector
<
std
::
vector
<
uint64_t
>>
offset_bucket
;
offset_bucket
.
resize
(
shard_num
);
for
(
in
t
x
=
0
;
x
<
num
;
++
x
)
{
for
(
size_
t
x
=
0
;
x
<
num
;
++
x
)
{
auto
y
=
keys
[
x
]
%
shard_num
;
offset_bucket
[
y
].
push_back
(
x
);
if
(
x
<
10
)
{
...
...
@@ -66,7 +66,7 @@ int32_t MemorySparseGeoTable::PushSparseParam(const uint64_t* keys,
auto
&
local_shard
=
_local_shards
[
shard_id
];
auto
&
offsets
=
offset_bucket
[
shard_id
];
for
(
in
t
i
=
0
;
i
<
offsets
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
offsets
.
size
();
++
i
)
{
auto
offset
=
offsets
[
i
];
auto
id
=
keys
[
offset
];
auto
&
feature_value
=
local_shard
[
id
];
...
...
@@ -132,7 +132,7 @@ int32_t MemorySparseGeoTable::Initialize() {
_dim
=
_config
.
common
().
dims
()[
0
];
_shards_task_pool
.
resize
(
_task_pool_size
);
for
(
in
t
i
=
0
;
i
<
_shards_task_pool
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
_shards_task_pool
.
size
();
++
i
)
{
_shards_task_pool
[
i
].
reset
(
new
::
ThreadPool
(
1
));
}
...
...
@@ -200,14 +200,14 @@ int32_t MemorySparseGeoTable::_PushSparse(const uint64_t* keys,
task_keys
[
shard_id
].
push_back
({
keys
[
i
],
i
});
}
for
(
size_
t
shard_id
=
0
;
shard_id
<
shard_num
;
++
shard_id
)
{
for
(
in
t
shard_id
=
0
;
shard_id
<
shard_num
;
++
shard_id
)
{
tasks
[
shard_id
]
=
_shards_task_pool
[
shard_id
]
->
enqueue
(
[
this
,
shard_id
,
values
,
&
task_keys
]()
->
int
{
auto
&
keys
=
task_keys
[
shard_id
];
auto
&
local_shard
=
_local_shards
[
shard_id
];
auto
blas
=
GetBlas
<
float
>
();
for
(
in
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
uint64_t
key
=
keys
[
i
].
first
;
uint64_t
push_data_idx
=
keys
[
i
].
second
;
const
float
*
update_data
=
values
+
push_data_idx
*
_dim
;
...
...
paddle/fluid/distributed/ps/table/memory_sparse_table.cc
浏览文件 @
c49f35cf
...
...
@@ -37,7 +37,7 @@ namespace distributed {
int32_t
MemorySparseTable
::
Initialize
()
{
_shards_task_pool
.
resize
(
_task_pool_size
);
for
(
in
t
i
=
0
;
i
<
_shards_task_pool
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
_shards_task_pool
.
size
();
++
i
)
{
_shards_task_pool
[
i
].
reset
(
new
::
ThreadPool
(
1
));
}
auto
&
profiler
=
CostProfiler
::
instance
();
...
...
@@ -79,7 +79,7 @@ int32_t MemorySparseTable::Load(const std::string& path,
}
int
load_param
=
atoi
(
param
.
c_str
());
auto
expect_shard_num
=
_sparse_table_shard_num
;
size_t
expect_shard_num
=
_sparse_table_shard_num
;
if
(
file_list
.
size
()
!=
expect_shard_num
)
{
LOG
(
WARNING
)
<<
"MemorySparseTable file_size:"
<<
file_list
.
size
()
<<
" not equal to expect_shard_num:"
<<
expect_shard_num
;
...
...
@@ -98,7 +98,7 @@ int32_t MemorySparseTable::Load(const std::string& path,
int
thread_num
=
_real_local_shard_num
<
15
?
_real_local_shard_num
:
15
;
omp_set_num_threads
(
thread_num
);
#pragma omp parallel for schedule(dynamic)
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
FsChannelConfig
channel_config
;
channel_config
.
path
=
file_list
[
file_start_idx
+
i
];
VLOG
(
1
)
<<
"MemorySparseTable::load begin load "
<<
channel_config
.
path
...
...
@@ -164,7 +164,7 @@ int32_t MemorySparseTable::LoadLocalFS(const std::string& path,
auto
file_list
=
paddle
::
framework
::
localfs_list
(
table_path
);
int
load_param
=
atoi
(
param
.
c_str
());
auto
expect_shard_num
=
_sparse_table_shard_num
;
size_t
expect_shard_num
=
_sparse_table_shard_num
;
if
(
file_list
.
size
()
!=
expect_shard_num
)
{
LOG
(
WARNING
)
<<
"MemorySparseTable file_size:"
<<
file_list
.
size
()
<<
" not equal to expect_shard_num:"
<<
expect_shard_num
;
...
...
@@ -183,7 +183,7 @@ int32_t MemorySparseTable::LoadLocalFS(const std::string& path,
int
thread_num
=
_real_local_shard_num
<
15
?
_real_local_shard_num
:
15
;
omp_set_num_threads
(
thread_num
);
#pragma omp parallel for schedule(dynamic)
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
bool
is_read_failed
=
false
;
int
retry_num
=
0
;
int
err_no
=
0
;
...
...
@@ -244,7 +244,7 @@ int32_t MemorySparseTable::Save(const std::string& dirname,
int
thread_num
=
_real_local_shard_num
<
20
?
_real_local_shard_num
:
20
;
omp_set_num_threads
(
thread_num
);
#pragma omp parallel for schedule(dynamic)
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
FsChannelConfig
channel_config
;
if
(
_config
.
compress_in_save
()
&&
(
save_param
==
0
||
save_param
==
3
))
{
channel_config
.
path
=
paddle
::
string
::
format_string
(
...
...
@@ -326,7 +326,7 @@ int32_t MemorySparseTable::SaveLocalFS(const std::string& dirname,
omp_set_num_threads
(
thread_num
);
#pragma omp parallel for schedule(dynamic)
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
feasign_cnt
=
0
;
auto
&
shard
=
_local_shards
[
i
];
std
::
string
file_name
=
paddle
::
string
::
format_string
(
...
...
@@ -354,7 +354,7 @@ int32_t MemorySparseTable::SaveLocalFS(const std::string& dirname,
int64_t
MemorySparseTable
::
LocalSize
()
{
int64_t
local_size
=
0
;
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
local_size
+=
_local_shards
[
i
].
size
();
}
return
local_size
;
...
...
@@ -364,7 +364,7 @@ int64_t MemorySparseTable::LocalMFSize() {
std
::
vector
<
int64_t
>
size_arr
(
_real_local_shard_num
,
0
);
std
::
vector
<
std
::
future
<
int
>>
tasks
(
_real_local_shard_num
);
int64_t
ret_size
=
0
;
for
(
size_
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
for
(
in
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
tasks
[
shard_id
]
=
_shards_task_pool
[
shard_id
%
_shards_task_pool
.
size
()]
->
enqueue
(
[
this
,
shard_id
,
&
size_arr
]()
->
int
{
...
...
@@ -378,7 +378,7 @@ int64_t MemorySparseTable::LocalMFSize() {
return
0
;
});
}
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
tasks
[
i
].
wait
();
}
for
(
auto
x
:
size_arr
)
{
...
...
@@ -469,7 +469,7 @@ int32_t MemorySparseTable::PullSparse(float* pull_values,
memcpy
(
data_buffer_ptr
,
itr
.
value
().
data
(),
data_size
*
sizeof
(
float
));
}
for
(
in
t
mf_idx
=
data_size
;
mf_idx
<
value_size
;
++
mf_idx
)
{
for
(
size_
t
mf_idx
=
data_size
;
mf_idx
<
value_size
;
++
mf_idx
)
{
data_buffer
[
mf_idx
]
=
0.0
;
}
auto
offset
=
keys
[
i
].
second
;
...
...
@@ -503,7 +503,7 @@ int32_t MemorySparseTable::PullSparsePtr(char** pull_values,
task_keys
[
shard_id
].
push_back
({
keys
[
i
],
i
});
}
// std::atomic<uint32_t> missed_keys{0};
for
(
size_
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
for
(
in
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
tasks
[
shard_id
]
=
_shards_task_pool
[
shard_id
%
_shards_task_pool
.
size
()]
->
enqueue
(
[
this
,
shard_id
,
&
task_keys
,
pull_values
,
value_size
,
...
...
@@ -512,7 +512,7 @@ int32_t MemorySparseTable::PullSparsePtr(char** pull_values,
auto
&
local_shard
=
_local_shards
[
shard_id
];
float
data_buffer
[
value_size
];
float
*
data_buffer_ptr
=
data_buffer
;
for
(
in
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
uint64_t
key
=
keys
[
i
].
first
;
auto
itr
=
local_shard
.
find
(
key
);
size_t
data_size
=
value_size
-
mf_value_size
;
...
...
@@ -558,7 +558,7 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, const float* values,
size_t
update_value_col
=
_value_accesor
->
GetAccessorInfo
().
update_size
/
sizeof
(
float
);
for
(
size_
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
for
(
in
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
tasks
[
shard_id
]
=
_shards_task_pool
[
shard_id
%
_task_pool_size
]
->
enqueue
(
[
this
,
shard_id
,
value_col
,
mf_value_col
,
update_value_col
,
values
,
&
task_keys
]()
->
int
{
...
...
@@ -566,7 +566,7 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, const float* values,
auto
&
local_shard
=
_local_shards
[
shard_id
];
float
data_buffer
[
value_col
];
// NOLINT
float
*
data_buffer_ptr
=
data_buffer
;
for
(
in
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
uint64_t
key
=
keys
[
i
].
first
;
uint64_t
push_data_idx
=
keys
[
i
].
second
;
const
float
*
update_data
=
...
...
@@ -639,7 +639,7 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys,
auto
&
local_shard
=
_local_shards
[
shard_id
];
float
data_buffer
[
value_col
];
// NOLINT
float
*
data_buffer_ptr
=
data_buffer
;
for
(
in
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
uint64_t
key
=
keys
[
i
].
first
;
uint64_t
push_data_idx
=
keys
[
i
].
second
;
const
float
*
update_data
=
values
[
push_data_idx
];
...
...
paddle/fluid/distributed/ps/table/sparse_accessor.cc
浏览文件 @
c49f35cf
...
...
@@ -171,7 +171,7 @@ bool SparseAccessor::NeedExtendMF(float* value) {
return
score
>=
_config
.
embedx_threshold
();
}
bool
SparseAccessor
::
HasMF
(
size_
t
size
)
{
bool
SparseAccessor
::
HasMF
(
in
t
size
)
{
return
size
>
sparse_feature_value
.
EmbedxG2SumIndex
();
}
...
...
@@ -201,7 +201,7 @@ int32_t SparseAccessor::Merge(float** update_values,
for
(
size_t
value_item
=
0
;
value_item
<
num
;
++
value_item
)
{
float
*
update_value
=
update_values
[
value_item
];
const
float
*
other_update_value
=
other_update_values
[
value_item
];
for
(
auto
i
=
0u
;
i
<
total_dim
;
++
i
)
{
for
(
size_t
i
=
0
;
i
<
total_dim
;
++
i
)
{
if
(
i
!=
SparsePushValue
::
SlotIndex
())
{
update_value
[
i
]
+=
other_update_value
[
i
];
}
...
...
paddle/fluid/distributed/ps/table/sparse_accessor.h
浏览文件 @
c49f35cf
...
...
@@ -130,7 +130,7 @@ class SparseAccessor : public ValueAccessor {
// 判断该value是否保存到ssd
// virtual bool save_ssd(float* value);
virtual
bool
NeedExtendMF
(
float
*
value
);
virtual
bool
HasMF
(
size_
t
size
);
virtual
bool
HasMF
(
in
t
size
);
// 判断该value是否在save阶段dump,
// param作为参数用于标识save阶段,如downpour的xbox与batch_model
// param = 0, save all feature
...
...
paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc
浏览文件 @
c49f35cf
...
...
@@ -90,7 +90,7 @@ void SparseAdaGradSGDRule::UpdateValueWork(float* w, float* sgd,
float
&
g2sum
=
sgd
[
G2SumIndex
()];
double
add_g2sum
=
0
;
for
(
in
t
i
=
0
;
i
<
_embedding_dim
;
i
++
)
{
for
(
size_
t
i
=
0
;
i
<
_embedding_dim
;
i
++
)
{
double
scaled_grad
=
grad
[
i
]
/
scale
;
w
[
i
]
-=
learning_rate_
*
scaled_grad
*
sqrt
(
_initial_g2sum
/
(
_initial_g2sum
+
g2sum
));
...
...
@@ -103,7 +103,7 @@ void SparseAdaGradSGDRule::UpdateValueWork(float* w, float* sgd,
void
SparseAdaGradSGDRule
::
InitValueWork
(
float
*
value
,
float
*
sgd
,
bool
zero_init
)
{
for
(
in
t
i
=
0
;
i
<
_embedding_dim
;
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
_embedding_dim
;
++
i
)
{
if
(
zero_init
)
{
value
[
i
]
=
0.0
;
BoundValue
(
value
[
i
]);
...
...
@@ -141,7 +141,7 @@ void StdAdaGradSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param,
void
StdAdaGradSGDRule
::
UpdateValueWork
(
float
*
w
,
float
*
sgd
,
const
float
*
grad
,
float
scale
)
{
for
(
in
t
i
=
0
;
i
<
_embedding_dim
;
i
++
)
{
for
(
size_
t
i
=
0
;
i
<
_embedding_dim
;
i
++
)
{
float
&
g2sum
=
sgd
[
G2SumIndex
()
+
i
];
double
scaled_grad
=
grad
[
i
]
/
scale
;
w
[
i
]
-=
learning_rate_
*
scaled_grad
*
...
...
@@ -153,7 +153,7 @@ void StdAdaGradSGDRule::UpdateValueWork(float* w, float* sgd, const float* grad,
void
StdAdaGradSGDRule
::
InitValueWork
(
float
*
value
,
float
*
sgd
,
bool
zero_init
)
{
for
(
in
t
i
=
0
;
i
<
_embedding_dim
;
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
_embedding_dim
;
++
i
)
{
if
(
zero_init
)
{
value
[
i
]
=
0.0
;
BoundValue
(
value
[
i
]);
...
...
@@ -204,7 +204,7 @@ void SparseAdamSGDRule::UpdateValueWork(float* w, float* sgd, const float* grad,
// lr not change in one update
lr
*=
sqrt
(
1
-
beta2_pow_
)
/
(
1
-
beta1_pow_
);
for
(
in
t
i
=
0
;
i
<
_embedding_dim
;
i
++
)
{
for
(
size_
t
i
=
0
;
i
<
_embedding_dim
;
i
++
)
{
// Calculation
gsum
[
i
]
=
_beta1_decay_rate
*
gsum
[
i
]
+
(
1
-
_beta1_decay_rate
)
*
g
[
i
];
g2sum
[
i
]
=
...
...
@@ -219,7 +219,7 @@ void SparseAdamSGDRule::UpdateValueWork(float* w, float* sgd, const float* grad,
void
SparseAdamSGDRule
::
InitValueWork
(
float
*
value
,
float
*
sgd
,
bool
zero_init
)
{
for
(
in
t
i
=
0
;
i
<
_embedding_dim
;
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
_embedding_dim
;
++
i
)
{
if
(
zero_init
)
{
value
[
i
]
=
0.0
;
BoundValue
(
value
[
i
]);
...
...
@@ -233,7 +233,7 @@ void SparseAdamSGDRule::InitValueWork(float* value, float* sgd,
}
}
// init rule gsum and g2sum
for
(
in
t
i
=
GSumIndex
();
i
<
Beta1PowIndex
();
i
++
)
{
for
(
size_
t
i
=
GSumIndex
();
i
<
Beta1PowIndex
();
i
++
)
{
sgd
[
i
]
=
0.0
;
}
// init beta1_pow and beta2_pow
...
...
paddle/fluid/distributed/ps/table/ssd_sparse_table.cc
浏览文件 @
c49f35cf
...
...
@@ -58,7 +58,7 @@ int32_t SSDSparseTable::PullSparse(float* pull_values, const uint64_t* keys,
}
std
::
atomic
<
uint32_t
>
missed_keys
{
0
};
for
(
size_
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
for
(
in
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
tasks
[
shard_id
]
=
_shards_task_pool
[
shard_id
%
_shards_task_pool
.
size
()]
->
enqueue
(
[
this
,
shard_id
,
&
task_keys
,
value_size
,
mf_value_size
,
...
...
@@ -67,7 +67,7 @@ int32_t SSDSparseTable::PullSparse(float* pull_values, const uint64_t* keys,
auto
&
local_shard
=
_local_shards
[
shard_id
];
float
data_buffer
[
value_size
];
float
*
data_buffer_ptr
=
data_buffer
;
for
(
in
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
uint64_t
key
=
keys
[
i
].
first
;
auto
itr
=
local_shard
.
find
(
key
);
size_t
data_size
=
value_size
-
mf_value_size
;
...
...
@@ -105,7 +105,8 @@ int32_t SSDSparseTable::PullSparse(float* pull_values, const uint64_t* keys,
memcpy
(
data_buffer_ptr
,
itr
.
value
().
data
(),
data_size
*
sizeof
(
float
));
}
for
(
int
mf_idx
=
data_size
;
mf_idx
<
value_size
;
++
mf_idx
)
{
for
(
size_t
mf_idx
=
data_size
;
mf_idx
<
value_size
;
++
mf_idx
)
{
data_buffer
[
mf_idx
]
=
0.0
;
}
int
pull_data_idx
=
keys
[
i
].
second
;
...
...
@@ -117,7 +118,7 @@ int32_t SSDSparseTable::PullSparse(float* pull_values, const uint64_t* keys,
return
0
;
});
}
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
tasks
[
i
].
wait
();
}
if
(
FLAGS_pserver_print_missed_key_num_every_push
)
{
...
...
@@ -145,7 +146,7 @@ int32_t SSDSparseTable::PushSparse(const uint64_t* keys, const float* values,
int
shard_id
=
(
keys
[
i
]
%
_sparse_table_shard_num
)
%
_avg_local_shard_num
;
task_keys
[
shard_id
].
push_back
({
keys
[
i
],
i
});
}
for
(
size_
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
for
(
in
t
shard_id
=
0
;
shard_id
<
_real_local_shard_num
;
++
shard_id
)
{
tasks
[
shard_id
]
=
_shards_task_pool
[
shard_id
%
_shards_task_pool
.
size
()]
->
enqueue
(
[
this
,
shard_id
,
value_col
,
mf_value_col
,
update_value_col
,
...
...
@@ -154,7 +155,7 @@ int32_t SSDSparseTable::PushSparse(const uint64_t* keys, const float* values,
auto
&
local_shard
=
_local_shards
[
shard_id
];
float
data_buffer
[
value_col
];
float
*
data_buffer_ptr
=
data_buffer
;
for
(
in
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
for
(
size_
t
i
=
0
;
i
<
keys
.
size
();
++
i
)
{
uint64_t
key
=
keys
[
i
].
first
;
uint64_t
push_data_idx
=
keys
[
i
].
second
;
const
float
*
update_data
=
...
...
@@ -196,7 +197,7 @@ int32_t SSDSparseTable::PushSparse(const uint64_t* keys, const float* values,
return
0
;
});
}
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
tasks
[
i
].
wait
();
}
}
...
...
@@ -228,7 +229,7 @@ int32_t SSDSparseTable::Shrink(const std::string& param) {
int
thread_num
=
_real_local_shard_num
<
20
?
_real_local_shard_num
:
20
;
omp_set_num_threads
(
thread_num
);
#pragma omp parallel for schedule(dynamic)
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
uint64_t
mem_count
=
0
;
uint64_t
ssd_count
=
0
;
...
...
@@ -264,7 +265,7 @@ int32_t SSDSparseTable::Shrink(const std::string& param) {
int32_t
SSDSparseTable
::
UpdateTable
()
{
// TODO implement with multi-thread
int
count
=
0
;
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
auto
&
shard
=
_local_shards
[
i
];
// from mem to ssd
for
(
auto
it
=
shard
.
begin
();
it
!=
shard
.
end
();)
{
...
...
@@ -285,7 +286,7 @@ int32_t SSDSparseTable::UpdateTable() {
int64_t
SSDSparseTable
::
LocalSize
()
{
int64_t
local_size
=
0
;
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
local_size
+=
_local_shards
[
i
].
size
();
}
// TODO rocksdb size
...
...
@@ -328,7 +329,7 @@ int32_t SSDSparseTable::Save(const std::string& path,
omp_set_num_threads
(
thread_num
);
#pragma omp parallel for schedule(dynamic)
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
FsChannelConfig
channel_config
;
if
(
_config
.
compress_in_save
()
&&
(
save_param
==
0
||
save_param
==
3
))
{
channel_config
.
path
=
paddle
::
string
::
format_string
(
...
...
@@ -484,14 +485,14 @@ int64_t SSDSparseTable::CacheShuffle(
int
feasign_size
=
0
;
std
::
vector
<
paddle
::
framework
::
Channel
<
std
::
pair
<
uint64_t
,
std
::
string
>>>
tmp_channels
;
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
tmp_channels
.
push_back
(
paddle
::
framework
::
MakeChannel
<
std
::
pair
<
uint64_t
,
std
::
string
>>
());
}
omp_set_num_threads
(
thread_num
);
#pragma omp parallel for schedule(dynamic)
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
paddle
::
framework
::
ChannelWriter
<
std
::
pair
<
uint64_t
,
std
::
string
>>&
writer
=
writers
[
i
];
// std::shared_ptr<paddle::framework::ChannelObject<std::pair<uint64_t,
...
...
@@ -520,7 +521,7 @@ int64_t SSDSparseTable::CacheShuffle(
<<
" and start sparse cache data shuffle real local shard num: "
<<
_real_local_shard_num
;
std
::
vector
<
std
::
pair
<
uint64_t
,
std
::
string
>>
local_datas
;
for
(
size_
t
idx_shard
=
0
;
idx_shard
<
_real_local_shard_num
;
++
idx_shard
)
{
for
(
in
t
idx_shard
=
0
;
idx_shard
<
_real_local_shard_num
;
++
idx_shard
)
{
paddle
::
framework
::
ChannelWriter
<
std
::
pair
<
uint64_t
,
std
::
string
>>&
writer
=
writers
[
idx_shard
];
auto
channel
=
writer
.
channel
();
...
...
@@ -543,8 +544,8 @@ int64_t SSDSparseTable::CacheShuffle(
send_index
[
i
]
=
i
;
}
std
::
random_shuffle
(
send_index
.
begin
(),
send_index
.
end
());
for
(
auto
index
=
0u
;
index
<
shuffle_node_num
;
++
index
)
{
in
t
i
=
send_index
[
index
];
for
(
int
index
=
0
;
index
<
shuffle_node_num
;
++
index
)
{
size_
t
i
=
send_index
[
index
];
if
(
i
==
_shard_idx
)
{
continue
;
}
...
...
@@ -624,7 +625,7 @@ int32_t SSDSparseTable::Load(const std::string& path,
}
//加载path目录下数据[start_idx, end_idx)
int32_t
SSDSparseTable
::
Load
(
size_t
start_idx
,
size_
t
end_idx
,
int32_t
SSDSparseTable
::
Load
(
size_t
start_idx
,
in
t
end_idx
,
const
std
::
vector
<
std
::
string
>&
file_list
,
const
std
::
string
&
param
)
{
if
(
start_idx
>=
file_list
.
size
())
{
...
...
@@ -688,7 +689,7 @@ int32_t SSDSparseTable::Load(size_t start_idx, size_t end_idx,
continue
;
}
}
in
t
value_size
=
size_
t
value_size
=
_value_accesor
->
ParseFromString
(
++
end
,
data_buffer_ptr
);
// ssd or mem
if
(
_value_accesor
->
SaveSSD
(
data_buffer_ptr
))
{
...
...
paddle/fluid/distributed/ps/table/ssd_sparse_table.h
浏览文件 @
c49f35cf
...
...
@@ -55,7 +55,7 @@ class SSDSparseTable : public MemorySparseTable {
int32_t
Flush
()
override
{
return
0
;
}
virtual
int32_t
Shrink
(
const
std
::
string
&
param
)
override
;
virtual
void
Clear
()
override
{
for
(
size_
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
for
(
in
t
i
=
0
;
i
<
_real_local_shard_num
;
++
i
)
{
_local_shards
[
i
].
clear
();
}
}
...
...
@@ -79,7 +79,7 @@ class SSDSparseTable : public MemorySparseTable {
virtual
int32_t
Load
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
override
;
//加载path目录下数据[start_idx, end_idx)
virtual
int32_t
Load
(
size_t
start_idx
,
size_
t
end_idx
,
virtual
int32_t
Load
(
size_t
start_idx
,
in
t
end_idx
,
const
std
::
vector
<
std
::
string
>&
file_list
,
const
std
::
string
&
param
);
int64_t
LocalSize
();
...
...
paddle/fluid/distributed/ps/wrapper/fleet.cc
浏览文件 @
c49f35cf
...
...
@@ -536,8 +536,8 @@ void FleetWrapper::PushSparseFromTensorAsync(
output_len
=
0
;
if
(
tensor
->
lod
().
size
()
>
0
)
{
for
(
size_
t
i
=
0
;
i
<
tensor
->
lod
()[
0
].
size
()
-
1
;
++
i
)
{
for
(
in
t
j
=
tensor
->
lod
()[
0
][
i
];
j
<
tensor
->
lod
()[
0
][
i
+
1
];
for
(
in
t
i
=
0
;
i
<
tensor
->
lod
()[
0
].
size
()
-
1
;
++
i
)
{
for
(
size_
t
j
=
tensor
->
lod
()[
0
][
i
];
j
<
tensor
->
lod
()[
0
][
i
+
1
];
++
j
,
output_len
+=
fea_dim
)
{
uint64_t
real_id
=
static_cast
<
uint64_t
>
(
ids
[
j
]);
if
(
real_id
==
padding_id
)
{
...
...
@@ -566,7 +566,7 @@ void FleetWrapper::PushSparseFromTensorAsync(
}
}
}
else
{
for
(
size_
t
i
=
0
;
i
<
len
;
++
i
,
output_len
+=
fea_dim
)
{
for
(
in
t
i
=
0
;
i
<
len
;
++
i
,
output_len
+=
fea_dim
)
{
uint64_t
real_id
=
static_cast
<
uint64_t
>
(
ids
[
i
]);
if
(
real_id
==
padding_id
)
{
continue
;
...
...
paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc
浏览文件 @
c49f35cf
...
...
@@ -222,7 +222,7 @@ void RunBrpcPushDense() {
worker_ptr_
->
PullDense
(
temp_region
.
data
(),
temp_region
.
size
(),
0
);
pull_status
.
wait
();
for
(
size
_t
idx
=
0
;
idx
<
tensor
->
numel
();
++
idx
)
{
for
(
int64
_t
idx
=
0
;
idx
<
tensor
->
numel
();
++
idx
)
{
EXPECT_FLOAT_EQ
(
temp
[
idx
],
1.0
);
}
...
...
@@ -236,7 +236,7 @@ void RunBrpcPushDense() {
pull_status
=
worker_ptr_
->
PullDense
(
regions
.
data
(),
regions
.
size
(),
0
);
pull_status
.
wait
();
for
(
size
_t
idx
=
0
;
idx
<
tensor
->
numel
();
++
idx
)
{
for
(
int64
_t
idx
=
0
;
idx
<
tensor
->
numel
();
++
idx
)
{
EXPECT_FLOAT_EQ
(
w
[
idx
],
float
(
idx
));
}
...
...
@@ -265,7 +265,7 @@ void RunBrpcPushDense() {
worker_ptr_
->
PullDense
(
regions
.
data
(),
regions
.
size
(),
0
);
pull_update_status
.
wait
();
for
(
size
_t
idx
=
0
;
idx
<
tensor
->
numel
();
++
idx
)
{
for
(
int64
_t
idx
=
0
;
idx
<
tensor
->
numel
();
++
idx
)
{
EXPECT_FLOAT_EQ
(
w
[
idx
],
float
(
idx
)
-
1.0
);
}
...
...
paddle/fluid/distributed/test/sparse_sgd_rule_test.cc
浏览文件 @
c49f35cf
...
...
@@ -89,25 +89,25 @@ TEST(downpour_sparse_adagrad_test, test_init_and_update) {
rule
.
InitValue
(
w
,
w
+
10
,
true
);
for
(
auto
i
=
0u
;
i
<
kEmbSize
;
++
i
)
{
for
(
int
i
=
0
;
i
<
kEmbSize
;
++
i
)
{
ASSERT_FLOAT_EQ
(
w
[
i
],
0
);
}
ASSERT_FLOAT_EQ
(
w
[
kEmbSize
],
0
);
// check init_value for random
rule
.
InitValue
(
w
,
w
+
10
,
false
);
for
(
auto
i
=
0u
;
i
<
kEmbSize
;
++
i
)
{
for
(
int
i
=
0
;
i
<
kEmbSize
;
++
i
)
{
ASSERT_TRUE
(
w
[
i
]
>=
rule
.
MinBound
()
&&
w
[
i
]
<=
rule
.
MaxBound
());
}
ASSERT_FLOAT_EQ
(
w
[
kEmbSize
],
0
);
// check update_value for one field
for
(
auto
i
=
0u
;
i
<
kEmbSize
;
++
i
)
{
for
(
int
i
=
0
;
i
<
kEmbSize
;
++
i
)
{
w
[
i
]
=
0
;
}
w
[
kEmbSize
]
=
0
;
float
grad
[
kEmbSize
];
for
(
auto
i
=
0u
;
i
<
kEmbSize
;
++
i
)
{
for
(
int
i
=
0
;
i
<
kEmbSize
;
++
i
)
{
grad
[
i
]
=
(
i
+
1
)
*
1.0
;
}
...
...
@@ -185,7 +185,7 @@ TEST(downpour_sparse_adam_test, test_init_and_update) {
rule
.
UpdateValue
(
value
,
value
+
embed_dim
,
grad
);
for
(
auto
i
=
0u
;
i
<
value_dim
;
++
i
)
{
// check update
for
(
int
i
=
0
;
i
<
value_dim
;
++
i
)
{
// check update
ASSERT_FLOAT_EQ
(
value
[
i
],
label
[
i
])
<<
"i is "
<<
i
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录