select_op.md 7.4 KB
Newer Older
1 2 3 4
# select_op Design

## Introduction

_青葱's avatar
_青葱 已提交
5 6 7 8
In golang, the [**select**](https://golang.org/ref/spec#Select_statements)
statement lets a goroutine wait on multiple communication operations at the
same time. The **select** blocks until one of its cases can run, then
executes the case.  If multiple cases are ready to run, then one case is
9 10
choosen at random to be executed.

_青葱's avatar
_青葱 已提交
11
With the introduction of CSP for Paddle, we mimic this behavior by
12 13 14 15 16 17 18 19
creating a ***select_op***.

## How to use it

The **select_op** is available as a c++ operator.  However most users
will prefer to use the much simplier Python API.

- **fluid.Select()**: Creates a select operator and adds it to the current
_青葱's avatar
_青葱 已提交
20 21
block within the main program.  Also creates a sub block and adds it to the
main program.  This sub block is used to hold all variables and operators
22
used by the case statements.
_青葱's avatar
_青葱 已提交
23 24

Within the select block, users can add cases by
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
calling **select.case** or **select.default** method.

- **fluid.Select.case(channel_action, channel, result_variable)**: Represents
a fluid channel send/recv case.  This method creates a SelectCase block
guard and adds it to the Select block.  The arguments into this method tells
the select which channel operation to listen to.

- **fluid.Select.default()**: Represents the fluid default case.  This default
case is executed if none of the channel send/recv cases are available to
execute.

**Example:**
```
ch1 = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
quit_ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
_青葱's avatar
_青葱 已提交
40

41 42
x = fill_constant(shape=[1], dtype=core.VarDesc.VarType.INT32, value=0)
y = fill_constant(shape=[1], dtype=core.VarDesc.VarType.INT32, value=1)
_青葱's avatar
_青葱 已提交
43

44 45
while_cond = fill_constant(shape=[1], dtype=core.VarDesc.VarType.BOOL, value=True)
while_op = While(cond=while_cond)    
_青葱's avatar
_青葱 已提交
46

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
with while_op.block():
    with fluid.Select() as select:
        with select.case(fluid.channel_send, channel, x):
            # Send x, then perform Fibonacci calculation on x and y
            x_tmp = fill_constant(shape=[1], dtype=core.VarDesc.VarType.INT32, value=0)
            assign(input=x, output=x_tmp)
            assign(input=y, output=x)
            assign(elementwise_add(x=x_tmp, y=y), output=y)
        with select.case(fluid.channel_recv, quit_channel, result2):
            # Exit out of While loop
            while_false = fill_constant(shape=[1], dtype=core.VarDesc.VarType.BOOL, value=False)
            helper = layer_helper.LayerHelper('assign')
            helper.append_op(
                type='assign',
                inputs={'X': [while_false]},
                outputs={'Out': [while_cond]})
```

## How it Works

### Program Description

```
blocks {
  idx: 0
  ...
  // Create "case_to_execute" variable
  ops {
    outputs {
      parameter: "Out"
      arguments: "fill_constant_110.tmp_0"
    }
    type: "fill_constant"
    attrs {
      name: "force_cpu"
      type: BOOLEAN
      b: false
    }
    attrs {
      name: "value"
      type: FLOAT
      f: -1.0
    }
    attrs {
      name: "shape"
      type: INTS
      ints: 1
    }
    attrs {
      name: "dtype"
      type: INT
      i: 2
    }
  }
  // Create "select" operator.
_青葱's avatar
_青葱 已提交
102
  // inputs:
103 104 105 106 107
  //   X: All input variables used by operators within the select block
  //   case_to_execute: Variable filled in by select_op when it determines
  //     which case to execute.
  //  
  // outputs:
_青葱's avatar
_青葱 已提交
108 109
  //   Out: All output variables referenced by operators within select block.
  //
110 111
  // attrs:
  //   sub_block: The block id containing the select "cases"
_青葱's avatar
_青葱 已提交
112
  //   cases:  Serialized list of all cases in the select op.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
  //     Each case is serialized as: '<index>,<type>,<channel>,<value>'
  //     where type is 0 for default, 1 for send, and 2 for receive.
  //     No channel and values are needed for default cases.
  ops {
    inputs {
      parameter: "X"
      arguments: "fill_constant_103.tmp_0"
      arguments: "fill_constant_104.tmp_0"
    }
    inputs {
      parameter: "case_to_execute"
      arguments: "fill_constant_110.tmp_0"
    }
    outputs {
      parameter: "Out"
      arguments: "fill_constant_110.tmp_0"
    }    
    type: "select"
    attrs {
      name: "sub_block"
      type: BLOCK
      block_idx: 1
    }
    attrs {
      name: "cases"
      type: STRINGS
      strings: "0,1,channel_101,fill_constant_109.tmp_0"
      strings: "1,2,channel_102,fill_constant_108.tmp_0"
    }
  }
  ...
}
```

The python select API will add the **select_op** to the current block.  In addition, it will
iterate through all it's case statements and add any input variables required by case statements
into **X**.  It will also create a temp variable called **case_to_execute**.  This variable is
filled in by the select_op after it has completed processing the case statements.

If there are no available cases to execute (ie: all cases are blocked on channel operations, and
_青葱's avatar
_青葱 已提交
153
there is no default statement), then the select_op will block the current thread.  The thread will
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
unblock once there is a channel operation affecting one of the case statements, at which point, the
**select_op** will set the **case_to_execute** variable to the index of the case to execute.

Finally the select_op will call executor.run on the **sub_block**.

```
blocks {
  idx: 1
  parent_idx: 0
  ...
  // Fill a tensor with the case index (ie: 0,1,2,3,ect.)
  ops {
    outputs {
      parameter: "Out"
      arguments: "fill_constant_111.tmp_0"
    }
    type: "fill_constant"
    attrs {
      name: "force_cpu"
      type: BOOLEAN
      b: false
    }
    attrs {
      name: "value"
      type: FLOAT
      f: 0.0
    }
    attrs {
      name: "shape"
      type: INTS
      ints: 1
    }
    attrs {
      name: "dtype"
      type: INT
      i: 2
    }
  }
  // Create an "equal" operator to compare the case index with the "case_to_execute"
  // tensor (which was filled in by the select op).
  ops {
    inputs {
      parameter: "X"
      arguments: "fill_constant_111.tmp_0"  // case 0
    }
    inputs {
      parameter: "Y"
      arguments: "fill_constant_110.tmp_0"  // case_to_execute
    }
    outputs {
      parameter: "Out"
      arguments: "equal_0.tmp_0"
    }
    type: "equal"
    attrs {
      name: "axis"
      type: INT
      i: -1
    }
  }
  // Use the output of the "equal" operator as a condition for the "conditional_block".
  // If the condition evaluates to true, then execute the "sub_block" (which represents
  // the select case's body)
  ops {
    inputs {
      parameter: "Params"
    }
    inputs {
      parameter: "X"
      arguments: "equal_0.tmp_0"
    }
    outputs {
      parameter: "Out"
    }
    outputs {
      parameter: "Scope"
      arguments: "_generated_var_0"
    }
    type: "conditional_block"
    attrs {
      name: "is_scalar_condition"
      type: BOOLEAN
      b: true
    }
    attrs {
      name: "sub_block"
      type: BLOCK
      block_idx: 4
    }
  }
  ...
  // Repeat the above operators for each case statements inside the select body
}

```

_青葱's avatar
_青葱 已提交
250 251
Cases are represented by a **conditional_block operator**, whose's condition is set as the output of
equal(**case_to_execute**, **case_index**).  Since each case index is unique in this sub-block,
252 253 254 255 256
only one case will be executed.

### select_op flow

<p align="center">
_青葱's avatar
_青葱 已提交
257
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/select_op_workflow.png"/><br/>
258 259
</p>

_青葱's avatar
_青葱 已提交
260
The select algorithm is inspired by golang's select routine.  Please refer to
261 262 263 264 265
http://www.tapirgames.com/blog/golang-concurrent-select-implementation for more information.

## Backward Pass

TODO