Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
5f924007
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
5f924007
编写于
4月 24, 2017
作者:
J
jacquesqiao
提交者:
GitHub
4月 24, 2017
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1782 from jacquesqiao/support-remote-updater
support distribute training in python v2 API
上级
f22302ad
cb84cbab
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
157 addition
and
35 deletion
+157
-35
demo/word2vec/api_train_v2.py
demo/word2vec/api_train_v2.py
+26
-6
paddle/api/PaddleAPI.h
paddle/api/PaddleAPI.h
+14
-3
paddle/api/ParameterUpdater.cpp
paddle/api/ParameterUpdater.cpp
+19
-3
paddle/gserver/gradientmachines/MultiGradientMachine.cpp
paddle/gserver/gradientmachines/MultiGradientMachine.cpp
+1
-1
python/paddle/v2/optimizer.py
python/paddle/v2/optimizer.py
+27
-4
python/paddle/v2/topology.py
python/paddle/v2/topology.py
+12
-0
python/paddle/v2/trainer.py
python/paddle/v2/trainer.py
+58
-18
未找到文件。
demo/word2vec/train_v2.py
→
demo/word2vec/
api_
train_v2.py
浏览文件 @
5f924007
import
gzip
import
math
import
paddle.v2
as
paddle
dictsize
=
1953
embsize
=
32
hiddensize
=
256
N
=
5
def
wordemb
(
inlayer
):
wordemb
=
paddle
.
layer
.
table_projection
(
wordemb
=
paddle
.
layer
.
embedding
(
input
=
inlayer
,
size
=
embsize
,
param_attr
=
paddle
.
attr
.
Param
(
name
=
"_proj"
,
initial_std
=
0.001
,
learning_rate
=
1
,
l2_rate
=
0
,
))
l2_rate
=
0
,
sparse_update
=
True
))
return
wordemb
def
main
():
# for local training
cluster_train
=
False
if
not
cluster_train
:
paddle
.
init
(
use_gpu
=
False
,
trainer_count
=
1
)
else
:
paddle
.
init
(
use_gpu
=
False
,
trainer_count
=
2
,
port
=
7164
,
ports_num
=
1
,
ports_num_for_sparse
=
1
,
num_gradient_servers
=
1
)
word_dict
=
paddle
.
dataset
.
imikolov
.
build_dict
()
dict_size
=
len
(
word_dict
)
firstword
=
paddle
.
layer
.
data
(
...
...
@@ -57,6 +70,9 @@ def main():
def
event_handler
(
event
):
if
isinstance
(
event
,
paddle
.
event
.
EndIteration
):
if
event
.
batch_id
%
100
==
0
:
with
gzip
.
open
(
"batch-"
+
str
(
event
.
batch_id
)
+
".tar.gz"
,
'w'
)
as
f
:
trainer
.
save_parameter_to_tar
(
f
)
result
=
trainer
.
test
(
paddle
.
batch
(
paddle
.
dataset
.
imikolov
.
test
(
word_dict
,
N
),
32
))
...
...
@@ -65,11 +81,15 @@ def main():
result
.
metrics
)
cost
=
paddle
.
layer
.
classification_cost
(
input
=
predictword
,
label
=
nextword
)
parameters
=
paddle
.
parameters
.
create
(
cost
)
ada
m_optimizer
=
paddle
.
optimizer
.
Adam
(
ada
grad
=
paddle
.
optimizer
.
AdaGrad
(
learning_rate
=
3e-3
,
regularization
=
paddle
.
optimizer
.
L2Regularization
(
8e-4
))
trainer
=
paddle
.
trainer
.
SGD
(
cost
,
parameters
,
adam_optimizer
)
trainer
=
paddle
.
trainer
.
SGD
(
cost
,
parameters
,
adagrad
,
is_local
=
not
cluster_train
)
trainer
.
train
(
paddle
.
batch
(
paddle
.
dataset
.
imikolov
.
train
(
word_dict
,
N
),
32
),
num_passes
=
30
,
...
...
paddle/api/PaddleAPI.h
浏览文件 @
5f924007
...
...
@@ -19,6 +19,7 @@ limitations under the License. */
#include <stdexcept>
#include <string>
#include <vector>
#include "paddle/gserver/gradientmachines/GradientMachine.h"
#include "paddle/utils/Common.h"
#include "paddle/utils/GlobalConstants.h"
...
...
@@ -468,8 +469,10 @@ private:
};
enum
GradientMatchineCreateMode
{
CREATE_MODE_NORMAL
=
0
,
CREATE_MODE_TESTING
=
4
CREATE_MODE_NORMAL
=
paddle
::
GradientMachine
::
kNormal
,
CREATE_MODE_SGD_SPARSE_CPU_TRAINING
=
paddle
::
GradientMachine
::
kSgdSparseCpuTraining
,
CREATE_MODE_TESTING
=
paddle
::
GradientMachine
::
kTesting
};
struct
ParameterConfigPrivate
;
...
...
@@ -817,7 +820,8 @@ private:
public:
static
ParameterUpdater
*
createLocalUpdater
(
OptimizationConfig
*
config
);
static
ParameterUpdater
*
createRemoteUpdater
(
OptimizationConfig
*
config
,
int
passCount
);
int
passCount
,
bool
useSparseUpdater
);
~
ParameterUpdater
();
/**
...
...
@@ -855,6 +859,13 @@ public:
*/
void
update
(
Parameter
*
param
);
/**
* @breif only get required sparse rows by default.
* @param fullSize: get full matrix parameter if *fullSize* set
* @param apply: get PARAMETER_APPLY on pserver if *apply* set
*/
void
getParametersRemote
(
bool
fullSize
=
false
,
bool
apply
=
false
);
/**
* @brief restore the average parameter.
* @note It is only used in AverageOptimizer. Restore will get the current
...
...
paddle/api/ParameterUpdater.cpp
浏览文件 @
5f924007
...
...
@@ -29,10 +29,22 @@ ParameterUpdater *ParameterUpdater::createLocalUpdater(
}
ParameterUpdater
*
ParameterUpdater
::
createRemoteUpdater
(
OptimizationConfig
*
config
,
int
passCount
)
{
OptimizationConfig
*
config
,
int
passCount
,
bool
useSparseUpdater
)
{
auto
updater
=
new
ParameterUpdater
();
updater
->
m
->
updater
.
reset
(
new
paddle
::
RemoteParameterUpdater
(
config
->
m
->
getConfig
(),
passCount
,
nullptr
));
auto
remoteUpdater
=
new
paddle
::
RemoteParameterUpdater
(
config
->
m
->
getConfig
(),
passCount
,
nullptr
);
if
(
useSparseUpdater
)
{
std
::
unique_ptr
<
paddle
::
ParameterUpdater
>
remoteUpdaterPtr
(
remoteUpdater
);
auto
sparseRemoteUpdater
=
new
paddle
::
SparseRemoteParameterUpdaterComposite
(
config
->
m
->
getConfig
(),
passCount
,
false
,
std
::
move
(
remoteUpdaterPtr
));
updater
->
m
->
updater
.
reset
(
sparseRemoteUpdater
);
}
else
{
updater
->
m
->
updater
.
reset
(
remoteUpdater
);
}
return
updater
;
}
...
...
@@ -59,6 +71,10 @@ void ParameterUpdater::update(Parameter *param) {
m
->
updater
->
update
(
paddleParam
);
}
void
ParameterUpdater
::
getParametersRemote
(
bool
fullSize
,
bool
apply
)
{
m
->
updater
->
getParametersRemote
(
fullSize
,
apply
);
}
void
ParameterUpdater
::
restore
()
{
m
->
updater
->
restore
();
}
void
ParameterUpdater
::
apply
()
{
m
->
updater
->
apply
();
}
...
...
paddle/gserver/gradientmachines/MultiGradientMachine.cpp
浏览文件 @
5f924007
...
...
@@ -518,7 +518,7 @@ void TrainerThread::computeThread() {
backward
();
break
;
case
MultiGradientMachine
::
TASK_COPY_IN_ARGS
:
copyInArgs
();
batchSize_
=
copyInArgs
();
inArgsCopied_
=
true
;
multiMachine_
->
waitForCopyInArgs
();
break
;
...
...
python/paddle/v2/optimizer.py
浏览文件 @
5f924007
...
...
@@ -38,12 +38,35 @@ class Optimizer(object):
assert
isinstance
(
tmp
,
swig_api
.
ParameterOptimizer
)
return
tmp
.
getParameterTypes
()
def
create_local_updater
(
self
):
def
__create_local_updater__
(
self
):
return
swig_api
.
ParameterUpdater
.
createLocalUpdater
(
self
.
__opt_conf__
)
def
create_remote_updater
(
self
,
pass_num
):
return
swig_api
.
ParameterUpdater
.
createRemoteUpdater
(
self
.
__opt_conf__
,
pass_num
)
def
__create_remote_updater__
(
self
,
pass_num
,
use_sparse_updater
):
return
swig_api
.
ParameterUpdater
.
createRemoteUpdater
(
self
.
__opt_conf__
,
pass_num
,
use_sparse_updater
)
def
create_updater
(
self
,
is_local
,
num_passes
,
use_sparse_updater
):
"""
create proper parameter_updater by configuration.
:param is_local: create local or remote parameter updater
:param num_passes: remote parameter updater will use this to config
parameter server.
:param use_sparse_updater: when use remote updater, if some parameter is
sparse, updater should do some extra thing:
.. code-block:: python
if use_sparse_remote_updater:
gradient_machine.prefetch(in_args)
parameter_updater.getParametersRemote()
:return: parameter_updater
"""
if
is_local
:
parameter_updater
=
self
.
__create_local_updater__
()
else
:
parameter_updater
=
self
.
__create_remote_updater__
(
num_passes
,
use_sparse_updater
)
return
parameter_updater
class
Momentum
(
Optimizer
):
...
...
python/paddle/v2/topology.py
浏览文件 @
5f924007
...
...
@@ -73,6 +73,18 @@ class Topology(object):
assert
isinstance
(
self
.
__model_config__
,
ModelConfig
)
def
use_sparse_updater
(
self
):
"""
check if any parameter require to use sparse_update
:return:
"""
use_sparse
=
False
for
parameter
in
self
.
__model_config__
.
parameters
:
if
parameter
.
sparse_update
or
parameter
.
sparse_remote_update
:
use_sparse
=
True
break
return
use_sparse
def
proto
(
self
):
return
self
.
__model_config__
...
...
python/paddle/v2/trainer.py
浏览文件 @
5f924007
...
...
@@ -2,6 +2,8 @@
Module Trainer
"""
import
collections
import
gzip
import
os
import
py_paddle.swig_paddle
as
api
...
...
@@ -42,7 +44,12 @@ class SGD(object):
:type extra_layers: paddle.v2.config_base.Layer
"""
def
__init__
(
self
,
cost
,
parameters
,
update_equation
,
extra_layers
=
None
):
def
__init__
(
self
,
cost
,
parameters
,
update_equation
,
extra_layers
=
None
,
is_local
=
True
):
if
not
isinstance
(
parameters
,
v2_parameters
.
Parameters
):
raise
TypeError
(
'parameters should be parameters'
)
...
...
@@ -55,20 +62,48 @@ class SGD(object):
self
.
__topology__
=
topology
self
.
__parameters__
=
parameters
self
.
__topology_in_proto__
=
topology
.
proto
()
self
.
__is_local__
=
is_local
# In local mode, disable sparse_remote_update.
self
.
__use_sparse_updater__
=
self
.
__topology__
.
use_sparse_updater
()
# # In local mode, disable sparse_remote_update.
if
is_local
:
for
param
in
self
.
__topology_in_proto__
.
parameters
:
if
param
.
sparse_remote_update
:
param
.
sparse_remote_update
=
False
self
.
__gm_create_mode__
=
api
.
CREATE_MODE_NORMAL
if
not
\
self
.
__use_sparse_updater__
else
api
.
CREATE_MODE_SGD_SPARSE_CPU_TRAINING
self
.
__data_types__
=
topology
.
data_type
()
gm
=
api
.
GradientMachine
.
createFromConfigProto
(
self
.
__topology_in_proto__
,
api
.
CREATE_MODE_NORMAL
,
self
.
__topology_in_proto__
,
self
.
__gm_create_mode__
,
self
.
__optimizer__
.
enable_types
())
assert
isinstance
(
gm
,
api
.
GradientMachine
)
self
.
__gradient_machine__
=
gm
self
.
__gradient_machine__
.
randParameters
()
parameters
.
append_gradient_machine
(
gm
)
self
.
__parameters__
.
append_gradient_machine
(
gm
)
self
.
__parameter_updater__
=
None
def
__use_remote_sparse_updater__
(
self
):
return
self
.
__use_sparse_updater__
and
not
self
.
__is_local__
def
__prepare_parameter__
(
self
,
in_args
):
"""
prepare parameter before forward backward.
1. When use remote sparse updater, parameters should be got
from ps according to input arguments.
:param in_args: input arguments of this batch.
:return:
"""
if
self
.
__use_remote_sparse_updater__
():
self
.
__gradient_machine__
.
prefetch
(
in_args
)
self
.
__parameter_updater__
.
getParametersRemote
()
def
save_parameter_to_tar
(
self
,
f
):
self
.
__parameter_updater__
.
catchUpWith
()
self
.
__parameter_updater__
.
apply
()
self
.
__parameter_updater__
.
getParametersRemote
(
True
,
True
)
self
.
__parameters__
.
to_tar
(
f
)
self
.
__parameter_updater__
.
restore
()
def
train
(
self
,
reader
,
num_passes
=
1
,
event_handler
=
None
,
feeding
=
None
):
"""
...
...
@@ -90,8 +125,9 @@ class SGD(object):
event_handler
=
default_event_handler
__check_train_args__
(
**
locals
())
updater
=
self
.
__optimizer__
.
create_local_updater
()
updater
.
init
(
self
.
__gradient_machine__
)
self
.
__parameter_updater__
=
self
.
__optimizer__
.
create_updater
(
self
.
__is_local__
,
num_passes
,
self
.
__use_sparse_updater__
)
self
.
__parameter_updater__
.
init
(
self
.
__gradient_machine__
)
self
.
__gradient_machine__
.
start
()
batch_evaluator
=
self
.
__gradient_machine__
.
makeEvaluator
()
...
...
@@ -103,23 +139,26 @@ class SGD(object):
for
pass_id
in
xrange
(
num_passes
):
event_handler
(
v2_event
.
BeginPass
(
pass_id
))
pass_evaluator
.
start
()
updater
.
startPass
()
self
.
__parameter_updater__
.
startPass
()
for
batch_id
,
data_batch
in
enumerate
(
reader
()):
batch_evaluator
.
start
()
event_handler
(
v2_event
.
BeginIteration
(
pass_id
=
pass_id
,
batch_id
=
batch_id
))
pass_type
=
updater
.
startBatch
(
len
(
data_batch
))
self
.
__gradient_machine__
.
forwardBackward
(
feeder
(
data_batch
),
out_args
,
pass_type
)
pass_type
=
self
.
__parameter_updater__
.
startBatch
(
len
(
data_batch
))
in_args
=
feeder
(
data_batch
)
self
.
__prepare_parameter__
(
in_args
)
self
.
__gradient_machine__
.
forwardBackward
(
in_args
,
out_args
,
pass_type
)
self
.
__gradient_machine__
.
eval
(
pass_evaluator
)
self
.
__gradient_machine__
.
eval
(
batch_evaluator
)
for
each_param
in
self
.
__gradient_machine__
.
getNonStaticParameters
(
):
updater
.
update
(
each_param
)
self
.
__parameter_updater__
.
update
(
each_param
)
cost_sum
=
out_args
.
sum
()
cost
=
cost_sum
/
len
(
data_batch
)
updater
.
finishBatch
(
cost
)
self
.
__parameter_updater__
.
finishBatch
(
cost
)
batch_evaluator
.
finish
()
event_handler
(
v2_event
.
EndIteration
(
...
...
@@ -128,7 +167,7 @@ class SGD(object):
cost
=
cost
,
evaluator
=
batch_evaluator
))
updater
.
finishPass
()
self
.
__parameter_updater__
.
finishPass
()
pass_evaluator
.
finish
()
event_handler
(
v2_event
.
EndPass
(
pass_id
,
evaluator
=
pass_evaluator
))
self
.
__gradient_machine__
.
finish
()
...
...
@@ -152,8 +191,9 @@ class SGD(object):
num_samples
=
0.0
for
data_batch
in
reader
():
num_samples
+=
len
(
data_batch
)
self
.
__gradient_machine__
.
forward
(
feeder
(
data_batch
),
out_args
,
api
.
PASS_TEST
)
in_args
=
feeder
(
data_batch
)
self
.
__prepare_parameter__
(
in_args
)
self
.
__gradient_machine__
.
forward
(
in_args
,
out_args
,
api
.
PASS_TEST
)
total_cost
+=
out_args
.
sum
()
self
.
__gradient_machine__
.
eval
(
evaluator
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录