diff --git a/go/master/c/client.go b/go/master/c/client.go index 9a59337108d1aa33929abb480af686a96514655b..9a3960d59cd950ba68213ac53a51bfc4e68c0546 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -123,7 +123,8 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int } err := c.SetDataset(paths) if err != nil { - log.Error("error set dataset", log.Ctx{"error": err}) + log.Error("error set dataset", + log.Ctx{"error": err, "paths": paths}) return C.PADDLE_MASTER_ERROR } diff --git a/go/master/client.go b/go/master/client.go index 5d657548c9039dfdacf61dd1145deb9777596d9f..7bcf86955348fad14cbe86e2180539372fcb82cf 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -121,6 +121,7 @@ func (c *Client) StartGetRecords(passID int) { } func (c *Client) getRecords(passID int) { + i := 0 for { t, err := c.getTask(passID) if err != nil { @@ -130,12 +131,20 @@ func (c *Client) getRecords(passID int) { c.ch <- record{nil, err} break } - if err.Error() == ErrPassAfter.Error() { - // wait util last pass finishes - time.Sleep(time.Second * 3) - continue + + if i%60 == 0 { + log.Debug("getTask of passID error.", + log.Ctx{"error": err, "passID": passID}) + i = 0 } - log.Error("getTask error.", log.Ctx{"error": err}) + + // if err.Error() == ErrPassAfter.Error() + // wait util last pass finishes + // if other error such as network error + // wait to reconnect or task time out + time.Sleep(time.Second * 3) + i += 3 + continue } for _, chunk := range t.Chunks { diff --git a/go/master/client_test.go b/go/master/client_test.go index 79b9cc844d1ff938915a622bf19a7d772682becf..1963dbfd732605d3b2612f10a047c3a03faa53be 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -117,6 +117,7 @@ func TestNextRecord(t *testing.T) { if e != nil { panic(e) } + // test for n passes for pass := 0; pass < 10; pass++ { c.StartGetRecords(pass) diff --git a/python/paddle/v2/reader/creator.py b/python/paddle/v2/reader/creator.py index 97e844b92c77a7c58105dc5df2b4092fa5571d6f..421f6c933d7032e4103f504fc509e2d5c89149b2 100644 --- a/python/paddle/v2/reader/creator.py +++ b/python/paddle/v2/reader/creator.py @@ -61,7 +61,7 @@ def recordio(paths, buf_size=100): """ Creates a data reader from given RecordIO file paths separated by ",", glob pattern is supported. - :path: path of recordio files. + :path: path of recordio files, can be a string or a string list. :returns: data reader of recordio files. """ @@ -92,7 +92,7 @@ def cloud_reader(paths, etcd_endpoints, timeout_sec=5, buf_size=64): """ Create a data reader that yield a record one by one from the paths: - :path: path of recordio files. + :paths: path of recordio files, can be a string or a string list. :etcd_endpoints: the endpoints for etcd cluster :returns: data reader of recordio files. @@ -107,7 +107,12 @@ def cloud_reader(paths, etcd_endpoints, timeout_sec=5, buf_size=64): import cPickle as pickle import paddle.v2.master as master c = master.client(etcd_endpoints, timeout_sec, buf_size) - c.set_dataset(paths) + + if isinstance(paths, basestring): + path = [paths] + else: + path = paths + c.set_dataset(path) def reader(): global pass_num