From 8c9119afcd63eedefa93d08339c773a128a285a5 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Fri, 27 Oct 2017 03:45:18 -0500 Subject: [PATCH] add logs and fix a bug (#5074) add logs and fix a python path bug --- go/master/c/client.go | 3 ++- go/master/client.go | 19 ++++++++++++++----- go/master/client_test.go | 1 + python/paddle/v2/reader/creator.py | 11 ++++++++--- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/go/master/c/client.go b/go/master/c/client.go index 9a59337108..9a3960d59c 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -123,7 +123,8 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int } err := c.SetDataset(paths) if err != nil { - log.Error("error set dataset", log.Ctx{"error": err}) + log.Error("error set dataset", + log.Ctx{"error": err, "paths": paths}) return C.PADDLE_MASTER_ERROR } diff --git a/go/master/client.go b/go/master/client.go index 5d657548c9..7bcf869553 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -121,6 +121,7 @@ func (c *Client) StartGetRecords(passID int) { } func (c *Client) getRecords(passID int) { + i := 0 for { t, err := c.getTask(passID) if err != nil { @@ -130,12 +131,20 @@ func (c *Client) getRecords(passID int) { c.ch <- record{nil, err} break } - if err.Error() == ErrPassAfter.Error() { - // wait util last pass finishes - time.Sleep(time.Second * 3) - continue + + if i%60 == 0 { + log.Debug("getTask of passID error.", + log.Ctx{"error": err, "passID": passID}) + i = 0 } - log.Error("getTask error.", log.Ctx{"error": err}) + + // if err.Error() == ErrPassAfter.Error() + // wait util last pass finishes + // if other error such as network error + // wait to reconnect or task time out + time.Sleep(time.Second * 3) + i += 3 + continue } for _, chunk := range t.Chunks { diff --git a/go/master/client_test.go b/go/master/client_test.go index 79b9cc844d..1963dbfd73 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -117,6 +117,7 @@ func TestNextRecord(t *testing.T) { if e != nil { panic(e) } + // test for n passes for pass := 0; pass < 10; pass++ { c.StartGetRecords(pass) diff --git a/python/paddle/v2/reader/creator.py b/python/paddle/v2/reader/creator.py index 97e844b92c..421f6c933d 100644 --- a/python/paddle/v2/reader/creator.py +++ b/python/paddle/v2/reader/creator.py @@ -61,7 +61,7 @@ def recordio(paths, buf_size=100): """ Creates a data reader from given RecordIO file paths separated by ",", glob pattern is supported. - :path: path of recordio files. + :path: path of recordio files, can be a string or a string list. :returns: data reader of recordio files. """ @@ -92,7 +92,7 @@ def cloud_reader(paths, etcd_endpoints, timeout_sec=5, buf_size=64): """ Create a data reader that yield a record one by one from the paths: - :path: path of recordio files. + :paths: path of recordio files, can be a string or a string list. :etcd_endpoints: the endpoints for etcd cluster :returns: data reader of recordio files. @@ -107,7 +107,12 @@ def cloud_reader(paths, etcd_endpoints, timeout_sec=5, buf_size=64): import cPickle as pickle import paddle.v2.master as master c = master.client(etcd_endpoints, timeout_sec, buf_size) - c.set_dataset(paths) + + if isinstance(paths, basestring): + path = [paths] + else: + path = paths + c.set_dataset(path) def reader(): global pass_num -- GitLab