Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
0678073c
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 1 年 前同步成功
通知
2298
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
0678073c
编写于
10月 24, 2017
作者:
H
helinwang
提交者:
GitHub
10月 24, 2017
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #5059 from helinwang/log15
Go master, pserver, trainer: switch to log15, away from logrus
上级
6c0b3836
60238a1b
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
218 addition
and
138 deletion
+218
-138
go/cmd/master/master.go
go/cmd/master/master.go
+24
-15
go/cmd/pserver/pserver.go
go/cmd/pserver/pserver.go
+14
-10
go/glide.lock
go/glide.lock
+12
-4
go/glide.yaml
go/glide.yaml
+4
-0
go/master/c/client.go
go/master/c/client.go
+9
-3
go/master/client.go
go/master/client.go
+12
-9
go/master/client_internal_test.go
go/master/client_internal_test.go
+0
-6
go/master/etcd_client.go
go/master/etcd_client.go
+13
-11
go/master/service.go
go/master/service.go
+29
-22
go/pserver/client/c/cclient.go
go/pserver/client/c/cclient.go
+38
-11
go/pserver/client/client.go
go/pserver/client/client.go
+3
-3
go/pserver/client/client_test.go
go/pserver/client/client_test.go
+2
-2
go/pserver/client/etcd_client.go
go/pserver/client/etcd_client.go
+31
-19
go/pserver/etcd_client.go
go/pserver/etcd_client.go
+17
-13
go/pserver/optimizer.go
go/pserver/optimizer.go
+3
-3
go/pserver/service.go
go/pserver/service.go
+7
-7
未找到文件。
go/cmd/master/master.go
浏览文件 @
0678073c
...
@@ -25,9 +25,8 @@ import (
...
@@ -25,9 +25,8 @@ import (
"strings"
"strings"
"time"
"time"
log
"github.com/inconshreveable/log15"
"github.com/namsral/flag"
"github.com/namsral/flag"
log
"github.com/sirupsen/logrus"
"github.com/topicai/candy"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
...
@@ -41,16 +40,20 @@ func main() {
...
@@ -41,16 +40,20 @@ func main() {
taskTimeoutMax
:=
flag
.
Int
(
"task-timeout-max"
,
3
,
"max timtout count for each task before it being declared failed task."
)
taskTimeoutMax
:=
flag
.
Int
(
"task-timeout-max"
,
3
,
"max timtout count for each task before it being declared failed task."
)
chunkPerTask
:=
flag
.
Int
(
"chunk-per-task"
,
10
,
"chunk per task."
)
chunkPerTask
:=
flag
.
Int
(
"chunk-per-task"
,
10
,
"chunk per task."
)
logLevel
:=
flag
.
String
(
"log-level"
,
"info"
,
logLevel
:=
flag
.
String
(
"log-level"
,
"info"
,
"log level, possible values: debug, info, warn
ing, error, fatal, panic
"
)
"log level, possible values: debug, info, warn
, error, crit
"
)
flag
.
Parse
()
flag
.
Parse
()
level
,
e
:=
log
.
ParseLevel
(
*
logLevel
)
lvl
,
err
:=
log
.
LvlFromString
(
*
logLevel
)
candy
.
Must
(
e
)
if
err
!=
nil
{
panic
(
err
)
}
log
.
SetLevel
(
level
)
log
.
Root
()
.
SetHandler
(
log
.
LvlFilterHandler
(
lvl
,
log
.
CallerStackHandler
(
"%+v"
,
log
.
StderrHandler
)),
)
if
*
endpoints
==
""
{
if
*
endpoints
==
""
{
log
.
Warn
ingln
(
"-endpoints not set, fault tolerance not be enabled."
)
log
.
Warn
(
"-endpoints not set, fault tolerance not be enabled."
)
}
}
var
store
master
.
Store
var
store
master
.
Store
...
@@ -58,23 +61,25 @@ func main() {
...
@@ -58,23 +61,25 @@ func main() {
eps
:=
strings
.
Split
(
*
endpoints
,
","
)
eps
:=
strings
.
Split
(
*
endpoints
,
","
)
ip
,
err
:=
networkhelper
.
GetExternalIP
()
ip
,
err
:=
networkhelper
.
GetExternalIP
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatal
(
err
)
log
.
Crit
(
"get external ip error"
,
log
.
Ctx
{
"error"
:
err
})
panic
(
err
)
}
}
addr
:=
fmt
.
Sprintf
(
"%s:%d"
,
ip
,
*
port
)
addr
:=
fmt
.
Sprintf
(
"%s:%d"
,
ip
,
*
port
)
store
,
err
=
master
.
NewEtcdClient
(
eps
,
addr
,
master
.
DefaultLockPath
,
master
.
DefaultAddrPath
,
master
.
DefaultStatePath
,
*
ttlSec
)
store
,
err
=
master
.
NewEtcdClient
(
eps
,
addr
,
master
.
DefaultLockPath
,
master
.
DefaultAddrPath
,
master
.
DefaultStatePath
,
*
ttlSec
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatal
(
err
)
log
.
Crit
(
"error creating etcd client."
,
log
.
Ctx
{
"error"
:
err
})
panic
(
err
)
}
}
}
else
{
}
else
{
store
=
&
master
.
InMemStore
{}
store
=
&
master
.
InMemStore
{}
}
}
shutdown
:=
func
()
{
shutdown
:=
func
()
{
log
.
Info
ln
(
"shutting down gracefully"
)
log
.
Info
(
"shutting down gracefully"
)
err
:=
store
.
Shutdown
()
err
:=
store
.
Shutdown
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"shutdown error"
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
}
}
...
@@ -86,24 +91,28 @@ func main() {
...
@@ -86,24 +91,28 @@ func main() {
s
,
err
:=
master
.
NewService
(
store
,
*
chunkPerTask
,
*
taskTimeoutDur
,
*
taskTimeoutMax
)
s
,
err
:=
master
.
NewService
(
store
,
*
chunkPerTask
,
*
taskTimeoutDur
,
*
taskTimeoutMax
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatal
(
err
)
log
.
Crit
(
"error creating new service."
,
log
.
Ctx
{
"error"
:
err
})
panic
(
err
)
}
}
err
=
rpc
.
Register
(
s
)
err
=
rpc
.
Register
(
s
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatal
(
err
)
log
.
Crit
(
"error registering to etcd."
,
log
.
Ctx
{
"error"
:
err
})
panic
(
err
)
}
}
rpc
.
HandleHTTP
()
rpc
.
HandleHTTP
()
l
,
err
:=
net
.
Listen
(
"tcp"
,
":"
+
strconv
.
Itoa
(
*
port
))
l
,
err
:=
net
.
Listen
(
"tcp"
,
":"
+
strconv
.
Itoa
(
*
port
))
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatal
(
err
)
log
.
Crit
(
"error listing to port"
,
log
.
Ctx
{
"error"
:
err
,
"port"
:
*
port
})
panic
(
err
)
}
}
go
func
()
{
go
func
()
{
err
=
http
.
Serve
(
l
,
nil
)
err
=
http
.
Serve
(
l
,
nil
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatal
(
err
)
log
.
Crit
(
"error serving HTTP"
,
log
.
Ctx
{
"error"
:
err
})
panic
(
err
)
}
}
}()
}()
...
...
go/cmd/pserver/pserver.go
浏览文件 @
0678073c
...
@@ -27,11 +27,11 @@ import (
...
@@ -27,11 +27,11 @@ import (
"github.com/topicai/candy"
"github.com/topicai/candy"
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
func
main
()
{
func
main
()
{
port
:=
flag
.
Int
(
"port"
,
0
,
"port of the pserver"
)
port
:=
flag
.
Int
(
"port"
,
8001
,
"port of the pserver"
)
index
:=
flag
.
Int
(
"index"
,
-
1
,
"index of the pserver, set to -1 if use etcd for auto pserver index registry"
)
index
:=
flag
.
Int
(
"index"
,
-
1
,
"index of the pserver, set to -1 if use etcd for auto pserver index registry"
)
etcdEndpoint
:=
flag
.
String
(
"etcd-endpoint"
,
"http://127.0.0.1:2379"
,
etcdEndpoint
:=
flag
.
String
(
"etcd-endpoint"
,
"http://127.0.0.1:2379"
,
"comma separated endpoint string for pserver to connect to etcd"
)
"comma separated endpoint string for pserver to connect to etcd"
)
...
@@ -41,13 +41,17 @@ func main() {
...
@@ -41,13 +41,17 @@ func main() {
checkpointPath
:=
flag
.
String
(
"checkpoint-path"
,
"/checkpoints/"
,
"save checkpoint path"
)
checkpointPath
:=
flag
.
String
(
"checkpoint-path"
,
"/checkpoints/"
,
"save checkpoint path"
)
checkpointInterval
:=
flag
.
Duration
(
"checkpoint-interval"
,
600
*
time
.
Second
,
"save checkpoint per interval seconds"
)
checkpointInterval
:=
flag
.
Duration
(
"checkpoint-interval"
,
600
*
time
.
Second
,
"save checkpoint per interval seconds"
)
logLevel
:=
flag
.
String
(
"log-level"
,
"info"
,
logLevel
:=
flag
.
String
(
"log-level"
,
"info"
,
"log level, possible values: debug, info, warn
ing, error, fatal, panic
"
)
"log level, possible values: debug, info, warn
, error, crit
"
)
flag
.
Parse
()
flag
.
Parse
()
level
,
err
:=
log
.
ParseLevel
(
*
logLevel
)
lvl
,
err
:=
log
.
LvlFromString
(
*
logLevel
)
candy
.
Must
(
err
)
if
err
!=
nil
{
panic
(
err
)
}
log
.
SetLevel
(
level
)
log
.
Root
()
.
SetHandler
(
log
.
LvlFilterHandler
(
lvl
,
log
.
CallerStackHandler
(
"%+v"
,
log
.
StderrHandler
)),
)
var
idx
int
var
idx
int
...
@@ -63,7 +67,7 @@ func main() {
...
@@ -63,7 +67,7 @@ func main() {
cp
,
err
=
pserver
.
LoadCheckpoint
(
e
,
idx
)
cp
,
err
=
pserver
.
LoadCheckpoint
(
e
,
idx
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
pserver
.
ErrCheckpointNotFound
{
if
err
==
pserver
.
ErrCheckpointNotFound
{
log
.
Info
f
(
"Could not find the pserver checkpoint."
)
log
.
Info
(
"Could not find the pserver checkpoint."
)
}
else
{
}
else
{
panic
(
err
)
panic
(
err
)
}
}
...
@@ -71,10 +75,10 @@ func main() {
...
@@ -71,10 +75,10 @@ func main() {
}
}
shutdown
:=
func
()
{
shutdown
:=
func
()
{
log
.
Info
ln
(
"shutting down gracefully"
)
log
.
Info
(
"shutting down gracefully"
)
sErr
:=
e
.
Shutdown
()
sErr
:=
e
.
Shutdown
()
if
sErr
!=
nil
{
if
sErr
!=
nil
{
log
.
Error
ln
(
sErr
)
log
.
Error
(
"error shutting down"
,
log
.
Ctx
{
"error"
:
sErr
}
)
}
}
}
}
...
@@ -95,7 +99,7 @@ func main() {
...
@@ -95,7 +99,7 @@ func main() {
candy
.
Must
(
err
)
candy
.
Must
(
err
)
go
func
()
{
go
func
()
{
log
.
Info
f
(
"start pserver at port %d"
,
*
port
)
log
.
Info
(
"starting pserver"
,
log
.
Ctx
{
"port"
:
*
port
}
)
err
=
http
.
Serve
(
l
,
nil
)
err
=
http
.
Serve
(
l
,
nil
)
candy
.
Must
(
err
)
candy
.
Must
(
err
)
}()
}()
...
...
go/glide.lock
浏览文件 @
0678073c
hash:
328e7b9b7306b45e7b9879139a9f86698115981f6283032e1312093a6a6ddb04
hash:
51d9e2e46d7fd9173ff11ecada40f7b7728756be18d5e2f032535f66465e6e15
updated: 2017-10-
16T08:00:23.484693528Z
updated: 2017-10-
24T15:04:09.987751592-07:00
imports:
imports:
- name: github.com/alecthomas/gometalinter
- name: github.com/alecthomas/gometalinter
version: bae2f1293d092fd8167939d5108d1b025eaef9de
version: bae2f1293d092fd8167939d5108d1b025eaef9de
...
@@ -99,6 +99,8 @@ imports:
...
@@ -99,6 +99,8 @@ imports:
version: d2709f9f1f31ebcda9651b03077758c1f3a0018c
version: d2709f9f1f31ebcda9651b03077758c1f3a0018c
- name: github.com/ghodss/yaml
- name: github.com/ghodss/yaml
version: 0ca9ea5df5451ffdf184b4428c902747c2c11cd7
version: 0ca9ea5df5451ffdf184b4428c902747c2c11cd7
- name: github.com/go-stack/stack
version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf
- name: github.com/gogo/protobuf
- name: github.com/gogo/protobuf
version: 909568be09de550ed094403c2bf8a261b5bb730a
version: 909568be09de550ed094403c2bf8a261b5bb730a
subpackages:
subpackages:
...
@@ -120,8 +122,14 @@ imports:
...
@@ -120,8 +122,14 @@ imports:
- runtime
- runtime
- runtime/internal
- runtime/internal
- utilities
- utilities
- name: github.com/inconshreveable/log15
version: 0decfc6c20d9ca0ad143b0e89dcaa20f810b4fb3
- name: github.com/jonboulle/clockwork
- name: github.com/jonboulle/clockwork
version: 2eee05ed794112d45db504eb05aa693efd2b8b09
version: 2eee05ed794112d45db504eb05aa693efd2b8b09
- name: github.com/mattn/go-colorable
version: 5411d3eea5978e6cdc258b30de592b60df6aba96
- name: github.com/mattn/go-isatty
version: 57fdcb988a5c543893cc61bce354a6e24ab70022
- name: github.com/matttproud/golang_protobuf_extensions
- name: github.com/matttproud/golang_protobuf_extensions
version: c12348ce28de40eed0136aa2b644d0ee0650e56c
version: c12348ce28de40eed0136aa2b644d0ee0650e56c
subpackages:
subpackages:
...
@@ -179,11 +187,12 @@ imports:
...
@@ -179,11 +187,12 @@ imports:
- lex/httplex
- lex/httplex
- trace
- trace
- name: golang.org/x/sys
- name: golang.org/x/sys
version:
0f826bdd13b500be0f1d4004938ad978fcc6031e
version:
e48874b42435b4347fc52bdee0424a52abc974d7
repo: https://github.com/golang/sys.git
repo: https://github.com/golang/sys.git
vcs: git
vcs: git
subpackages:
subpackages:
- unix
- unix
- windows
- name: golang.org/x/text
- name: golang.org/x/text
version: 836efe42bb4aa16aaa17b9c155d8813d336ed720
version: 836efe42bb4aa16aaa17b9c155d8813d336ed720
repo: https://github.com/golang/text.git
repo: https://github.com/golang/text.git
...
@@ -222,4 +231,3 @@ testImports:
...
@@ -222,4 +231,3 @@ testImports:
version: 05e8a0eda380579888eb53c394909df027f06991
version: 05e8a0eda380579888eb53c394909df027f06991
subpackages:
subpackages:
- assert
- assert
go/glide.yaml
浏览文件 @
0678073c
...
@@ -26,3 +26,7 @@ import:
...
@@ -26,3 +26,7 @@ import:
version
:
v1.1.0
version
:
v1.1.0
-
package
:
github.com/alecthomas/gometalinter
-
package
:
github.com/alecthomas/gometalinter
version
:
v1.2.1
version
:
v1.2.1
-
package
:
github.com/inconshreveable/log15
version
:
v2.13
-
package
:
github.com/go-stack/stack
version
:
v1.6.0
go/master/c/client.go
浏览文件 @
0678073c
...
@@ -35,13 +35,19 @@ import (
...
@@ -35,13 +35,19 @@ import (
"unsafe"
"unsafe"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/Paddle/go/master"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
var
mu
sync
.
Mutex
var
mu
sync
.
Mutex
var
handleMap
=
make
(
map
[
C
.
paddle_master_client
]
*
master
.
Client
)
var
handleMap
=
make
(
map
[
C
.
paddle_master_client
]
*
master
.
Client
)
var
curHandle
C
.
paddle_master_client
var
curHandle
C
.
paddle_master_client
func
init
()
{
log
.
Root
()
.
SetHandler
(
log
.
LvlFilterHandler
(
log
.
LvlWarn
,
log
.
CallerStackHandler
(
"%+v"
,
log
.
StderrHandler
)),
)
}
func
add
(
c
*
master
.
Client
)
C
.
paddle_master_client
{
func
add
(
c
*
master
.
Client
)
C
.
paddle_master_client
{
mu
.
Lock
()
mu
.
Lock
()
defer
mu
.
Unlock
()
defer
mu
.
Unlock
()
...
@@ -117,7 +123,7 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int
...
@@ -117,7 +123,7 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int
}
}
err
:=
c
.
SetDataset
(
paths
)
err
:=
c
.
SetDataset
(
paths
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"error set dataset"
,
log
.
Ctx
{
"error"
:
err
}
)
return
C
.
PADDLE_MASTER_ERROR
return
C
.
PADDLE_MASTER_ERROR
}
}
...
@@ -167,7 +173,7 @@ func paddle_request_save_model(client C.paddle_master_client, trainerID string,
...
@@ -167,7 +173,7 @@ func paddle_request_save_model(client C.paddle_master_client, trainerID string,
c
:=
get
(
client
)
c
:=
get
(
client
)
need
,
err
:=
c
.
RequestSaveModel
(
trainerID
,
time
.
Duration
(
blockMS
)
*
time
.
Millisecond
)
need
,
err
:=
c
.
RequestSaveModel
(
trainerID
,
time
.
Duration
(
blockMS
)
*
time
.
Millisecond
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"error request save model"
,
log
.
Ctx
{
"error"
:
err
}
)
return
C
.
PADDLE_MASTER_ERROR
return
C
.
PADDLE_MASTER_ERROR
}
}
...
...
go/master/client.go
浏览文件 @
0678073c
...
@@ -21,7 +21,7 @@ import (
...
@@ -21,7 +21,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
"github.com/PaddlePaddle/recordio"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
// Client is the client of the master server.
// Client is the client of the master server.
...
@@ -75,7 +75,7 @@ func WithEtcd(endpoints []string, timeout time.Duration) func(*Client) error {
...
@@ -75,7 +75,7 @@ func WithEtcd(endpoints []string, timeout time.Duration) func(*Client) error {
for
{
for
{
err
:=
f
()
err
:=
f
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Warn
ingln
(
err
)
log
.
Warn
(
"create etcd client error"
,
log
.
Ctx
{
"error"
:
err
}
)
}
else
{
}
else
{
break
break
}
}
...
@@ -135,13 +135,13 @@ func (c *Client) getRecords(passID int) {
...
@@ -135,13 +135,13 @@ func (c *Client) getRecords(passID int) {
time
.
Sleep
(
time
.
Second
*
3
)
time
.
Sleep
(
time
.
Second
*
3
)
continue
continue
}
}
log
.
Error
f
(
"getTask error: %s"
,
err
)
log
.
Error
(
"getTask error."
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
for
_
,
chunk
:=
range
t
.
Chunks
{
for
_
,
chunk
:=
range
t
.
Chunks
{
f
,
e
:=
os
.
Open
(
chunk
.
Path
)
f
,
e
:=
os
.
Open
(
chunk
.
Path
)
if
e
!=
nil
{
if
e
!=
nil
{
log
.
Error
ln
(
e
)
log
.
Error
(
"error open chunk"
,
log
.
Ctx
{
"error"
:
e
}
)
continue
continue
}
}
...
@@ -152,12 +152,15 @@ func (c *Client) getRecords(passID int) {
...
@@ -152,12 +152,15 @@ func (c *Client) getRecords(passID int) {
if
s
.
Err
()
!=
nil
{
if
s
.
Err
()
!=
nil
{
c
.
ch
<-
record
{
nil
,
s
.
Err
()}
c
.
ch
<-
record
{
nil
,
s
.
Err
()}
log
.
Errorln
(
err
,
chunk
.
Path
)
log
.
Error
(
"error scan chunk"
,
log
.
Ctx
{
"error"
:
err
,
"path"
:
chunk
.
Path
},
)
}
}
err
=
f
.
Close
()
err
=
f
.
Close
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"error close record file"
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
}
}
...
@@ -166,7 +169,7 @@ func (c *Client) getRecords(passID int) {
...
@@ -166,7 +169,7 @@ func (c *Client) getRecords(passID int) {
// correct, but a reasonable approximation.
// correct, but a reasonable approximation.
err
=
c
.
taskFinished
(
t
.
Meta
.
ID
)
err
=
c
.
taskFinished
(
t
.
Meta
.
ID
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"task finish callback error."
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
}
}
}
}
...
@@ -179,12 +182,12 @@ func (c *Client) monitorMaster(addrCh <-chan string) {
...
@@ -179,12 +182,12 @@ func (c *Client) monitorMaster(addrCh <-chan string) {
if
curMaster
==
""
{
if
curMaster
==
""
{
err
:=
c
.
conn
.
Close
()
err
:=
c
.
conn
.
Close
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"close old master addr error"
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
}
else
{
}
else
{
err
:=
c
.
conn
.
Connect
(
curMaster
)
err
:=
c
.
conn
.
Connect
(
curMaster
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"connect to new master addr error"
,
log
.
Ctx
{
"error"
:
err
}
)
// connect to addr failed, set
// connect to addr failed, set
// to last known addr in order
// to last known addr in order
...
...
go/master/client_internal_test.go
浏览文件 @
0678073c
...
@@ -25,8 +25,6 @@ import (
...
@@ -25,8 +25,6 @@ import (
"testing"
"testing"
"time"
"time"
log
"github.com/sirupsen/logrus"
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
"github.com/PaddlePaddle/recordio"
)
)
...
@@ -36,10 +34,6 @@ const (
...
@@ -36,10 +34,6 @@ const (
chunkPerTask
=
10
chunkPerTask
=
10
)
)
func
init
()
{
log
.
SetLevel
(
log
.
ErrorLevel
)
}
func
TestGetFinishTask
(
t
*
testing
.
T
)
{
func
TestGetFinishTask
(
t
*
testing
.
T
)
{
const
path
=
"/tmp/master_client_test_0"
const
path
=
"/tmp/master_client_test_0"
...
...
go/master/etcd_client.go
浏览文件 @
0678073c
...
@@ -20,7 +20,7 @@ import (
...
@@ -20,7 +20,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/clientv3/concurrency"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
const
(
const
(
...
@@ -44,7 +44,7 @@ type EtcdClient struct {
...
@@ -44,7 +44,7 @@ type EtcdClient struct {
// NewEtcdClient creates a new EtcdClient.
// NewEtcdClient creates a new EtcdClient.
func
NewEtcdClient
(
endpoints
[]
string
,
addr
string
,
lockPath
,
addrPath
,
statePath
string
,
ttlSec
int
)
(
*
EtcdClient
,
error
)
{
func
NewEtcdClient
(
endpoints
[]
string
,
addr
string
,
lockPath
,
addrPath
,
statePath
string
,
ttlSec
int
)
(
*
EtcdClient
,
error
)
{
log
.
Debug
f
(
"Connecting to etcd at %v"
,
endpoints
)
log
.
Debug
(
"Connecting to etcd"
,
log
.
Ctx
{
"endpoint"
:
endpoints
}
)
cli
,
err
:=
clientv3
.
New
(
clientv3
.
Config
{
cli
,
err
:=
clientv3
.
New
(
clientv3
.
Config
{
Endpoints
:
endpoints
,
Endpoints
:
endpoints
,
DialTimeout
:
dialTimeout
,
DialTimeout
:
dialTimeout
,
...
@@ -64,12 +64,12 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
...
@@ -64,12 +64,12 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
// one master running, but split-brain problem may cause
// one master running, but split-brain problem may cause
// multiple master servers running), and the cluster management
// multiple master servers running), and the cluster management
// software will kill one of them.
// software will kill one of them.
log
.
Info
f
(
"Trying to acquire lock at %s."
,
lockPath
)
log
.
Info
(
"Trying to acquire lock."
,
log
.
Ctx
{
"path"
:
lockPath
}
)
err
=
lock
.
Lock
(
context
.
TODO
())
err
=
lock
.
Lock
(
context
.
TODO
())
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
log
.
Info
f
(
"Successfully acquired lock at %s."
,
lockPath
)
log
.
Info
(
"Successfully acquired lock at %s."
,
log
.
Ctx
{
"path"
:
lockPath
}
)
put
:=
clientv3
.
OpPut
(
addrPath
,
addr
)
put
:=
clientv3
.
OpPut
(
addrPath
,
addr
)
resp
,
err
:=
cli
.
Txn
(
context
.
Background
())
.
If
(
lock
.
IsOwner
())
.
Then
(
put
)
.
Commit
()
resp
,
err
:=
cli
.
Txn
(
context
.
Background
())
.
If
(
lock
.
IsOwner
())
.
Then
(
put
)
.
Commit
()
...
@@ -78,7 +78,8 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
...
@@ -78,7 +78,8 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
}
}
if
!
resp
.
Succeeded
{
if
!
resp
.
Succeeded
{
log
.
Fatal
(
"No longer owns the master lock. Exiting."
)
log
.
Crit
(
"No longer owns the master lock. Exiting."
)
panic
(
"No longer owns the master lock. Exiting."
)
}
}
e
:=
&
EtcdClient
{
e
:=
&
EtcdClient
{
...
@@ -102,7 +103,7 @@ func (e *EtcdClient) Save(state []byte) error {
...
@@ -102,7 +103,7 @@ func (e *EtcdClient) Save(state []byte) error {
}
}
if
!
resp
.
Succeeded
{
if
!
resp
.
Succeeded
{
log
.
Error
ln
(
"No longer owns the lock, trying to lock again"
)
log
.
Error
(
"No longer owns the lock, trying to lock again"
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
5
*
time
.
Second
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
5
*
time
.
Second
)
err
:=
e
.
lock
.
Lock
(
ctx
)
err
:=
e
.
lock
.
Lock
(
ctx
)
cancel
()
cancel
()
...
@@ -116,9 +117,10 @@ func (e *EtcdClient) Save(state []byte) error {
...
@@ -116,9 +117,10 @@ func (e *EtcdClient) Save(state []byte) error {
// to kill current master server. The current
// to kill current master server. The current
// state is not saved, but the trainer's RPC
// state is not saved, but the trainer's RPC
// call will fail, so the trainer will retry.
// call will fail, so the trainer will retry.
log
.
Fatalf
(
"Could not acquire the lock at %s: %v. Exiting."
,
e
.
lockPath
,
err
)
log
.
Crit
(
"Could not acquire the lock at %s: %v. Exiting."
,
log
.
Ctx
{
"path"
:
e
.
lockPath
,
"error"
:
err
})
panic
(
"Could not acquire the lock at %s: %v. Exiting."
)
}
}
log
.
Info
f
(
"Successfully acquired lock at %s."
,
e
.
lockPath
)
log
.
Info
(
"Successfully acquired lock at %s."
,
e
.
lockPath
)
return
e
.
Save
(
state
)
return
e
.
Save
(
state
)
}
}
...
@@ -136,7 +138,7 @@ func (e *EtcdClient) Load() ([]byte, error) {
...
@@ -136,7 +138,7 @@ func (e *EtcdClient) Load() ([]byte, error) {
}
}
if
!
resp
.
Succeeded
{
if
!
resp
.
Succeeded
{
log
.
Error
ln
(
"No longer owns the lock, trying to lock and load again."
)
log
.
Error
(
"No longer owns the lock, trying to lock and load again."
)
err
=
e
.
lock
.
Lock
(
context
.
Background
())
err
=
e
.
lock
.
Lock
(
context
.
Background
())
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
...
@@ -163,7 +165,7 @@ func (e *EtcdClient) Shutdown() error {
...
@@ -163,7 +165,7 @@ func (e *EtcdClient) Shutdown() error {
if
err
==
nil
{
if
err
==
nil
{
err
=
newErr
err
=
newErr
}
else
{
}
else
{
log
.
Error
ln
(
newErr
)
log
.
Error
(
"shutdown error"
,
log
.
Ctx
{
"error"
:
newErr
}
)
}
}
}
}
...
@@ -192,7 +194,7 @@ func watchKey(c *clientv3.Client, key string, valChan chan<- string) {
...
@@ -192,7 +194,7 @@ func watchKey(c *clientv3.Client, key string, valChan chan<- string) {
for
wresp
:=
range
rch
{
for
wresp
:=
range
rch
{
for
_
,
ev
:=
range
wresp
.
Events
{
for
_
,
ev
:=
range
wresp
.
Events
{
// if received event is DELETE, the value will be an empty string
// if received event is DELETE, the value will be an empty string
log
.
Info
f
(
"received event %s, %q : %q
\n
"
,
ev
.
Type
,
ev
.
Kv
.
Key
,
ev
.
Kv
.
Value
)
log
.
Info
(
"received event."
,
log
.
Ctx
{
"type"
:
ev
.
Type
,
"key"
:
ev
.
Kv
.
Key
,
"value"
:
ev
.
Kv
.
Value
}
)
valChan
<-
string
(
ev
.
Kv
.
Value
)
valChan
<-
string
(
ev
.
Kv
.
Value
)
}
}
}
}
...
...
go/master/service.go
浏览文件 @
0678073c
...
@@ -25,7 +25,7 @@ import (
...
@@ -25,7 +25,7 @@ import (
"sync"
"sync"
"time"
"time"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
"github.com/PaddlePaddle/recordio"
"github.com/PaddlePaddle/recordio"
)
)
...
@@ -170,11 +170,11 @@ func (s *Service) recover() (bool, error) {
...
@@ -170,11 +170,11 @@ func (s *Service) recover() (bool, error) {
}
}
if
state
==
nil
{
if
state
==
nil
{
log
.
Info
ln
(
"No state exists, not recovered."
)
log
.
Info
(
"No state exists, not recovered."
)
return
false
,
nil
return
false
,
nil
}
}
log
.
Info
f
(
"Loaded snapshot of size: %d bytes."
,
len
(
state
)
)
log
.
Info
(
"Loaded snapshot."
,
log
.
Ctx
{
"size"
:
len
(
state
)}
)
gr
,
err
:=
gzip
.
NewReader
(
bytes
.
NewReader
(
state
))
gr
,
err
:=
gzip
.
NewReader
(
bytes
.
NewReader
(
state
))
if
err
!=
nil
{
if
err
!=
nil
{
return
false
,
err
return
false
,
err
...
@@ -191,11 +191,11 @@ func (s *Service) recover() (bool, error) {
...
@@ -191,11 +191,11 @@ func (s *Service) recover() (bool, error) {
if
err
!=
nil
{
if
err
!=
nil
{
// Only close failed, recover actually succeed, so
// Only close failed, recover actually succeed, so
// just log error.
// just log error.
log
.
Error
ln
(
err
)
log
.
Error
(
"error close recover file."
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
s
.
state
=
tqs
s
.
state
=
tqs
log
.
WithFields
(
s
.
logFields
())
.
Infof
(
"Master recovered from snapshot, scheduling pending task timeout check."
)
log
.
Info
(
"Master recovered from snapshot, scheduling pending task timeout check."
,
s
.
logCtx
()
)
for
_
,
t
:=
range
s
.
state
.
Pending
{
for
_
,
t
:=
range
s
.
state
.
Pending
{
time
.
AfterFunc
(
s
.
timeoutDur
,
s
.
checkTimeoutFunc
(
t
.
Task
.
Meta
.
ID
,
t
.
Task
.
Meta
.
Epoch
))
time
.
AfterFunc
(
s
.
timeoutDur
,
s
.
checkTimeoutFunc
(
t
.
Task
.
Meta
.
ID
,
t
.
Task
.
Meta
.
Epoch
))
}
}
...
@@ -224,7 +224,7 @@ func (s *Service) snapshot() error {
...
@@ -224,7 +224,7 @@ func (s *Service) snapshot() error {
}
}
state
:=
buf
.
Bytes
()
state
:=
buf
.
Bytes
()
log
.
Info
f
(
"Saving snapshot of size: %d bytes."
,
len
(
state
)
)
log
.
Info
(
"Saving snapshot."
,
log
.
Ctx
{
"size bytes"
:
len
(
state
)}
)
return
s
.
store
.
Save
(
state
)
return
s
.
store
.
Save
(
state
)
}
}
...
@@ -260,7 +260,7 @@ func readChunks(globPaths []string) ([]Chunk, error) {
...
@@ -260,7 +260,7 @@ func readChunks(globPaths []string) ([]Chunk, error) {
}
}
count
:=
index
.
NumChunks
()
count
:=
index
.
NumChunks
()
log
.
Info
f
(
"readChunks: file %s has %d chunks"
,
path
,
count
)
log
.
Info
(
"reading chunks."
,
log
.
Ctx
{
"path"
:
path
,
"num chunks"
:
count
}
)
for
i
:=
0
;
i
<
count
;
i
++
{
for
i
:=
0
;
i
<
count
;
i
++
{
chunk
:=
Chunk
{
chunk
:=
Chunk
{
Path
:
path
,
Path
:
path
,
...
@@ -300,7 +300,7 @@ func (s *Service) SetDataset(globPaths []string, _ *int) error {
...
@@ -300,7 +300,7 @@ func (s *Service) SetDataset(globPaths []string, _ *int) error {
err
=
s
.
snapshot
()
err
=
s
.
snapshot
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"snapshot error"
,
log
.
Ctx
{
"error"
:
err
}
)
return
err
return
err
}
}
close
(
s
.
ready
)
close
(
s
.
ready
)
...
@@ -320,7 +320,7 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
...
@@ -320,7 +320,7 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
defer
func
()
{
defer
func
()
{
err
:=
s
.
snapshot
()
err
:=
s
.
snapshot
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"snapshot error"
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
}()
}()
...
@@ -328,12 +328,12 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
...
@@ -328,12 +328,12 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
t
.
NumFailure
++
t
.
NumFailure
++
if
t
.
NumFailure
>
s
.
failureMax
{
if
t
.
NumFailure
>
s
.
failureMax
{
log
.
Warn
ingf
(
"Task %v failed %d times, discard."
,
t
.
Task
,
t
.
NumFailure
)
log
.
Warn
(
"Task failed to many times, discard."
,
log
.
Ctx
{
"task"
:
t
.
Task
,
"num failed"
:
t
.
NumFailure
}
)
s
.
state
.
Failed
=
append
(
s
.
state
.
Failed
,
t
)
s
.
state
.
Failed
=
append
(
s
.
state
.
Failed
,
t
)
return
return
}
}
log
.
Warn
ingf
(
"Task %v failed %d times, re-dispatch."
,
t
.
Task
,
t
.
NumFailure
)
log
.
Warn
(
"Task failed, re-dispatch."
,
log
.
Ctx
{
"task"
:
t
.
Task
,
"num failed"
:
t
.
NumFailure
}
)
s
.
state
.
Todo
=
append
(
s
.
state
.
Todo
,
t
)
s
.
state
.
Todo
=
append
(
s
.
state
.
Todo
,
t
)
return
return
}
}
...
@@ -353,8 +353,8 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
...
@@ -353,8 +353,8 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
}
}
// must be called with lock held.
// must be called with lock held.
func
(
s
*
Service
)
log
Fields
()
log
.
Fields
{
func
(
s
*
Service
)
log
Ctx
()
log
.
Ctx
{
return
log
.
Fields
{
return
log
.
Ctx
{
"todoLen"
:
len
(
s
.
state
.
Todo
),
"todoLen"
:
len
(
s
.
state
.
Todo
),
"pendingLen"
:
len
(
s
.
state
.
Pending
),
"pendingLen"
:
len
(
s
.
state
.
Pending
),
"doneLen"
:
len
(
s
.
state
.
Done
),
"doneLen"
:
len
(
s
.
state
.
Done
),
...
@@ -383,10 +383,10 @@ func (s *Service) GetTask(passID int, task *Task) error {
...
@@ -383,10 +383,10 @@ func (s *Service) GetTask(passID int, task *Task) error {
if
len
(
s
.
state
.
Todo
)
==
0
{
if
len
(
s
.
state
.
Todo
)
==
0
{
if
len
(
s
.
state
.
Done
)
==
0
&&
len
(
s
.
state
.
Pending
)
==
0
{
if
len
(
s
.
state
.
Done
)
==
0
&&
len
(
s
.
state
.
Pending
)
==
0
{
log
.
W
ithFields
(
s
.
logFields
())
.
Warningln
(
"All tasks failed, may start next pass"
)
log
.
W
arn
(
"All tasks failed, may start next pass"
,
s
.
logCtx
()
)
return
ErrAllTaskFailed
return
ErrAllTaskFailed
}
}
log
.
W
ithFields
(
s
.
logFields
())
.
Warningln
(
"No more available task."
)
log
.
W
arn
(
"No more available task."
,
s
.
logCtx
()
)
return
ErrNoMoreAvailable
return
ErrNoMoreAvailable
}
}
...
@@ -400,8 +400,9 @@ func (s *Service) GetTask(passID int, task *Task) error {
...
@@ -400,8 +400,9 @@ func (s *Service) GetTask(passID int, task *Task) error {
}
}
*
task
=
t
.
Task
*
task
=
t
.
Task
log
.
WithFields
(
s
.
logFields
())
.
Infof
(
"Task #%v dispatched."
,
t
.
Task
.
Meta
)
ctx
:=
s
.
logCtx
()
ctx
[
"task meta"
]
=
t
.
Task
.
Meta
log
.
Info
(
"Task dispatched."
,
ctx
)
time
.
AfterFunc
(
s
.
timeoutDur
,
s
.
checkTimeoutFunc
(
t
.
Task
.
Meta
.
ID
,
t
.
Task
.
Meta
.
Epoch
))
time
.
AfterFunc
(
s
.
timeoutDur
,
s
.
checkTimeoutFunc
(
t
.
Task
.
Meta
.
ID
,
t
.
Task
.
Meta
.
Epoch
))
return
nil
return
nil
}
}
...
@@ -417,7 +418,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
...
@@ -417,7 +418,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
t
,
ok
:=
s
.
state
.
Pending
[
taskID
]
t
,
ok
:=
s
.
state
.
Pending
[
taskID
]
if
!
ok
{
if
!
ok
{
log
.
WithFields
(
s
.
logFields
())
.
Warningln
(
"Pending task #%d not found."
,
taskID
)
ctx
:=
s
.
logCtx
()
ctx
[
"task id"
]
=
taskID
log
.
Warn
(
"Pending task not found."
,
ctx
)
return
nil
return
nil
}
}
...
@@ -426,7 +429,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
...
@@ -426,7 +429,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s
.
state
.
Done
=
append
(
s
.
state
.
Done
,
t
)
s
.
state
.
Done
=
append
(
s
.
state
.
Done
,
t
)
delete
(
s
.
state
.
Pending
,
taskID
)
delete
(
s
.
state
.
Pending
,
taskID
)
log
.
WithFields
(
s
.
logFields
())
.
Infof
(
"Task #%d finished."
,
taskID
)
ctx
:=
s
.
logCtx
()
ctx
[
"task id"
]
=
taskID
log
.
Info
(
"Task finished."
,
ctx
)
if
len
(
s
.
state
.
Todo
)
==
0
&&
len
(
s
.
state
.
Pending
)
==
0
{
if
len
(
s
.
state
.
Todo
)
==
0
&&
len
(
s
.
state
.
Pending
)
==
0
{
// increase master side pass count if all tasks finished
// increase master side pass count if all tasks finished
s
.
state
.
CurPass
++
s
.
state
.
CurPass
++
...
@@ -434,12 +439,14 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
...
@@ -434,12 +439,14 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s
.
state
.
Done
=
[]
taskEntry
{}
s
.
state
.
Done
=
[]
taskEntry
{}
// TODO(typhoonzero): deal with failed tasks
// TODO(typhoonzero): deal with failed tasks
s
.
state
.
Failed
=
[]
taskEntry
{}
s
.
state
.
Failed
=
[]
taskEntry
{}
log
.
WithFields
(
s
.
logFields
())
.
Warningf
(
"all task finished, add new pass data, newpass: %d."
,
s
.
state
.
CurPass
)
ctx
:=
s
.
logCtx
()
ctx
[
"new pass"
]
=
s
.
state
.
CurPass
log
.
Warn
(
"all task finished, add new pass data."
,
ctx
)
}
}
err
:=
s
.
snapshot
()
err
:=
s
.
snapshot
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"snapshot error"
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
return
err
return
err
}
}
...
@@ -455,7 +462,7 @@ func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
...
@@ -455,7 +462,7 @@ func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
t
,
ok
:=
s
.
state
.
Pending
[
meta
.
ID
]
t
,
ok
:=
s
.
state
.
Pending
[
meta
.
ID
]
if
!
ok
{
if
!
ok
{
log
.
W
ithFields
(
s
.
logFields
())
.
Warningln
(
"TaskFailed:Pending task #%v not found."
,
t
.
Task
.
Meta
)
log
.
W
arn
(
"TaskFailed:Pending task not found."
,
log
.
Ctx
{
"task"
:
t
.
Task
.
Meta
}
)
return
nil
return
nil
}
}
...
...
go/pserver/client/c/cclient.go
浏览文件 @
0678073c
...
@@ -45,9 +45,15 @@ import (
...
@@ -45,9 +45,15 @@ import (
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver/client"
"github.com/PaddlePaddle/Paddle/go/pserver/client"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
func
init
()
{
log
.
Root
()
.
SetHandler
(
log
.
LvlFilterHandler
(
log
.
LvlWarn
,
log
.
CallerStackHandler
(
"%+v"
,
log
.
StderrHandler
)),
)
}
var
mu
sync
.
Mutex
var
mu
sync
.
Mutex
var
handleMap
=
make
(
map
[
C
.
paddle_pserver_client
]
*
client
.
Client
)
var
handleMap
=
make
(
map
[
C
.
paddle_pserver_client
]
*
client
.
Client
)
var
curHandle
C
.
paddle_pserver_client
var
curHandle
C
.
paddle_pserver_client
...
@@ -164,10 +170,13 @@ func paddle_init_param(client C.paddle_pserver_client, param C.paddle_parameter,
...
@@ -164,10 +170,13 @@ func paddle_init_param(client C.paddle_pserver_client, param C.paddle_parameter,
if
err
!=
nil
{
if
err
!=
nil
{
if
err
.
Error
()
==
pserver
.
AlreadyInitialized
{
if
err
.
Error
()
==
pserver
.
AlreadyInitialized
{
log
.
Warningf
(
"parameter %s already initialized, treat paddle_init_param as successful."
,
name
)
log
.
Warn
(
"parameter already initialized, treat paddle_init_param as successful."
,
log
.
Ctx
{
"parameter"
:
name
},
)
return
C
.
PSERVER_OK
return
C
.
PSERVER_OK
}
}
log
.
Error
ln
(
err
)
log
.
Error
(
"error init param"
,
log
.
Ctx
{
"error"
:
err
}
)
return
C
.
PSERVER_ERROR
return
C
.
PSERVER_ERROR
}
}
...
@@ -180,11 +189,11 @@ func paddle_finish_init_params(client C.paddle_pserver_client) C.int {
...
@@ -180,11 +189,11 @@ func paddle_finish_init_params(client C.paddle_pserver_client) C.int {
err
:=
c
.
FinishInitParams
()
err
:=
c
.
FinishInitParams
()
if
err
!=
nil
{
if
err
!=
nil
{
if
err
.
Error
()
==
pserver
.
AlreadyInitialized
{
if
err
.
Error
()
==
pserver
.
AlreadyInitialized
{
log
.
Warn
ingln
(
"parameters already initialized, treat paddle_finish_init_params as successful."
)
log
.
Warn
(
"parameters already initialized, treat paddle_finish_init_params as successful."
)
return
C
.
PSERVER_OK
return
C
.
PSERVER_OK
}
}
log
.
Error
ln
(
err
)
log
.
Error
(
"error finish init params"
,
log
.
Ctx
{
"error"
:
err
}
)
return
C
.
PSERVER_ERROR
return
C
.
PSERVER_ERROR
}
}
...
@@ -205,7 +214,7 @@ func paddle_send_grads(client C.paddle_pserver_client, grads **C.paddle_gradient
...
@@ -205,7 +214,7 @@ func paddle_send_grads(client C.paddle_pserver_client, grads **C.paddle_gradient
c
:=
get
(
client
)
c
:=
get
(
client
)
err
:=
c
.
SendGrads
(
gs
)
err
:=
c
.
SendGrads
(
gs
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"error send grads"
,
log
.
Ctx
{
"error"
:
err
}
)
return
C
.
PSERVER_ERROR
return
C
.
PSERVER_ERROR
}
}
...
@@ -222,7 +231,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
...
@@ -222,7 +231,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
c
:=
get
(
client
)
c
:=
get
(
client
)
ps
,
err
:=
c
.
GetParams
(
ns
)
ps
,
err
:=
c
.
GetParams
(
ns
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"error get params"
,
log
.
Ctx
{
"error"
:
err
}
)
return
C
.
PSERVER_ERROR
return
C
.
PSERVER_ERROR
}
}
...
@@ -231,7 +240,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
...
@@ -231,7 +240,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
for
i
,
p
:=
range
ps
{
for
i
,
p
:=
range
ps
{
pn
[
i
]
=
p
.
Name
pn
[
i
]
=
p
.
Name
}
}
log
.
Errorf
(
"pserver returned wrong number of parameters. Requested: %s, returned: %s."
,
strings
.
Join
(
pn
,
", "
),
strings
.
Join
(
ns
,
", "
))
log
.
Error
(
"pserver returned wrong number of parameters."
,
log
.
Ctx
{
"Requested"
:
strings
.
Join
(
pn
,
", "
),
"Returned"
:
strings
.
Join
(
ns
,
", "
),
},
)
return
C
.
PSERVER_ERROR
return
C
.
PSERVER_ERROR
}
}
...
@@ -241,7 +256,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
...
@@ -241,7 +256,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
for
i
,
p
:=
range
ps
{
for
i
,
p
:=
range
ps
{
pn
[
i
]
=
p
.
Name
pn
[
i
]
=
p
.
Name
}
}
log
.
Errorf
(
"pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s."
,
strings
.
Join
(
pn
,
", "
),
strings
.
Join
(
ns
,
", "
))
log
.
Error
(
"pserver returned wrong parameters, or not in requested order."
,
log
.
Ctx
{
"Requested"
:
strings
.
Join
(
pn
,
", "
),
"Returned"
:
strings
.
Join
(
ns
,
", "
),
},
)
return
C
.
PSERVER_ERROR
return
C
.
PSERVER_ERROR
}
}
}
}
...
@@ -251,13 +272,19 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
...
@@ -251,13 +272,19 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
param
:=
*
(
**
C
.
paddle_parameter
)(
unsafe
.
Pointer
((
uintptr
(
unsafe
.
Pointer
(
dst
))
+
uintptr
(
i
)
*
unsafe
.
Sizeof
(
*
dst
))))
param
:=
*
(
**
C
.
paddle_parameter
)(
unsafe
.
Pointer
((
uintptr
(
unsafe
.
Pointer
(
dst
))
+
uintptr
(
i
)
*
unsafe
.
Sizeof
(
*
dst
))))
if
unsafe
.
Pointer
(
param
)
==
nil
{
if
unsafe
.
Pointer
(
param
)
==
nil
{
log
.
Error
ln
(
"must pre-allocate parameter."
)
log
.
Error
(
"must pre-allocate parameter."
)
return
C
.
PSERVER_ERROR
return
C
.
PSERVER_ERROR
}
}
if
unsafe
.
Pointer
(
param
.
content
)
!=
nil
{
if
unsafe
.
Pointer
(
param
.
content
)
!=
nil
{
if
int
(
param
.
content_len
)
!=
len
(
p
.
Content
)
{
if
int
(
param
.
content_len
)
!=
len
(
p
.
Content
)
{
log
.
Errorf
(
"the pre-allocated content len does not match parameter content len. Pre-allocated len: %d, returned len: %d"
,
param
.
content_len
,
len
(
p
.
Content
))
log
.
Error
(
"the pre-allocated content len does not match parameter content len."
,
log
.
Ctx
{
"Pre-allocated len"
:
param
.
content_len
,
"Returned len"
:
len
(
p
.
Content
),
},
)
return
C
.
PSERVER_ERROR
return
C
.
PSERVER_ERROR
}
}
}
}
...
...
go/pserver/client/client.go
浏览文件 @
0678073c
...
@@ -22,7 +22,7 @@ import (
...
@@ -22,7 +22,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
// TODO(helin): add RPC call retry logic
// TODO(helin): add RPC call retry logic
...
@@ -84,7 +84,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
...
@@ -84,7 +84,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
if
curServers
[
i
]
.
Addr
==
""
{
if
curServers
[
i
]
.
Addr
==
""
{
err
:=
c
.
pservers
[
i
]
.
Close
()
err
:=
c
.
pservers
[
i
]
.
Close
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"error closing connection to pserver"
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
continue
continue
...
@@ -92,7 +92,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
...
@@ -92,7 +92,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
err
:=
c
.
pservers
[
i
]
.
Connect
(
curServers
[
i
]
.
Addr
)
err
:=
c
.
pservers
[
i
]
.
Connect
(
curServers
[
i
]
.
Addr
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"error connecting to pserver"
,
log
.
Ctx
{
"error"
:
err
}
)
// connect to addr failed, set
// connect to addr failed, set
// to last known addr in order
// to last known addr in order
...
...
go/pserver/client/client_test.go
浏览文件 @
0678073c
...
@@ -30,7 +30,7 @@ import (
...
@@ -30,7 +30,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver/client"
"github.com/PaddlePaddle/Paddle/go/pserver/client"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
const
(
const
(
...
@@ -90,7 +90,7 @@ func initEtcdClient() {
...
@@ -90,7 +90,7 @@ func initEtcdClient() {
DialTimeout
:
time
.
Second
*
time
.
Duration
(
1
),
DialTimeout
:
time
.
Second
*
time
.
Duration
(
1
),
})
})
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
f
(
"err %v"
,
err
)
log
.
Error
(
"error init etcd client"
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
timeout
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
timeout
)
_
,
err
=
client
.
Delete
(
ctx
,
pserver
.
PsDesired
)
_
,
err
=
client
.
Delete
(
ctx
,
pserver
.
PsDesired
)
...
...
go/pserver/client/etcd_client.go
浏览文件 @
0678073c
...
@@ -25,7 +25,7 @@ import (
...
@@ -25,7 +25,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/clientv3/concurrency"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
const
(
const
(
...
@@ -54,26 +54,29 @@ func (e *Etcd) Desired() int {
...
@@ -54,26 +54,29 @@ func (e *Etcd) Desired() int {
resp
,
err
:=
e
.
client
.
Get
(
ctx
,
pserver
.
PsDesired
)
resp
,
err
:=
e
.
client
.
Get
(
ctx
,
pserver
.
PsDesired
)
cancel
()
cancel
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Errorf
(
"Get ps dresire number failed! recnnectiong..., %v"
,
err
)
log
.
Error
(
"Get ps dresire number failed! reconnecting..."
,
log
.
Ctx
{
"error"
:
err
},
)
time
.
Sleep
(
e
.
timeout
)
time
.
Sleep
(
e
.
timeout
)
continue
continue
}
}
kvs
:=
resp
.
Kvs
kvs
:=
resp
.
Kvs
if
len
(
kvs
)
==
0
{
if
len
(
kvs
)
==
0
{
log
.
Info
ln
(
"Waiting for ps desired registered ..."
)
log
.
Info
(
"Waiting for ps desired registered ..."
)
time
.
Sleep
(
e
.
timeout
)
time
.
Sleep
(
e
.
timeout
)
continue
continue
}
}
psDesired
,
err
=
strconv
.
Atoi
(
string
(
resp
.
Kvs
[
0
]
.
Value
))
psDesired
,
err
=
strconv
.
Atoi
(
string
(
resp
.
Kvs
[
0
]
.
Value
))
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
f
(
"psDesired %d invalid %v"
,
psDesired
,
err
)
log
.
Error
(
"atoi failed"
,
log
.
Ctx
{
"error"
:
err
}
)
time
.
Sleep
(
e
.
timeout
)
time
.
Sleep
(
e
.
timeout
)
continue
continue
}
}
log
.
Debug
f
(
"Get psDesired number: %d"
,
psDesired
)
log
.
Debug
(
"Got psDesired"
,
log
.
Ctx
{
"psDesired"
:
psDesired
}
)
break
break
}
}
return
psDesired
return
psDesired
...
@@ -88,17 +91,20 @@ func (e *Etcd) List() []Server {
...
@@ -88,17 +91,20 @@ func (e *Etcd) List() []Server {
for
i
:=
0
;
i
<
psDesired
;
i
++
{
for
i
:=
0
;
i
<
psDesired
;
i
++
{
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
e
.
timeout
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
e
.
timeout
)
psKey
:=
pserver
.
PsPath
+
strconv
.
Itoa
(
i
)
psKey
:=
pserver
.
PsPath
+
strconv
.
Itoa
(
i
)
log
.
Debug
f
(
"checking %s"
,
psKey
)
log
.
Debug
(
"looking for pserver"
,
log
.
Ctx
{
"ps key"
:
psKey
}
)
resp
,
err
:=
e
.
client
.
Get
(
ctx
,
psKey
)
resp
,
err
:=
e
.
client
.
Get
(
ctx
,
psKey
)
cancel
()
cancel
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Infof
(
"Get psKey= %s error, %v"
,
psKey
,
err
)
log
.
Info
(
"Get psKey error"
,
log
.
Ctx
{
"ps key"
:
psKey
,
"error"
:
err
},
)
time
.
Sleep
(
e
.
timeout
)
time
.
Sleep
(
e
.
timeout
)
continue
continue
}
}
kvs
:=
resp
.
Kvs
kvs
:=
resp
.
Kvs
if
len
(
kvs
)
==
0
{
if
len
(
kvs
)
==
0
{
log
.
Info
f
(
"Waiting for ps addr registered ..."
)
log
.
Info
(
"Waiting for ps addr registered ..."
)
time
.
Sleep
(
e
.
timeout
)
time
.
Sleep
(
e
.
timeout
)
continue
continue
}
}
...
@@ -106,11 +112,17 @@ func (e *Etcd) List() []Server {
...
@@ -106,11 +112,17 @@ func (e *Etcd) List() []Server {
psAddr
:=
string
(
resp
.
Kvs
[
0
]
.
Value
)
psAddr
:=
string
(
resp
.
Kvs
[
0
]
.
Value
)
// TODO(Longfei) check the ps address
// TODO(Longfei) check the ps address
if
psAddr
==
""
{
if
psAddr
==
""
{
log
.
Infof
(
"Get psKey = %s, psAddr is empty"
,
psKey
)
log
.
Info
(
"Value under psKey is empty"
,
log
.
Ctx
{
"psKey"
:
psKey
},
)
time
.
Sleep
(
e
.
timeout
)
time
.
Sleep
(
e
.
timeout
)
continue
continue
}
}
log
.
Debugf
(
"got value (%s) for key: %s"
,
psAddr
,
psKey
)
log
.
Debug
(
"got psAddr given psKey"
,
log
.
Ctx
{
"psAddr"
:
psAddr
,
"psKey"
:
psKey
},
)
servers
[
i
]
.
Index
=
i
servers
[
i
]
.
Index
=
i
servers
[
i
]
.
Addr
=
psAddr
servers
[
i
]
.
Addr
=
psAddr
}
}
...
@@ -130,13 +142,13 @@ func NewEtcd(endpoints string) *Etcd {
...
@@ -130,13 +142,13 @@ func NewEtcd(endpoints string) *Etcd {
DialTimeout
:
defaultEtcdTimeout
,
DialTimeout
:
defaultEtcdTimeout
,
})
})
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
f
(
"Init etcd connection failed: %v"
,
err
)
log
.
Error
(
"Init etcd connection failed"
,
log
.
Ctx
{
"error"
:
err
}
)
time
.
Sleep
(
defaultEtcdTimeout
)
time
.
Sleep
(
defaultEtcdTimeout
)
continue
continue
}
}
break
break
}
}
log
.
Info
f
(
"Connected to etcd: %s
\n
"
,
endpoints
)
log
.
Info
(
"Connected to etcd endpoint"
,
log
.
Ctx
{
"endpoint"
:
endpoints
}
)
client
:=
&
Etcd
{
client
:=
&
Etcd
{
client
:
cli
,
client
:
cli
,
timeout
:
defaultEtcdTimeout
,
timeout
:
defaultEtcdTimeout
,
...
@@ -154,7 +166,7 @@ func (e *Etcd) Select() (bool, error) {
...
@@ -154,7 +166,7 @@ func (e *Etcd) Select() (bool, error) {
}
}
lock
:=
concurrency
.
NewMutex
(
sess
,
initLockPath
)
lock
:=
concurrency
.
NewMutex
(
sess
,
initLockPath
)
log
.
Info
f
(
"Trying to acquire lock at %s."
,
initLockPath
)
log
.
Info
(
"Trying to acquire lock"
,
log
.
Ctx
{
"lock path"
:
initLockPath
}
)
// Do not use timeout context here, since we don't know how
// Do not use timeout context here, since we don't know how
// long does it take for other trainers to initialize the
// long does it take for other trainers to initialize the
// parameters.
// parameters.
...
@@ -162,7 +174,7 @@ func (e *Etcd) Select() (bool, error) {
...
@@ -162,7 +174,7 @@ func (e *Etcd) Select() (bool, error) {
if
err
!=
nil
{
if
err
!=
nil
{
return
false
,
err
return
false
,
err
}
}
log
.
Info
f
(
"Successfully acquired lock at %s."
,
initLockPath
)
log
.
Info
(
"Successfully acquired lock"
,
log
.
Ctx
{
"lock path"
:
initLockPath
}
)
get
:=
clientv3
.
OpGet
(
initDonePath
)
get
:=
clientv3
.
OpGet
(
initDonePath
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
e
.
timeout
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
e
.
timeout
)
...
@@ -181,17 +193,17 @@ func (e *Etcd) Select() (bool, error) {
...
@@ -181,17 +193,17 @@ func (e *Etcd) Select() (bool, error) {
if
len
(
resp
.
Kvs
)
==
0
{
if
len
(
resp
.
Kvs
)
==
0
{
// Key value not set, select current trainer.
// Key value not set, select current trainer.
e
.
lock
=
lock
e
.
lock
=
lock
log
.
Info
ln
(
"Trainer selected."
)
log
.
Info
(
"Trainer selected."
)
return
true
,
nil
return
true
,
nil
}
}
if
string
(
resp
.
Kvs
[
0
]
.
Value
)
==
initDoneVal
{
if
string
(
resp
.
Kvs
[
0
]
.
Value
)
==
initDoneVal
{
log
.
Info
ln
(
"Initialization is already done."
)
log
.
Info
(
"Initialization is already done."
)
ctx
,
cancel
=
context
.
WithTimeout
(
context
.
Background
(),
e
.
timeout
)
ctx
,
cancel
=
context
.
WithTimeout
(
context
.
Background
(),
e
.
timeout
)
err
=
lock
.
Unlock
(
ctx
)
err
=
lock
.
Unlock
(
ctx
)
cancel
()
cancel
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"error unlocking"
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
return
false
,
nil
return
false
,
nil
}
}
...
@@ -221,7 +233,7 @@ func (e *Etcd) Done() error {
...
@@ -221,7 +233,7 @@ func (e *Etcd) Done() error {
err
=
e
.
lock
.
Unlock
(
ctx
)
err
=
e
.
lock
.
Unlock
(
ctx
)
cancel
()
cancel
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"error unlocking"
,
log
.
Ctx
{
"error"
:
err
}
)
}
else
{
}
else
{
e
.
lock
=
nil
e
.
lock
=
nil
}
}
...
@@ -244,7 +256,7 @@ func (e *Etcd) Close() error {
...
@@ -244,7 +256,7 @@ func (e *Etcd) Close() error {
cErr
:=
e
.
client
.
Close
()
cErr
:=
e
.
client
.
Close
()
if
cErr
!=
nil
{
if
cErr
!=
nil
{
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
cErr
)
log
.
Error
(
"error closing etcd client"
,
log
.
Ctx
{
"error"
:
cErr
}
)
return
err
return
err
}
}
return
cErr
return
cErr
...
...
go/pserver/etcd_client.go
浏览文件 @
0678073c
...
@@ -24,7 +24,7 @@ import (
...
@@ -24,7 +24,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/clientv3/concurrency"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
const
(
const
(
...
@@ -82,19 +82,19 @@ func (e *EtcdClient) Register(port int) (int, error) {
...
@@ -82,19 +82,19 @@ func (e *EtcdClient) Register(port int) (int, error) {
DialTimeout
:
e
.
dialTimeout
,
DialTimeout
:
e
.
dialTimeout
,
})
})
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
f
(
"connect to etcd error: %v"
,
err
)
log
.
Error
(
"connect to etcd error"
,
log
.
Ctx
{
"error"
:
err
}
)
time
.
Sleep
(
retryTimeout
)
time
.
Sleep
(
retryTimeout
)
continue
continue
}
}
e
.
client
=
cli
e
.
client
=
cli
sess
,
err
:=
concurrency
.
NewSession
(
cli
,
concurrency
.
WithTTL
(
e
.
ttlSec
))
sess
,
err
:=
concurrency
.
NewSession
(
cli
,
concurrency
.
WithTTL
(
e
.
ttlSec
))
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
f
(
"create etcd session error: %v"
,
err
)
log
.
Error
(
"create etcd session error"
,
log
.
Ctx
{
"error"
:
err
}
)
time
.
Sleep
(
retryTimeout
)
time
.
Sleep
(
retryTimeout
)
continue
continue
}
}
e
.
sess
=
sess
e
.
sess
=
sess
log
.
Debug
f
(
"inited client to %s"
,
e
.
endpoints
)
log
.
Debug
(
"connected to etcd"
,
log
.
Ctx
{
"endpoint"
:
e
.
endpoints
}
)
break
break
}
}
// init /ps_desired using transaction, for multiple pservers may want to write
// init /ps_desired using transaction, for multiple pservers may want to write
...
@@ -104,7 +104,7 @@ func (e *EtcdClient) Register(port int) (int, error) {
...
@@ -104,7 +104,7 @@ func (e *EtcdClient) Register(port int) (int, error) {
_
,
err
:=
e
.
initDesiredPservers
(
ctx
,
e
.
numPservers
)
_
,
err
:=
e
.
initDesiredPservers
(
ctx
,
e
.
numPservers
)
cancel
()
cancel
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Warn
(
err
)
log
.
Warn
(
"pserver init error"
,
log
.
Ctx
{
"error"
:
err
,
"num pservers"
:
e
.
numPservers
}
)
time
.
Sleep
(
retryTimeout
)
time
.
Sleep
(
retryTimeout
)
continue
continue
}
}
...
@@ -119,14 +119,17 @@ func (e *EtcdClient) Register(port int) (int, error) {
...
@@ -119,14 +119,17 @@ func (e *EtcdClient) Register(port int) (int, error) {
resp
,
err
:=
e
.
client
.
Get
(
ctx
,
PsDesired
)
resp
,
err
:=
e
.
client
.
Get
(
ctx
,
PsDesired
)
cancel
()
cancel
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
f
(
"getting %s error: %v"
,
PsDesired
,
err
)
log
.
Error
(
"get etcd key error"
,
log
.
Ctx
{
"key"
:
PsDesired
,
"error"
:
err
}
)
time
.
Sleep
(
retryTimeout
)
time
.
Sleep
(
retryTimeout
)
continue
continue
}
}
if
len
(
resp
.
Kvs
)
!=
0
{
if
len
(
resp
.
Kvs
)
!=
0
{
e
.
desired
,
err
=
strconv
.
Atoi
(
string
(
resp
.
Kvs
[
0
]
.
Value
))
e
.
desired
,
err
=
strconv
.
Atoi
(
string
(
resp
.
Kvs
[
0
]
.
Value
))
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Errorf
(
"value of %s invalid %v
\n
"
,
PsDesired
,
err
)
log
.
Error
(
"psDesired atoi error"
,
log
.
Ctx
{
"error"
:
err
,
"value"
:
string
(
resp
.
Kvs
[
0
]
.
Value
)},
)
time
.
Sleep
(
retryTimeout
)
time
.
Sleep
(
retryTimeout
)
// NOTE: wait util ps_desired value change
// NOTE: wait util ps_desired value change
continue
continue
...
@@ -143,7 +146,7 @@ func (e *EtcdClient) Register(port int) (int, error) {
...
@@ -143,7 +146,7 @@ func (e *EtcdClient) Register(port int) (int, error) {
pserverIdx
,
err
=
e
.
registerPserverEtcd
(
ctx
,
port
)
pserverIdx
,
err
=
e
.
registerPserverEtcd
(
ctx
,
port
)
cancel
()
cancel
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Warn
(
err
)
log
.
Warn
(
"register pserver on etcd error"
,
log
.
Ctx
{
"error"
:
err
}
)
time
.
Sleep
(
retryTimeout
)
time
.
Sleep
(
retryTimeout
)
continue
continue
}
}
...
@@ -170,16 +173,17 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context, port int) (int, er
...
@@ -170,16 +173,17 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context, port int) (int, er
registered
:=
false
registered
:=
false
for
i
:=
0
;
i
<
e
.
desired
;
i
++
{
for
i
:=
0
;
i
<
e
.
desired
;
i
++
{
psKey
:=
PsPath
+
strconv
.
Itoa
(
i
)
psKey
:=
PsPath
+
strconv
.
Itoa
(
i
)
log
.
Debugf
(
"checking %s"
,
psKey
)
ps
:=
c
.
Get
(
psKey
)
ps
:=
c
.
Get
(
psKey
)
log
.
Debugf
(
"got value (%s) for key: %s"
,
ps
,
psKey
)
log
.
Debug
(
"register pserver got value"
,
log
.
Ctx
{
"value"
:
ps
,
"key"
:
psKey
},
)
if
ps
==
""
{
if
ps
==
""
{
// find the first id and write info
// find the first id and write info
pserverAddr
:=
e
.
externalIP
+
":"
+
strconv
.
Itoa
(
port
)
pserverAddr
:=
e
.
externalIP
+
":"
+
strconv
.
Itoa
(
port
)
c
.
Put
(
psKey
,
pserverAddr
,
clientv3
.
WithLease
(
e
.
sess
.
Lease
()))
c
.
Put
(
psKey
,
pserverAddr
,
clientv3
.
WithLease
(
e
.
sess
.
Lease
()))
log
.
Debugf
(
"set pserver node %s with value %s"
,
psKey
,
pserverAddr
)
log
.
Debug
(
"register finished"
,
log
.
Ctx
{
"key"
:
psKey
,
"value"
:
pserverAddr
})
log
.
Debug
(
"register finished"
)
idx
=
i
idx
=
i
registered
=
true
registered
=
true
break
break
...
@@ -239,7 +243,7 @@ func (e *EtcdClient) Shutdown() error {
...
@@ -239,7 +243,7 @@ func (e *EtcdClient) Shutdown() error {
newErr
:=
e
.
client
.
Close
()
newErr
:=
e
.
client
.
Close
()
if
newErr
!=
nil
{
if
newErr
!=
nil
{
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
newErr
)
log
.
Error
(
"shutdown error"
,
log
.
Ctx
{
"error"
:
newErr
}
)
}
else
{
}
else
{
err
=
newErr
err
=
newErr
}
}
...
...
go/pserver/optimizer.go
浏览文件 @
0678073c
...
@@ -25,7 +25,7 @@ import (
...
@@ -25,7 +25,7 @@ import (
"fmt"
"fmt"
"unsafe"
"unsafe"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
type
optimizer
struct
{
type
optimizer
struct
{
...
@@ -56,12 +56,12 @@ func newOptimizer(paramWithConfigs ParameterWithConfig, State []byte) *optimizer
...
@@ -56,12 +56,12 @@ func newOptimizer(paramWithConfigs ParameterWithConfig, State []byte) *optimizer
c
:=
paramWithConfigs
.
Config
c
:=
paramWithConfigs
.
Config
s
:=
State
s
:=
State
paramBufferSize
:=
C
.
size_t
(
len
(
p
.
Content
))
paramBufferSize
:=
C
.
size_t
(
len
(
p
.
Content
))
log
.
WithFields
(
log
.
Fields
{
log
.
Info
(
"New Optimizer Created with config"
,
log
.
Ctx
{
"ElementType"
:
p
.
ElementType
,
"ElementType"
:
p
.
ElementType
,
"ParamSize"
:
paramBufferSize
,
"ParamSize"
:
paramBufferSize
,
"ConfigSize"
:
len
(
c
),
"ConfigSize"
:
len
(
c
),
"StateSize"
:
len
(
s
),
"StateSize"
:
len
(
s
),
})
.
Info
(
"New Optimizer Created with config:"
)
})
var
cbuffer
unsafe
.
Pointer
var
cbuffer
unsafe
.
Pointer
cbuffer
=
C
.
malloc
(
paramBufferSize
)
cbuffer
=
C
.
malloc
(
paramBufferSize
)
...
...
go/pserver/service.go
浏览文件 @
0678073c
...
@@ -32,7 +32,7 @@ import (
...
@@ -32,7 +32,7 @@ import (
uuid
"github.com/satori/go.uuid"
uuid
"github.com/satori/go.uuid"
log
"github.com/
sirupsen/logrus
"
log
"github.com/
inconshreveable/log15
"
)
)
// ElementType is the type of elements of a Parameter.
// ElementType is the type of elements of a Parameter.
...
@@ -209,7 +209,7 @@ func (s *Service) FinishInitParams(_ int, _ *int) error {
...
@@ -209,7 +209,7 @@ func (s *Service) FinishInitParams(_ int, _ *int) error {
for
range
t
{
for
range
t
{
err
:=
s
.
checkpoint
()
err
:=
s
.
checkpoint
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
err
)
log
.
Error
(
"finish init params error"
,
log
.
Ctx
{
"error"
:
err
}
)
}
}
}
}
}()
}()
...
@@ -262,7 +262,7 @@ func (s *Service) GetParam(name string, parameter *Parameter) error {
...
@@ -262,7 +262,7 @@ func (s *Service) GetParam(name string, parameter *Parameter) error {
func
traceTime
(
start
time
.
Time
,
name
string
)
{
func
traceTime
(
start
time
.
Time
,
name
string
)
{
elapsed
:=
time
.
Since
(
start
)
elapsed
:=
time
.
Since
(
start
)
log
.
Info
f
(
"%s took %v"
,
name
,
elapsed
)
log
.
Info
(
"time elapsed"
,
log
.
Ctx
{
"name"
:
name
,
"elapsed"
:
elapsed
}
)
}
}
// checkpoint saves checkpoint to disk.
// checkpoint saves checkpoint to disk.
...
@@ -270,7 +270,7 @@ func traceTime(start time.Time, name string) {
...
@@ -270,7 +270,7 @@ func traceTime(start time.Time, name string) {
// checkpoint should be only called after the parameters are
// checkpoint should be only called after the parameters are
// initialized.
// initialized.
func
(
s
*
Service
)
checkpoint
()
(
err
error
)
{
func
(
s
*
Service
)
checkpoint
()
(
err
error
)
{
log
.
Info
ln
(
"Begin save checkpoint."
)
log
.
Info
(
"Begin save checkpoint."
)
defer
traceTime
(
time
.
Now
(),
"save checkpoint"
)
defer
traceTime
(
time
.
Now
(),
"save checkpoint"
)
s
.
mu
.
Lock
()
s
.
mu
.
Lock
()
...
@@ -315,7 +315,7 @@ func (s *Service) checkpoint() (err error) {
...
@@ -315,7 +315,7 @@ func (s *Service) checkpoint() (err error) {
closeErr
:=
f
.
Close
()
closeErr
:=
f
.
Close
()
if
closeErr
!=
nil
{
if
closeErr
!=
nil
{
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
ln
(
closeErr
)
log
.
Error
(
"error close checkpoint file"
,
log
.
Ctx
{
"error"
:
closeErr
}
)
}
else
{
}
else
{
// Set closeErr as return value.
// Set closeErr as return value.
err
=
closeErr
err
=
closeErr
...
@@ -336,7 +336,7 @@ func (s *Service) checkpoint() (err error) {
...
@@ -336,7 +336,7 @@ func (s *Service) checkpoint() (err error) {
oldMeta
,
err
:=
loadMeta
(
s
.
client
,
s
.
idx
)
oldMeta
,
err
:=
loadMeta
(
s
.
client
,
s
.
idx
)
if
err
==
ErrCheckpointNotFound
{
if
err
==
ErrCheckpointNotFound
{
log
.
Info
ln
(
"Do not have existing checkpoint."
)
log
.
Info
(
"Do not have existing checkpoint."
)
err
=
nil
err
=
nil
}
}
...
@@ -368,7 +368,7 @@ func (s *Service) checkpoint() (err error) {
...
@@ -368,7 +368,7 @@ func (s *Service) checkpoint() (err error) {
if
rmErr
!=
nil
{
if
rmErr
!=
nil
{
// log error, but still treat checkpoint as
// log error, but still treat checkpoint as
// successful.
// successful.
log
.
Error
ln
(
rmErr
)
log
.
Error
(
"remove old meta file error"
,
log
.
Ctx
{
"error"
:
rmErr
}
)
}
}
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录