未验证 提交 c9dd4e57 编写于 作者: T Thuan Nguyen 提交者: GitHub

Unittests concurrency (#8666)

Python Unit Tests for CSP

* Simple Channel Send and Receive test
* Daisy Chain test with 100 channels/Go ops
上级 9e1ec8c9
...@@ -12,11 +12,11 @@ ...@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# TODO: Variables: make_channel
# TODO: Operators: send, close_channel, recv, go, select
from layers.control_flow import BlockGuard from layers.control_flow import BlockGuard
from layer_helper import LayerHelper from layer_helper import LayerHelper, unique_name
from layers import fill_constant
import core import core
__all__ = [ __all__ = [
'Go', 'Go',
'make_channel', 'make_channel',
...@@ -46,27 +46,35 @@ class Go(BlockGuard): ...@@ -46,27 +46,35 @@ class Go(BlockGuard):
parent_block = main_program.block(main_program.current_block() parent_block = main_program.block(main_program.current_block()
.parent_idx) .parent_idx)
inner_outputs = set()
x_name_list = set() x_name_list = set()
out_vars = []
for op in go_block.ops: for op in go_block.ops:
# Iterate over all operators, get all the inputs # Iterate over all operators, get all the inputs
# and add as input to the Go operator. # and add as input to the Go operator.
for iname in op.input_names: for iname in op.input_names:
for in_var_name in op.input(iname): for in_var_name in op.input(iname):
x_name_list.add(in_var_name) if in_var_name not in inner_outputs:
x_name_list.add(in_var_name)
# Iterate over all operators , get all the outputs
# add to the output list of Go operator only if
# they exist in the parent block.
for oname in op.output_names: for oname in op.output_names:
for out_var_name in op.output(oname): for out_var_name in op.output(oname):
if out_var_name in parent_block.vars: inner_outputs.add(out_var_name)
out_vars.add(parent_block.var(out_var_name))
# Iterate over all operators , get all the outputs
# add to the output list of Go operator only if
# they exist in the parent block.
out_vars = []
for inner_out_name in inner_outputs:
if inner_out_name in parent_block.vars:
out_vars.append(parent_block.var(inner_out_name))
parent_block.append_op( parent_block.append_op(
type='go', type='go',
inputs={'X': [parent_block.var(x_name) for x_name in x_name_list]}, inputs={
outputs={'Out': out_vars}, 'X':
[parent_block.var_recursive(x_name) for x_name in x_name_list]
},
outputs={},
attrs={'sub_block': go_block}) attrs={'sub_block': go_block})
...@@ -88,8 +96,8 @@ def make_channel(dtype, capacity=0): ...@@ -88,8 +96,8 @@ def make_channel(dtype, capacity=0):
`channel_close`, and `Go` to design a concurrent Paddle program. `channel_close`, and `Go` to design a concurrent Paddle program.
Args: Args:
dtype (ParamAttr|int): Data type of the data sent in the channel. dtype (ParamAttr|string): Data type of the data sent in the channel.
This data type should be one of the Paddle supported data types. This data type should be the string name of a numpy data type.
capacity (ParamAttr|int): Size of the channel. Defaults to 0 for capacity (ParamAttr|int): Size of the channel. Defaults to 0 for
to create an unbuffered channel. to create an unbuffered channel.
...@@ -106,14 +114,16 @@ def make_channel(dtype, capacity=0): ...@@ -106,14 +114,16 @@ def make_channel(dtype, capacity=0):
fluid.channel_send(ch, 100) fluid.channel_send(ch, 100)
fluid.channel_close(ch) fluid.channel_close(ch)
""" """
helper = LayerHelper('make_channel', **locals()) helper = LayerHelper('channel_create', **locals())
main_program = helper.main_program main_program = helper.main_program
make_channel_block = main_program.current_block() make_channel_block = main_program.current_block()
# Make a channel variable (using the channel data type) and make sure it # Make a channel variable (using the channel data type) and make sure it
# persists into the global scope. # persists into the global scope.
channel = helper.create_variable( channel = helper.create_variable(
dtype=core.VarDesc.VarType.CHANNEL, persistable=True) name=unique_name.generate('channel'),
type=core.VarDesc.VarType.CHANNEL,
persistable=True)
create_channel_op = make_channel_block.append_op( create_channel_op = make_channel_block.append_op(
type="channel_create", type="channel_create",
...@@ -121,7 +131,7 @@ def make_channel(dtype, capacity=0): ...@@ -121,7 +131,7 @@ def make_channel(dtype, capacity=0):
attrs={"data_type": dtype, attrs={"data_type": dtype,
"capacity": capacity}) "capacity": capacity})
return create_channel_op return channel
def channel_send(channel, value): def channel_send(channel, value):
...@@ -133,7 +143,7 @@ def channel_send(channel, value): ...@@ -133,7 +143,7 @@ def channel_send(channel, value):
Args: Args:
channel (Variable|Channel): Channel variable created using channel (Variable|Channel): Channel variable created using
`make_channel`. `make_channel`.
value (Variable): Value to send to channel
Returns: Returns:
Variable: The boolean status on whether or not the channel Variable: The boolean status on whether or not the channel
successfully sent the passed value. successfully sent the passed value.
...@@ -149,7 +159,11 @@ def channel_send(channel, value): ...@@ -149,7 +159,11 @@ def channel_send(channel, value):
helper = LayerHelper('channel_send', **locals()) helper = LayerHelper('channel_send', **locals())
main_program = helper.main_program main_program = helper.main_program
channel_send_block = main_program.current_block() channel_send_block = main_program.current_block()
status = helper.create_variable(dtype=core.VarDesc.VarType.TENSOR)
status = helper.create_variable(
name=unique_name.generate('status'),
type=core.VarDesc.VarType.LOD_TENSOR,
dtype=core.VarDesc.VarType.BOOL)
channel_send_op = channel_send_block.append_op( channel_send_op = channel_send_block.append_op(
type="channel_send", type="channel_send",
...@@ -159,10 +173,10 @@ def channel_send(channel, value): ...@@ -159,10 +173,10 @@ def channel_send(channel, value):
}, },
outputs={"Status": status}) outputs={"Status": status})
return channel_send_op return status
def channel_recv(channel, dtype): def channel_recv(channel, return_value):
""" """
Receives a value through a channel variable. Used by an unbuffered or Receives a value through a channel variable. Used by an unbuffered or
buffered channel within a concurrent Go block to get data from originally buffered channel within a concurrent Go block to get data from originally
...@@ -172,11 +186,10 @@ def channel_recv(channel, dtype): ...@@ -172,11 +186,10 @@ def channel_recv(channel, dtype):
Args: Args:
channel (Variable|Channel): Channel variable created using channel (Variable|Channel): Channel variable created using
`make_channel`. `make_channel`.
dtype (Variable|int): Data type of the data expected to be read in the return_value (Variable): Variable to set as a result of running channel_recv_op
channel. This data type should be one of the Paddle supported data
types.
Returns: Returns:
Variable: The received value from the channel.
Variable: The boolean status on whether or not the channel Variable: The boolean status on whether or not the channel
successfully received the passed value. successfully received the passed value.
...@@ -185,7 +198,7 @@ def channel_recv(channel, dtype): ...@@ -185,7 +198,7 @@ def channel_recv(channel, dtype):
ch = fluid.make_channel(dtype='int32', capacity=10) ch = fluid.make_channel(dtype='int32', capacity=10)
with fluid.Go(): with fluid.Go():
fluid.channel_recv(ch, 'int32') returned_value = fluid.channel_recv(ch, 'int32')
# Code to send data through the channel. # Code to send data through the channel.
""" """
...@@ -193,8 +206,10 @@ def channel_recv(channel, dtype): ...@@ -193,8 +206,10 @@ def channel_recv(channel, dtype):
main_program = helper.main_program main_program = helper.main_program
channel_recv_block = main_program.current_block() channel_recv_block = main_program.current_block()
return_value = helper.create_variable(dtype=dtype) status = helper.create_variable(
status = helper.create_variable(dtype=core.VarDesc.VarType.TENSOR) name=unique_name.generate('status'),
type=core.VarDesc.VarType.LOD_TENSOR,
dtype=core.VarDesc.VarType.BOOL)
channel_recv_op = channel_recv_block.append_op( channel_recv_op = channel_recv_block.append_op(
type="channel_recv", type="channel_recv",
...@@ -202,7 +217,7 @@ def channel_recv(channel, dtype): ...@@ -202,7 +217,7 @@ def channel_recv(channel, dtype):
outputs={"Out": return_value, outputs={"Out": return_value,
"Status": status}) "Status": status})
return channel_recv_op return return_value, status
def channel_close(channel): def channel_close(channel):
...@@ -228,5 +243,3 @@ def channel_close(channel): ...@@ -228,5 +243,3 @@ def channel_close(channel):
channel_close_op = channel_close_block.append_op( channel_close_op = channel_close_block.append_op(
type="channel_close", inputs={"Channel": channel}) type="channel_close", inputs={"Channel": channel})
return channel_close_op
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
class TestCSPFramework(unittest.TestCase):
def daisy_chain(self):
n = 10000
leftmost = fluid.make_channel(dtype=int)
right = leftmost
left = leftmost
with fluid.While(steps=n):
right = fluid.make_channel(dtype=int)
with fluid.go():
fluid.send(left, 1 + fluid.recv(right))
left = right
with fluid.go():
fluid.send(right, 1)
fluid.Print(fluid.recv(leftmost))
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.core as core
from paddle.fluid import framework, unique_name
from paddle.fluid.executor import Executor
from paddle.fluid.layers import fill_constant
class TestRoutineOp(unittest.TestCase):
def test_simple_routine(self):
ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
# Create LOD_TENSOR<INT64> and put it into the scope. This placeholder
# variable will be filled in and returned by fluid.channel_recv
result = self._create_tensor('return_value',
core.VarDesc.VarType.LOD_TENSOR,
core.VarDesc.VarType.INT64)
with fluid.Go():
input_value = fill_constant(
shape=[1], dtype=core.VarDesc.VarType.FP64, value=1234)
fluid.channel_send(ch, input_value)
result, status = fluid.channel_recv(ch, result)
fluid.channel_close(ch)
cpu = core.CPUPlace()
exe = Executor(cpu)
outs = exe.run(fetch_list=[result])
self.assertEqual(outs[0], 1234)
def test_daisy_chain(self):
'''
Mimics classic Daisy-chain test: https://talks.golang.org/2012/concurrency.slide#39
'''
n = 100
leftmost = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
left = leftmost
# TODO(thuan): Use fluid.While() after scope capture is implemented.
# https://github.com/PaddlePaddle/Paddle/issues/8502
for i in range(n):
right = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
with fluid.Go():
one_tensor = self._create_one_dim_tensor(1)
result = self._create_tensor('return_value',
core.VarDesc.VarType.LOD_TENSOR,
core.VarDesc.VarType.INT64)
result, status = fluid.channel_recv(right, result)
one_added = fluid.layers.elementwise_add(x=one_tensor, y=result)
fluid.channel_send(left, one_added)
left = right
# Trigger the channel propagation by sending a "1" to rightmost channel
with fluid.Go():
one_tensor = self._create_one_dim_tensor(1)
fluid.channel_send(right, one_tensor)
leftmost_result = self._create_tensor('return_value',
core.VarDesc.VarType.LOD_TENSOR,
core.VarDesc.VarType.INT64)
leftmost_result, status = fluid.channel_recv(leftmost, leftmost_result)
cpu = core.CPUPlace()
exe = Executor(cpu)
leftmost_data = exe.run(fetch_list=[leftmost_result])
# The leftmost_data should be equal to the number of channels + 1
self.assertEqual(leftmost_data[0][0], n + 1)
def _create_one_dim_tensor(self, value):
one_dim_tensor = fill_constant(
shape=[1], dtype=core.VarDesc.VarType.INT64, value=value)
one_dim_tensor.stop_gradient = True
return one_dim_tensor
def _create_tensor(self, name, type, dtype):
return framework.default_main_program().current_block().create_var(
name=unique_name.generate(name), type=type, dtype=dtype)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册