csp.md 8.2 KB
Newer Older
Y
Yi Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Design Doc: CSP in PaddlePaddle Fluid

## Motivation

Concurrent programming is important for deep learning.  Few example applications are:

1.  The main thread keeps reading the next mini-batch while another thread uses the GPU for computing.
2.  The main thread performs the computation while another thread uploads the local gradients from each trainer to the parameter server.

Most DL systems, including TensorFlow, Caffe2, and MxNet, can asynchronously execute operators in a graph. However, Fluid doesn't have the concept of a graph at all, as the design goal of Fluid is that of a programming language.

## Concurrent Programming Models

There were many concurrent programming models, implemented in various forms:

_青葱's avatar
_青葱 已提交
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
<table>
<thead>
<tr>
<th>concurrent programming model</th>
<th>implementation</th>
</tr>
</thead>
<tbody>
<tr>
<td>mutex </td>
<td>types and functions in standard libraries </td>
</tr>
<tr>
<td>semaphore </td>
<td> types and functions in standard libraries </td>
</tr>
<tr>
<td> communicating sequential processes (CSP)  </td>
<td> Go programming language </td>
</tr>
<tr>
<td> actor model  </td>
<td> Erlang programming language </td>
</tr>
<tr>
<td> message passing  </td>
<td> MPI </td>
</tr>
<tr>
<td> bulk synchronous parallel (BSP)   </td>
<td> Pregel distributed programming framework </td>
</tr>
</tbody>
</table>

Y
Yi Wang 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71

Since Fluid was designed to be a programming language, we would like to implement CSP in Fluid.

### CSP v.s. Actor Model

A well-known implementation of Actor Model is the Erlang programming language.  In Actor Model, *processes* could send messages to another process and receive messages from another process given the process IDs.  We can find the three ingredients, process with ID, send, and recv, in MPI too.  Indeed, we can rewrite Erlang programs in Python + MPI with possibly fewer lines of code.  Our concern with Actor Model is that it doesn't seem reasonable to implement process management in a programming language's runtime library; instead, it should be the operating systems' responsibility to manage processes and libraries like MPI for send/recv.

## CSP in Fluid

Fluid has two fundamental control-flows: *if-else* and *while*.  If we are to implement CSP, we need the following:

1. a new data type: *channel* and operators *send* and *recv*,
1. *goroutine* or thread, and
1. a new control-flow: select.

We also need Python wrappers for the above components.

