Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
magicwindyyd
mindspore
提交
4dddee57
M
mindspore
项目概览
magicwindyyd
/
mindspore
与 Fork 源项目一致
Fork自
MindSpore / mindspore
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
mindspore
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4dddee57
编写于
8月 31, 2020
作者:
M
mindspore-ci-bot
提交者:
Gitee
8月 31, 2020
浏览文件
操作
浏览文件
下载
差异文件
!5551 Fix master ps stuck
Merge pull request !5551 from ZPaC/master-fix-stuck-bug
上级
aa20f1c4
442b38dc
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
94 addition
and
93 deletion
+94
-93
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc
...end/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc
+0
-4
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc
...ernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc
+0
-4
mindspore/ccsrc/frontend/parallel/ps/parameter_server.h
mindspore/ccsrc/frontend/parallel/ps/parameter_server.h
+38
-36
mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h
mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h
+56
-49
未找到文件。
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc
浏览文件 @
4dddee57
...
...
@@ -88,10 +88,6 @@ void SparseApplyFtrlPSKernel::ReInit(const std::vector<AddressPtr> &inputs) {
bool
SparseApplyFtrlPSKernel
::
Execute
(
const
std
::
vector
<
AddressPtr
>
&
inputs
,
const
std
::
vector
<
AddressPtr
>
&
workspace
,
const
std
::
vector
<
AddressPtr
>
&
outputs
)
{
ReInit
(
inputs
);
int
*
indices
=
reinterpret_cast
<
int
*>
(
inputs
[
4
]
->
addr
);
for
(
size_t
i
=
0
;
i
<
inputs
[
4
]
->
size
/
sizeof
(
int
);
i
++
)
{
indices
[
i
]
-=
row_offset_
;
}
return
Launch
(
inputs
,
workspace
,
outputs
);
}
...
...
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc
浏览文件 @
4dddee57
...
...
@@ -86,10 +86,6 @@ bool SparseApplyLazyAdamPSKernel::Execute(const std::vector<AddressPtr> &inputs,
const
std
::
vector
<
AddressPtr
>
&
workspace
,
const
std
::
vector
<
AddressPtr
>
&
outputs
)
{
ReInit
(
inputs
);
int
*
indices
=
reinterpret_cast
<
int
*>
(
inputs
[
10
]
->
addr
);
for
(
size_t
i
=
0
;
i
<
inputs
[
10
]
->
size
/
sizeof
(
int
);
i
++
)
{
indices
[
i
]
-=
row_offset_
;
}
return
Launch
(
inputs
,
workspace
,
outputs
);
}
...
...
mindspore/ccsrc/frontend/parallel/ps/parameter_server.h
浏览文件 @
4dddee57
...
...
@@ -511,28 +511,27 @@ void ParameterServer<T>::UpdateWeights() {
MS_EXCEPTION_IF_NULL
(
optimizer
);
std
::
shared_ptr
<
OptimizerInfo
>
optim_info
=
optim_infos_
[
key
];
if
(
optim_info
==
nullptr
)
{
continue
;
}
const
std
::
vector
<
kernel
::
AddressPtr
>
&
inputs
=
optim_info
->
inputs
();
const
std
::
vector
<
kernel
::
AddressPtr
>
&
workspaces
=
optim_info
->
workspaces
();
const
std
::
vector
<
kernel
::
AddressPtr
>
&
outputs
=
optim_info
->
outputs
();
std
::
shared_ptr
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
shapes
=
std
::
make_shared
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
();
std
::
shared_ptr
<
std
::
vector
<
size_t
>>
indices_shape
=
std
::
make_shared
<
std
::
vector
<
size_t
>>
();
indices_shape
->
emplace_back
(
optim_info
->
indice_size
());
shapes
->
push_back
(
indices_shape
);
if
(
original_optim_inputs_shape_
.
count
(
key
)
!=
0
)
{
for
(
auto
&
input_shapes
:
*
(
original_optim_inputs_shape_
[
key
]))
{
shapes
->
push_back
(
input_shapes
);
if
(
optim_info
!=
nullptr
)
{
const
std
::
vector
<
kernel
::
AddressPtr
>
&
inputs
=
optim_info
->
inputs
();
const
std
::
vector
<
kernel
::
AddressPtr
>
&
workspaces
=
optim_info
->
workspaces
();
const
std
::
vector
<
kernel
::
AddressPtr
>
&
outputs
=
optim_info
->
outputs
();
std
::
shared_ptr
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
shapes
=
std
::
make_shared
<
std
::
vector
<
std
::
shared_ptr
<
std
::
vector
<
size_t
>>>>
();
std
::
shared_ptr
<
std
::
vector
<
size_t
>>
indices_shape
=
std
::
make_shared
<
std
::
vector
<
size_t
>>
();
indices_shape
->
emplace_back
(
optim_info
->
indice_size
());
shapes
->
push_back
(
indices_shape
);
if
(
original_optim_inputs_shape_
.
count
(
key
)
!=
0
)
{
for
(
auto
&
input_shapes
:
*
(
original_optim_inputs_shape_
[
key
]))
{
shapes
->
push_back
(
input_shapes
);
}
}
optimizer
->
ReInit
(
shapes
);
optim_info
->
ComputeMean
(
shapes
,
worker_num_
,
pserver_num_
,
rank_id_
);
optimizer
->
Execute
(
inputs
,
workspaces
,
outputs
);
optim_info
->
Reset
();
}
optimizer
->
ReInit
(
shapes
);
optim_info
->
ComputeMean
(
shapes
,
worker_num_
,
pserver_num_
,
rank_id_
);
optimizer
->
Execute
(
inputs
,
workspaces
,
outputs
);
optim_info
->
Reset
();
if
(
!
is_embedding_
[
key
])
{
tokens_
[
key
]
=
worker_num_
;
}
...
...
@@ -545,23 +544,26 @@ template <typename T>
void
ParameterServer
<
T
>::
AccumGrad
(
const
Keys
&
keys
,
const
Values
&
values
,
const
Lengths
&
lengths
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
const
Key
&
key
=
keys
[
0
];
std
::
shared_ptr
<
OptimizerInfo
>
optim_info
=
optim_infos_
[
key
];
// Create or update the optimizer info
if
(
optim_info
==
nullptr
)
{
const
std
::
shared_ptr
<
OptimizerInfoBuilder
>
&
builder
=
optim_info_builders_
[
weight_key_to_optims_
[
key
]];
std
::
shared_ptr
<
kernel
::
ps
::
PServerKernel
>
pserver_kernel
=
optimizers_
[
key
];
if
(
pserver_kernel
==
nullptr
)
{
MS_LOG
(
EXCEPTION
)
<<
"no optimizer found for key "
<<
key
<<
" optim name "
<<
weight_key_to_optims_
[
key
];
bool
no_sparse_grad
=
values
.
size
()
==
1
&&
values
[
0
]
==
-
100
;
if
(
!
no_sparse_grad
)
{
std
::
shared_ptr
<
OptimizerInfo
>
optim_info
=
optim_infos_
[
key
];
// Create or update the optimizer info
if
(
optim_info
==
nullptr
)
{
const
std
::
shared_ptr
<
OptimizerInfoBuilder
>
&
builder
=
optim_info_builders_
[
weight_key_to_optims_
[
key
]];
std
::
shared_ptr
<
kernel
::
ps
::
PServerKernel
>
pserver_kernel
=
optimizers_
[
key
];
if
(
pserver_kernel
==
nullptr
)
{
MS_LOG
(
EXCEPTION
)
<<
"no optimizer found for key "
<<
key
<<
" optim name "
<<
weight_key_to_optims_
[
key
];
}
MS_EXCEPTION_IF_NULL
(
pserver_kernel
);
OptimizerInfo
*
optim
=
builder
->
Build
(
pserver_kernel
,
weights_
[
key
],
keys
,
values
,
lengths
,
optim_inputs_shape_
[
key
],
worker_num_
);
optim_info
.
reset
(
optim
);
optim_infos_
[
key
]
=
optim_info
;
}
else
{
optim_info
->
Update
(
values
,
lengths
);
optim_info
->
Accumulate
(
values
,
lengths
);
}
MS_EXCEPTION_IF_NULL
(
pserver_kernel
);
OptimizerInfo
*
optim
=
builder
->
Build
(
pserver_kernel
,
weights_
[
key
],
keys
,
values
,
lengths
,
optim_inputs_shape_
[
key
],
worker_num_
);
optim_info
.
reset
(
optim
);
optim_infos_
[
key
]
=
optim_info
;
}
else
{
optim_info
->
Update
(
values
,
lengths
);
optim_info
->
Accumulate
(
values
,
lengths
);
}
grads_accum_counter_
[
key
]
+=
1
;
...
...
mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h
浏览文件 @
4dddee57
...
...
@@ -112,7 +112,7 @@ class WorkerProxy : public ::ps::KVWorker<T> {
std
::
unique_ptr
<::
ps
::
Customer
>
general_customer_
;
std
::
unordered_map
<::
ps
::
Key
,
std
::
shared_ptr
<
std
::
vector
<::
ps
::
Range
>>>
embedding_table_ranges_
;
std
::
unordered_map
<
int
,
std
::
vector
<::
ps
::
KVPairs
<
T
>>>
lookup_results_
;
std
::
unordered_map
<
int
,
::
ps
::
KVPairs
<
T
>>
gathered_response_
;
std
::
unordered_map
<
int
,
std
::
map
<
int
,
::
ps
::
KVPairs
<
T
>
>>
gathered_response_
;
std
::
mutex
mutex_
;
Slicer
lookup_slicer_
;
Slicer
sparse_slicer_
;
...
...
@@ -337,12 +337,19 @@ int WorkerProxy<T>::AddGeneralRspCB(const ::ps::SArray<::ps::Key> &keys, ::ps::S
int
ts
=
general_customer_
->
NewRequest
(
::
ps
::
kServerGroup
);
const
auto
&
callback
=
[
this
,
ts
,
keys
,
vals
,
lens
,
cb
]()
mutable
{
mutex_
.
lock
();
auto
&
kvs
=
gathered_response_
[
ts
];
std
::
map
<
int
,
::
ps
::
KVPairs
<
T
>>
server_
kvs
=
gathered_response_
[
ts
];
mutex_
.
unlock
();
*
vals
=
kvs
.
vals
;
if
(
lens
)
{
*
lens
=
kvs
.
lens
;
vals
->
clear
();
for
(
auto
kvs
:
server_kvs
)
{
for
(
auto
val
:
kvs
.
second
.
vals
)
{
vals
->
push_back
(
val
);
}
if
(
lens
)
{
for
(
auto
len
:
kvs
.
second
.
lens
)
{
lens
->
push_back
(
len
);
}
}
}
mutex_
.
lock
();
...
...
@@ -464,43 +471,50 @@ void WorkerProxy<T>::SparseSlicer(int timestamp, const ::ps::KVPairs<T> &send, c
}
}
size_t
indices_size
=
indice_ids
.
size
();
int
slice_segment_size
=
indices_size
*
segment_size
;
T
*
src_grad_data
=
new
T
[
slice_segment_size
];
int
*
src_indice_data
=
new
int
[
indices_size
];
PrepareSparseGradient
(
begin
,
end
,
distinct_ids
,
indice_to_grads
,
indice_data
,
segment_size
,
src_grad_data
,
src_indice_data
);
// Reduce the sparse gradient and indice
T
*
new_grad
=
new
T
[
slice_segment_size
];
int
*
new_indices
=
new
int
[
indices_size
];
mindspore
::
kernel
::
SparseGradient
<
int
>
unique_sparse_grad
({
new_grad
,
new_indices
,
indices_size
});
Util
::
ReduceSparseGradient
(
src_grad_data
,
src_indice_data
,
indices_size
,
segment_size
,
first_dim_size
,
outer_dim_size
,
&
unique_sparse_grad
);
// Update the length of reduce sparse gradient and indice
::
ps
::
SArray
<
int
>
reduced_lens
;
reduced_lens
.
CopyFrom
(
kvs
.
lens
);
reduced_lens
[
grad_index
]
=
unique_sparse_grad
.
indices_size_
*
segment_size
;
reduced_lens
[
indice_index
]
=
unique_sparse_grad
.
indices_size_
;
// Build the sparse value to be sent
size_t
total_size
=
0
;
for
(
auto
size
:
reduced_lens
)
{
total_size
+=
size
;
}
::
ps
::
SArray
<
T
>
reduced_data
(
total_size
,
0
);
BuildSparseValue
(
reduced_lens
,
grad_index
,
indice_index
,
data
,
unique_sparse_grad
.
value_
,
unique_sparse_grad
.
indices_
,
&
reduced_data
);
if
(
indices_size
>
0
)
{
int
slice_segment_size
=
indices_size
*
segment_size
;
T
*
src_grad_data
=
new
T
[
slice_segment_size
];
int
*
src_indice_data
=
new
int
[
indices_size
];
PrepareSparseGradient
(
begin
,
end
,
distinct_ids
,
indice_to_grads
,
indice_data
,
segment_size
,
src_grad_data
,
src_indice_data
);
// Reduce the sparse gradient and indice
T
*
new_grad
=
new
T
[
slice_segment_size
];
int
*
new_indices
=
new
int
[
indices_size
];
mindspore
::
kernel
::
SparseGradient
<
int
>
unique_sparse_grad
({
new_grad
,
new_indices
,
indices_size
});
Util
::
ReduceSparseGradient
(
src_grad_data
,
src_indice_data
,
indices_size
,
segment_size
,
first_dim_size
,
outer_dim_size
,
&
unique_sparse_grad
);
// Update the length of reduce sparse gradient and indice
::
ps
::
SArray
<
int
>
reduced_lens
;
reduced_lens
.
CopyFrom
(
kvs
.
lens
);
reduced_lens
[
grad_index
]
=
unique_sparse_grad
.
indices_size_
*
segment_size
;
reduced_lens
[
indice_index
]
=
unique_sparse_grad
.
indices_size_
;
// Build the sparse value to be sent
size_t
total_size
=
0
;
for
(
auto
size
:
reduced_lens
)
{
total_size
+=
size
;
}
::
ps
::
SArray
<
T
>
reduced_data
(
total_size
,
0
);
BuildSparseValue
(
reduced_lens
,
grad_index
,
indice_index
,
data
,
unique_sparse_grad
.
value_
,
unique_sparse_grad
.
indices_
,
&
reduced_data
);
kvs
.
lens
=
reduced_lens
;
kvs
.
vals
=
reduced_data
;
kvs
.
lens
=
reduced_lens
;
kvs
.
vals
=
reduced_data
;
}
if
(
indices_size
<=
0
)
{
sliced
->
at
(
i
).
first
=
false
;
}
else
{
sliced
->
at
(
i
).
first
=
true
;
expected_result_count_
[
timestamp
]
+=
1
;
::
ps
::
SArray
<
T
>
no_keys
;
::
ps
::
SArray
<
T
>
no_vals
;
::
ps
::
SArray
<
T
>
no_lens
;
no_keys
.
push_back
(
key
);
no_vals
.
push_back
(
-
100
);
kvs
.
vals
=
no_vals
;
kvs
.
lens
=
no_lens
;
}
sliced
->
at
(
i
).
first
=
true
;
expected_result_count_
[
timestamp
]
+=
1
;
}
}
...
...
@@ -554,8 +568,8 @@ void WorkerProxy<T>::BuildSparseValue(const ::ps::SArray<int> &lengths, const si
}
// Fill the reduced indice
int
indice_offset
=
grad_offset
+
lengths
[
grad_index
];
data_size
=
lengths
[
indice_index
]
*
sizeof
(
T
);
int
indice_offset
=
grad_offset
+
data_size
;
T
*
indice_data
=
reduced_data
->
data
()
+
indice_offset
;
T
*
convert
=
new
T
[
lengths
[
indice_index
]];
for
(
int
i
=
0
;
i
<
lengths
[
indice_index
];
i
++
)
{
...
...
@@ -656,7 +670,7 @@ void WorkerProxy<T>::ProcessLookupResult(const ::ps::Message &msg) {
lookup_results_
[
ts
].
push_back
(
kvs
);
mutex_
.
unlock
();
}
if
(
lookup_customer_
->
NumResponse
(
ts
)
==
expected_result_count_
[
ts
]
-
1
)
{
if
(
lookup_customer_
->
NumResponse
(
ts
)
+
1
==
server_num_
)
{
const
auto
&
cb
=
lookup_callbacks_
[
ts
];
cb
();
lookup_callbacks_
.
erase
(
ts
);
...
...
@@ -676,15 +690,8 @@ void WorkerProxy<T>::ProcessResponse(const ::ps::Message &msg) {
kvs
.
lens
=
msg
.
data
[
2
];
}
mutex_
.
lock
();
for
(
auto
key
:
kvs
.
keys
)
{
gathered_response_
[
ts
].
keys
.
push_back
(
key
);
}
for
(
auto
val
:
kvs
.
vals
)
{
gathered_response_
[
ts
].
vals
.
push_back
(
val
);
}
for
(
auto
len
:
kvs
.
lens
)
{
gathered_response_
[
ts
].
lens
.
push_back
(
len
);
}
int
rsp_server_rank
=
::
ps
::
Postoffice
::
Get
()
->
IDtoRank
(
msg
.
meta
.
sender
);
gathered_response_
[
ts
][
rsp_server_rank
]
=
kvs
;
mutex_
.
unlock
();
if
(
general_customer_
->
NumResponse
(
ts
)
+
1
==
server_num_
)
{
const
auto
&
cb
=
general_callbacks_
[
ts
];
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录