Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
magicwindyyd
mindspore
提交
2b86c92f
M
mindspore
项目概览
magicwindyyd
/
mindspore
与 Fork 源项目一致
Fork自
MindSpore / mindspore
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
mindspore
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2b86c92f
编写于
8月 28, 2020
作者:
M
mindspore-ci-bot
提交者:
Gitee
8月 28, 2020
浏览文件
操作
浏览文件
下载
差异文件
!5413 Combine sparse embedding gradient
Merge pull request !5413 from chengang/combine_grad
上级
5314702c
50510863
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
444 addition
and
50 deletion
+444
-50
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc
...end/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc
+2
-6
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc
...end/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc
+4
-4
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc
...ernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc
+4
-4
mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc
mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc
+84
-5
mindspore/ccsrc/frontend/parallel/ps/optimizer_info.h
mindspore/ccsrc/frontend/parallel/ps/optimizer_info.h
+9
-2
mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc
...pore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc
+21
-9
mindspore/ccsrc/frontend/parallel/ps/parameter_server.h
mindspore/ccsrc/frontend/parallel/ps/parameter_server.h
+20
-2
mindspore/ccsrc/frontend/parallel/ps/util.cc
mindspore/ccsrc/frontend/parallel/ps/util.cc
+26
-0
mindspore/ccsrc/frontend/parallel/ps/util.h
mindspore/ccsrc/frontend/parallel/ps/util.h
+5
-0
mindspore/ccsrc/frontend/parallel/ps/worker.h
mindspore/ccsrc/frontend/parallel/ps/worker.h
+39
-1
mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h
mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h
+230
-17
未找到文件。
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc
浏览文件 @
2b86c92f
...
...
@@ -71,8 +71,8 @@ void SparseApplyAdamPSKernel::ReInit(const std::shared_ptr<std::vector<std::shar
const
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>
&
shape_vec
=
*
shapes
;
const
std
::
vector
<
size_t
>
&
indices_shape
=
*
(
shape_vec
[
0
]);
indices_size_
=
indices_shape
[
0
];
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
);
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
);
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
)
*
worker_num_
;
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
)
*
worker_num_
;
}
void
SparseApplyAdamPSKernel
::
ReInit
(
const
std
::
vector
<
AddressPtr
>
&
inputs
)
{
...
...
@@ -85,10 +85,6 @@ void SparseApplyAdamPSKernel::ReInit(const std::vector<AddressPtr> &inputs) {
bool
SparseApplyAdamPSKernel
::
Execute
(
const
std
::
vector
<
AddressPtr
>
&
inputs
,
const
std
::
vector
<
AddressPtr
>
&
workspace
,
const
std
::
vector
<
AddressPtr
>
&
outputs
)
{
ReInit
(
inputs
);
int
*
indices
=
reinterpret_cast
<
int
*>
(
inputs
[
10
]
->
addr
);
for
(
size_t
i
=
0
;
i
<
inputs
[
10
]
->
size
/
sizeof
(
int
);
i
++
)
{
indices
[
i
]
-=
row_offset_
;
}
return
Launch
(
inputs
,
workspace
,
outputs
);
}
...
...
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc
浏览文件 @
2b86c92f
...
...
@@ -74,15 +74,15 @@ void SparseApplyFtrlPSKernel::ReInit(const std::shared_ptr<std::vector<std::shar
const
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>
&
shape_vec
=
*
shapes
;
std
::
vector
<
size_t
>
indices_shape
=
*
(
shape_vec
[
0
]);
indices_size_
=
indices_shape
[
0
];
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
);
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
);
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
)
*
worker_num_
;
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
)
*
worker_num_
;
}
void
SparseApplyFtrlPSKernel
::
ReInit
(
const
std
::
vector
<
AddressPtr
>
&
inputs
)
{
const
auto
&
indices_addr
=
inputs
[
4
];
indices_size_
=
indices_addr
->
size
/
sizeof
(
int
);
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
);
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
);
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
)
*
worker_num_
;
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
)
*
worker_num_
;
}
bool
SparseApplyFtrlPSKernel
::
Execute
(
const
std
::
vector
<
AddressPtr
>
&
inputs
,
const
std
::
vector
<
AddressPtr
>
&
workspace
,
...
...
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc
浏览文件 @
2b86c92f
...
...
@@ -71,15 +71,15 @@ void SparseApplyLazyAdamPSKernel::ReInit(
const
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>
&
shape_vec
=
*
shapes
;
const
std
::
vector
<
size_t
>
&
indices_shape
=
*
(
shape_vec
[
0
]);
indices_size_
=
indices_shape
[
0
];
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
);
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
);
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
)
*
worker_num_
;
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
)
*
worker_num_
;
}
void
SparseApplyLazyAdamPSKernel
::
ReInit
(
const
std
::
vector
<
AddressPtr
>
&
inputs
)
{
const
auto
&
indices_addr
=
inputs
[
10
];
indices_size_
=
indices_addr
->
size
/
sizeof
(
int
);
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
);
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
);
workspace_size_list_
[
0
]
=
indices_size_
*
var_outer_dim_size_
*
sizeof
(
float
)
*
worker_num_
;
workspace_size_list_
[
1
]
=
indices_size_
*
sizeof
(
int
)
*
worker_num_
;
}
bool
SparseApplyLazyAdamPSKernel
::
Execute
(
const
std
::
vector
<
AddressPtr
>
&
inputs
,
...
...
mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc
浏览文件 @
2b86c92f
...
...
@@ -16,6 +16,7 @@
#include "frontend/parallel/ps/optimizer_info.h"
#include <memory>
#include "frontend/parallel/ps/util.h"
namespace
mindspore
{
namespace
parallel
{
...
...
@@ -30,6 +31,8 @@ const std::vector<AddressPtr> &OptimizerInfo::outputs() { return outputs_; }
bool
OptimizerInfo
::
IsSparse
()
const
{
return
false
;
}
const
size_t
OptimizerInfo
::
indice_size
()
const
{
return
0
;
}
size_t
OptimizerInfo
::
grad_index
()
{
return
0
;
}
size_t
OptimizerInfo
::
indices_index
()
{
return
0
;
}
...
...
@@ -57,7 +60,8 @@ void DenseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
}
}
void
DenseOptimInfo
::
ComputeMean
(
size_t
n
)
{
void
DenseOptimInfo
::
ComputeMean
(
const
std
::
shared_ptr
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
&
,
size_t
n
,
size_t
server_num
,
size_t
rank_id
)
{
if
(
n
>
1
)
{
float
*
accum_grad_data
=
reinterpret_cast
<
float
*>
(
gradient
()
->
addr
);
size_t
size
=
gradient
()
->
size
/
sizeof
(
float
);
...
...
@@ -96,15 +100,88 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
for
(
size_t
i
=
0
;
i
<
indices_index
;
i
++
)
{
indice_offset
+=
lengths
[
i
];
}
int
*
incr_indice_data
=
reinterpret_cast
<
int
*>
(
values
.
data
()
+
indice_offset
);
size_t
incr_indice_size
=
lengths
[
indices_index
]
*
sizeof
(
float
);
float
*
incr_indice_data
=
values
.
data
()
+
indice_offset
;
size_t
incr_indice_size
=
lengths
[
indices_index
];
size_t
incr_indice_data_size
=
incr_indice_size
*
sizeof
(
int
);
int
*
converted_indices
=
new
int
[
incr_indice_size
];
for
(
size_t
i
=
0
;
i
<
incr_indice_size
;
i
++
)
{
converted_indices
[
i
]
=
static_cast
<
int
>
(
incr_indice_data
[
i
]);
}
auto
ret2
=
memcpy_s
(
accum_indices_data
+
indices_offset_
,
incr_indice_size
,
incr_indice_data
,
incr_indice_size
);
auto
ret2
=
memcpy_s
(
accum_indices_data
+
indices_offset_
,
incr_indice_data_size
,
converted_indices
,
incr_indice_data_size
);
if
(
ret2
!=
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"memcpy_s error, errorno("
<<
ret2
<<
")"
;
}
delete
[]
converted_indices
;
indices_offset_
+=
lengths
[
indices_index
];
indices
()
->
size
+=
incr_indice_size
;
indices
()
->
size
+=
incr_indice_data_size
;
}
void
SparseOptimInfo
::
ComputeMean
(
const
std
::
shared_ptr
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
&
shapes
,
size_t
n
,
size_t
server_num
,
size_t
rank_id
)
{
size_t
indices_size
=
static_cast
<
size_t
>
(
indices
()
->
size
/
sizeof
(
int
));
int
segment_size
=
gradient
()
->
size
/
indices
()
->
size
;
float
*
new_grad
=
new
float
[
indices_size
*
segment_size
];
int
*
new_indices
=
new
int
[
indices_size
];
mindspore
::
kernel
::
SparseGradient
<
int
>
unique_sparse_grad
({
new_grad
,
new_indices
,
indices_size
});
const
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>
&
shape_vec
=
*
shapes
;
if
(
shape_vec
.
size
()
<
2
||
shape_vec
[
1
]
==
nullptr
)
{
MS_LOG
(
EXCEPTION
)
<<
"No input shape found"
;
}
auto
input_shapes
=
shape_vec
.
size
()
>
0
?
shape_vec
[
1
]
:
nullptr
;
MS_EXCEPTION_IF_NULL
(
input_shapes
);
if
(
input_shapes
->
size
()
==
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"Invalid input shapes"
;
}
int
first_dim_size
=
input_shapes
->
front
();
int
outer_dim_size
=
segment_size
;
if
(
first_dim_size
==
0
||
outer_dim_size
==
0
)
{
MS_LOG
(
ERROR
)
<<
"Invalid first dim size"
;
}
float
*
grad_data
=
reinterpret_cast
<
float
*>
(
gradient
()
->
addr
);
int
*
indices_data
=
reinterpret_cast
<
int
*>
(
indices
()
->
addr
);
size_t
original_row_count
=
input_shapes
->
front
();
if
(
original_row_count
>
0
)
{
size_t
offset
=
0
;
if
((
original_row_count
%
server_num
)
==
0
)
{
offset
=
original_row_count
/
server_num
*
rank_id
;
}
else
{
offset
=
std
::
round
((
static_cast
<
float
>
(
original_row_count
))
/
server_num
)
*
rank_id
;
}
for
(
size_t
i
=
0
;
i
<
indices_size
;
i
++
)
{
indices_data
[
i
]
-=
offset
;
}
}
Util
::
ReduceSparseGradient
(
grad_data
,
indices_data
,
indices_size
,
segment_size
,
first_dim_size
,
outer_dim_size
,
&
unique_sparse_grad
);
int
reduced_grad_size
=
unique_sparse_grad
.
indices_size_
*
segment_size
*
sizeof
(
float
);
auto
ret
=
memcpy_s
(
gradient
()
->
addr
,
reduced_grad_size
,
unique_sparse_grad
.
value_
,
reduced_grad_size
);
if
(
ret
!=
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"memcpy_s error, errorno("
<<
ret
<<
")"
;
}
int
reduced_indice_size
=
unique_sparse_grad
.
indices_size_
*
sizeof
(
int
);
ret
=
memcpy_s
(
indices
()
->
addr
,
reduced_indice_size
,
unique_sparse_grad
.
indices_
,
reduced_indice_size
);
if
(
ret
!=
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"memcpy_s error, errorno("
<<
ret
<<
")"
;
}
gradient
()
->
size
=
reduced_grad_size
;
indices
()
->
size
=
reduced_indice_size
;
for
(
size_t
i
=
0
;
i
<
unique_sparse_grad
.
indices_size_
*
segment_size
;
i
++
)
{
grad_data
[
i
]
=
grad_data
[
i
]
/
n
;
}
delete
[]
new_grad
;
delete
[]
new_indices
;
}
void
SparseOptimInfo
::
Reset
()
{
...
...
@@ -135,6 +212,8 @@ void MomentumOptimInfo::Update(const Values &values, const Lengths &lens) {
}
}
const
size_t
SparseOptimInfo
::
indice_size
()
const
{
return
indices_offset_
;
}
const
AddressPtr
&
MomentumOptimInfo
::
gradient
()
{
return
inputs_
[
3
];
}
const
AddressPtr
&
MomentumOptimInfo
::
indices
()
{
return
inputs_
[
3
];
}
...
...
mindspore/ccsrc/frontend/parallel/ps/optimizer_info.h
浏览文件 @
2b86c92f
...
...
@@ -18,6 +18,7 @@
#define MINDSPORE_CCSRC_FRONTEND_PARALLEL_PS_OPTIMIZER_INFO_H_
#include <vector>
#include <memory>
#include "backend/kernel_compiler/kernel.h"
#include "frontend/parallel/ps/common.h"
...
...
@@ -33,12 +34,14 @@ class OptimizerInfo {
virtual
void
Update
(
const
Values
&
values
,
const
Lengths
&
lengths
)
{}
virtual
void
UpdateWeight
(
const
WeightPtr
&
weight
);
virtual
void
Accumulate
(
const
Values
&
values
,
const
Lengths
&
lengths
)
=
0
;
virtual
void
ComputeMean
(
size_t
n
)
{}
virtual
void
ComputeMean
(
const
std
::
shared_ptr
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
&
shapes
,
size_t
n
,
size_t
server_num
,
size_t
rank_id
)
{}
virtual
void
Reset
()
{}
void
AddWorkspace
(
const
AddressPtr
&
workspace
);
virtual
const
AddressPtr
&
gradient
()
=
0
;
virtual
const
AddressPtr
&
indices
()
=
0
;
virtual
const
size_t
indice_size
()
const
;
const
std
::
vector
<
AddressPtr
>
&
inputs
();
const
std
::
vector
<
AddressPtr
>
&
workspaces
();
const
std
::
vector
<
AddressPtr
>
&
outputs
();
...
...
@@ -59,7 +62,8 @@ class DenseOptimInfo : public OptimizerInfo {
~
DenseOptimInfo
()
override
=
default
;
void
Accumulate
(
const
Values
&
values
,
const
Lengths
&
lens
)
override
;
void
ComputeMean
(
size_t
n
)
override
;
void
ComputeMean
(
const
std
::
shared_ptr
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
&
shapes
,
size_t
n
,
size_t
server_num
,
size_t
rank_id
)
override
;
void
Reset
()
override
;
};
...
...
@@ -69,7 +73,10 @@ class SparseOptimInfo : public OptimizerInfo {
~
SparseOptimInfo
()
override
=
default
;
void
Accumulate
(
const
Values
&
values
,
const
Lengths
&
lens
)
override
;
void
ComputeMean
(
const
std
::
shared_ptr
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
&
shapes
,
size_t
n
,
size_t
server_num
,
size_t
rank_id
)
override
;
void
Reset
()
override
;
const
size_t
indice_size
()
const
override
;
protected:
size_t
grads_offset_
{
0
};
...
...
mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc
浏览文件 @
2b86c92f
...
...
@@ -136,15 +136,21 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
const
std
::
shared_ptr
<
std
::
vector
<
size_t
>>
&
indices_shape
=
(
*
inputs_shape
)[
10
];
size_t
total_indice_size
=
std
::
accumulate
((
*
indices_shape
).
begin
(),
(
*
indices_shape
).
end
(),
sizeof
(
floa
t
),
std
::
multiplies
<
size_t
>
());
std
::
accumulate
((
*
indices_shape
).
begin
(),
(
*
indices_shape
).
end
(),
sizeof
(
in
t
),
std
::
multiplies
<
size_t
>
());
AddressPtr
indices
=
std
::
make_shared
<
kernel
::
Address
>
();
indices
->
addr
=
new
float
[
total_indice_size
*
worker_num
];
ret
=
memcpy_s
(
indices
->
addr
,
lens
[
7
]
*
sizeof
(
float
),
reinterpret_cast
<
float
*>
(
epsilon
->
addr
)
+
lens
[
5
]
+
lens
[
6
],
lens
[
7
]
*
sizeof
(
float
));
indices
->
addr
=
new
int
[
total_indice_size
*
worker_num
];
int
*
converted_indices
=
new
int
[
lens
[
7
]];
size_t
indices_data_size
=
lens
[
7
]
*
sizeof
(
int
);
float
*
indices_data
=
reinterpret_cast
<
float
*>
(
epsilon
->
addr
)
+
lens
[
5
]
+
lens
[
6
];
for
(
int
i
=
0
;
i
<
lens
[
7
];
i
++
)
{
converted_indices
[
i
]
=
static_cast
<
int
>
(
indices_data
[
i
]);
}
ret
=
memcpy_s
(
indices
->
addr
,
indices_data_size
,
converted_indices
,
indices_data_size
);
if
(
ret
!=
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"memcpy_s error, errorno("
<<
ret
<<
")"
;
}
indices
->
size
=
lens
[
7
]
*
sizeof
(
int
);
indices
->
size
=
indices_data_size
;
delete
[]
converted_indices
;
return
new
SparseAdamOptimInfo
(
weight_addr
,
m
,
v
,
beta1_power
,
beta2_power
,
learning_rate
,
beta1
,
beta2
,
epsilon
,
grad
,
indices
);
...
...
@@ -185,13 +191,19 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
size_t
total_indice_size
=
std
::
accumulate
((
*
indices_shape
).
begin
(),
(
*
indices_shape
).
end
(),
1
,
std
::
multiplies
<
size_t
>
());
AddressPtr
indices
=
std
::
make_shared
<
kernel
::
Address
>
();
indices
->
addr
=
new
float
[
total_indice_size
*
worker_num
];
ret
=
memcpy_s
(
indices
->
addr
,
lens
[
1
]
*
sizeof
(
float
),
reinterpret_cast
<
float
*>
(
values
.
data
())
+
lens
[
0
],
lens
[
1
]
*
sizeof
(
float
));
indices
->
addr
=
new
int
[
total_indice_size
*
worker_num
];
int
*
converted_indices
=
new
int
[
lens
[
1
]];
size_t
indices_data_size
=
lens
[
1
]
*
sizeof
(
int
);
float
*
indices_data
=
reinterpret_cast
<
float
*>
(
values
.
data
())
+
lens
[
0
];
for
(
int
i
=
0
;
i
<
lens
[
1
];
i
++
)
{
converted_indices
[
i
]
=
static_cast
<
int
>
(
indices_data
[
i
]);
}
ret
=
memcpy_s
(
indices
->
addr
,
indices_data_size
,
converted_indices
,
indices_data_size
);
if
(
ret
!=
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"memcpy_s error, errorno("
<<
ret
<<
")"
;
}
indices
->
size
=
lens
[
1
]
*
sizeof
(
int
);
indices
->
size
=
indices_data_size
;
delete
[]
converted_indices
;
return
new
SparseFtrlOptimInfo
(
weight_addr
,
accum
,
linear
,
grad
,
indices
);
}
...
...
mindspore/ccsrc/frontend/parallel/ps/parameter_server.h
浏览文件 @
2b86c92f
...
...
@@ -145,6 +145,7 @@ class ParameterServer {
std
::
unordered_map
<
Key
,
std
::
shared_ptr
<
PServerKernel
>>
optimizers_
;
std
::
unordered_map
<
Key
,
InputsShapePtr
>
optim_inputs_shape_
;
std
::
unordered_map
<
Key
,
InputsShapePtr
>
original_optim_inputs_shape_
;
std
::
unordered_map
<
Key
,
std
::
shared_ptr
<
OptimizerInfo
>>
optim_infos_
;
std
::
unordered_map
<
std
::
string
,
std
::
shared_ptr
<
OptimizerInfoBuilder
>>
optim_info_builders_
;
std
::
unordered_map
<
Key
,
std
::
string
>
weight_key_to_optims_
;
...
...
@@ -366,19 +367,24 @@ void ParameterServer<T>::InitWeightKeyToOptims(const Key &key, const int &optim_
template
<
typename
T
>
void
ParameterServer
<
T
>::
InitOptimInputsShape
(
const
Keys
&
keys
,
const
Values
&
values
,
const
Lengths
&
lengths
)
{
InputsShapePtr
inputs_shape
=
std
::
make_shared
<
InputsShape
>
();
InputsShapePtr
original_inputs_shape
=
std
::
make_shared
<
InputsShape
>
();
int
val_idx
=
0
;
const
Key
&
key
=
keys
[
0
];
MS_LOG
(
INFO
)
<<
"Initializing optimizer inputs shape for key:"
<<
key
;
if
(
optim_inputs_shape_
.
count
(
key
)
==
0
)
{
original_optim_inputs_shape_
[
key
]
=
original_inputs_shape
;
optim_inputs_shape_
[
key
]
=
inputs_shape
;
}
for
(
size_t
i
=
0
;
i
<
keys
.
size
();
i
++
)
{
auto
shape
=
std
::
make_shared
<
std
::
vector
<
size_t
>>
();
auto
original_shape
=
std
::
make_shared
<
std
::
vector
<
size_t
>>
();
inputs_shape
->
push_back
(
shape
);
original_inputs_shape
->
push_back
(
original_shape
);
int
len
=
lengths
[
i
];
for
(
int
j
=
0
;
j
<
len
;
j
++
)
{
shape
->
push_back
(
values
[
val_idx
++
]);
shape
->
push_back
(
values
[
val_idx
]);
original_shape
->
push_back
(
values
[
val_idx
++
]);
}
}
if
(
weight_key_to_optims_
.
count
(
key
)
>
0
)
{
...
...
@@ -512,7 +518,19 @@ void ParameterServer<T>::UpdateWeights() {
const
std
::
vector
<
kernel
::
AddressPtr
>
&
workspaces
=
optim_info
->
workspaces
();
const
std
::
vector
<
kernel
::
AddressPtr
>
&
outputs
=
optim_info
->
outputs
();
optim_info
->
ComputeMean
(
worker_num_
);
std
::
shared_ptr
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
shapes
=
std
::
make_shared
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
();
std
::
shared_ptr
<
std
::
vector
<
size_t
>>
indices_shape
=
std
::
make_shared
<
std
::
vector
<
size_t
>>
();
indices_shape
->
emplace_back
(
optim_info
->
indice_size
());
shapes
->
push_back
(
indices_shape
);
if
(
original_optim_inputs_shape_
.
count
(
key
)
!=
0
)
{
for
(
auto
&
input_shapes
:
*
(
original_optim_inputs_shape_
[
key
]))
{
shapes
->
push_back
(
input_shapes
);
}
}
optimizer
->
ReInit
(
shapes
);
optim_info
->
ComputeMean
(
shapes
,
worker_num_
,
pserver_num_
,
rank_id_
);
optimizer
->
Execute
(
inputs
,
workspaces
,
outputs
);
optim_info
->
Reset
();
if
(
!
is_embedding_
[
key
])
{
...
...
mindspore/ccsrc/frontend/parallel/ps/util.cc
浏览文件 @
2b86c92f
...
...
@@ -146,6 +146,32 @@ int Util::LocalShard(int first_dim, int rank_id, int server_num) {
void
Util
::
SetRankId
(
int
rank_id
)
{
rank_id_
=
rank_id
;
}
int
Util
::
GetRankId
()
{
return
rank_id_
;
}
void
Util
::
ReduceSparseGradient
(
float
*
gradients
,
int
*
indices
,
const
size_t
indices_size
,
size_t
segment_size
,
const
size_t
first_dim_size
,
const
size_t
outer_dim_size
,
mindspore
::
kernel
::
SparseGradient
<
int
>
*
unique_sparse_grad
)
{
size_t
slice_segment_size
=
indices_size
*
segment_size
;
auto
workspace_grad
=
new
float
[
slice_segment_size
];
auto
workspace_indices
=
new
int
[
indices_size
];
MS_EXCEPTION_IF_NULL
(
gradients
);
MS_EXCEPTION_IF_NULL
(
indices
);
MS_EXCEPTION_IF_NULL
(
workspace_grad
);
MS_EXCEPTION_IF_NULL
(
workspace_indices
);
mindspore
::
kernel
::
SparseGradient
<
int
>
workspace_sparse_grad
({
workspace_grad
,
workspace_indices
,
indices_size
});
mindspore
::
kernel
::
SparseGradient
<
int
>
input_sparse_grad
({
gradients
,
indices
,
indices_size
});
mindspore
::
kernel
::
ReduceSparseGradientParam
<
int
>
param
;
param
.
input_grad_
=
&
input_sparse_grad
;
param
.
workspace_grad_
=
&
workspace_sparse_grad
;
param
.
output_grad_
=
unique_sparse_grad
;
param
.
max_index_
=
first_dim_size
;
param
.
value_stride_
=
outer_dim_size
;
mindspore
::
kernel
::
SparseOptimizerCPUKernel
::
BucketReduceSparseGradient
(
param
);
delete
[]
workspace_grad
;
delete
[]
workspace_indices
;
}
}
// namespace ps
}
// namespace parallel
}
// namespace mindspore
mindspore/ccsrc/frontend/parallel/ps/util.h
浏览文件 @
2b86c92f
...
...
@@ -21,6 +21,8 @@
#include <string>
#include <unordered_map>
#include "backend/session/anf_runtime_algorithm.h"
#include "backend/kernel_compiler/common_utils.h"
#include "backend/kernel_compiler/cpu/sparse_optimizer_cpu_kernel.h"
namespace
mindspore
{
namespace
parallel
{
...
...
@@ -39,6 +41,9 @@ class Util {
static
int
LocalShard
(
int
first_dim
,
int
rank_id
,
int
server_num
);
static
void
SetRankId
(
int
rank_id
);
static
int
GetRankId
();
static
void
ReduceSparseGradient
(
float
*
gradients
,
int
*
indices
,
const
size_t
indices_size
,
size_t
segment_size
,
const
size_t
first_dim_size
,
const
size_t
outer_dim_size
,
mindspore
::
kernel
::
SparseGradient
<
int
>
*
unique_sparse_grad
);
private:
static
std
::
unordered_map
<
std
::
string
,
int
>
optimizer_to_ids
;
...
...
mindspore/ccsrc/frontend/parallel/ps/worker.h
浏览文件 @
2b86c92f
...
...
@@ -96,6 +96,32 @@ void Worker<T>::Run() {
template
<
typename
T
>
void
Worker
<
T
>::
Push
(
const
std
::
vector
<
size_t
>
&
keys
,
std
::
vector
<
uintptr_t
>
addrs
,
const
ShapeVector
&
sizes
)
{
if
(
keys
.
size
()
==
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"key size should be greater than zero"
;
}
if
(
key_to_optimId_
.
count
(
keys
[
0
])
==
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"no optim id found for key"
<<
keys
[
0
];
}
Key
key
=
keys
[
0
];
int
optim_id
=
key_to_optimId_
[
key
];
bool
is_sparse
=
false
;
if
(
optim_id
==
1
||
optim_id
==
2
||
optim_id
==
3
)
{
is_sparse
=
true
;
}
int
grad_index
=
-
1
;
int
indice_index
=
-
1
;
// Sparse adam gradient
if
(
optim_id
==
1
||
optim_id
==
2
)
{
grad_index
=
6
;
indice_index
=
7
;
// Sparse ftrl gradient
}
else
if
(
optim_id
==
3
)
{
grad_index
=
0
;
indice_index
=
1
;
}
size_t
total_size
=
0
;
for
(
auto
size
:
sizes
)
{
total_size
+=
size
;
...
...
@@ -110,10 +136,22 @@ void Worker<T>::Push(const std::vector<size_t> &keys, std::vector<uintptr_t> add
}
offset
+=
sizes
[
i
]
*
sizeof
(
T
);
}
while
(
!
kv_worker_
->
IsReadyForPush
(
keys
[
0
]))
{
continue
;
}
kv_worker_
->
PushData
(
::
ps
::
SArray
<::
ps
::
Key
>
(
keys
),
total_buffer
,
::
ps
::
SArray
<
int
>
(
sizes
));
if
(
!
is_sparse
)
{
kv_worker_
->
PushData
(
::
ps
::
SArray
<::
ps
::
Key
>
(
keys
),
total_buffer
,
::
ps
::
SArray
<
int
>
(
sizes
));
}
else
{
std
::
vector
<
int
>
&
var_shape
=
key_to_optim_shapes_
[
key
][
0
];
int
first_dim_size
=
var_shape
[
0
];
int
outer_dim_size
=
1
;
for
(
size_t
i
=
1
;
i
<
var_shape
.
size
();
++
i
)
{
outer_dim_size
*=
var_shape
[
i
];
}
kv_worker_
->
PushSparseData
(
::
ps
::
SArray
<::
ps
::
Key
>
(
keys
),
total_buffer
,
::
ps
::
SArray
<
int
>
(
sizes
),
grad_index
,
indice_index
,
first_dim_size
,
outer_dim_size
);
}
}
template
<
typename
T
>
...
...
mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h
浏览文件 @
2b86c92f
...
...
@@ -17,14 +17,16 @@
#ifndef MINDSPORE_CCSRC_FRONTEND_PARALLEL_PS_WORKER_PROXY_H_
#define MINDSPORE_CCSRC_FRONTEND_PARALLEL_PS_WORKER_PROXY_H_
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <algorithm>
#include <utility>
#include <memory>
#include <vector>
#include <unordered_set>
#include "ps/ps.h"
#include "frontend/parallel/ps/util.h"
#include "backend/kernel_compiler/common_utils.h"
namespace
mindspore
{
namespace
parallel
{
...
...
@@ -36,7 +38,7 @@ class WorkerProxy : public ::ps::KVWorker<T> {
using
Callback
=
std
::
function
<
void
()
>
;
using
SlicedKVs
=
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
;
using
Slicer
=
std
::
function
<
void
(
int
ts
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
ranges
,
SlicedKVs
*
sliced
)
>
;
SlicedKVs
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attrs
)
>
;
using
::
ps
::
SimpleApp
::
obj_
;
explicit
WorkerProxy
(
int
app_id
,
int
customer_id
,
int
lookup_customer_id
,
int
general_customer_id
)
:
Worker
(
app_id
,
customer_id
)
{
...
...
@@ -46,14 +48,16 @@ class WorkerProxy : public ::ps::KVWorker<T> {
using
std
::
placeholders
::
_2
;
using
std
::
placeholders
::
_3
;
using
std
::
placeholders
::
_4
;
using
std
::
placeholders
::
_5
;
lookup_customer_
=
std
::
unique_ptr
<::
ps
::
Customer
>
(
new
::
ps
::
Customer
(
app_id
,
lookup_customer_id
,
std
::
bind
(
&
WorkerProxy
<
T
>::
ProcessLookupResult
,
this
,
_1
)));
general_customer_
=
std
::
unique_ptr
<::
ps
::
Customer
>
(
new
::
ps
::
Customer
(
app_id
,
general_customer_id
,
std
::
bind
(
&
WorkerProxy
<
T
>::
ProcessResponse
,
this
,
_1
)));
lookup_slicer_
=
std
::
bind
(
&
WorkerProxy
<
T
>::
LookupIdSlicer
,
this
,
_1
,
_2
,
_3
,
_4
);
broadcast_slicer_
=
std
::
bind
(
&
WorkerProxy
<
T
>::
BroadcastSlicer
,
this
,
_1
,
_2
,
_3
,
_4
);
round_robin_slicer_
=
std
::
bind
(
&
WorkerProxy
<
T
>::
RoundRobinSlicer
,
this
,
_1
,
_2
,
_3
,
_4
);
worker_init_embedding_slicer_
=
std
::
bind
(
&
WorkerProxy
<
T
>::
WorkerInitEmbeddingSlicer
,
this
,
_1
,
_2
,
_3
,
_4
);
lookup_slicer_
=
std
::
bind
(
&
WorkerProxy
<
T
>::
LookupIdSlicer
,
this
,
_1
,
_2
,
_3
,
_4
,
_5
);
sparse_slicer_
=
std
::
bind
(
&
WorkerProxy
<
T
>::
SparseSlicer
,
this
,
_1
,
_2
,
_3
,
_4
,
_5
);
broadcast_slicer_
=
std
::
bind
(
&
WorkerProxy
<
T
>::
BroadcastSlicer
,
this
,
_1
,
_2
,
_3
,
_4
,
_5
);
round_robin_slicer_
=
std
::
bind
(
&
WorkerProxy
<
T
>::
RoundRobinSlicer
,
this
,
_1
,
_2
,
_3
,
_4
,
_5
);
worker_init_embedding_slicer_
=
std
::
bind
(
&
WorkerProxy
<
T
>::
WorkerInitEmbeddingSlicer
,
this
,
_1
,
_2
,
_3
,
_4
,
_5
);
}
~
WorkerProxy
()
override
=
default
;
...
...
@@ -68,6 +72,8 @@ class WorkerProxy : public ::ps::KVWorker<T> {
bool
IsReadyForPull
(
const
Key
&
key
);
void
PushData
(
const
::
ps
::
SArray
<::
ps
::
Key
>
&
keys
,
const
::
ps
::
SArray
<
T
>
&
vals
,
const
::
ps
::
SArray
<
int
>
&
lens
=
{},
int
cmd
=
0
,
int
priority
=
0
);
void
PushSparseData
(
const
::
ps
::
SArray
<::
ps
::
Key
>
&
keys
,
const
::
ps
::
SArray
<
T
>
&
vals
,
const
::
ps
::
SArray
<
int
>
&
lens
,
size_t
grad_index
,
size_t
indice_index
,
size_t
first_dim_size
,
size_t
outer_dim_size
);
void
PullData
(
const
::
ps
::
SArray
<::
ps
::
Key
>
&
keys
,
::
ps
::
SArray
<
T
>
*
vals
,
::
ps
::
SArray
<
int
>
*
lens
=
nullptr
,
int
cmd
=
0
,
int
priority
=
0
);
void
Finalize
();
...
...
@@ -79,19 +85,28 @@ class WorkerProxy : public ::ps::KVWorker<T> {
int
AddGeneralRspCB
(
const
::
ps
::
SArray
<::
ps
::
Key
>
&
keys
,
::
ps
::
SArray
<
T
>
*
vals
,
::
ps
::
SArray
<
int
>
*
lens
,
int
cmd
,
const
Callback
&
cb
);
void
LookupIdSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
);
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attrs
);
void
SparseSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attrs
);
void
BroadcastSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
);
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attrs
);
void
RoundRobinSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
);
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attrs
);
void
WorkerInitEmbeddingSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
);
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attrs
);
void
ProcessLookupResult
(
const
::
ps
::
Message
&
msg
);
void
ProcessResponse
(
const
::
ps
::
Message
&
msg
);
void
Send
(
::
ps
::
Customer
*
customer
,
int
timestamp
,
bool
push
,
bool
pull
,
int
cmd
,
const
::
ps
::
KVPairs
<
T
>
&
kvs
,
const
Slicer
&
slicer
);
const
Slicer
&
slicer
,
std
::
map
<
int
,
int
>
attrs
=
{}
);
void
AddKeyByHashMod
(
const
::
ps
::
Key
&
key
);
void
PrepareSparseGradient
(
const
size_t
begin
,
const
size_t
end
,
const
std
::
unordered_set
<
int
>
&
distinct_ids
,
const
std
::
vector
<
std
::
pair
<
int
,
T
*>>
&
indice_to_grad
,
const
int
*
all_indice
,
const
size_t
segment_size
,
T
*
gradient
,
int
*
indice
);
void
BuildSparseValue
(
const
::
ps
::
SArray
<
int
>
&
lengths
,
const
size_t
grad_index
,
const
size_t
indice_index
,
const
T
*
original_data
,
const
T
*
grads
,
int
*
indices
,
::
ps
::
SArray
<
T
>
*
reduced_data
);
int
server_num_
;
std
::
unique_ptr
<::
ps
::
Customer
>
lookup_customer_
;
std
::
unique_ptr
<::
ps
::
Customer
>
general_customer_
;
...
...
@@ -100,6 +115,7 @@ class WorkerProxy : public ::ps::KVWorker<T> {
std
::
unordered_map
<
int
,
::
ps
::
KVPairs
<
T
>>
gathered_response_
;
std
::
mutex
mutex_
;
Slicer
lookup_slicer_
;
Slicer
sparse_slicer_
;
Slicer
broadcast_slicer_
;
Slicer
round_robin_slicer_
;
Slicer
worker_init_embedding_slicer_
;
...
...
@@ -221,6 +237,28 @@ void WorkerProxy<T>::PushData(const ::ps::SArray<::ps::Key> &keys, const ::ps::S
general_customer_
->
WaitRequest
(
ts
);
}
template
<
typename
T
>
void
WorkerProxy
<
T
>::
PushSparseData
(
const
::
ps
::
SArray
<::
ps
::
Key
>
&
keys
,
const
::
ps
::
SArray
<
T
>
&
vals
,
const
::
ps
::
SArray
<
int
>
&
lens
,
size_t
grad_index
,
size_t
indice_index
,
size_t
first_dim_size
,
size_t
outer_dim_size
)
{
int
ts
=
AddGeneralRspCB
(
keys
,
nullptr
,
nullptr
,
0
,
nullptr
);
::
ps
::
KVPairs
<
T
>
kvs
;
kvs
.
keys
=
keys
;
kvs
.
vals
=
vals
;
kvs
.
lens
=
lens
;
int
cmd
=
0
;
if
(
embedding_table_ranges_
.
count
(
keys
[
0
]))
{
std
::
map
<
int
,
int
>
attrs
{{
0
,
grad_index
},
{
1
,
indice_index
},
{
2
,
first_dim_size
},
{
3
,
outer_dim_size
}};
Send
(
general_customer_
.
get
(),
ts
,
true
,
false
,
cmd
,
kvs
,
sparse_slicer_
,
attrs
);
}
else
{
Send
(
general_customer_
.
get
(),
ts
,
true
,
false
,
cmd
,
kvs
,
round_robin_slicer_
);
}
if
(
expected_result_count_
[
ts
]
<
server_num_
)
{
general_customer_
->
AddResponse
(
ts
,
server_num_
-
expected_result_count_
[
ts
]);
}
general_customer_
->
WaitRequest
(
ts
);
}
template
<
typename
T
>
void
WorkerProxy
<
T
>::
PullData
(
const
::
ps
::
SArray
<::
ps
::
Key
>
&
keys
,
::
ps
::
SArray
<
T
>
*
vals
,
::
ps
::
SArray
<
int
>
*
lens
,
int
cmd
,
int
priority
)
{
...
...
@@ -320,7 +358,8 @@ int WorkerProxy<T>::AddGeneralRspCB(const ::ps::SArray<::ps::Key> &keys, ::ps::S
template
<
typename
T
>
void
WorkerProxy
<
T
>::
LookupIdSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
)
{
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attrs
)
{
int
*
lookup_ids
=
send
.
lens
.
data
();
size_t
id_size
=
send
.
lens
.
size
();
...
...
@@ -358,9 +397,181 @@ void WorkerProxy<T>::LookupIdSlicer(int timestamp, const ::ps::KVPairs<T> &send,
}
}
template
<
typename
T
>
void
WorkerProxy
<
T
>::
SparseSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attrs
)
{
// Init variables
T
*
data
=
send
.
vals
.
data
();
if
(
attrs
.
count
(
0
)
==
0
||
attrs
.
count
(
1
)
==
0
||
attrs
.
count
(
2
)
==
0
||
attrs
.
count
(
3
)
==
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"Invalid attrs keys"
;
}
auto
iter
=
attrs
.
find
(
0
);
size_t
grad_index
=
static_cast
<
size_t
>
(
iter
->
second
);
iter
=
attrs
.
find
(
1
);
size_t
indice_index
=
static_cast
<
size_t
>
(
iter
->
second
);
iter
=
attrs
.
find
(
2
);
size_t
first_dim_size
=
static_cast
<
size_t
>
(
iter
->
second
);
iter
=
attrs
.
find
(
3
);
size_t
outer_dim_size
=
static_cast
<
size_t
>
(
iter
->
second
);
int
grad_size
=
send
.
lens
[
grad_index
];
int
indice_size
=
send
.
lens
[
indice_index
];
int
segment_size
=
grad_size
/
indice_size
;
int
grad_offset
=
0
;
int
indice_offset
=
0
;
for
(
size_t
i
=
0
;
i
<
grad_index
;
i
++
)
{
grad_offset
+=
send
.
lens
[
i
];
}
for
(
size_t
j
=
0
;
j
<
indice_index
;
j
++
)
{
indice_offset
+=
send
.
lens
[
j
];
}
T
*
grad_data
=
data
+
grad_offset
;
int
*
indice_data
=
reinterpret_cast
<
int
*>
(
data
)
+
indice_offset
;
// Build the mappings of indice to gradient
std
::
vector
<
std
::
pair
<
int
,
T
*>>
indice_to_grads
;
for
(
int
i
=
0
;
i
<
indice_size
;
i
++
)
{
int
indice
=
indice_data
[
i
];
T
*
grad
=
grad_data
+
i
*
segment_size
;
indice_to_grads
.
push_back
(
std
::
make_pair
(
indice
,
grad
));
}
const
Key
&
key
=
send
.
keys
[
0
];
const
std
::
vector
<::
ps
::
Range
>
&
ranges
=
*
(
embedding_table_ranges_
[
key
]);
sliced
->
resize
(
ranges
.
size
());
// Construct reduced sparse data for each server
for
(
size_t
i
=
0
;
i
<
ranges
.
size
();
i
++
)
{
const
::
ps
::
Range
&
range
=
ranges
[
i
];
const
auto
&
begin
=
range
.
begin
();
const
auto
&
end
=
range
.
end
();
auto
&
kvs
=
sliced
->
at
(
i
).
second
;
kvs
.
keys
=
send
.
keys
;
kvs
.
lens
=
send
.
lens
;
// Prepare the sparse gradient and indice
std
::
vector
<
int
>
indice_ids
;
std
::
unordered_set
<
int
>
distinct_ids
;
for
(
int
j
=
0
;
j
<
indice_size
;
j
++
)
{
size_t
indice
=
static_cast
<
size_t
>
(
indice_data
[
j
]);
if
(
indice
>=
begin
&&
indice
<=
end
)
{
indice_ids
.
push_back
(
indice
);
distinct_ids
.
insert
(
indice
);
}
}
size_t
indices_size
=
indice_ids
.
size
();
int
slice_segment_size
=
indices_size
*
segment_size
;
T
*
src_grad_data
=
new
T
[
slice_segment_size
];
int
*
src_indice_data
=
new
int
[
indices_size
];
PrepareSparseGradient
(
begin
,
end
,
distinct_ids
,
indice_to_grads
,
indice_data
,
segment_size
,
src_grad_data
,
src_indice_data
);
// Reduce the sparse gradient and indice
T
*
new_grad
=
new
T
[
slice_segment_size
];
int
*
new_indices
=
new
int
[
indices_size
];
mindspore
::
kernel
::
SparseGradient
<
int
>
unique_sparse_grad
({
new_grad
,
new_indices
,
indices_size
});
Util
::
ReduceSparseGradient
(
src_grad_data
,
src_indice_data
,
indices_size
,
segment_size
,
first_dim_size
,
outer_dim_size
,
&
unique_sparse_grad
);
// Update the length of reduce sparse gradient and indice
::
ps
::
SArray
<
int
>
reduced_lens
;
reduced_lens
.
CopyFrom
(
kvs
.
lens
);
reduced_lens
[
grad_index
]
=
unique_sparse_grad
.
indices_size_
*
segment_size
;
reduced_lens
[
indice_index
]
=
unique_sparse_grad
.
indices_size_
;
// Build the sparse value to be sent
size_t
total_size
=
0
;
for
(
auto
size
:
reduced_lens
)
{
total_size
+=
size
;
}
::
ps
::
SArray
<
T
>
reduced_data
(
total_size
,
0
);
BuildSparseValue
(
reduced_lens
,
grad_index
,
indice_index
,
data
,
unique_sparse_grad
.
value_
,
unique_sparse_grad
.
indices_
,
&
reduced_data
);
kvs
.
lens
=
reduced_lens
;
kvs
.
vals
=
reduced_data
;
if
(
indices_size
<=
0
)
{
sliced
->
at
(
i
).
first
=
false
;
}
else
{
sliced
->
at
(
i
).
first
=
true
;
expected_result_count_
[
timestamp
]
+=
1
;
}
}
}
template
<
typename
T
>
void
WorkerProxy
<
T
>::
PrepareSparseGradient
(
const
size_t
begin
,
const
size_t
end
,
const
std
::
unordered_set
<
int
>
&
distinct_ids
,
const
std
::
vector
<
std
::
pair
<
int
,
T
*>>
&
indice_to_grads
,
const
int
*
all_indice
,
const
size_t
segment_size
,
T
*
gradient
,
int
*
indices
)
{
int
offset
=
0
;
int
index
=
0
;
size_t
segment_data_size
=
segment_size
*
sizeof
(
T
);
for
(
auto
&
pair
:
indice_to_grads
)
{
if
(
distinct_ids
.
count
(
pair
.
first
)
==
0
)
{
continue
;
}
indices
[
index
++
]
=
pair
.
first
;
auto
ret
=
memcpy_s
(
gradient
+
offset
,
segment_data_size
,
pair
.
second
,
segment_data_size
);
if
(
ret
!=
0
)
{
MS_LOG
(
ERROR
)
<<
"memcpy_s error, errorno("
<<
ret
<<
")"
;
}
offset
+=
segment_size
;
}
}
template
<
typename
T
>
void
WorkerProxy
<
T
>::
BuildSparseValue
(
const
::
ps
::
SArray
<
int
>
&
lengths
,
const
size_t
grad_index
,
const
size_t
indice_index
,
const
T
*
original_data
,
const
T
*
grads
,
int
*
indices
,
::
ps
::
SArray
<
T
>
*
reduced_data
)
{
int
offset
=
0
;
for
(
size_t
i
=
0
;
i
<
lengths
.
size
();
i
++
)
{
if
(
i
!=
grad_index
&&
i
!=
indice_index
)
{
int
data_size
=
lengths
[
i
]
*
sizeof
(
T
);
auto
ret
=
memcpy_s
(
reduced_data
->
data
()
+
offset
,
data_size
,
original_data
+
offset
,
data_size
);
if
(
ret
!=
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"memcpy_s error, errorno("
<<
ret
<<
")"
;
}
}
offset
+=
lengths
[
i
];
}
// Fill the reduced gradient
int
grad_offset
=
0
;
for
(
size_t
i
=
0
;
i
<
grad_index
;
i
++
)
{
grad_offset
+=
lengths
[
i
];
}
int
data_size
=
lengths
[
grad_index
]
*
sizeof
(
T
);
auto
ret
=
memcpy_s
(
reduced_data
->
data
()
+
grad_offset
,
data_size
,
grads
,
data_size
);
if
(
ret
!=
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"memcpy_s error, errorno("
<<
ret
<<
")"
;
}
// Fill the reduced indice
data_size
=
lengths
[
indice_index
]
*
sizeof
(
T
);
int
indice_offset
=
grad_offset
+
data_size
;
T
*
indice_data
=
reduced_data
->
data
()
+
indice_offset
;
T
*
convert
=
new
T
[
lengths
[
indice_index
]];
for
(
int
i
=
0
;
i
<
lengths
[
indice_index
];
i
++
)
{
convert
[
i
]
=
static_cast
<
T
>
(
indices
[
i
]);
}
ret
=
memcpy_s
(
indice_data
,
data_size
,
convert
,
data_size
);
if
(
ret
!=
0
)
{
MS_LOG
(
EXCEPTION
)
<<
"memcpy_s error, errorno("
<<
ret
<<
")"
;
}
delete
[]
convert
;
}
template
<
typename
T
>
void
WorkerProxy
<
T
>::
BroadcastSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
)
{
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attr
)
{
sliced
->
resize
(
server_num_
);
for
(
int
i
=
0
;
i
<
server_num_
;
i
++
)
{
sliced
->
at
(
i
).
first
=
true
;
...
...
@@ -371,7 +582,8 @@ void WorkerProxy<T>::BroadcastSlicer(int timestamp, const ::ps::KVPairs<T> &send
template
<
typename
T
>
void
WorkerProxy
<
T
>::
RoundRobinSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
)
{
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attr
)
{
sliced
->
resize
(
server_num_
);
auto
keys
=
send
.
keys
;
auto
vals
=
send
.
vals
;
...
...
@@ -408,7 +620,8 @@ void WorkerProxy<T>::RoundRobinSlicer(int timestamp, const ::ps::KVPairs<T> &sen
template
<
typename
T
>
void
WorkerProxy
<
T
>::
WorkerInitEmbeddingSlicer
(
int
timestamp
,
const
::
ps
::
KVPairs
<
T
>
&
send
,
const
std
::
vector
<::
ps
::
Range
>
&
,
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
)
{
std
::
vector
<
std
::
pair
<
bool
,
::
ps
::
KVPairs
<
T
>>>
*
sliced
,
const
std
::
map
<
int
,
int
>
&
attrs
)
{
sliced
->
resize
(
server_num_
);
auto
keys
=
send
.
keys
;
auto
vals
=
send
.
vals
;
...
...
@@ -483,9 +696,9 @@ void WorkerProxy<T>::ProcessResponse(const ::ps::Message &msg) {
template
<
typename
T
>
void
WorkerProxy
<
T
>::
Send
(
::
ps
::
Customer
*
customer
,
int
timestamp
,
bool
push
,
bool
pull
,
int
cmd
,
const
::
ps
::
KVPairs
<
T
>
&
kvs
,
const
Slicer
&
slicer
)
{
const
::
ps
::
KVPairs
<
T
>
&
kvs
,
const
Slicer
&
slicer
,
std
::
map
<
int
,
int
>
attrs
)
{
SlicedKVs
sliced
;
slicer
(
timestamp
,
kvs
,
::
ps
::
Postoffice
::
Get
()
->
GetServerKeyRanges
(),
&
sliced
);
slicer
(
timestamp
,
kvs
,
::
ps
::
Postoffice
::
Get
()
->
GetServerKeyRanges
(),
&
sliced
,
attrs
);
for
(
size_t
i
=
0
;
i
<
sliced
.
size
();
i
++
)
{
const
auto
&
s
=
sliced
[
i
];
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录