The type *channel* is conceptually the blocking queue.  In Go, its implemented is a [blocking circular queue](https://github.com/golang/go/blob/68ce117cf17b8debf5754bfd476345779b5b6616/src/runtime/chan.go#L31-L50), which supports send and recv.

The `select` operation has been in OS kernels long before Go language.  All Unix kernels implement system calls *poll* and *select*.  They monitor multiple file descriptors to see if I/O is possible on any of them.  This takes O(N) time.  Since Linux 2.6, a new system call, *epoll*, can do the same in O(1) time.  In BSD systems, there is a similar system call *kqueue*.  Go's Linux implementation uses epoll.

72
It might be a good idea to implement Fluid's select using epoll too.  In this design doc, we start from the O(N) way so that we could focus on Python binding and the syntax.
Y
Yi Wang 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100

### Type Channel

Fluid supports many data types:

1. Tensor,
1. Row-sparse Tensor
1. LoD Tensor,
1. Tensor array, etc

Each data type is registered in the [`framework.proto`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/framework.proto#L117-L127) as an enum value.  To add a new type channel, we need to add a new type enum.

To expose a C++ type to Python, we need to edit the [`pybind.cc`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/pybind/pybind.cc) file.  [Here](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/pybind/pybind.cc#L120-L164) is an example how we expose C++ class LoDTensor.

## Syntax Design

### Create Channel

In Go, we create a channel by specifying the element type and buffer size:

```go
ch  := make(chan int)       // a channel without buffer
ch1 := make(chan int, 100)  // a channel that can buffer 100 ints.
```

In Fluid, we should be able to do the same:

```python
101 102
ch  = fluid.make_channel(dtype=INT)
ch1 = fluid.make_channel(dtype=INT, 100)
Y
Yi Wang 已提交
103 104 105 106 107
```

In addition to that, we want channels that can hold more complex element types, e.g., Tensors of float16:

```python
108
ch = fluid.make_channel(dtype=Tensor, etype=float16)
Y
Yi Wang 已提交
109 110 111 112 113 114 115 116
```

or Tensors of Tensors of float16 etc.

The point here is that we need a consistent way to compose types, like in C++ we can have `Tensor<Tensor<...<float16>...> >`.

### Send and Recv

117
Go's CSP implementation depends on data type *channel*. There are two types of channels:
118

119 120
1. The unblocked channel, or buffered channel, is a blocking queue with a non-zero sized buffer. The sending to buffered channel blocks if the buffer is full, and the receive operation blocks if the buffer is empty.
1. blocked channel, or unbuffered channel, is a blocking queue with no buffer.  Both sending and receiving block with unbuffered channels.
121

122
There are four types of actions with a channel:
123

124
1. Create a channel
125

126 127 128 129
   ```go
   ch := make(chan int) // this is an unbuffered channel
   ch := make(chan int, 100) // this is a buffered channel of 100 ints.
   ```
130

131
1. Send
132

133 134 135
   ```go
   ch <- 111
   ```
136

137
1. Recv
138

139 140 141
   ```go
   y, ok <- ch
   ```
142

143
1. Close
144

145 146 147
   ```go
   close(ch)
   ```
_青葱's avatar
_青葱 已提交
148

149
   Please be aware that a closed channel is not a nil channel, which is `var ch chan int`.
_青葱's avatar
_青葱 已提交
150

151
There are some [axioms with channels](https://dave.cheney.net/2014/03/19/channel-axioms):
152

153
1. A send to a nil channel blocks forever
154

155 156 157 158 159
1. A receive from a nil channel blocks forever

1. A send to a closed channel panics

1. A receive from a closed channel returns the residual values and then zeros.
160

161 162 163
In Fluid, we have [buffered channels](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/details/buffered_channel.h) and [unbuffered channels](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/details/unbuffered_channel.h)

The following program illustrates the Python syntax for accessing Fluid buffers.
164 165

```python
166 167 168 169
import fluid

buffer_size = 10
ch = fluid.make_channel(dtype=INT, buffer_size)
170

171 172 173
# Now write three elements to the channel
with fluid.while(steps=buffer_size):
  fluid.send(ch, step)
C
chengduo 已提交
174 175 176

fluid.close_channel(ch)

177 178 179 180 181 182 183 184
with fluid.while(steps=buffer_size):
  fluid.print(fluid.recv(ch))
```

The following example shows that to avoid the always-blocking behavior of unbuffered channels, we need to use Fluid's goroutines.

```python
import fluid
185 186 187

ch = fluid.make_channel(dtype=INT)

188 189
with fluid.go():
  fluid.send(ch)
190

191
y = fluid.recv(ch)
192

193
fluid.close_channel(ch)
194 195
```

Y
Yi Wang 已提交
196 197
### Select

198
In Go, the `select` statement lets a goroutine wait on multiple communication operations. A `select` blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240

```go

ch1  := make(chan int)       
ch2  := make(chan int, 100)

x := 0

for {
    select {
    case ch1 <- x:
      x := x + 1
    case y <- ch2:
      fmt.Println("Received on channel")
    default:
      fmt.Println("Default")
    }
  }

```

In Fluid, we should be able to do the same:

```python
ch1  = fluid.make_chan(dtype=INT)
ch2 = fluid.make_chan(dtype=INT, 100)

sel = fluid.select()

with sel.case(ch1, 'w', X):
    fluid.layers.increment(X)

with sel.case(ch2, 'r', Y):
    fluid.print("Received on Channel")

with sel.default():
    fluid.print("Default")

```

In the above code snippet, `X` and `Y` are variables. Now let us look at each of these statements one by one.

241
- `sel.case(ch1, 'w', X)` : This specifies that we are writing to `ch1` and we want to write the integer in variable `X` to the channel. The character `w` is used here to make the syntax familiar to write syntax in Python I/O.
242

243
- `sel.case(ch2, 'r', Y)` : This specifies that we would like to read the result from `ch2` into variable `Y`. The character `r` is used here to make the syntax familiar to read syntax in Python I/O.
244 245 246

- `sel.default()` : This is equivalent to the default in Go `select`. If none of the channels are ready for read or write, then the fluid code in the default block will be executed.

Y
Yi Wang 已提交
247 248 249 250 251
## Example Programs

### 1. RPC between Trainers and Parameter Servers

### 2. Concurrent Minibatch Loading