提交 5ba1e1e1 编写于 作者: T tensor-tang

Merge remote-tracking branch 'upstream/develop' into mkldnn_bn

......@@ -8,7 +8,7 @@ ExternalProject_Add(
extern_eigen3
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/RLovelett/eigen.git"
GIT_TAG 4e79cb69b9425f5f8c3a84be4350d4ab75b5fd9d
GIT_TAG 70661066beef694cadf6c304d0d07e0758825c10
PREFIX ${EIGEN_SOURCE_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
......
INCLUDE(ExternalProject)
include(ExternalProject)
SET(NCCL_SOURCE_DIR ${THIRD_PARTY_PATH}/nccl)
INCLUDE_DIRECTORIES(${NCCL_SOURCE_DIR}/src/extern_nccl/src)
set(NCCL_SOURCE_DIR ${THIRD_PARTY_PATH}/nccl)
include_directories(${NCCL_SOURCE_DIR}/src/extern_nccl/src)
if(WITH_DSO)
# If we use DSO, we do not build nccl, just download the dependencies
......@@ -12,39 +11,39 @@ if(WITH_DSO)
set(NCCL_INSTALL_DIR "")
else()
# otherwise, we build nccl and link it.
set(NCCL_INSTALL_DIR ${THIRD_PARTY_PATH}/install/nccl)
# Note: cuda 8.0 is needed to make nccl
# When cuda is not installed on the system directory, need to set CUDA_HOME to your cuda root
set(NCCL_BUILD_COMMAND "make -j 8")
set(NCCL_INSTALL_COMMAND "make install")
SET(NCCL_INSTALL_DIR ${THIRD_PARTY_PATH}/install/nccl)
set(NCCL_INSTALL_COMMAND "make install PREFIX=${NCCL_INSTALL_DIR}")
endif()
ExternalProject_Add(
extern_nccl
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/NVIDIA/nccl.git"
GIT_TAG "v1.3.4-1"
PREFIX "${NCCL_SOURCE_DIR}"
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND "${NCCL_BUILD_COMMAND}"
INSTALL_COMMAND "${NCCL_INSTALL_COMMAND}"
INSTALL_DIR "${NCCL_INSTALL_DIR}"
TEST_COMMAND ""
extern_nccl
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/NVIDIA/nccl.git"
GIT_TAG "v1.3.4-1"
PREFIX "${NCCL_SOURCE_DIR}"
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND "${NCCL_BUILD_COMMAND}"
INSTALL_COMMAND "${NCCL_INSTALL_COMMAND}"
INSTALL_DIR "${NCCL_INSTALL_DIR}"
TEST_COMMAND ""
)
if (WITH_DSO)
if (${CMAKE_VERSION} VERSION_LESS "3.3.0")
set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/lib_any_dummy.c)
file(WRITE ${dummyfile} "const char * dummy_any = \"${dummyfile}\";")
if(WITH_DSO)
if(${CMAKE_VERSION} VERSION_LESS "3.3.0")
set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/lib_nccl_dummy.c)
file(WRITE ${dummyfile} "const char * dummy_nccl = \"${dummyfile}\";")
add_library(nccl STATIC ${dummyfile})
else()
add_library(nccl INTERFACE)
endif()
else()
ADD_LIBRARY(nccl STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET nccl PROPERTY IMPORTED_LOCATION
${NCCL_INSTALL_DIR}/lib/libnccl.a)
add_library(nccl STATIC IMPORTED GLOBAL)
set_property(TARGET nccl PROPERTY IMPORTED_LOCATION
${NCCL_INSTALL_DIR}/lib/libnccl_static.a)
endif()
add_dependencies(nccl extern_nccl)
LIST(APPEND external_project_dependencies nccl)
## Survey on Graph
Neural network framework often provides symbolic API for users to write network topology conveniently. This doc manily focus on symbolic API in most popular neural network frameworks, and try to find out how to parse symbolic configuration to a portable file, such as protobuf or json.
### Mxnet
The core concept of symbolic API is `Symbol`. Mxnet implements `Symbol` class in C++, and export to Python using C-API. Please refer to the comments in Mxnet:
`Symbol` is help class used to represent the operator node in Graph.
`Symbol` acts as an interface for building graphs from different components like Variable, Functor and Group. `Symbol` is also exported to python front-end (while Graph is not) to enable quick test and deployment. Conceptually, symbol is the final operation of a graph and thus including all the information required (the graph) to evaluate its output value.
A simple network topology wrote by Symbol is as follows:
```python
def get_symbol(num_classes=10, **kwargs):
data = mx.symbol.Variable('data')
data = mx.symbol.Flatten(data=data)
fc1 = mx.symbol.FullyConnected(data = data, name='fc1', num_hidden=128)
act1 = mx.symbol.Activation(data = fc1, name='relu1', act_type="relu")
fc2 = mx.symbol.FullyConnected(data = act1, name = 'fc2', num_hidden = 64)
act2 = mx.symbol.Activation(data = fc2, name='relu2', act_type="relu")
fc3 = mx.symbol.FullyConnected(data = act2, name='fc3', num_hidden=num_classes)
mlp = mx.symbol.SoftmaxOutput(data = fc3, name = 'softmax')
return mlp
```
Varible here is actually a Symbol. Every basic Symbol will correspond to one Node, and every Node has its own NodeAttr. There is a op field in NodeAttr class, when a Symbol represents Variable(often input data), the op field is null.
Symbol contains a data member, std::vector<NodeEntry> outputs, and NodeEntry cantains a poniter to Node. We can follow the Node pointer to get all the Graph.
And Symbol can be saved to a Json file.
Here is a detailed example:
```
>>> import mxnet as mx
>>> data = mx.symbol.Variable('data')
>>> print data.debug_str()
Variable:data
>>> data = mx.symbol.Flatten(data=data)
>>> print data.debug_str()
Symbol Outputs:
output[0]=flatten0(0)
Variable:data
--------------------
Op:Flatten, Name=flatten0
Inputs:
arg[0]=data(0) version=0
>>> fc1 = mx.symbol.FullyConnected(data = data, name='fc1', num_hidden=128)
>>> print fc1.debug_str()
Symbol Outputs:
output[0]=fc1(0)
Variable:data
--------------------
Op:Flatten, Name=flatten0
Inputs:
arg[0]=data(0) version=0
Variable:fc1_weight
Variable:fc1_bias
--------------------
Op:FullyConnected, Name=fc1
Inputs:
arg[0]=flatten0(0)
arg[1]=fc1_weight(0) version=0
arg[2]=fc1_bias(0) version=0
Attrs:
num_hidden=128
```
### TensorFlow
The core concept of symbolic API is `Tensor`. Tensorflow defines `Tensor` in Python. Please refer to the comments in TensorFlow:
A `Tensor` is a symbolic handle to one of the outputs of an `Operation`. It does not hold the values of that operation's output, but instead provides a means of computing those values in a TensorFlow [Session](https://www.tensorflow.org/api_docs/python/tf/Session).
A simple example is as follows:
```python
# Build a dataflow graph.
c = tf.constant([[1.0, 2.0], [3.0, 4.0]])
d = tf.constant([[1.0, 1.0], [0.0, 1.0]])
e = tf.matmul(c, d)
# Construct a `Session` to execute the graph.
sess = tf.Session()
# Execute the graph and store the value that `e` represents in `result`.
result = sess.run(e)
```
The main method of `Tensor` is as follows:
```python
@property
def op(self):
"""The `Operation` that produces this tensor as an output."""
return self._op
@property
def dtype(self):
"""The `DType` of elements in this tensor."""
return self._dtype
@property
def graph(self):
"""The `Graph` that contains this tensor."""
return self._op.graph
@property
def name(self):
"""The string name of this tensor."""
if not self._op.name:
raise ValueError("Operation was not named: %s" % self._op)
return "%s:%d" % (self._op.name, self._value_index)
@property
def device(self):
"""The name of the device on which this tensor will be produced, or None."""
return self._op.device
```
Tensor can be taken as target to run by session. Tensor contains all the information of Graph, and tracks data dependency.
Here is a detailed example:
```
>>> import tensorflow as tf
>>> c = tf.constant([[1.0, 2.0], [3.0, 4.0]])
>>> print c.graph
<tensorflow.python.framework.ops.Graph object at 0x10f256d50>
>>> d = tf.constant([[1.0, 1.0], [0.0, 1.0]])
>>> print d.graph
<tensorflow.python.framework.ops.Graph object at 0x10f256d50>
>>> e = tf.matmul(c, d)
>>> print e.graph
<tensorflow.python.framework.ops.Graph object at 0x10f256d50>
```
### Dynet
The core concept of symbolic API is `Expression`, and Dynet defines `Expression` class in C++.
A simple example is as follows:
```cpp
ComputationGraph cg;
Expression W = parameter(cg, pW);
Expression in = input(cg, xs[i]);
Expression label = input(cg, ys[i]);
Expression pred = W * in;
Expression loss = square(pred - label);
```
The input data and parameter are also represented by Expression. Every basci Expression corresponds to a Node. And input data is also a Node.
Expression has a data member ComputationGraph, and ComputationGraph will be modified in users' configuring process. Expression can be a running target, beacuse Expression contains all dependency.
Here is a detailed example:
write topology in C++
```
ComputationGraph cg;
Expression W = parameter(cg, pW);
cg.print_graphviz();
Expression pred = W * xs[i];
cg.print_graphviz();
Expression loss = square(pred - ys[i]);
cg.print_graphviz();
```
compile and print
```
# first print
digraph G {
rankdir=LR;
nodesep=.05;
N0 [label="v0 = parameters({1}) @ 0x7ffe4de00110"];
}
# second print
digraph G {
rankdir=LR;
nodesep=.05;
N0 [label="v0 = parameters({1}) @ 0x7ffe4de00110"];
N1 [label="v1 = v0 * -0.98"];
N0 -> N1;
}
# third print
digraph G {
rankdir=LR;
nodesep=.05;
N0 [label="v0 = parameters({1}) @ 0x7ffe4de00110"];
N1 [label="v1 = v0 * -0.98"];
N0 -> N1;
N2 [label="v2 = -1.88387 - v1"];
N1 -> N2;
N3 [label="v3 = -v2"];
N2 -> N3;
N4 [label="v4 = square(v3)"];
N3 -> N4;
}
```
### Conclusion
Actually, Symbol/Tensor/Expression in Mxnet/TensorFlow/Dynet are the same level concepts. We use a unified name Expression here, this level concept has following features:
- Users wirte topoloy with symbolic API, and all return value is Expression, including input data and parameter.
- Expression corresponds with a global Graph, and Expression can also be composed.
- Expression tracks all dependency and can be taken as a run target
# Design Doc: Model Format
## Motivation
The model is the output of training process. One complete model consists of two parts, namely, the **topology** and the **parameters**. To support industrial deployment, we need to make the model format must be self-completed and do not expose any training source code.
As a result, In PaddlePaddle, the **topology** represents as a [ProgramDesc](https://github.com/PaddlePaddle/Paddle/blob/1c0a4c901c9fc881d120249c703b15d1c50dae7d/doc/design/program.md), which describes the model structure. The **parameters** contain all the trainable weights in the model, we must support large size parameter, and efficient serialization/deserialization.
## Implementation
The topology is saved as a plain text, in detail, a self-contain protobuf file.
The parameters are saved as a binary file. As we all know, the protobuf message has the limits of [64M size](https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.coded_stream#CodedInputStream.SetTotalBytesLimit.details). We do a (benchmark experiment)[https://github.com/PaddlePaddle/Paddle/pull/4610], its result shows protobuf is not fit in this scene.
As a result, we design a particular format for tensor serialization. By default, arbitrary tensor in Paddle is a [LoDTensor](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/lod_tensor.md), and has a description information proto of (LoDTensorDesc)[https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/framework.proto#L99]. We save the DescProto as the byte string header, it contains the necessary information, such as the `dims`, the `name` of the tensor, and the `LoD` information in [LoDTensor](https://github.com/PaddlePaddle/Paddle/blob/1c0a4c901c9fc881d120249c703b15d1c50dae7d/paddle/framework/lod_tensor.md). Tensor stores value in a continuous memory buffer, for speed we dump the raw memory to disk and save it as the byte string content. So, the binary format of one tensor is,
|HeaderLength|ContentLength|**LoDTensorDesc**|**TensorValue**|
In detail, tensor's byte view as the table shows. Note that all the signed value written in little-endian.
```text
[offset] [type] [description]
0004 4 bytes integer HeaderLength, the length of LoDTensorDesc
0008 4 bytes integer ContentLength, the length of LodTensor Buffer
0009 1 bytes char TensorDesc
00010 1 bytes char TensorDesc
...
00100 1 bytes char TensorValue
00101 1 bytes char TensorValue
00102 1 bytes char TensorValue ..
...
```
## Summary
We introduce the model format, the `ProgramDesc` describe the **topology**, and a bunch of particular format binary tensors describes the **parameters**.
......@@ -65,20 +65,6 @@ class Optimizer(object):
def __init__(self):
pass
def create_backward_pass(self, loss, parameter_list=None):
"""
create and add gradient Operators in BlockDesc to Compute gradients of `loss`
for parameters in parameter_list
Args:
loss: an variable generated by cost function.
parameter_list: parameters that need to compute gradient and update to optimize the lost.
Returns:
list of (parameters, gradients) pair.
"""
return None
def create_optimization_pass(self, parameters_and_grads):
"""Add optimization operators to update gradients to variables.
......@@ -93,7 +79,7 @@ class Optimizer(object):
def minimize(self, loss, parameter_list):
"""Add operations to minimize `loss` by updating `parameter_list`.
This method combines interface `create_backward_pass()` and
This method combines interface `append_backward_ops()` and
`create_optimization_pass()` into one.
"""
params_grads = self.create_backward_pass(loss, parameter_list)
......
......@@ -25,9 +25,8 @@ import (
"strings"
"time"
log "github.com/inconshreveable/log15"
"github.com/namsral/flag"
log "github.com/sirupsen/logrus"
"github.com/topicai/candy"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
......@@ -41,16 +40,20 @@ func main() {
taskTimeoutMax := flag.Int("task-timeout-max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk-per-task", 10, "chunk per task.")
logLevel := flag.String("log-level", "info",
"log level, possible values: debug, info, warning, error, fatal, panic")
"log level, possible values: debug, info, warn, error, crit")
flag.Parse()
level, e := log.ParseLevel(*logLevel)
candy.Must(e)
lvl, err := log.LvlFromString(*logLevel)
if err != nil {
panic(err)
}
log.SetLevel(level)
log.Root().SetHandler(
log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)),
)
if *endpoints == "" {
log.Warningln("-endpoints not set, fault tolerance not be enabled.")
log.Warn("-endpoints not set, fault tolerance not be enabled.")
}
var store master.Store
......@@ -58,23 +61,25 @@ func main() {
eps := strings.Split(*endpoints, ",")
ip, err := networkhelper.GetExternalIP()
if err != nil {
log.Fatal(err)
log.Crit("get external ip error", log.Ctx{"error": err})
panic(err)
}
addr := fmt.Sprintf("%s:%d", ip, *port)
store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec)
if err != nil {
log.Fatal(err)
log.Crit("error creating etcd client.", log.Ctx{"error": err})
panic(err)
}
} else {
store = &master.InMemStore{}
}
shutdown := func() {
log.Infoln("shutting down gracefully")
log.Info("shutting down gracefully")
err := store.Shutdown()
if err != nil {
log.Errorln(err)
log.Error("shutdown error", log.Ctx{"error": err})
}
}
......@@ -86,24 +91,28 @@ func main() {
s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
if err != nil {
log.Fatal(err)
log.Crit("error creating new service.", log.Ctx{"error": err})
panic(err)
}
err = rpc.Register(s)
if err != nil {
log.Fatal(err)
log.Crit("error registering to etcd.", log.Ctx{"error": err})
panic(err)
}
rpc.HandleHTTP()
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
if err != nil {
log.Fatal(err)
log.Crit("error listing to port", log.Ctx{"error": err, "port": *port})
panic(err)
}
go func() {
err = http.Serve(l, nil)
if err != nil {
log.Fatal(err)
log.Crit("error serving HTTP", log.Ctx{"error": err})
panic(err)
}
}()
......
......@@ -27,11 +27,11 @@ import (
"github.com/topicai/candy"
"github.com/PaddlePaddle/Paddle/go/pserver"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
func main() {
port := flag.Int("port", 0, "port of the pserver")
port := flag.Int("port", 8001, "port of the pserver")
index := flag.Int("index", -1, "index of the pserver, set to -1 if use etcd for auto pserver index registry")
etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
"comma separated endpoint string for pserver to connect to etcd")
......@@ -41,13 +41,17 @@ func main() {
checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path")
checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds")
logLevel := flag.String("log-level", "info",
"log level, possible values: debug, info, warning, error, fatal, panic")
"log level, possible values: debug, info, warn, error, crit")
flag.Parse()
level, err := log.ParseLevel(*logLevel)
candy.Must(err)
lvl, err := log.LvlFromString(*logLevel)
if err != nil {
panic(err)
}
log.SetLevel(level)
log.Root().SetHandler(
log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)),
)
var idx int
......@@ -63,7 +67,7 @@ func main() {
cp, err = pserver.LoadCheckpoint(e, idx)
if err != nil {
if err == pserver.ErrCheckpointNotFound {
log.Infof("Could not find the pserver checkpoint.")
log.Info("Could not find the pserver checkpoint.")
} else {
panic(err)
}
......@@ -71,10 +75,10 @@ func main() {
}
shutdown := func() {
log.Infoln("shutting down gracefully")
log.Info("shutting down gracefully")
sErr := e.Shutdown()
if sErr != nil {
log.Errorln(sErr)
log.Error("error shutting down", log.Ctx{"error": sErr})
}
}
......@@ -95,7 +99,7 @@ func main() {
candy.Must(err)
go func() {
log.Infof("start pserver at port %d", *port)
log.Info("starting pserver", log.Ctx{"port": *port})
err = http.Serve(l, nil)
candy.Must(err)
}()
......
hash: 328e7b9b7306b45e7b9879139a9f86698115981f6283032e1312093a6a6ddb04
updated: 2017-10-16T08:00:23.484693528Z
hash: 51d9e2e46d7fd9173ff11ecada40f7b7728756be18d5e2f032535f66465e6e15
updated: 2017-10-24T15:04:09.987751592-07:00
imports:
- name: github.com/alecthomas/gometalinter
version: bae2f1293d092fd8167939d5108d1b025eaef9de
......@@ -99,6 +99,8 @@ imports:
version: d2709f9f1f31ebcda9651b03077758c1f3a0018c
- name: github.com/ghodss/yaml
version: 0ca9ea5df5451ffdf184b4428c902747c2c11cd7
- name: github.com/go-stack/stack
version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf
- name: github.com/gogo/protobuf
version: 909568be09de550ed094403c2bf8a261b5bb730a
subpackages:
......@@ -120,8 +122,14 @@ imports:
- runtime
- runtime/internal
- utilities
- name: github.com/inconshreveable/log15
version: 0decfc6c20d9ca0ad143b0e89dcaa20f810b4fb3
- name: github.com/jonboulle/clockwork
version: 2eee05ed794112d45db504eb05aa693efd2b8b09
- name: github.com/mattn/go-colorable
version: 5411d3eea5978e6cdc258b30de592b60df6aba96
- name: github.com/mattn/go-isatty
version: 57fdcb988a5c543893cc61bce354a6e24ab70022
- name: github.com/matttproud/golang_protobuf_extensions
version: c12348ce28de40eed0136aa2b644d0ee0650e56c
subpackages:
......@@ -179,11 +187,12 @@ imports:
- lex/httplex
- trace
- name: golang.org/x/sys
version: 0f826bdd13b500be0f1d4004938ad978fcc6031e
version: e48874b42435b4347fc52bdee0424a52abc974d7
repo: https://github.com/golang/sys.git
vcs: git
subpackages:
- unix
- windows
- name: golang.org/x/text
version: 836efe42bb4aa16aaa17b9c155d8813d336ed720
repo: https://github.com/golang/text.git
......@@ -222,4 +231,3 @@ testImports:
version: 05e8a0eda380579888eb53c394909df027f06991
subpackages:
- assert
......@@ -26,3 +26,7 @@ import:
version: v1.1.0
- package: github.com/alecthomas/gometalinter
version: v1.2.1
- package: github.com/inconshreveable/log15
version: v2.13
- package: github.com/go-stack/stack
version: v1.6.0
......@@ -35,13 +35,19 @@ import (
"unsafe"
"github.com/PaddlePaddle/Paddle/go/master"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
var mu sync.Mutex
var handleMap = make(map[C.paddle_master_client]*master.Client)
var curHandle C.paddle_master_client
func init() {
log.Root().SetHandler(
log.LvlFilterHandler(log.LvlWarn, log.CallerStackHandler("%+v", log.StderrHandler)),
)
}
func add(c *master.Client) C.paddle_master_client {
mu.Lock()
defer mu.Unlock()
......@@ -117,7 +123,7 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int
}
err := c.SetDataset(paths)
if err != nil {
log.Errorln(err)
log.Error("error set dataset", log.Ctx{"error": err})
return C.PADDLE_MASTER_ERROR
}
......@@ -167,7 +173,7 @@ func paddle_request_save_model(client C.paddle_master_client, trainerID string,
c := get(client)
need, err := c.RequestSaveModel(trainerID, time.Duration(blockMS)*time.Millisecond)
if err != nil {
log.Errorln(err)
log.Error("error request save model", log.Ctx{"error": err})
return C.PADDLE_MASTER_ERROR
}
......
......@@ -21,7 +21,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
"github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
// Client is the client of the master server.
......@@ -75,7 +75,7 @@ func WithEtcd(endpoints []string, timeout time.Duration) func(*Client) error {
for {
err := f()
if err != nil {
log.Warningln(err)
log.Warn("create etcd client error", log.Ctx{"error": err})
} else {
break
}
......@@ -135,13 +135,13 @@ func (c *Client) getRecords(passID int) {
time.Sleep(time.Second * 3)
continue
}
log.Errorf("getTask error: %s", err)
log.Error("getTask error.", log.Ctx{"error": err})
}
for _, chunk := range t.Chunks {
f, e := os.Open(chunk.Path)
if e != nil {
log.Errorln(e)
log.Error("error open chunk", log.Ctx{"error": e})
continue
}
......@@ -152,12 +152,15 @@ func (c *Client) getRecords(passID int) {
if s.Err() != nil {
c.ch <- record{nil, s.Err()}
log.Errorln(err, chunk.Path)
log.Error(
"error scan chunk",
log.Ctx{"error": err, "path": chunk.Path},
)
}
err = f.Close()
if err != nil {
log.Errorln(err)
log.Error("error close record file", log.Ctx{"error": err})
}
}
......@@ -166,7 +169,7 @@ func (c *Client) getRecords(passID int) {
// correct, but a reasonable approximation.
err = c.taskFinished(t.Meta.ID)
if err != nil {
log.Errorln(err)
log.Error("task finish callback error.", log.Ctx{"error": err})
}
}
}
......@@ -179,12 +182,12 @@ func (c *Client) monitorMaster(addrCh <-chan string) {
if curMaster == "" {
err := c.conn.Close()
if err != nil {
log.Errorln(err)
log.Error("close old master addr error", log.Ctx{"error": err})
}
} else {
err := c.conn.Connect(curMaster)
if err != nil {
log.Errorln(err)
log.Error("connect to new master addr error", log.Ctx{"error": err})
// connect to addr failed, set
// to last known addr in order
......
......@@ -25,8 +25,6 @@ import (
"testing"
"time"
log "github.com/sirupsen/logrus"
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
)
......@@ -36,10 +34,6 @@ const (
chunkPerTask = 10
)
func init() {
log.SetLevel(log.ErrorLevel)
}
func TestGetFinishTask(t *testing.T) {
const path = "/tmp/master_client_test_0"
......
......@@ -20,7 +20,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
const (
......@@ -44,7 +44,7 @@ type EtcdClient struct {
// NewEtcdClient creates a new EtcdClient.
func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) {
log.Debugf("Connecting to etcd at %v", endpoints)
log.Debug("Connecting to etcd", log.Ctx{"endpoint": endpoints})
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
......@@ -64,12 +64,12 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
// one master running, but split-brain problem may cause
// multiple master servers running), and the cluster management
// software will kill one of them.
log.Infof("Trying to acquire lock at %s.", lockPath)
log.Info("Trying to acquire lock.", log.Ctx{"path": lockPath})
err = lock.Lock(context.TODO())
if err != nil {
return nil, err
}
log.Infof("Successfully acquired lock at %s.", lockPath)
log.Info("Successfully acquired lock at %s.", log.Ctx{"path": lockPath})
put := clientv3.OpPut(addrPath, addr)
resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit()
......@@ -78,7 +78,8 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
}
if !resp.Succeeded {
log.Fatal("No longer owns the master lock. Exiting.")
log.Crit("No longer owns the master lock. Exiting.")
panic("No longer owns the master lock. Exiting.")
}
e := &EtcdClient{
......@@ -102,7 +103,7 @@ func (e *EtcdClient) Save(state []byte) error {
}
if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock again")
log.Error("No longer owns the lock, trying to lock again")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := e.lock.Lock(ctx)
cancel()
......@@ -116,9 +117,10 @@ func (e *EtcdClient) Save(state []byte) error {
// to kill current master server. The current
// state is not saved, but the trainer's RPC
// call will fail, so the trainer will retry.
log.Fatalf("Could not acquire the lock at %s: %v. Exiting.", e.lockPath, err)
log.Crit("Could not acquire the lock at %s: %v. Exiting.", log.Ctx{"path": e.lockPath, "error": err})
panic("Could not acquire the lock at %s: %v. Exiting.")
}
log.Infof("Successfully acquired lock at %s.", e.lockPath)
log.Info("Successfully acquired lock at %s.", e.lockPath)
return e.Save(state)
}
......@@ -136,7 +138,7 @@ func (e *EtcdClient) Load() ([]byte, error) {
}
if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock and load again.")
log.Error("No longer owns the lock, trying to lock and load again.")
err = e.lock.Lock(context.Background())
if err != nil {
return nil, err
......@@ -163,7 +165,7 @@ func (e *EtcdClient) Shutdown() error {
if err == nil {
err = newErr
} else {
log.Errorln(newErr)
log.Error("shutdown error", log.Ctx{"error": newErr})
}
}
......@@ -192,7 +194,7 @@ func watchKey(c *clientv3.Client, key string, valChan chan<- string) {
for wresp := range rch {
for _, ev := range wresp.Events {
// if received event is DELETE, the value will be an empty string
log.Infof("received event %s, %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
log.Info("received event.", log.Ctx{"type": ev.Type, "key": ev.Kv.Key, "value": ev.Kv.Value})
valChan <- string(ev.Kv.Value)
}
}
......
......@@ -25,7 +25,7 @@ import (
"sync"
"time"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
"github.com/PaddlePaddle/recordio"
)
......@@ -170,11 +170,11 @@ func (s *Service) recover() (bool, error) {
}
if state == nil {
log.Infoln("No state exists, not recovered.")
log.Info("No state exists, not recovered.")
return false, nil
}
log.Infof("Loaded snapshot of size: %d bytes.", len(state))
log.Info("Loaded snapshot.", log.Ctx{"size": len(state)})
gr, err := gzip.NewReader(bytes.NewReader(state))
if err != nil {
return false, err
......@@ -191,11 +191,11 @@ func (s *Service) recover() (bool, error) {
if err != nil {
// Only close failed, recover actually succeed, so
// just log error.
log.Errorln(err)
log.Error("error close recover file.", log.Ctx{"error": err})
}
s.state = tqs
log.WithFields(s.logFields()).Infof("Master recovered from snapshot, scheduling pending task timeout check.")
log.Info("Master recovered from snapshot, scheduling pending task timeout check.", s.logCtx())
for _, t := range s.state.Pending {
time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.Meta.ID, t.Task.Meta.Epoch))
}
......@@ -224,7 +224,7 @@ func (s *Service) snapshot() error {
}
state := buf.Bytes()
log.Infof("Saving snapshot of size: %d bytes.", len(state))
log.Info("Saving snapshot.", log.Ctx{"size bytes": len(state)})
return s.store.Save(state)
}
......@@ -260,7 +260,7 @@ func readChunks(globPaths []string) ([]Chunk, error) {
}
count := index.NumChunks()
log.Infof("readChunks: file %s has %d chunks", path, count)
log.Info("reading chunks.", log.Ctx{"path": path, "num chunks": count})
for i := 0; i < count; i++ {
chunk := Chunk{
Path: path,
......@@ -300,7 +300,7 @@ func (s *Service) SetDataset(globPaths []string, _ *int) error {
err = s.snapshot()
if err != nil {
log.Errorln(err)
log.Error("snapshot error", log.Ctx{"error": err})
return err
}
close(s.ready)
......@@ -320,7 +320,7 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
defer func() {
err := s.snapshot()
if err != nil {
log.Errorln(err)
log.Error("snapshot error", log.Ctx{"error": err})
}
}()
......@@ -328,12 +328,12 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
t.NumFailure++
if t.NumFailure > s.failureMax {
log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
log.Warn("Task failed to many times, discard.", log.Ctx{"task": t.Task, "num failed": t.NumFailure})
s.state.Failed = append(s.state.Failed, t)
return
}
log.Warningf("Task %v failed %d times, re-dispatch.", t.Task, t.NumFailure)
log.Warn("Task failed, re-dispatch.", log.Ctx{"task": t.Task, "num failed": t.NumFailure})
s.state.Todo = append(s.state.Todo, t)
return
}
......@@ -353,8 +353,8 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
}
// must be called with lock held.
func (s *Service) logFields() log.Fields {
return log.Fields{
func (s *Service) logCtx() log.Ctx {
return log.Ctx{
"todoLen": len(s.state.Todo),
"pendingLen": len(s.state.Pending),
"doneLen": len(s.state.Done),
......@@ -383,10 +383,10 @@ func (s *Service) GetTask(passID int, task *Task) error {
if len(s.state.Todo) == 0 {
if len(s.state.Done) == 0 && len(s.state.Pending) == 0 {
log.WithFields(s.logFields()).Warningln("All tasks failed, may start next pass")
log.Warn("All tasks failed, may start next pass", s.logCtx())
return ErrAllTaskFailed
}
log.WithFields(s.logFields()).Warningln("No more available task.")
log.Warn("No more available task.", s.logCtx())
return ErrNoMoreAvailable
}
......@@ -400,8 +400,9 @@ func (s *Service) GetTask(passID int, task *Task) error {
}
*task = t.Task
log.WithFields(s.logFields()).Infof("Task #%v dispatched.", t.Task.Meta)
ctx := s.logCtx()
ctx["task meta"] = t.Task.Meta
log.Info("Task dispatched.", ctx)
time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.Meta.ID, t.Task.Meta.Epoch))
return nil
}
......@@ -417,7 +418,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
t, ok := s.state.Pending[taskID]
if !ok {
log.WithFields(s.logFields()).Warningln("Pending task #%d not found.", taskID)
ctx := s.logCtx()
ctx["task id"] = taskID
log.Warn("Pending task not found.", ctx)
return nil
}
......@@ -426,7 +429,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s.state.Done = append(s.state.Done, t)
delete(s.state.Pending, taskID)
log.WithFields(s.logFields()).Infof("Task #%d finished.", taskID)
ctx := s.logCtx()
ctx["task id"] = taskID
log.Info("Task finished.", ctx)
if len(s.state.Todo) == 0 && len(s.state.Pending) == 0 {
// increase master side pass count if all tasks finished
s.state.CurPass++
......@@ -434,12 +439,14 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s.state.Done = []taskEntry{}
// TODO(typhoonzero): deal with failed tasks
s.state.Failed = []taskEntry{}
log.WithFields(s.logFields()).Warningf("all task finished, add new pass data, newpass: %d.", s.state.CurPass)
ctx := s.logCtx()
ctx["new pass"] = s.state.CurPass
log.Warn("all task finished, add new pass data.", ctx)
}
err := s.snapshot()
if err != nil {
log.Errorln(err)
log.Error("snapshot error", log.Ctx{"error": err})
}
return err
}
......@@ -455,7 +462,7 @@ func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
t, ok := s.state.Pending[meta.ID]
if !ok {
log.WithFields(s.logFields()).Warningln("TaskFailed:Pending task #%v not found.", t.Task.Meta)
log.Warn("TaskFailed:Pending task not found.", log.Ctx{"task": t.Task.Meta})
return nil
}
......
......@@ -45,9 +45,15 @@ import (
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver/client"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
func init() {
log.Root().SetHandler(
log.LvlFilterHandler(log.LvlWarn, log.CallerStackHandler("%+v", log.StderrHandler)),
)
}
var mu sync.Mutex
var handleMap = make(map[C.paddle_pserver_client]*client.Client)
var curHandle C.paddle_pserver_client
......@@ -164,10 +170,13 @@ func paddle_init_param(client C.paddle_pserver_client, param C.paddle_parameter,
if err != nil {
if err.Error() == pserver.AlreadyInitialized {
log.Warningf("parameter %s already initialized, treat paddle_init_param as successful.", name)
log.Warn(
"parameter already initialized, treat paddle_init_param as successful.",
log.Ctx{"parameter": name},
)
return C.PSERVER_OK
}
log.Errorln(err)
log.Error("error init param", log.Ctx{"error": err})
return C.PSERVER_ERROR
}
......@@ -180,11 +189,11 @@ func paddle_finish_init_params(client C.paddle_pserver_client) C.int {
err := c.FinishInitParams()
if err != nil {
if err.Error() == pserver.AlreadyInitialized {
log.Warningln("parameters already initialized, treat paddle_finish_init_params as successful.")
log.Warn("parameters already initialized, treat paddle_finish_init_params as successful.")
return C.PSERVER_OK
}
log.Errorln(err)
log.Error("error finish init params", log.Ctx{"error": err})
return C.PSERVER_ERROR
}
......@@ -205,7 +214,7 @@ func paddle_send_grads(client C.paddle_pserver_client, grads **C.paddle_gradient
c := get(client)
err := c.SendGrads(gs)
if err != nil {
log.Errorln(err)
log.Error("error send grads", log.Ctx{"error": err})
return C.PSERVER_ERROR
}
......@@ -222,7 +231,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
c := get(client)
ps, err := c.GetParams(ns)
if err != nil {
log.Errorln(err)
log.Error("error get params", log.Ctx{"error": err})
return C.PSERVER_ERROR
}
......@@ -231,7 +240,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
for i, p := range ps {
pn[i] = p.Name
}
log.Errorf("pserver returned wrong number of parameters. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", "))
log.Error(
"pserver returned wrong number of parameters.",
log.Ctx{
"Requested": strings.Join(pn, ", "),
"Returned": strings.Join(ns, ", "),
},
)
return C.PSERVER_ERROR
}
......@@ -241,7 +256,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
for i, p := range ps {
pn[i] = p.Name
}
log.Errorf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", "))
log.Error(
"pserver returned wrong parameters, or not in requested order.",
log.Ctx{
"Requested": strings.Join(pn, ", "),
"Returned": strings.Join(ns, ", "),
},
)
return C.PSERVER_ERROR
}
}
......@@ -251,13 +272,19 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
param := *(**C.paddle_parameter)(unsafe.Pointer((uintptr(unsafe.Pointer(dst)) + uintptr(i)*unsafe.Sizeof(*dst))))
if unsafe.Pointer(param) == nil {
log.Errorln("must pre-allocate parameter.")
log.Error("must pre-allocate parameter.")
return C.PSERVER_ERROR
}
if unsafe.Pointer(param.content) != nil {
if int(param.content_len) != len(p.Content) {
log.Errorf("the pre-allocated content len does not match parameter content len. Pre-allocated len: %d, returned len: %d", param.content_len, len(p.Content))
log.Error(
"the pre-allocated content len does not match parameter content len.",
log.Ctx{
"Pre-allocated len": param.content_len,
"Returned len": len(p.Content),
},
)
return C.PSERVER_ERROR
}
}
......
......@@ -22,7 +22,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/Paddle/go/pserver"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
// TODO(helin): add RPC call retry logic
......@@ -84,7 +84,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
if curServers[i].Addr == "" {
err := c.pservers[i].Close()
if err != nil {
log.Errorln(err)
log.Error("error closing connection to pserver", log.Ctx{"error": err})
}
continue
......@@ -92,7 +92,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
err := c.pservers[i].Connect(curServers[i].Addr)
if err != nil {
log.Errorln(err)
log.Error("error connecting to pserver", log.Ctx{"error": err})
// connect to addr failed, set
// to last known addr in order
......
......@@ -30,7 +30,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver/client"
"github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
const (
......@@ -90,7 +90,7 @@ func initEtcdClient() {
DialTimeout: time.Second * time.Duration(1),
})
if err != nil {
log.Errorf("err %v", err)
log.Error("error init etcd client", log.Ctx{"error": err})
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, err = client.Delete(ctx, pserver.PsDesired)
......
......@@ -25,7 +25,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
const (
......@@ -54,26 +54,29 @@ func (e *Etcd) Desired() int {
resp, err := e.client.Get(ctx, pserver.PsDesired)
cancel()
if err != nil {
log.Errorf("Get ps dresire number failed! recnnectiong..., %v", err)
log.Error(
"Get ps dresire number failed! reconnecting...",
log.Ctx{"error": err},
)
time.Sleep(e.timeout)
continue
}
kvs := resp.Kvs
if len(kvs) == 0 {
log.Infoln("Waiting for ps desired registered ...")
log.Info("Waiting for ps desired registered ...")
time.Sleep(e.timeout)
continue
}
psDesired, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err != nil {
log.Errorf("psDesired %d invalid %v", psDesired, err)
log.Error("atoi failed", log.Ctx{"error": err})
time.Sleep(e.timeout)
continue
}
log.Debugf("Get psDesired number: %d", psDesired)
log.Debug("Got psDesired", log.Ctx{"psDesired": psDesired})
break
}
return psDesired
......@@ -88,17 +91,20 @@ func (e *Etcd) List() []Server {
for i := 0; i < psDesired; i++ {
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
psKey := pserver.PsPath + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
log.Debug("looking for pserver", log.Ctx{"ps key": psKey})
resp, err := e.client.Get(ctx, psKey)
cancel()
if err != nil {
log.Infof("Get psKey= %s error, %v", psKey, err)
log.Info(
"Get psKey error",
log.Ctx{"ps key": psKey, "error": err},
)
time.Sleep(e.timeout)
continue
}
kvs := resp.Kvs
if len(kvs) == 0 {
log.Infof("Waiting for ps addr registered ...")
log.Info("Waiting for ps addr registered ...")
time.Sleep(e.timeout)
continue
}
......@@ -106,11 +112,17 @@ func (e *Etcd) List() []Server {
psAddr := string(resp.Kvs[0].Value)
// TODO(Longfei) check the ps address
if psAddr == "" {
log.Infof("Get psKey = %s, psAddr is empty", psKey)
log.Info(
"Value under psKey is empty",
log.Ctx{"psKey": psKey},
)
time.Sleep(e.timeout)
continue
}
log.Debugf("got value (%s) for key: %s", psAddr, psKey)
log.Debug(
"got psAddr given psKey",
log.Ctx{"psAddr": psAddr, "psKey": psKey},
)
servers[i].Index = i
servers[i].Addr = psAddr
}
......@@ -130,13 +142,13 @@ func NewEtcd(endpoints string) *Etcd {
DialTimeout: defaultEtcdTimeout,
})
if err != nil {
log.Errorf("Init etcd connection failed: %v", err)
log.Error("Init etcd connection failed", log.Ctx{"error": err})
time.Sleep(defaultEtcdTimeout)
continue
}
break
}
log.Infof("Connected to etcd: %s\n", endpoints)
log.Info("Connected to etcd endpoint", log.Ctx{"endpoint": endpoints})
client := &Etcd{
client: cli,
timeout: defaultEtcdTimeout,
......@@ -154,7 +166,7 @@ func (e *Etcd) Select() (bool, error) {
}
lock := concurrency.NewMutex(sess, initLockPath)
log.Infof("Trying to acquire lock at %s.", initLockPath)
log.Info("Trying to acquire lock", log.Ctx{"lock path": initLockPath})
// Do not use timeout context here, since we don't know how
// long does it take for other trainers to initialize the
// parameters.
......@@ -162,7 +174,7 @@ func (e *Etcd) Select() (bool, error) {
if err != nil {
return false, err
}
log.Infof("Successfully acquired lock at %s.", initLockPath)
log.Info("Successfully acquired lock", log.Ctx{"lock path": initLockPath})
get := clientv3.OpGet(initDonePath)
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
......@@ -181,17 +193,17 @@ func (e *Etcd) Select() (bool, error) {
if len(resp.Kvs) == 0 {
// Key value not set, select current trainer.
e.lock = lock
log.Infoln("Trainer selected.")
log.Info("Trainer selected.")
return true, nil
}
if string(resp.Kvs[0].Value) == initDoneVal {
log.Infoln("Initialization is already done.")
log.Info("Initialization is already done.")
ctx, cancel = context.WithTimeout(context.Background(), e.timeout)
err = lock.Unlock(ctx)
cancel()
if err != nil {
log.Errorln(err)
log.Error("error unlocking", log.Ctx{"error": err})
}
return false, nil
}
......@@ -221,7 +233,7 @@ func (e *Etcd) Done() error {
err = e.lock.Unlock(ctx)
cancel()
if err != nil {
log.Errorln(err)
log.Error("error unlocking", log.Ctx{"error": err})
} else {
e.lock = nil
}
......@@ -244,7 +256,7 @@ func (e *Etcd) Close() error {
cErr := e.client.Close()
if cErr != nil {
if err != nil {
log.Errorln(cErr)
log.Error("error closing etcd client", log.Ctx{"error": cErr})
return err
}
return cErr
......
......@@ -24,7 +24,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
const (
......@@ -82,19 +82,19 @@ func (e *EtcdClient) Register(port int) (int, error) {
DialTimeout: e.dialTimeout,
})
if err != nil {
log.Errorf("connect to etcd error: %v", err)
log.Error("connect to etcd error", log.Ctx{"error": err})
time.Sleep(retryTimeout)
continue
}
e.client = cli
sess, err := concurrency.NewSession(cli, concurrency.WithTTL(e.ttlSec))
if err != nil {
log.Errorf("create etcd session error: %v", err)
log.Error("create etcd session error", log.Ctx{"error": err})
time.Sleep(retryTimeout)
continue
}
e.sess = sess
log.Debugf("inited client to %s", e.endpoints)
log.Debug("connected to etcd", log.Ctx{"endpoint": e.endpoints})
break
}
// init /ps_desired using transaction, for multiple pservers may want to write
......@@ -104,7 +104,7 @@ func (e *EtcdClient) Register(port int) (int, error) {
_, err := e.initDesiredPservers(ctx, e.numPservers)
cancel()
if err != nil {
log.Warn(err)
log.Warn("pserver init error", log.Ctx{"error": err, "num pservers": e.numPservers})
time.Sleep(retryTimeout)
continue
}
......@@ -119,14 +119,17 @@ func (e *EtcdClient) Register(port int) (int, error) {
resp, err := e.client.Get(ctx, PsDesired)
cancel()
if err != nil {
log.Errorf("getting %s error: %v", PsDesired, err)
log.Error("get etcd key error", log.Ctx{"key": PsDesired, "error": err})
time.Sleep(retryTimeout)
continue
}
if len(resp.Kvs) != 0 {
e.desired, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err != nil {
log.Errorf("value of %s invalid %v\n", PsDesired, err)
log.Error(
"psDesired atoi error",
log.Ctx{"error": err, "value": string(resp.Kvs[0].Value)},
)
time.Sleep(retryTimeout)
// NOTE: wait util ps_desired value change
continue
......@@ -143,7 +146,7 @@ func (e *EtcdClient) Register(port int) (int, error) {
pserverIdx, err = e.registerPserverEtcd(ctx, port)
cancel()
if err != nil {
log.Warn(err)
log.Warn("register pserver on etcd error", log.Ctx{"error": err})
time.Sleep(retryTimeout)
continue
}
......@@ -170,16 +173,17 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context, port int) (int, er
registered := false
for i := 0; i < e.desired; i++ {
psKey := PsPath + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
ps := c.Get(psKey)
log.Debugf("got value (%s) for key: %s", ps, psKey)
log.Debug(
"register pserver got value",
log.Ctx{"value": ps, "key": psKey},
)
if ps == "" {
// find the first id and write info
pserverAddr := e.externalIP + ":" + strconv.Itoa(port)
c.Put(psKey, pserverAddr, clientv3.WithLease(e.sess.Lease()))
log.Debugf("set pserver node %s with value %s", psKey, pserverAddr)
log.Debug("register finished")
log.Debug("register finished", log.Ctx{"key": psKey, "value": pserverAddr})
idx = i
registered = true
break
......@@ -239,7 +243,7 @@ func (e *EtcdClient) Shutdown() error {
newErr := e.client.Close()
if newErr != nil {
if err != nil {
log.Errorln(newErr)
log.Error("shutdown error", log.Ctx{"error": newErr})
} else {
err = newErr
}
......
......@@ -25,7 +25,7 @@ import (
"fmt"
"unsafe"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
type optimizer struct {
......@@ -56,12 +56,12 @@ func newOptimizer(paramWithConfigs ParameterWithConfig, State []byte) *optimizer
c := paramWithConfigs.Config
s := State
paramBufferSize := C.size_t(len(p.Content))
log.WithFields(log.Fields{
log.Info("New Optimizer Created with config", log.Ctx{
"ElementType": p.ElementType,
"ParamSize": paramBufferSize,
"ConfigSize": len(c),
"StateSize": len(s),
}).Info("New Optimizer Created with config:")
})
var cbuffer unsafe.Pointer
cbuffer = C.malloc(paramBufferSize)
......@@ -72,21 +72,34 @@ func newOptimizer(paramWithConfigs ParameterWithConfig, State []byte) *optimizer
}
o.config = c
o.opt = C.paddle_create_optimizer((*C.uchar)(&c[0]), C.int(len(c)),
C.paddle_element_type(p.ElementType), cbuffer, C.int(paramBufferSize), (*C.char)(cstate), C.int(len(s)))
o.opt = C.paddle_create_optimizer(
(*C.uchar)(&c[0]),
C.int(len(c)),
C.paddle_element_type(p.ElementType),
cbuffer,
C.int(paramBufferSize),
(*C.char)(cstate),
C.int(len(s)),
)
return o
}
func (o *optimizer) GetWeights() []byte {
var buffer unsafe.Pointer
// we do not own the buffer, no need to free later.
bufferLen := C.paddle_optimizer_get_weights(o.opt, &buffer)
return cArrayToSlice(buffer, int(bufferLen)*C.sizeof_float)
}
func (o *optimizer) GetStates() []byte {
var cbuffer *C.char
// we owns the state buffer, need to free later.
cbufferLen := C.paddle_optimizer_get_state(o.opt, &cbuffer)
return cArrayToSlice(unsafe.Pointer(cbuffer), int(cbufferLen))
buf := cArrayToSlice(unsafe.Pointer(cbuffer), int(cbufferLen))
cpy := make([]byte, len(buf))
copy(cpy, buf)
C.free(unsafe.Pointer(cbuffer))
return cpy
}
func (o *optimizer) UpdateParameter(g Gradient) error {
......
......@@ -15,8 +15,12 @@
package pserver
import (
"encoding/binary"
"io/ioutil"
"math"
"testing"
"github.com/stretchr/testify/assert"
)
func TestOptimizerCreateRelease(t *testing.T) {
......@@ -36,3 +40,39 @@ func TestOptimizerCreateRelease(t *testing.T) {
o := newOptimizer(param, nil)
o.Cleanup()
}
func float32Bytes(float float32) []byte {
bits := math.Float32bits(float)
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, bits)
return bytes
}
func TestOptimizerState(t *testing.T) {
p := Parameter{
Name: "a",
ElementType: Int32,
}
weights := float32Bytes(100)
p.Content = weights
config, err := ioutil.ReadFile("./client/c/test/testdata/optimizer.pb")
if err != nil {
t.Fatalf("read optimizer proto failed")
}
param := ParameterWithConfig{
Param: p,
Config: config,
}
o := newOptimizer(param, nil)
s := o.GetStates()
// clear param content and check if the state is restored.
param.Param.Content = float32Bytes(300)
o1 := newOptimizer(param, s)
s1 := o1.GetStates()
assert.Equal(t, s, s1)
assert.Equal(t, weights, o.GetWeights())
assert.Equal(t, weights, o1.GetWeights())
o.Cleanup()
o1.Cleanup()
}
......@@ -32,7 +32,7 @@ import (
uuid "github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
// ElementType is the type of elements of a Parameter.
......@@ -124,6 +124,9 @@ func loadMeta(e *EtcdClient, idx int) (meta checkpointMeta, err error) {
// LoadCheckpoint loads checkpoint from file.
func LoadCheckpoint(e *EtcdClient, idx int) (Checkpoint, error) {
log.Info("Loading checkpoint", "pserver index", idx)
defer traceTime(time.Now(), "load checkpoint")
cpMeta, err := loadMeta(e, idx)
if err != nil {
return nil, err
......@@ -178,6 +181,7 @@ func NewService(idx int, interval time.Duration, path string, client *EtcdClient
func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, _ *int) error {
select {
case <-s.initialized:
log.Warn("init param called but parameters already initialized.")
return errors.New(AlreadyInitialized)
default:
}
......@@ -191,6 +195,13 @@ func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, _ *int) error
// properly memory aligned, if not, make copy to a memory
// aligned region.
s.optMap[paramWithConfigs.Param.Name] = newOptimizer(paramWithConfigs, nil)
log.Info(
"init parameter",
"name", paramWithConfigs.Param.Name,
"config len", len(paramWithConfigs.Config),
"param len", len(paramWithConfigs.Param.Content),
"type", paramWithConfigs.Param.ElementType,
)
return nil
}
......@@ -199,6 +210,7 @@ func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, _ *int) error
func (s *Service) FinishInitParams(_ int, _ *int) error {
select {
case <-s.initialized:
log.Warn("finished init param called but parameters already initialized.")
return errors.New(AlreadyInitialized)
default:
}
......@@ -209,10 +221,12 @@ func (s *Service) FinishInitParams(_ int, _ *int) error {
for range t {
err := s.checkpoint()
if err != nil {
log.Errorln(err)
log.Error("finish init params error", log.Ctx{"error": err})
}
}
}()
log.Info("init parameter finished.")
return nil
}
......@@ -222,6 +236,7 @@ func (s *Service) SendGrad(g Gradient, _ *int) error {
select {
case <-s.initialized:
default:
log.Warn("received gradient before initialization.", "name", g.Name, "size", len(g.Content), "type", g.ElementType)
return errors.New(Uninitialized)
}
......@@ -233,6 +248,7 @@ func (s *Service) SendGrad(g Gradient, _ *int) error {
return fmt.Errorf("parameter: %s does not exist", g.Name)
}
log.Info("received gradient from trainer, updating gradient.", "name", g.Name, "size", len(g.Content), "type", g.ElementType)
return o.UpdateParameter(g)
}
......@@ -244,6 +260,7 @@ func (s *Service) GetParam(name string, parameter *Parameter) error {
opt, ok := s.optMap[name]
if !ok {
log.Warn("trainer wants to get a parameter that does not exist.", "name", name)
return fmt.Errorf("parameter: %s does not exist", name)
}
......@@ -257,12 +274,13 @@ func (s *Service) GetParam(name string, parameter *Parameter) error {
parameter.Name = name
parameter.ElementType = opt.elementType
parameter.Content = opt.GetWeights()
log.Info("sending parameter to the trainer", "name", parameter.Name, "size", len(parameter.Content), "type", parameter.ElementType)
return nil
}
func traceTime(start time.Time, name string) {
elapsed := time.Since(start)
log.Infof("%s took %v", name, elapsed)
log.Info("time elapsed", log.Ctx{"name": name, "elapsed": elapsed})
}
// checkpoint saves checkpoint to disk.
......@@ -270,7 +288,7 @@ func traceTime(start time.Time, name string) {
// checkpoint should be only called after the parameters are
// initialized.
func (s *Service) checkpoint() (err error) {
log.Infoln("Begin save checkpoint.")
log.Info("Begin save checkpoint.")
defer traceTime(time.Now(), "save checkpoint")
s.mu.Lock()
......@@ -297,6 +315,13 @@ func (s *Service) checkpoint() (err error) {
return
}
if _, err = os.Stat(s.checkpointPath); os.IsNotExist(err) {
err = os.MkdirAll(s.checkpointPath, os.ModePerm)
if err != nil {
return
}
}
id := uuid.NewV4().String()
p := path.Join(s.checkpointPath, id)
f, err := os.Create(p)
......@@ -308,7 +333,7 @@ func (s *Service) checkpoint() (err error) {
closeErr := f.Close()
if closeErr != nil {
if err != nil {
log.Errorln(closeErr)
log.Error("error close checkpoint file", log.Ctx{"error": closeErr})
} else {
// Set closeErr as return value.
err = closeErr
......@@ -329,7 +354,7 @@ func (s *Service) checkpoint() (err error) {
oldMeta, err := loadMeta(s.client, s.idx)
if err == ErrCheckpointNotFound {
log.Infoln("Do not have existing checkpoint.")
log.Info("Do not have existing checkpoint.")
err = nil
}
......@@ -361,7 +386,7 @@ func (s *Service) checkpoint() (err error) {
if rmErr != nil {
// log error, but still treat checkpoint as
// successful.
log.Errorln(rmErr)
log.Error("remove old meta file error", log.Ctx{"error": rmErr})
}
}
......
# ddim lib
proto_library(framework_proto SRCS framework.proto)
proto_library(saver_proto SRCS framework.proto saver.proto)
cc_library(ddim SRCS ddim.cc DEPS eigen3)
cc_test(ddim_test SRCS ddim_test.cc DEPS ddim)
nv_test(dim_test SRCS dim_test.cu DEPS ddim)
......@@ -7,8 +10,8 @@ cc_library(tensor SRCS tensor.cc DEPS ddim place paddle_memory device_context)
cc_test(tensor_test SRCS tensor_test.cc DEPS tensor)
cc_test(eigen_test SRCS eigen_test.cc DEPS tensor)
cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor)
cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor)
cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor saver_proto framework_proto)
cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor paddle_memory)
nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor)
cc_test(variable_test SRCS variable_test.cc)
......@@ -16,7 +19,6 @@ cc_test(variable_test SRCS variable_test.cc)
cc_library(scope SRCS scope.cc)
cc_test(scope_test SRCS scope_test.cc DEPS scope)
proto_library(framework_proto SRCS framework.proto)
cc_library(attribute SRCS attribute.cc DEPS framework_proto)
cc_test(program_desc_test SRCS program_desc_test.cc DEPS proto_desc)
......
......@@ -115,6 +115,7 @@ message VarDesc {
SELECTED_ROWS = 2;
FEED_MINIBATCH = 3;
FETCH_LIST = 4;
STEP_SCOPES = 5;
}
required string name = 1;
required VarType type = 2;
......
......@@ -13,6 +13,15 @@
limitations under the License. */
#include "paddle/framework/lod_tensor.h"
#include "paddle/framework/saver.pb.h"
#include "paddle/memory/memcpy.h"
#include "paddle/memory/memory.h"
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <iterator>
#include <glog/logging.h>
......@@ -112,5 +121,140 @@ void LoDTensor::ShrinkInLevel(size_t level, size_t elem_begin,
lod_ = new_lod;
}
std::string LoDTensor::SerializeToString() const {
LoDTensorProto desc;
// set data_type
if (this->type() == typeid(int8_t)) desc.set_data_type(DataType::BOOL);
if (this->type() == typeid(int16_t)) desc.set_data_type(DataType::INT16);
if (this->type() == typeid(int32_t)) desc.set_data_type(DataType::INT32);
if (this->type() == typeid(int64_t)) desc.set_data_type(DataType::INT64);
// FIXME(dzh): there is no fp16 in standard c++
if (this->type() == typeid(float)) // NOLINT
desc.set_data_type(DataType::FP32);
if (this->type() == typeid(double)) // NOLINT
desc.set_data_type(DataType::FP64);
for (int i = 0; i < dims().size(); ++i) {
desc.add_dims(dims()[i]);
}
// set lod information
desc.set_lod_level(this->NumLevels());
for (size_t i = 0; i < this->NumLevels(); ++i) {
LoDInfo* lod = desc.add_levels();
for (size_t j = 0; j < lod_[i].size(); ++j) {
lod->add_level(lod_[i][j]);
}
}
desc.set_version(0);
std::string desc_bytes = desc.SerializeAsString();
// FIXME(dzh) : implement fix chunk size buffer.
size_t DESC_SIZE = desc_bytes.size();
size_t DATA_SIZE = holder_->size() - offset_;
const size_t BUFFER_SIZE = DESC_SIZE + DATA_SIZE + 2 * sizeof(size_t);
char* buffer =
static_cast<char*>(memory::Alloc(platform::CPUPlace(), BUFFER_SIZE));
// format: desc_size data_size, desc_bytes, data_bytes.
platform::CPUPlace src_place;
platform::CPUPlace dst_place;
memory::Copy(dst_place, buffer, src_place, &BUFFER_SIZE, sizeof(size_t));
memory::Copy(dst_place, buffer + sizeof(size_t), src_place, &DESC_SIZE,
sizeof(size_t));
memory::Copy(dst_place, buffer + sizeof(size_t) * 2, src_place,
desc_bytes.c_str(), desc_bytes.size());
PADDLE_ENFORCE(this->numel() != 0, "Serialize a empty Tensor!");
platform::Place place = holder_->place();
int element_width = holder_->size() / this->numel();
if (platform::is_cpu_place(place)) {
memory::Copy(dst_place, buffer + sizeof(size_t) * 2 + desc_bytes.size(),
boost::get<platform::CPUPlace>(place),
static_cast<char*>(holder_->ptr()) + offset_ / element_width,
DATA_SIZE);
}
#ifdef PADDLE_WITH_GPU
if (platform::is_gpu_place(place)) {
memory::Copy(dst_place, buffer + sizeof(size_t) * 2 + desc_bytes.size(),
boost::get<platform::GPUPlace>(place),
static_cast<char*>(holder_->ptr()) + offset_ / element_width,
DATA_SIZE);
}
#endif
std::string ret(buffer, BUFFER_SIZE);
memory::Free(platform::CPUPlace(), buffer);
return ret;
}
void LoDTensor::DeserializeFromString(const std::string& s,
const platform::Place& dst_place) {
size_t DESC_SIZE, BUFFER_SIZE;
platform::CPUPlace src_place;
memory::Copy(src_place, &BUFFER_SIZE, src_place, s.c_str(), sizeof(size_t));
memory::Copy(src_place, &DESC_SIZE, src_place, s.c_str() + sizeof(size_t),
sizeof(size_t));
const size_t DATA_SIZE = BUFFER_SIZE - DESC_SIZE - sizeof(size_t) * 2;
// parse LoDTensorDesc
LoDTensorProto desc;
desc.ParseFromArray(s.c_str() + sizeof(size_t) * 2, DESC_SIZE);
std::vector<int64_t> dims;
std::copy(desc.dims().begin(), desc.dims().end(), std::back_inserter(dims));
this->Resize(make_ddim(dims));
// parse data type
void* ptr = nullptr;
if (desc.data_type() == DataType::BOOL)
ptr = this->mutable_data<bool>(dst_place);
if (desc.data_type() == DataType::INT16)
ptr = this->mutable_data<int16_t>(dst_place);
if (desc.data_type() == DataType::INT32)
ptr = this->mutable_data<int32_t>(dst_place);
if (desc.data_type() == DataType::INT64)
ptr = this->mutable_data<int64_t>(dst_place);
// FIXME(dzh): there is no fp16 in standard c++
if (desc.data_type() == DataType::FP32)
ptr = this->mutable_data<float>(dst_place);
if (desc.data_type() == DataType::FP64)
ptr = this->mutable_data<double>(dst_place);
LoD lod;
std::vector<size_t> levels;
for (int i = 0; i < desc.levels().size(); ++i) {
auto current_level = desc.levels()[i].level();
std::copy(current_level.begin(), current_level.end(),
std::back_inserter(levels));
lod.emplace_back(levels);
levels.clear();
}
this->set_lod(lod);
if (platform::is_cpu_place(dst_place)) {
memory::Copy(boost::get<platform::CPUPlace>(dst_place), ptr, src_place,
s.c_str() + sizeof(size_t) * 2 + DESC_SIZE, DATA_SIZE);
}
#ifdef PADDLE_WITH_GPU
if (platform::is_gpu_place(dst_place)) {
memory::Copy(boost::get<platform::GPUPlace>(dst_place), ptr, src_place,
s.c_str() + sizeof(size_t) * 2 + DESC_SIZE, DATA_SIZE);
}
#endif
}
} // namespace framework
} // namespace paddle
......@@ -25,6 +25,7 @@
#include "paddle/framework/ddim.h"
#include "paddle/framework/tensor.h"
#include "paddle/platform/enforce.h"
#include "paddle/platform/place.h"
namespace paddle {
namespace framework {
......@@ -132,6 +133,27 @@ class LoDTensor : public Tensor {
*/
void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end);
/**
* @brief Serialize tensor to char bytes.
* Please check model_format.md for the format detail.
* NOTE: GPUTensor will copy data to cpu implicitly.
* @return return string
*/
// FIXME(dzh) : Currently, this interface should only be used in
// save/restore model and checkpoint. ParameterServer do not use shape
// information to do the optimization, as a result, when we serialize
// parameter/gradient to string, we should serialize the tensor
// to string in the ps trainer instead of LoDTensor.
std::string SerializeToString() const;
/**
* @brief Deserialize char bytes to tensor.
* @return return string
*/
void DeserializeFromString(const std::string& s,
const platform::Place& dst_place);
private:
LoD lod_;
};
......
......@@ -17,10 +17,13 @@
#include <gtest/gtest.h>
#include <algorithm>
#include <memory>
#include <vector>
namespace paddle {
namespace framework {
const int kLodTensorSize = 20 * 128;
class LoDTensorTester : public ::testing::Test {
public:
virtual void SetUp() override {
......@@ -38,7 +41,10 @@ class LoDTensorTester : public ::testing::Test {
lod_tensor_.Resize({20 /*batch size*/, 128 /*dim*/});
// malloc memory
lod_tensor_.mutable_data<float>(place);
float* dst_ptr = lod_tensor_.mutable_data<float>(place);
for (int i = 0; i < kLodTensorSize; ++i) {
dst_ptr[i] = i;
}
lod_tensor_.set_lod(lod);
}
......@@ -101,5 +107,21 @@ TEST_F(LoDTensorTester, ShrinkInLevel) {
ASSERT_EQ(new_lod_tensor.data<float>(), lod_tensor_.data<float>());
}
TEST_F(LoDTensorTester, SerializeDeserialize) {
LoDTensor new_lod_tensor = lod_tensor_;
float* src_ptr = lod_tensor_.data<float>();
std::string s = lod_tensor_.SerializeToString();
LoDTensor dst;
dst.DeserializeFromString(s, platform::CPUPlace());
float* dst_ptr = dst.data<float>();
for (int i = 0; i < kLodTensorSize; ++i) {
EXPECT_EQ(dst_ptr[i], src_ptr[i]);
}
ASSERT_EQ(dst.NumElements(0), 2UL);
ASSERT_EQ(dst.NumElements(1), 3UL);
ASSERT_EQ(dst.NumElements(2), 8UL);
}
} // namespace framework
} // namespace paddle
......@@ -48,3 +48,30 @@ TEST(LoDTensor, LoDInGPU) {
CHECK_EQ(lod[0].data()[i], src_lod[0].data()[i] * 2);
}
}
TEST(LoDTensor, SerializeDeserialize) {
paddle::framework::LoDTensor lod_tensor;
paddle::platform::GPUPlace place(0);
paddle::framework::LoD src_lod;
src_lod.push_back(std::vector<size_t>{0, 2, 4, 6, 8, 10, 12, 14});
lod_tensor.Resize({14, 16});
lod_tensor.mutable_data<float>(place);
lod_tensor.set_lod(src_lod);
CHECK_EQ(lod_tensor.lod_element(0, 2).first, 4UL);
CHECK_EQ(lod_tensor.lod_element(0, 4).first, 8UL);
test<<<1, 8>>>(src_lod[0].data(), src_lod[0].size());
cudaDeviceSynchronize();
std::string s = lod_tensor.SerializeToString();
paddle::framework::LoDTensor dst;
dst.DeserializeFromString(s, place);
paddle::framework::LoD dst_lod = dst.lod();
for (size_t i = 0; i < dst_lod[0].size(); ++i) {
CHECK_EQ(src_lod[0].data()[i], dst_lod[0].data()[i] * 2);
}
}
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
syntax = "proto2";
option optimize_for = LITE_RUNTIME;
package paddle.framework;
import "framework.proto";
/**
* This file contains necessary information for model, checkpoint.
* etc.
*/
message LoDInfo { repeated int64 level = 1; }
/**
* Save the LoDTensorDesc information through LoDTensorProto, its data memory
* is copyed to c buffer immediately. See model_format.md for details.
*/
message LoDTensorProto {
optional DataType data_type = 1;
repeated int64 dims = 2; // [UNK, 640, 480] is saved as [-1, 640, 480]
repeated LoDInfo levels = 3;
optional int32 lod_level = 4 [ default = 0 ];
optional int32 version = 5;
}
......@@ -65,6 +65,23 @@ void Scope::DropKids() {
kids_.clear();
}
std::vector<std::string> Scope::GetAllNames(bool recursive) const {
std::vector<std::string> known_vars(vars_.size());
if (recursive) {
for (auto& kid : kids_) {
auto kid_vars = kid->GetAllNames();
for (auto& p : kid_vars) {
known_vars.emplace_back(p);
}
}
}
for (auto& p : vars_) {
known_vars.emplace_back(p.first);
}
return known_vars;
}
void Scope::DeleteScope(Scope* scope) {
auto it = std::find(this->kids_.begin(), this->kids_.end(), scope);
PADDLE_ENFORCE(it != this->kids_.end(), "Cannot find %p as kid scope", scope);
......
......@@ -17,6 +17,7 @@ limitations under the License. */
#include <list>
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/framework/variable.h"
#include "paddle/platform/macros.h"
......@@ -64,6 +65,9 @@ class Scope {
/// Drop all kids scopes belonged to this scope.
void DropKids();
// enumerate all the variables current contains.
std::vector<std::string> GetAllNames(bool recursive = false) const;
private:
// Call Scope::NewScope for a sub-scope.
explicit Scope(Scope const* parent) : parent_(parent) {}
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/framework/scope.h"
#include "glog/logging.h"
#include "gtest/gtest.h"
using paddle::framework::Scope;
......@@ -54,3 +55,17 @@ TEST(Scope, FindScope) {
EXPECT_EQ(&s, s.FindScope(v));
EXPECT_EQ(&s, ss.FindScope(v));
}
TEST(Scope, GetAllNames) {
Scope s;
Variable* v = s.Var("a");
EXPECT_EQ(&s, s.FindScope(v));
std::vector<std::string> ans = s.GetAllNames();
std::string str;
for (auto& var : ans) {
str += var;
}
EXPECT_STREQ("a", str.c_str());
}
......@@ -31,6 +31,8 @@ namespace paddle {
namespace framework {
class LoDTensor;
class Tensor {
public:
template <typename T, size_t D, int MajorType, typename IndexType>
......@@ -134,6 +136,8 @@ class Tensor {
inline void check_memory_size() const;
private:
friend class LoDTensor;
/**
* @note Placeholder hides type T, so it doesn't appear as a template
* parameter of Variable.
......@@ -181,7 +185,12 @@ class Tensor {
/*! holds the memory block if allocated. */
std::shared_ptr<Placeholder> holder_;
/*! points to dimensions of memory block. */
/**
* @brief points to elements dimensions.
*
* @note dims_ do not indicate the memory block size.
*/
DDim dims_;
/**
......
......@@ -69,6 +69,13 @@ function(op_library TARGET)
file(APPEND ${pybind_file} "USE_OP(max_pool2d_with_index);\n")
endif()
# save_restore_op contains several operators
if ("${TARGET}" STREQUAL "save_restore_op")
set(pybind_flag 1)
# It's enough to just adding one operator to pybind
file(APPEND ${pybind_file} "USE_NO_KERNEL_OP(save);\n")
endif()
# activation_op contains several operators
if ("${TARGET}" STREQUAL "activation_op")
set(pybind_flag 1)
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/operators/batch_norm_op.h"
namespace paddle {
namespace operators {
using Tensor = framework::Tensor;
template <typename T, int MajorType = Eigen::RowMajor,
typename IndexType = Eigen::DenseIndex>
using EigenMatrix = framework::EigenMatrix<T, MajorType, IndexType>;
template <typename T>
using EigenArrayMap =
Eigen::Map<Eigen::Array<T, Eigen::Dynamic, Eigen::Dynamic>>;
template <typename T>
using ConstEigenArrayMap =
Eigen::Map<const Eigen::Array<T, Eigen::Dynamic, Eigen::Dynamic>>;
template <typename T>
using EigenVectorArrayMap = Eigen::Map<Eigen::Array<T, Eigen::Dynamic, 1>>;
template <typename T>
using ConstEigenVectorArrayMap =
Eigen::Map<const Eigen::Array<T, Eigen::Dynamic, 1>>;
class BatchNormOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("X"), "");
PADDLE_ENFORCE(ctx->HasInput("Scale"), "");
PADDLE_ENFORCE(ctx->HasInput("Bias"), "");
PADDLE_ENFORCE(ctx->HasInput("Mean"), "");
PADDLE_ENFORCE(ctx->HasInput("Variance"), "");
PADDLE_ENFORCE(ctx->HasOutput("Y"), "");
PADDLE_ENFORCE(ctx->HasOutput("MeanOut"), "");
PADDLE_ENFORCE(ctx->HasOutput("VarianceOut"), "");
PADDLE_ENFORCE(ctx->HasOutput("SavedMean"), "");
PADDLE_ENFORCE(ctx->HasOutput("SavedVariance"), "");
// make sure Mean/MeanOut and Variance/VarianceOut share memory in Python
PADDLE_ENFORCE_EQ(ctx->Inputs("Mean")[0], ctx->Outputs("MeanOut")[0],
"Mean and MeanOut should share the same memory");
PADDLE_ENFORCE_EQ(ctx->Inputs("Variance")[0],
ctx->Outputs("VarianceOut")[0],
"Variance and VarianceOut should share the same memory");
const auto x_dims = ctx->GetInputDim("X");
const TensorFormat tensor_format =
StringToTensorFormat(ctx->Attrs().Get<std::string>("tensor_format"));
const int C =
(tensor_format == TensorFormat::NCHW ? x_dims[1]
: x_dims[x_dims.size() - 1]);
PADDLE_ENFORCE_EQ(ctx->GetInputDim("Scale").size(), 1UL);
PADDLE_ENFORCE_EQ(ctx->GetInputDim("Scale")[0], C);
PADDLE_ENFORCE_EQ(ctx->GetInputDim("Bias").size(), 1UL);
PADDLE_ENFORCE_EQ(ctx->GetInputDim("Bias")[0], C);
ctx->SetOutputDim("Y", x_dims);
ctx->SetOutputDim("MeanOut", {C});
ctx->SetOutputDim("VarianceOut", {C});
ctx->SetOutputDim("SavedMean", {C});
ctx->SetOutputDim("SavedVariance", {C});
}
};
class BatchNormOpMaker : public framework::OpProtoAndCheckerMaker {
public:
BatchNormOpMaker(framework::OpProto *proto,
framework::OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddAttr<bool>("is_test", "").SetDefault(false);
AddAttr<float>("momentum", "").SetDefault(0.9);
AddAttr<float>("epsilon", "").SetDefault(1e-5);
AddAttr<std::string>("tensor_format", "").SetDefault("NCHW");
AddInput("X", "The input tensor");
AddInput("Scale",
"Scale is a 1-dimensional tensor of size C "
"to be applied to the output");
AddInput("Bias",
"Bias is a 1-dimensional tensor of size C "
"to be applied to the output");
AddInput("Mean",
"The global mean (for training) or the "
"estimated mean (for testing)");
AddInput("Variance",
"The global variance (for training) "
"or the estimated Variance (for testing)");
AddOutput("Y", "result after normalization");
AddOutput("MeanOut",
"Share memory with Mean. "
"Store the global mean when training");
AddOutput("VarianceOut",
"Share memory with Variance. "
"Store the global Variance when training");
AddOutput("SavedMean",
"Mean of the current mini batch, "
"will apply to output when training");
AddOutput("SavedVariance",
"Variance of the current mini batch, "
"will apply to output when training");
AddComment(R"DOC(
https://arxiv.org/pdf/1502.03167.pdf
NHWC `[batch, in_height, in_width, in_channels]`
NCHW `[batch, in_channels, in_height, in_width]`
)DOC");
}
};
template <typename T>
class BatchNormKernel<platform::CPUPlace, T> : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
const float epsilon = ctx.Attr<float>("epsilon");
const float momentum = ctx.Attr<float>("momentum");
const bool is_test = ctx.Attr<bool>("is_test");
const std::string tensor_format_str =
ctx.Attr<std::string>("tensor_format");
const TensorFormat tensor_format = StringToTensorFormat(tensor_format_str);
const auto *x = ctx.Input<Tensor>("X");
const auto &x_dims = x->dims();
PADDLE_ENFORCE(x_dims.size() >= 3 && x_dims.size() <= 5,
"The Input dim size should be between 3 and 5");
const int N = x_dims[0];
const int C =
(tensor_format == TensorFormat::NCHW ? x_dims[1]
: x_dims[x_dims.size() - 1]);
const int sample_size = x->numel() / N / C;
auto *y = ctx.Output<Tensor>("Y");
auto *mean_out = ctx.Output<Tensor>("MeanOut");
auto *variance_out = ctx.Output<Tensor>("VarianceOut");
auto *saved_mean = ctx.Output<Tensor>("SavedMean");
auto *saved_variance = ctx.Output<Tensor>("SavedVariance");
// alloc memory
y->mutable_data<T>(ctx.GetPlace());
mean_out->mutable_data<T>(ctx.GetPlace());
variance_out->mutable_data<T>(ctx.GetPlace());
saved_mean->mutable_data<T>(ctx.GetPlace());
saved_variance->mutable_data<T>(ctx.GetPlace());
if (!is_test) {
// saved_xx is use just in this batch of data
EigenVectorArrayMap<T> saved_mean_e(
saved_mean->mutable_data<T>(ctx.GetPlace()), C);
EigenVectorArrayMap<T> saved_variance_e(
saved_variance->mutable_data<T>(ctx.GetPlace()), C);
saved_mean_e.setZero();
saved_variance_e.setZero();
switch (tensor_format) {
case TensorFormat::NCHW: {
ConstEigenArrayMap<T> x_arr(x->data<T>(), sample_size, N * C);
for (int nc = 0; nc < N * C; ++nc) {
saved_mean_e(nc % C) += x_arr.col(nc).sum();
}
saved_mean_e /= N * sample_size;
for (int nc = 0; nc < N * C; ++nc) {
saved_variance_e(nc % C) +=
(x_arr.col(nc) - saved_mean_e(nc % C)).matrix().squaredNorm();
}
saved_variance_e /= N * sample_size;
break;
}
case TensorFormat::NHWC: {
ConstEigenArrayMap<T> x_arr(x->data<T>(), C, N * sample_size);
for (int i = 0; i < N * sample_size; ++i) {
saved_mean_e += x_arr.col(i);
}
saved_mean_e /= N * sample_size;
for (int i = 0; i < N * sample_size; ++i) {
saved_variance_e +=
(x_arr.col(i) - saved_mean_e) * (x_arr.col(i) - saved_mean_e);
}
saved_variance_e /= N * sample_size;
break;
}
default:
PADDLE_THROW("Unknown storage order: %s", tensor_format_str);
}
EigenVectorArrayMap<T> running_mean_arr(
mean_out->mutable_data<T>(ctx.GetPlace()), C);
EigenVectorArrayMap<T> running_var_arr(
variance_out->mutable_data<T>(ctx.GetPlace()), C);
running_mean_arr =
running_mean_arr * momentum + saved_mean_e * (1. - momentum);
running_var_arr =
running_var_arr * momentum + saved_variance_e * (1. - momentum);
}
// use SavedMean and SavedVariance to do normalize
Eigen::Array<T, Eigen::Dynamic, 1> inv_std(C);
if (is_test) {
ConstEigenVectorArrayMap<T> var_arr(
ctx.Input<Tensor>("Variance")->data<T>(), C);
inv_std = (var_arr + epsilon).sqrt().inverse();
} else {
EigenVectorArrayMap<T> saved_inv_std(
ctx.Output<Tensor>("SavedVariance")->data<T>(), C);
// inverse SavedVariance first, gradient will use it too.
saved_inv_std = (saved_inv_std + epsilon).inverse().sqrt();
inv_std = saved_inv_std;
}
ConstEigenVectorArrayMap<T> mean_arr(
is_test ? ctx.Input<Tensor>("Mean")->data<T>()
: ctx.Output<Tensor>("SavedMean")->data<T>(),
C);
// ((x - est_mean) * (inv_var) * scale + bias
// formula transform ====>
// (x * inv_var * scale) + (bias - est_mean * inv_var * scale)
const auto *scale = ctx.Input<Tensor>("Scale");
const auto *bias = ctx.Input<Tensor>("Bias");
ConstEigenVectorArrayMap<T> scale_arr(scale->data<T>(), C);
ConstEigenVectorArrayMap<T> bias_arr(bias->data<T>(), C);
Eigen::Array<T, Eigen::Dynamic, 1> new_scale = inv_std * scale_arr;
Eigen::Array<T, Eigen::Dynamic, 1> new_bias =
bias_arr - mean_arr * inv_std * scale_arr;
switch (tensor_format) {
case TensorFormat::NCHW: {
EigenArrayMap<T> y_arr(y->mutable_data<T>(ctx.GetPlace()), sample_size,
N * C);
ConstEigenArrayMap<T> x_arr(x->data<T>(), sample_size, N * C);
for (int nc = 0; nc < N * C; ++nc) {
y_arr.col(nc) = x_arr.col(nc) * new_scale(nc % C) + new_bias(nc % C);
}
break;
}
case TensorFormat::NHWC: {
EigenArrayMap<T>(y->mutable_data<T>(ctx.GetPlace()), C,
N * sample_size) =
(ConstEigenArrayMap<T>(x->data<T>(), C, N * sample_size).colwise() *
new_scale)
.colwise() +
new_bias;
break;
}
default:
PADDLE_THROW("Unknown storage order: %d", tensor_format);
}
}
};
class BatchNormGradOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override {
// check input
PADDLE_ENFORCE(ctx->HasInput("X"));
PADDLE_ENFORCE(ctx->HasInput("Scale"), "");
PADDLE_ENFORCE(ctx->HasInput(framework::GradVarName("Y")), "");
PADDLE_ENFORCE(ctx->HasInput("SavedMean"), "");
PADDLE_ENFORCE(ctx->HasInput("SavedVariance"), "");
// check output
PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("X")), "");
PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("Scale")), "");
PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("Bias")), "");
const auto x_dims = ctx->GetInputDim("X");
const TensorFormat tensor_format =
StringToTensorFormat(ctx->Attrs().Get<std::string>("tensor_format"));
const int C =
(tensor_format == TensorFormat::NCHW ? x_dims[1]
: x_dims[x_dims.size() - 1]);
ctx->SetOutputDim(framework::GradVarName("X"), x_dims);
ctx->SetOutputDim(framework::GradVarName("Scale"), {C});
ctx->SetOutputDim(framework::GradVarName("Bias"), {C});
}
};
template <typename T>
class BatchNormGradKernel<platform::CPUPlace, T>
: public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
const auto *x = ctx.Input<Tensor>("X");
const auto *d_y = ctx.Input<Tensor>(framework::GradVarName("Y"));
const auto *scale = ctx.Input<Tensor>("Scale");
const auto *saved_mean = ctx.Input<Tensor>("SavedMean");
// SavedVariance have been reverted in forward operator
const auto *saved_inv_variance = ctx.Input<Tensor>("SavedVariance");
const std::string tensor_format_str =
ctx.Attr<std::string>("tensor_format");
const TensorFormat tensor_format = StringToTensorFormat(tensor_format_str);
// Get the size for each dimension.
// NCHW [batch_size, in_channels, in_height, in_width]
const auto &x_dims = x->dims();
PADDLE_ENFORCE(x_dims.size() >= 3 && x_dims.size() <= 5,
"The Input dim size should be between 3 and 5");
const int N = x_dims[0];
const int C =
(tensor_format == TensorFormat::NCHW ? x_dims[1]
: x_dims[x_dims.size() - 1]);
const int sample_size = x->numel() / N / C;
ConstEigenVectorArrayMap<T> scale_arr(scale->data<T>(), C);
ConstEigenVectorArrayMap<T> mean_arr(saved_mean->data<T>(), C);
ConstEigenVectorArrayMap<T> inv_var_arr(saved_inv_variance->data<T>(), C);
// init output
auto *d_x = ctx.Output<Tensor>(framework::GradVarName("X"));
auto *d_scale = ctx.Output<Tensor>(framework::GradVarName("Scale"));
auto *d_bias = ctx.Output<Tensor>(framework::GradVarName("Bias"));
d_x->mutable_data<T>(ctx.GetPlace());
d_scale->mutable_data<T>(ctx.GetPlace());
d_bias->mutable_data<T>(ctx.GetPlace());
// d_bias = np.sum(d_y, axis=0)
// d_scale = np.sum((X - mean) / inv_std * dy, axis=0)
// d_x = (1. / N) * scale * inv_var * (N * d_y - np.sum(d_y, axis=0)
// - (X - mean) * inv_var * inv_var * np.sum(d_y * (X - mean), axis=0))
EigenVectorArrayMap<T> d_bias_arr(d_bias->mutable_data<T>(ctx.GetPlace()),
C);
EigenVectorArrayMap<T> d_scale_arr(d_scale->mutable_data<T>(ctx.GetPlace()),
C);
d_bias_arr.setZero();
d_scale_arr.setZero();
const auto scale_inv_var_nhw = scale_arr * inv_var_arr / (N * sample_size);
switch (tensor_format) {
case TensorFormat::NCHW: {
ConstEigenArrayMap<T> x_arr(x->data<T>(), sample_size, N * C);
ConstEigenArrayMap<T> d_y_arr(d_y->data<T>(), sample_size, N * C);
EigenArrayMap<T> d_x_arr(d_x->mutable_data<T>(ctx.GetPlace()),
sample_size, N * C);
d_x_arr.setZero();
for (int nc = 0; nc < N * C; ++nc) {
int c = nc % C;
d_bias_arr(c) += d_y_arr.col(nc).sum();
d_scale_arr(c) +=
((x_arr.col(nc) - mean_arr(c)) * inv_var_arr(c) * d_y_arr.col(nc))
.sum();
}
for (int nc = 0; nc < N * C; ++nc) {
int c = nc % C;
d_x_arr.col(nc) +=
scale_inv_var_nhw(c) *
(d_y_arr.col(nc) * N * sample_size - d_bias_arr(c) -
(x_arr.col(nc) - mean_arr[c]) * d_scale_arr(c) * inv_var_arr(c));
}
break;
}
case TensorFormat::NHWC: {
ConstEigenArrayMap<T> x_arr(x->data<T>(), C, N * sample_size);
ConstEigenArrayMap<T> d_y_arr(d_y->data<T>(), C, N * sample_size);
EigenArrayMap<T> d_x_arr(d_x->mutable_data<T>(ctx.GetPlace()), C,
N * sample_size);
d_x_arr.setZero();
const auto d_y_row_sum = d_y_arr.rowwise().sum();
const auto x_minus_mean = x_arr.colwise() - mean_arr;
const auto d_y_mul_x_minus_mean_row_sum =
(d_y_arr * x_minus_mean).rowwise().sum();
const auto inv_var_sqr = inv_var_arr * inv_var_arr;
for (int nhw = 0; nhw < N * sample_size; ++nhw) {
d_bias_arr += d_y_arr.col(nhw);
d_scale_arr +=
(x_arr.col(nhw) - mean_arr) * inv_var_arr * d_y_arr.col(nhw);
d_x_arr.col(nhw) +=
scale_inv_var_nhw *
(d_y_arr.col(nhw) * N * sample_size - d_y_row_sum -
x_minus_mean.col(nhw) * inv_var_sqr *
d_y_mul_x_minus_mean_row_sum);
}
break;
}
default:
PADDLE_THROW("Unknown storage order: %s", tensor_format_str);
}
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OP(batch_norm, ops::BatchNormOp, ops::BatchNormOpMaker,
batch_norm_grad, ops::BatchNormGradOp);
REGISTER_OP_CPU_KERNEL(batch_norm,
ops::BatchNormKernel<paddle::platform::CPUPlace, float>);
REGISTER_OP_CPU_KERNEL(
batch_norm_grad,
ops::BatchNormGradKernel<paddle::platform::CPUPlace, float>);
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include "paddle/framework/eigen.h"
#include "paddle/framework/op_registry.h"
namespace paddle {
namespace operators {
enum TensorFormat {
NHWC = 0,
NCHW = 1,
};
inline TensorFormat StringToTensorFormat(const std::string& str) {
if (str == "NHWC" || str == "nhwc") {
return TensorFormat::NHWC;
} else if (str == "NCHW" || str == "nchw") {
return TensorFormat::NCHW;
} else {
PADDLE_THROW("Unknown storage order string: %s", str);
}
}
template <typename Place, typename T>
class BatchNormKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override;
};
template <typename Place, typename T>
class BatchNormGradKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override;
};
} // namespace operators
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/framework/eigen.h"
#include "paddle/framework/op_registry.h"
#include <fstream>
namespace paddle {
namespace operators {
using framework::Tensor;
using framework::LoDTensor;
inline static std::string VarToFileName(const std::string& folder_path,
const std::string& var_name) {
return folder_path + "/__" + var_name + "__";
}
class SaveOp : public framework::OperatorBase {
public:
SaveOp(const std::string& type, const framework::VariableNameMap& inputs,
const framework::VariableNameMap& outputs,
const framework::AttributeMap& attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void Run(const framework::Scope& scope,
const platform::DeviceContext& dev_ctx) const override {
const auto& var_names = this->Inputs("X");
for (const auto& name : var_names) {
PADDLE_ENFORCE_NOT_NULL(scope.FindVar(name),
"Can not find variable '%s' in the scope.", name);
}
std::string folder_path = this->Attr<std::string>("folderPath");
PADDLE_ENFORCE(!folder_path.empty(),
"'folderPath' of SaveOp shouldn't be empty.");
VLOG(1) << "Save variables to folder: " << folder_path;
for (const auto& name : var_names) {
std::string file_name = VarToFileName(folder_path, name);
std::ofstream fout(file_name, std::ofstream::out);
PADDLE_ENFORCE(fout.is_open(), "Fail to create file %s.", file_name);
const LoDTensor& tensor = scope.FindVar(name)->Get<LoDTensor>();
std::string bytes = tensor.SerializeToString();
fout << bytes;
fout.close();
}
VLOG(1) << "Compelete saving variables. Items count: " << var_names.size();
}
};
class SaveOpMaker : public framework::OpProtoAndCheckerMaker {
public:
SaveOpMaker(framework::OpProto* proto, framework::OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X",
"(tensor), the tensor count can be 1~INT_MAX, tensors names which "
"values will be saved.")
.AsDuplicable();
AddAttr<std::string>("folderPath", "the folderPath for save model.");
AddComment(R"DOC(
Save the input tensors to a binary file based on input tensor names and absolute path.
All the inputs can carry the LoD (Level of Details) information,
or not.
)DOC");
}
};
class RestoreOp : public framework::OperatorBase {
public:
RestoreOp(const std::string& type, const framework::VariableNameMap& inputs,
const framework::VariableNameMap& outputs,
const framework::AttributeMap& attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void Run(const framework::Scope& scope,
const platform::DeviceContext& dev_ctx) const override {
const auto& var_names = this->Outputs("Out");
for (const auto& name : var_names) {
PADDLE_ENFORCE_NOT_NULL(scope.FindVar(name),
"Can not find variable '%s' in the scope.", name);
}
std::string folder_path = this->Attr<std::string>("folderPath");
PADDLE_ENFORCE(!folder_path.empty(),
"'folderPath' of RestoreOp shouldn't be empty.");
VLOG(1) << "Try loading variables from folder: " << folder_path;
for (const auto& name : var_names) {
std::string file_name = VarToFileName(folder_path, name);
std::ifstream fin(file_name, std::ifstream::in);
PADDLE_ENFORCE(fin.is_open(), "Fail to open file %s.", file_name);
const size_t kBufferSize = 4096; // equal to linux page size
char buffer[kBufferSize];
std::string cache;
while (!fin.eof()) {
fin.read(buffer, kBufferSize);
cache.append(buffer, fin.gcount());
}
LoDTensor* tensor = scope.FindVar(name)->GetMutable<LoDTensor>();
tensor->DeserializeFromString(cache, dev_ctx.GetPlace());
fin.close();
}
VLOG(1) << "Complete loading variables.";
}
};
class RestoreOpMaker : public framework::OpProtoAndCheckerMaker {
public:
RestoreOpMaker(framework::OpProto* proto,
framework::OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddOutput("Out",
"(tensor), the tensor count can be 1~INT_MAX, tensors which "
"values will be restores.")
.AsDuplicable();
AddAttr<std::string>("folderPath", "the folderPath for model file.");
AddAttr<int>("data_type", "output tensor data type")
.SetDefault(framework::DataType::FP32);
AddComment(R"DOC(
Restore the tensors from model file based on absolute path.
All the tensors outputs may carry the LoD (Level of Details) information,
or not.
)DOC");
}
};
} // namespace operators
} // namespace paddle
REGISTER_OPERATOR(save, paddle::operators::SaveOp,
paddle::framework::EmptyGradOpMaker,
paddle::operators::SaveOpMaker);
REGISTER_OPERATOR(restore, paddle::operators::RestoreOp,
paddle::framework::EmptyGradOpMaker,
paddle::operators::RestoreOpMaker);
......@@ -25,19 +25,17 @@ void AdadeltaOptimizer::Update(const Tensor* gradient) {
}
}
const char* AdadeltaOptimizer::SerializeState(int* state_len) {
std::string AdadeltaOptimizer::SerializeState() {
AdadeltaOptimizerState state;
state.set_num_sample_passed(num_sample_passed_);
std::string lr_str = this->lr_policy_->SerializeState(state_len);
std::string lr_str = this->lr_policy_->SerializeState();
state.mutable_lr_state()->ParseFromString(lr_str);
TensorToProto(*parameter_, state.mutable_parameter());
TensorToProto(*accum_gradient_, state.mutable_accum_gradient());
TensorToProto(*accum_delta_, state.mutable_accum_delta());
TensorToProto(*update_delta_, state.mutable_update_delta());
auto str = state.SerializeAsString();
*state_len += str.size();
return str.c_str();
return state.SerializeAsString();
}
void AdadeltaOptimizer::DeserializeState(const std::string& str) {
......
......@@ -23,7 +23,7 @@ public:
if (update_delta_) delete update_delta_;
}
void Update(const Tensor *gradient);
const char *SerializeState(int *state_len);
std::string SerializeState();
void DeserializeState(const std::string &state);
private:
......
......@@ -17,17 +17,15 @@ void AdagradOptimizer::Update(const Tensor* gradient) {
learning_rate * decay_ * param[i];
}
}
const char* AdagradOptimizer::SerializeState(int* state_len) {
std::string AdagradOptimizer::SerializeState() {
AdagradOptimizerState state;
state.set_num_sample_passed(num_sample_passed_);
std::string lr_str = this->lr_policy_->SerializeState(state_len);
std::string lr_str = this->lr_policy_->SerializeState();
state.mutable_lr_state()->ParseFromString(lr_str);
TensorToProto(*parameter_, state.mutable_parameter());
TensorToProto(*accum_gradient_, state.mutable_accum_gradient());
auto str = state.SerializeAsString();
*state_len += str.size();
return str.c_str();
return state.SerializeAsString();
}
void AdagradOptimizer::DeserializeState(const std::string& str) {
......
......@@ -19,7 +19,7 @@ public:
if (accum_gradient_) delete accum_gradient_;
}
void Update(const Tensor *gradient);
const char *SerializeState(int *state_len);
std::string SerializeState();
void DeserializeState(const std::string &state);
private:
......
......@@ -22,18 +22,16 @@ void AdamOptimizer::Update(const Tensor *gradient) {
}
}
const char *AdamOptimizer::SerializeState(int *state_len) {
std::string AdamOptimizer::SerializeState() {
AdamOptimizerState state;
std::string lr_str = this->lr_policy_->SerializeState(state_len);
std::string lr_str = this->lr_policy_->SerializeState();
state.mutable_lr_state()->ParseFromString(lr_str);
state.set_num_sample_passed(num_sample_passed_);
TensorToProto(*parameter_, state.mutable_parameter());
TensorToProto(*momentums_, state.mutable_momentums());
TensorToProto(*velocitys_, state.mutable_velocitys());
auto str = state.SerializeAsString();
*state_len += str.size();
return str.c_str();
return state.SerializeAsString();
}
void AdamOptimizer::DeserializeState(const std::string &str) {
......
......@@ -25,7 +25,7 @@ public:
if (velocitys_) delete velocitys_;
}
void Update(const Tensor *gradient);
const char *SerializeState(int *state_len);
std::string SerializeState();
void DeserializeState(const std::string &state);
private:
......
......@@ -10,7 +10,7 @@ class LrPolicy {
public:
virtual ~LrPolicy() {}
virtual double LearningRate(const uint64_t num_sample_passed) = 0;
virtual const char *SerializeState(int *state_len) = 0;
virtual std::string SerializeState() = 0;
virtual void DeserializeState(const std::string &state) = 0;
};
......@@ -21,12 +21,10 @@ public:
double LearningRate(const uint64_t num_sample_passed) {
return learning_rate_;
}
const char *SerializeState(int *state_len) {
std::string SerializeState() {
LrPolicyState state;
state.set_learning_rate(learning_rate_);
auto str = state.SerializeAsString();
*state_len = str.size();
return str.c_str();
return state.SerializeAsString();
}
void DeserializeState(const std::string &str) {
LrPolicyState state;
......@@ -46,14 +44,12 @@ public:
return std::max(learning_rate_ - lr_decay_a_ * num_sample_passed,
lr_decay_b_);
}
const char *SerializeState(int *state_len) {
std::string SerializeState() {
LrPolicyState state;
state.set_learning_rate(learning_rate_);
state.set_lr_decay_a(lr_decay_a_);
state.set_lr_decay_b(lr_decay_b_);
auto str = state.SerializeAsString();
*state_len = str.size();
return str.c_str();
return state.SerializeAsString();
}
void DeserializeState(const std::string &str) {
LrPolicyState state;
......
#include "optimizer.h"
#include <glog/logging.h>
#include <cstdlib>
#include <cstring>
#include <string>
#include "parameter_optimizer.h"
......@@ -78,7 +81,13 @@ int paddle_optimizer_get_weights(paddle_optimizer* o, void** param_buffer) {
}
int paddle_optimizer_get_state(paddle_optimizer* o, const char** state) {
int state_len = 0;
*state = o->impl->SerializeState(&state_len);
std::string s = o->impl->SerializeState();
int state_len = s.size();
if (state_len > 0) {
*state = (char*)std::malloc(state_len);
std::memcpy((void*)*state, (const void*)s.c_str(), state_len);
}
return state_len;
}
......@@ -32,6 +32,7 @@ ParameterOptimizer *ParameterOptimizer::Create(const std::string &config_proto,
Tensor *parameter,
const OptimizerConfig &config) -> ParameterOptimizer * {
if (config.optimizer() == OptimizerConfig::SGD) {
LOG(INFO) << "creating SGD optimizer";
return new SGDOptimizer(parameter,
lr,
config.sgd().momentum(),
......@@ -39,6 +40,7 @@ ParameterOptimizer *ParameterOptimizer::Create(const std::string &config_proto,
config.sgd().nesterov());
}
if (config.optimizer() == OptimizerConfig::Adadelta) {
LOG(INFO) << "creating Adadelta optimizer";
return new AdadeltaOptimizer(parameter,
lr,
config.adadelta().rho(),
......@@ -46,10 +48,12 @@ ParameterOptimizer *ParameterOptimizer::Create(const std::string &config_proto,
config.adadelta().decay());
}
if (config.optimizer() == OptimizerConfig::Adagrad) {
LOG(INFO) << "creating Adagrad optimizer";
return new AdagradOptimizer(
parameter, lr, config.adagrad().epsilon(), config.adagrad().decay());
}
if (config.optimizer() == OptimizerConfig::Adam) {
LOG(INFO) << "creating Adam optimizer";
return new AdamOptimizer(parameter,
lr,
config.adam().beta_1(),
......
......@@ -28,7 +28,7 @@ public:
Tensor *parameter);
virtual void Update(const Tensor *gradient) = 0;
virtual float *get_weight(int *param_size) const;
virtual const char *SerializeState(int *state_len) = 0;
virtual std::string SerializeState() = 0;
virtual void DeserializeState(const std::string &state) = 0;
protected:
......
......@@ -85,6 +85,7 @@ public:
for (size_t i = 0; i < opts_.size(); ++i) {
int s = 0;
float* newp = (float*)opts_[i]->get_weight(&s);
EXPECT_EQ(s, kSize);
for (size_t j = 0; j < kSize; ++j) {
EXPECT_EQ(newp[j], (*p)[j]);
}
......@@ -99,10 +100,20 @@ public:
}
void TestCheckPoint() {
paddle::optimizer::Tensor* p = FixedTensor(kSize);
for (size_t i = 0; i < opts_.size(); ++i) {
int state_len = 0;
std::string state = opts_[i]->SerializeState(&state_len);
auto state = opts_[i]->SerializeState();
opts_[i]->DeserializeState(state);
auto state1 = opts_[i]->SerializeState();
opts_[i]->DeserializeState(state);
EXPECT_EQ(state, state1);
int s = 0;
float* newp = (float*)opts_[i]->get_weight(&s);
EXPECT_EQ(s, kSize);
for (size_t j = 0; j < kSize; ++j) {
EXPECT_EQ(newp[j], (*p)[j]);
}
}
}
......
......@@ -21,7 +21,22 @@ TEST(TensorToProto, Case1) {
paddle::optimizer::Tensor t(3), t1(3);
for (size_t i = 0; i < t.size(); ++i) {
t[i] = i;
t1[i] = 0;
t1[i] = 10;
}
paddle::TensorProto proto;
paddle::optimizer::TensorToProto(t, &proto);
paddle::optimizer::ProtoToTensor(proto, &t1);
for (size_t i = 0; i < t1.size(); ++i) {
EXPECT_EQ(t1[i], t[i]);
}
}
TEST(TensorToProto, Case2) {
paddle::optimizer::Tensor t(1), t1(1);
for (size_t i = 0; i < t.size(); ++i) {
t[i] = i;
t1[i] = 10;
}
paddle::TensorProto proto;
......
......@@ -27,16 +27,14 @@ void SGDOptimizer::Update(const Tensor *gradient) {
}
}
const char *SGDOptimizer::SerializeState(int *state_len) {
std::string SGDOptimizer::SerializeState() {
SGDOptimizerState state;
state.set_num_sample_passed(num_sample_passed_);
std::string lr_str = this->lr_policy_->SerializeState(state_len);
std::string lr_str = this->lr_policy_->SerializeState();
state.mutable_lr_state()->ParseFromString(lr_str);
TensorToProto(*parameter_, state.mutable_parameter());
if (momentum_ != 0.0) TensorToProto(*momentums_, state.mutable_momentums());
auto str = state.SerializeAsString();
*state_len += str.size();
return str.c_str();
return state.SerializeAsString();
}
void SGDOptimizer::DeserializeState(const std::string &str) {
......
......@@ -23,7 +23,7 @@ public:
if (momentums_) delete momentums_;
}
void Update(const Tensor* gradient);
const char* SerializeState(int* state_len);
std::string SerializeState();
void DeserializeState(const std::string& state);
private:
......
......@@ -224,7 +224,8 @@ void BindVarDsec(py::module &m) {
.value("LOD_TENSOR", VarDesc::LOD_TENSOR)
.value("SELECTED_ROWS", VarDesc::SELECTED_ROWS)
.value("FEED_MINIBATCH", VarDesc::FEED_MINIBATCH)
.value("FETCH_LIST", VarDesc::FETCH_LIST);
.value("FETCH_LIST", VarDesc::FETCH_LIST)
.value("STEP_SCOPES", VarDesc::STEP_SCOPES);
}
void BindOpDesc(py::module &m) {
......
from paddle.v2.framework import framework as framework
__all__ = ['append_backward_ops']
def append_backward_ops(loss, parameter_list=None, no_grad_set=None):
"""
Create and add gradient Operators in BlockDesc to compute
gradients of `loss` for parameters in parameter_list
:param loss: an variable generated by cost function.
:type loss: Variable
:param no_grad_set: variable that should not create gradient
:type no_grad_set: set
:param parameter_list: parameters that need to compute gradient and
update to optimize the lost.
:type: list
:return: list of (parameters, gradients) pair.
:rtype: list[Variable]
"""
assert isinstance(loss, framework.Variable)
param_grad_map = loss.block.program.append_backward(loss, no_grad_set or
set())
if parameter_list is not None:
parameters = parameter_list
else:
params = loss.block.program.global_block().all_parameters()
parameters = [param.name for param in params]
params_and_grads = []
for param in parameters:
if param not in param_grad_map:
raise ValueError("param %s is not in map" % param)
grad_info = param_grad_map[param]
grad_block = loss.block.program.block(grad_info[1])
if not grad_block.has_var(grad_info[0]):
raise ValueError("grad block[{0}] did not have grad var {1}".format(
grad_info[1], grad_info[0]))
# Get the param var from the global block
param_var = loss.block.program.global_block().var(param)
grad_var = grad_block.var(grad_info[0])
if loss.block.has_var(grad_info[0]):
params_and_grads.append((param_var, grad_var))
else:
params_and_grads.append((param_var, None))
return params_and_grads
......@@ -261,7 +261,8 @@ class Operator(object):
self.desc.set_attr(attr_name, attrs[attr_name])
self.desc.check_attrs()
if type not in {'feed', 'fetch'}:
no_kernel_op_set = {'feed', 'fetch', 'save', 'restore'}
if type not in no_kernel_op_set:
self.desc.infer_var_type(self.block.desc)
self.desc.infer_shape(self.block.desc)
......
import paddle.v2.framework.framework as framework
from collections import defaultdict
__all__ = ['SGDOptimizer', 'MomentumOptimizer', 'AdagradOptimizer']
import paddle.v2.framework.framework as framework
from paddle.v2.framework.backward import append_backward_ops
__all__ = [
'SGDOptimizer', 'MomentumOptimizer', 'AdagradOptimizer', 'AdamOptimizer'
]
class Optimizer(object):
......@@ -43,6 +47,19 @@ class Optimizer(object):
"""
pass
def _finish_update(self, block):
"""Finish any custom updates needed
before completing an optimization step
Args:
block: the block in which the loss variable is present
parameters: list of parameter variables for the optimizer
Returns:
list of finish ops or None
"""
pass
def _add_accumulator(self, block, name, param, dtype=None, fill_value=0.0):
"""Utility function to add an accumulator for a parameter
......@@ -90,45 +107,6 @@ class Optimizer(object):
format(name, param.name))
return self._accumulators[name][param.name]
def create_backward_pass(self, loss, parameter_list=None, no_grad_set=None):
"""Create and add gradient Operators in BlockDesc to compute
gradients of `loss` for parameters in parameter_list
Args:
loss: an variable generated by cost function.
no_grad_set: variable that should not create gradient
parameter_list: parameters that need to compute gradient and
update to optimize the lost.
Returns:
list of (parameters, gradients) pair.
"""
assert isinstance(loss, framework.Variable)
param_grad_map = loss.block.program.append_backward(loss, no_grad_set or
set())
if parameter_list is not None:
parameters = parameter_list
else:
params = loss.block.program.global_block().all_parameters()
parameters = [param.name for param in params]
params_and_grads = []
for param in parameters:
if param not in param_grad_map:
raise Exception("param %s is not in map" % param)
grad_info = param_grad_map[param]
grad_block = loss.block.program.block(grad_info[1])
if not grad_block.has_var(grad_info[0]):
raise Exception("grad block[%d] did not have grad var %s" %
grad_info[1], grad_info[0])
# Get the param var from the global block
param_var = loss.block.program.global_block().var(param)
grad_var = grad_block.var(grad_info[0])
if loss.block.has_var(grad_info[0]):
params_and_grads.append((param_var, grad_var))
else:
params_and_grads.append((param_var, None))
return params_and_grads
def create_optimization_pass(self, parameters_and_grads, loss):
"""Add optimization operators to update gradients to variables.
......@@ -137,15 +115,17 @@ class Optimizer(object):
parameters_and_grads: a list of (variable, gradient) pair to update.
Returns:
optmization_op_list: a list of optimization operator that will update
parameter using gradient.
return_op_list: a list of operators that will complete one step of
optimization. This will include parameter update ops, global step
update ops and any other custom ops required by subclasses to manage
their internal state.
"""
# This is a default implementation of create_optimization_pass that
# can be shared by most optimizers. This implementation assumes that
# the subclass will implement the _append_optimize_op method and the
# _initialize_tensors method. The subclass can extend the
# _create_accumulators method if it needs to create accumulators
# for parameters.
# for parameters and extend _finish_update method to add custom ops.
# Create any accumulators
self._create_accumulators(loss.block,
......@@ -160,16 +140,26 @@ class Optimizer(object):
param_and_grad)
optimize_ops.append(optimize_op)
return optimize_ops
# Returned list of ops can include more ops in addition
# to optimization ops
return_ops = optimize_ops
# Get custom finish ops for subclasses
# FIXME: Need to fix this once we figure out how to handle dependencies
finish_ops = self._finish_update(loss.block)
if finish_ops is not None:
return_ops += finish_ops
return return_ops
def minimize(self, loss, parameter_list=None, no_grad_set=None):
"""Add operations to minimize `loss` by updating `parameter_list`.
This method combines interface `create_backward_pass()` and
This method combines interface `append_backward_ops()` and
`create_optimization_pass()` into one.
"""
params_grads = self.create_backward_pass(loss, parameter_list,
no_grad_set or set())
params_grads = append_backward_ops(loss, parameter_list, no_grad_set or
set())
optimize_ops = self.create_optimization_pass(params_grads, loss)
return optimize_ops
......@@ -329,3 +319,124 @@ class AdagradOptimizer(Optimizer):
attrs={"epsilon": self._epsilon})
return adagrad_op
class AdamOptimizer(Optimizer):
"""Implements the Adam Optimizer
"""
_moment1_acc_str = "moment1"
_moment2_acc_str = "moment2"
def __init__(self,
learning_rate=0.001,
beta1=0.9,
beta2=0.999,
epsilon=1e-8):
assert learning_rate is not None
assert beta1 is not None
assert beta2 is not None
assert epsilon is not None
super(AdamOptimizer, self).__init__()
self.type = "adam"
self._learning_rate = learning_rate
self._beta1 = beta1
self._beta2 = beta2
self._epsilon = epsilon
def _initialize_tensors(self, block):
assert isinstance(block, framework.Block)
lr_shape = [1]
# create a variable for learning_rate
self._lr = block.create_var(
dtype="float32", shape=lr_shape, lod_level=0)
# create an op to init the learning_rate
# FIXME: Fix when Initialization design has been implemented
# https://github.com/PaddlePaddle/Paddle/pull/4852
block.append_op(
type="fill_constant",
outputs={"Out": self._lr},
attrs={"shape": lr_shape,
"value": self._learning_rate})
def _create_accumulators(self, block, parameters):
assert isinstance(block, framework.Block)
global_block = block.program.global_block()
# Create beta1 and beta2 power tensors
beta_shape = [1]
# Create variables for beta1 and beta2 powers
self._beta1_pow_acc = global_block.create_var(
dtype="float32", shape=beta_shape, lod_level=0)
self._beta2_pow_acc = global_block.create_var(
dtype="float32", shape=beta_shape, lod_level=0)
# Initialize beta1 and beta2 power accumulators
# FIXME: Fix when Initialization design has been implemented
# https://github.com/PaddlePaddle/Paddle/pull/4852
global_block.append_op(
type="fill_constant",
outputs={"Out": self._beta1_pow_acc},
attrs={"shape": beta_shape,
"value": self._beta1})
global_block.append_op(
type="fill_constant",
outputs={"Out": self._beta2_pow_acc},
attrs={"shape": beta_shape,
"value": self._beta2})
# Create accumulator tensors for first and second moments
for p in parameters:
self._add_accumulator(block, self._moment1_acc_str, p, 'float32')
self._add_accumulator(block, self._moment2_acc_str, p, 'float32')
def _append_optimize_op(self, block, param_and_grad):
assert isinstance(block, framework.Block)
moment1 = self._get_accumulator(self._moment1_acc_str,
param_and_grad[0])
moment2 = self._get_accumulator(self._moment2_acc_str,
param_and_grad[0])
# create the momentum optimize op
adam_op = block.append_op(
type=self.type,
inputs={
"Param": param_and_grad[0],
"Grad": param_and_grad[1],
"LearningRate": self._lr,
"Moment1": moment1,
"Moment2": moment2,
"Beta1Pow": self._beta1_pow_acc,
"Beta2Pow": self._beta2_pow_acc
},
outputs={
"ParamOut": param_and_grad[0],
"Moment1Out": moment1,
"Moment2Out": moment2
},
attrs={
"beta1": self._beta1,
"beta2": self._beta2,
"epsilon": self._epsilon
})
return adam_op
def _finish_update(self, block):
"""Update Beta1 and Beta2 Power accumulators
"""
assert isinstance(block, framework.Block)
global_block = block.program.global_block()
scale_beta1 = global_block.append_op(
type="scale",
inputs={"X": self._beta1_pow_acc},
outputs={"Out": self._beta1_pow_acc},
attrs={"scale": self._beta1})
scale_beta2 = global_block.append_op(
type="scale",
inputs={"X": self._beta2_pow_acc},
outputs={"Out": self._beta2_pow_acc},
attrs={"scale": self._beta2})
return [scale_beta1, scale_beta2]
......@@ -390,7 +390,8 @@ class OpTest(unittest.TestCase):
output_names,
no_grad_set=None,
in_place=False,
max_relative_error=0.005):
max_relative_error=0.005,
user_defined_grads=None):
self.scope = core.Scope()
op_inputs = self.inputs if hasattr(self, "inputs") else dict()
op_outputs = self.outputs if hasattr(self, "outputs") else dict()
......@@ -403,7 +404,7 @@ class OpTest(unittest.TestCase):
if not type(output_names) is list:
output_names = [output_names]
numeric_grads = [
numeric_grads = user_defined_grads or [
get_numeric_gradient(
self.scope,
self.op,
......
import unittest
import numpy as np
from op_test import OpTest, get_backward_op, grad_var_name
import paddle.v2.framework.core as core
from paddle.v2.framework.op import Operator
def _reference_training(x, scale, offset, epsilon, data_format):
if data_format != "NHWC":
raise ValueError("data_format must be NHWC, got %s." % data_format)
x_square = x * x
x_square_sum = np.sum(x_square, (0, 1, 2))
x_sum = np.sum(x, axis=(0, 1, 2))
element_count = np.size(x) / int(np.shape(x)[-1])
mean = x_sum / element_count
var = x_square_sum / element_count - mean * mean
normalized = (x - mean) / np.sqrt(var + epsilon)
return (normalized * scale + offset), mean, var
def _reference_grad(x, grad_y, scale, mean, var, epsilon, data_format):
# Use the following formulas to calculate gradients:
# grad_scale =
# sum(grad_y * (x - mean)) * rsqrt(var + epsilon)
#
# grad_offset = sum(output_y)
#
# grad_x =
# 1/N * scale * rsqrt(var + epsilon) * (N * grad_y - sum(grad_y) -
# (x - mean) * sum(grad_y * (x - mean)) / (var + epsilon))
if data_format != "NHWC":
raise ValueError("data_format must be NHWC, got %s." % data_format)
grad_x = scale * (grad_y - np.mean(
grad_y, axis=(0, 1, 2)) - (x - mean) * np.mean(
grad_y * (x - mean), axis=(0, 1, 2)) /
(var + epsilon)) / np.sqrt(var + epsilon)
grad_scale = np.sum(grad_y * (x - mean) / np.sqrt(var + epsilon),
axis=(0, 1, 2))
grad_offset = np.sum(grad_y, axis=(0, 1, 2))
return grad_x, grad_scale, grad_offset
def create_or_get_tensor(scope, var_name, var, place):
tensor = scope.var(var_name).get_tensor()
if var is not None:
assert isinstance(var, np.ndarray)
tensor.set_lod([[]])
tensor.set_dims(var.shape)
tensor.set(var, place)
return tensor
def set_output_grad(scope, outputs, place):
def __set_tensor__(name):
out_tensor = scope.find_var(name).get_tensor()
grad_tensor = scope.var(grad_var_name(name)).get_tensor()
out_dtype = out_tensor.dtype()
if out_dtype == core.DataType.FP64:
data = np.ones(out_tensor.shape(), dtype=np.float64)
elif out_dtype == core.DataType.FP32:
data = np.ones(out_tensor.shape(), dtype=np.float32)
else:
raise ValueError("Not supported data type " + str(out_dtype))
grad_tensor.set(data, place)
for output in outputs:
__set_tensor__(output)
class TestBatchNormOp(OpTest):
def __assert_close(self, tensor, np_array, msg, atol=1e-4):
self.assertTrue(np.allclose(np.array(tensor), np_array, atol=atol), msg)
def test_forward_backward(self):
# attr
data_format = "NHWC"
epsilon = 0.00001
momentum = 0.9
channel_num = 2
x_shape = [2, 3, 4, channel_num]
scale_shape = [channel_num]
# input
x_val = np.random.random_sample(x_shape).astype(np.float32)
scale_val = np.random.random_sample(scale_shape).astype(np.float32)
bias_val = np.random.random_sample(scale_shape).astype(np.float32)
mean = np.zeros(scale_shape).astype(np.float32)
variance = np.zeros(scale_shape).astype(np.float32)
# run forward
y_out, saved_mean, var_ref = _reference_training(
x_val, scale_val, bias_val, epsilon, data_format)
# run backward
mean_out = saved_mean * (1 - momentum)
variance_out = var_ref * (1 - momentum)
saved_variance = 1 / np.sqrt(var_ref + epsilon)
# for gradient test
y_grad = np.ones(x_shape).astype(np.float32)
x_grad_ref, scale_grad_ref, bias_grad_ref = _reference_grad(
x_val, y_grad, scale_val, saved_mean, var_ref, epsilon, data_format)
def test_with_place(place):
scope = core.Scope()
# create input
x_tensor = create_or_get_tensor(scope, "x_val", x_val, place)
scale_tensor = create_or_get_tensor(scope, "scale_val", scale_val,
place)
bias_tensor = create_or_get_tensor(scope, "bias_val", bias_val,
place)
mean_tensor = create_or_get_tensor(scope, "mean", mean, place)
variance_tensor = create_or_get_tensor(scope, "variance", variance,
place)
# create output
y_tensor = create_or_get_tensor(scope, "y_out", None, place)
saved_mean_tensor = create_or_get_tensor(scope, "saved_mean", None,
place)
saved_variance_tensor = create_or_get_tensor(
scope, "saved_variance", None, place)
mean_out_tensor = mean_tensor
variance_out_tensor = variance_tensor
batch_norm_op = Operator(
"batch_norm",
# inputs
X="x_val",
Scale="scale_val",
Bias="bias_val",
Mean="mean",
Variance="variance",
# outputs
Y="y_out",
MeanOut="mean",
VarianceOut="variance",
SavedMean="saved_mean",
SavedVariance="saved_variance",
# attrs
is_test=False,
tensor_format=data_format,
momentum=momentum,
epsilon=epsilon)
ctx = core.DeviceContext.create(place)
batch_norm_op.run(scope, ctx)
# check forward result
self.__assert_close(y_tensor, y_out, "y_out")
self.__assert_close(saved_mean_tensor, saved_mean, "saved_mean")
self.__assert_close(saved_variance_tensor, saved_variance,
"saved_variance")
self.__assert_close(mean_out_tensor, mean_out, "mean_out")
# FIXME(qiao) figure out why with cuDNN variance_out have a higher error rate
if isinstance(place, core.GPUPlace):
atol = 5e-2
else:
atol = 1e-4
self.__assert_close(variance_out_tensor, variance_out,
"variance_out", atol)
# run backward
batch_norm_op_grad = get_backward_op(scope, batch_norm_op, set())
set_output_grad(
scope,
["y_out", "mean", "variance", "saved_mean", "saved_variance"],
place)
batch_norm_op_grad.run(scope, ctx)
x_grad_tensor = create_or_get_tensor(scope,
grad_var_name("x_val"), None,
place)
scale_grad_tensor = create_or_get_tensor(scope,
grad_var_name("scale_val"),
None, place)
bias_grad_tensor = create_or_get_tensor(scope,
grad_var_name("bias_val"),
None, place)
# check gradient output
self.__assert_close(x_grad_tensor, x_grad_ref, "x_grad")
self.__assert_close(scale_grad_tensor, scale_grad_ref, "scale_grad")
self.__assert_close(bias_grad_tensor, bias_grad_ref, "bias_grad")
places = [core.CPUPlace()]
if core.is_compile_gpu() and core.op_support_gpu("batch_norm"):
places.append(core.GPUPlace(0))
for place in places:
test_with_place(place)
if __name__ == '__main__':
unittest.main()
......@@ -2,6 +2,7 @@ import unittest
import paddle.v2.framework.framework as framework
import paddle.v2.framework.optimizer as optimizer
from paddle.v2.framework.backward import append_backward_ops
class TestOptimizer(unittest.TestCase):
......@@ -51,7 +52,7 @@ class TestMomentumOptimizer(unittest.TestCase):
outputs={"Out": mul_out},
attrs={"x_num_col_dims": 1})
momentum_optimizer = self.MockMomentum(learning_rate=0.01, momentum=0.2)
params_grads = momentum_optimizer.create_backward_pass(mul_out)
params_grads = append_backward_ops(mul_out)
self.assertEqual(len(params_grads), 1)
self.assertEqual(len(momentum_optimizer.get_accumulators()), 0)
opts = momentum_optimizer.create_optimization_pass(params_grads,
......@@ -93,7 +94,7 @@ class TestAdagradOptimizer(unittest.TestCase):
outputs={"Out": mul_out},
attrs={"x_num_col_dims": 1})
adagrad_optimizer = self.MockAdagrad(learning_rate=0.01, epsilon=1.0e-6)
params_grads = adagrad_optimizer.create_backward_pass(mul_out)
params_grads = append_backward_ops(mul_out)
self.assertEqual(len(params_grads), 1)
self.assertEqual(len(adagrad_optimizer.get_accumulators()), 0)
opts = adagrad_optimizer.create_optimization_pass(params_grads, mul_out)
......@@ -110,5 +111,54 @@ class TestAdagradOptimizer(unittest.TestCase):
self.assertTrue(mul_x.name in moment_acc)
class TestAdamOptimizer(unittest.TestCase):
class MockAdam(optimizer.AdamOptimizer):
def get_accumulators(self):
return self._accumulators
def get_moment1_str(self):
return self._moment1_acc_str
def get_moment2_str(self):
return self._moment2_acc_str
def test_adam_optimizer(self):
program = framework.Program()
block = program.global_block()
mul_x = block.create_parameter(
dtype="float32", shape=[5, 10], lod_level=0, name="mul.x")
mul_y = block.create_var(
dtype="float32", shape=[10, 8], lod_level=0, name="mul.y")
mul_out = block.create_var(
dtype="float32", shape=[5, 8], lod_level=0, name="mul.out")
block.append_op(
type="mul",
inputs={"X": mul_x,
"Y": mul_y},
outputs={"Out": mul_out},
attrs={"x_num_col_dims": 1})
adam_optimizer = self.MockAdam(
learning_rate=0.01, beta1=0.9, beta2=0.999)
params_grads = append_backward_ops(mul_out)
self.assertEqual(len(params_grads), 1)
self.assertEqual(len(adam_optimizer.get_accumulators()), 0)
opts = adam_optimizer.create_optimization_pass(params_grads, mul_out)
self.assertEqual(len(opts), 3)
adam_op = opts[0]
self.assertEqual(adam_op.type, "adam")
# Check accumulators
accumulators = adam_optimizer.get_accumulators()
self.assertEqual(len(accumulators), 2)
self.assertTrue(adam_optimizer.get_moment1_str() in accumulators)
self.assertTrue(adam_optimizer.get_moment2_str() in accumulators)
moment1_acc = accumulators[adam_optimizer.get_moment1_str()]
moment2_acc = accumulators[adam_optimizer.get_moment2_str()]
self.assertEqual(len(moment1_acc), 1)
self.assertEqual(len(moment2_acc), 1)
self.assertTrue(mul_x.name in moment1_acc)
self.assertTrue(mul_x.name in moment2_acc)
if __name__ == '__main__':
unittest.main()
import paddle.v2.framework.core as core
import paddle.v2.framework.framework as framework
import paddle.v2.framework.executor as executor
import numpy as np
import unittest
import os
import sys
import shutil
FOLDER_PATH = "./tmp_test_dir"
class TestSaveRestoreOp(unittest.TestCase):
def test_save_restore_op(self):
tensor_1_val = np.random.rand(3, 9).astype("float32")
tensor_2_val = np.random.randint(0, 20, size=(4, 2)).astype("int32")
place = core.CPUPlace()
program = framework.Program()
block = program.global_block()
v_a = block.create_var(
dtype="float32", shape=[3, 9], lod_level=0, name="tensor_1")
v_b = block.create_var(
dtype="int32", shape=[4, 2], lod_level=0, name="tensor_2")
t_1 = core.LoDTensor()
t_1.set(tensor_1_val, place)
t_2 = core.LoDTensor()
t_2.set(tensor_2_val, place)
block.append_op(
type="save",
inputs={"X": [v_a, v_b]},
attrs={"folderPath": FOLDER_PATH})
block.append_op(
type="fill_constant",
outputs={"Out": [v_a]},
attrs={"shape": [2, 2],
"value": 0.0})
block.append_op(
type="fill_constant",
outputs={"Out": [v_b]},
attrs={"shape": [2, 2],
"value": 0.0})
block.append_op(
type="restore",
outputs={"Out": [v_a, v_b]},
attrs={"folderPath": FOLDER_PATH})
if os.path.exists(FOLDER_PATH):
shutil.rmtree(FOLDER_PATH)
os.makedirs(FOLDER_PATH)
exe = executor.Executor(place)
out = exe.run(program,
feed={"tensor_1": t_1,
"tensor_2": t_2},
fetch_list=[v_a, v_b])
self.assertTrue(os.path.isdir(FOLDER_PATH))
self.assertTrue(os.path.isfile(FOLDER_PATH + "/__tensor_1__"))
self.assertTrue(os.path.isfile(FOLDER_PATH + "/__tensor_2__"))
self.assertTrue(np.array_equal(np.array(out[0]), tensor_1_val))
self.assertTrue(np.array_equal(np.array(out[1]), tensor_2_val))
shutil.rmtree(FOLDER_PATH)
if __name__ == "__main__":
unittest.main()
import unittest
from paddle.v2.framework.framework import Variable, g_program
from paddle.v2.framework.framework import Variable, g_program, Program
import paddle.v2.framework.core as core
import numpy as np
......@@ -36,6 +36,13 @@ class TestVariable(unittest.TestCase):
self.assertRaises(ValueError,
lambda: b.create_var(name="fc.w", shape=(24, 100)))
def test_step_scopes(self):
prog = Program()
b = prog.current_block()
var = b.create_var(
name='step_scopes', type=core.VarDesc.VarType.STEP_SCOPES)
self.assertEqual(core.VarDesc.VarType.STEP_SCOPES, var.type)
if __name__ == '__main__':
unittest.main()
......@@ -49,7 +49,7 @@ def save_model(parameters, path):
' in environment variable.')
etcd_ip = os.environ.get(etcd_name)
client = master.client("http://" + etcd_ip + ":2379", 5, 0)
client = paddle.v2.master.client("http://" + etcd_ip + ":2379", 5, 0)
r = client.request_save_model(trainer_id, 5000)
if r == 0:
# do not need to save
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册