diff --git a/python/paddle/fluid/concurrency.py b/python/paddle/fluid/concurrency.py index 978f1891e3463ed29fa2e5233bf55860e58d7155..dec224fc886cd0739add0ebb6488625ef5063b8d 100644 --- a/python/paddle/fluid/concurrency.py +++ b/python/paddle/fluid/concurrency.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO: Variables: make_channel -# TODO: Operators: send, close_channel, recv, go, select from layers.control_flow import BlockGuard -from layer_helper import LayerHelper +from layer_helper import LayerHelper, unique_name +from layers import fill_constant import core + __all__ = [ 'Go', 'make_channel', @@ -46,27 +46,35 @@ class Go(BlockGuard): parent_block = main_program.block(main_program.current_block() .parent_idx) + inner_outputs = set() x_name_list = set() - out_vars = [] for op in go_block.ops: # Iterate over all operators, get all the inputs # and add as input to the Go operator. for iname in op.input_names: for in_var_name in op.input(iname): - x_name_list.add(in_var_name) + if in_var_name not in inner_outputs: + x_name_list.add(in_var_name) - # Iterate over all operators , get all the outputs - # add to the output list of Go operator only if - # they exist in the parent block. for oname in op.output_names: for out_var_name in op.output(oname): - if out_var_name in parent_block.vars: - out_vars.add(parent_block.var(out_var_name)) + inner_outputs.add(out_var_name) + + # Iterate over all operators , get all the outputs + # add to the output list of Go operator only if + # they exist in the parent block. + out_vars = [] + for inner_out_name in inner_outputs: + if inner_out_name in parent_block.vars: + out_vars.append(parent_block.var(inner_out_name)) parent_block.append_op( type='go', - inputs={'X': [parent_block.var(x_name) for x_name in x_name_list]}, - outputs={'Out': out_vars}, + inputs={ + 'X': + [parent_block.var_recursive(x_name) for x_name in x_name_list] + }, + outputs={}, attrs={'sub_block': go_block}) @@ -88,8 +96,8 @@ def make_channel(dtype, capacity=0): `channel_close`, and `Go` to design a concurrent Paddle program. Args: - dtype (ParamAttr|int): Data type of the data sent in the channel. - This data type should be one of the Paddle supported data types. + dtype (ParamAttr|string): Data type of the data sent in the channel. + This data type should be the string name of a numpy data type. capacity (ParamAttr|int): Size of the channel. Defaults to 0 for to create an unbuffered channel. @@ -106,14 +114,16 @@ def make_channel(dtype, capacity=0): fluid.channel_send(ch, 100) fluid.channel_close(ch) """ - helper = LayerHelper('make_channel', **locals()) + helper = LayerHelper('channel_create', **locals()) main_program = helper.main_program make_channel_block = main_program.current_block() # Make a channel variable (using the channel data type) and make sure it # persists into the global scope. channel = helper.create_variable( - dtype=core.VarDesc.VarType.CHANNEL, persistable=True) + name=unique_name.generate('channel'), + type=core.VarDesc.VarType.CHANNEL, + persistable=True) create_channel_op = make_channel_block.append_op( type="channel_create", @@ -121,7 +131,7 @@ def make_channel(dtype, capacity=0): attrs={"data_type": dtype, "capacity": capacity}) - return create_channel_op + return channel def channel_send(channel, value): @@ -133,7 +143,7 @@ def channel_send(channel, value): Args: channel (Variable|Channel): Channel variable created using `make_channel`. - + value (Variable): Value to send to channel Returns: Variable: The boolean status on whether or not the channel successfully sent the passed value. @@ -149,7 +159,11 @@ def channel_send(channel, value): helper = LayerHelper('channel_send', **locals()) main_program = helper.main_program channel_send_block = main_program.current_block() - status = helper.create_variable(dtype=core.VarDesc.VarType.TENSOR) + + status = helper.create_variable( + name=unique_name.generate('status'), + type=core.VarDesc.VarType.LOD_TENSOR, + dtype=core.VarDesc.VarType.BOOL) channel_send_op = channel_send_block.append_op( type="channel_send", @@ -159,10 +173,10 @@ def channel_send(channel, value): }, outputs={"Status": status}) - return channel_send_op + return status -def channel_recv(channel, dtype): +def channel_recv(channel, return_value): """ Receives a value through a channel variable. Used by an unbuffered or buffered channel within a concurrent Go block to get data from originally @@ -172,11 +186,10 @@ def channel_recv(channel, dtype): Args: channel (Variable|Channel): Channel variable created using `make_channel`. - dtype (Variable|int): Data type of the data expected to be read in the - channel. This data type should be one of the Paddle supported data - types. + return_value (Variable): Variable to set as a result of running channel_recv_op Returns: + Variable: The received value from the channel. Variable: The boolean status on whether or not the channel successfully received the passed value. @@ -185,7 +198,7 @@ def channel_recv(channel, dtype): ch = fluid.make_channel(dtype='int32', capacity=10) with fluid.Go(): - fluid.channel_recv(ch, 'int32') + returned_value = fluid.channel_recv(ch, 'int32') # Code to send data through the channel. """ @@ -193,8 +206,10 @@ def channel_recv(channel, dtype): main_program = helper.main_program channel_recv_block = main_program.current_block() - return_value = helper.create_variable(dtype=dtype) - status = helper.create_variable(dtype=core.VarDesc.VarType.TENSOR) + status = helper.create_variable( + name=unique_name.generate('status'), + type=core.VarDesc.VarType.LOD_TENSOR, + dtype=core.VarDesc.VarType.BOOL) channel_recv_op = channel_recv_block.append_op( type="channel_recv", @@ -202,7 +217,7 @@ def channel_recv(channel, dtype): outputs={"Out": return_value, "Status": status}) - return channel_recv_op + return return_value, status def channel_close(channel): @@ -228,5 +243,3 @@ def channel_close(channel): channel_close_op = channel_close_block.append_op( type="channel_close", inputs={"Channel": channel}) - - return channel_close_op diff --git a/python/paddle/fluid/tests/notest_csp.py b/python/paddle/fluid/tests/notest_csp.py deleted file mode 100644 index f4be833deebd2c82e060e66a8bcf590020625cf8..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/notest_csp.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import paddle.fluid as fluid - - -class TestCSPFramework(unittest.TestCase): - def daisy_chain(self): - n = 10000 - leftmost = fluid.make_channel(dtype=int) - right = leftmost - left = leftmost - with fluid.While(steps=n): - right = fluid.make_channel(dtype=int) - with fluid.go(): - fluid.send(left, 1 + fluid.recv(right)) - left = right - - with fluid.go(): - fluid.send(right, 1) - fluid.Print(fluid.recv(leftmost)) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/test_concurrency.py b/python/paddle/fluid/tests/test_concurrency.py new file mode 100644 index 0000000000000000000000000000000000000000..9f7bf63c5e017251e87af94690ff32c47c538c6b --- /dev/null +++ b/python/paddle/fluid/tests/test_concurrency.py @@ -0,0 +1,100 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid +import paddle.fluid.core as core +from paddle.fluid import framework, unique_name +from paddle.fluid.executor import Executor +from paddle.fluid.layers import fill_constant + + +class TestRoutineOp(unittest.TestCase): + def test_simple_routine(self): + ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR) + + # Create LOD_TENSOR and put it into the scope. This placeholder + # variable will be filled in and returned by fluid.channel_recv + result = self._create_tensor('return_value', + core.VarDesc.VarType.LOD_TENSOR, + core.VarDesc.VarType.INT64) + + with fluid.Go(): + input_value = fill_constant( + shape=[1], dtype=core.VarDesc.VarType.FP64, value=1234) + fluid.channel_send(ch, input_value) + + result, status = fluid.channel_recv(ch, result) + fluid.channel_close(ch) + + cpu = core.CPUPlace() + exe = Executor(cpu) + + outs = exe.run(fetch_list=[result]) + self.assertEqual(outs[0], 1234) + + def test_daisy_chain(self): + ''' + Mimics classic Daisy-chain test: https://talks.golang.org/2012/concurrency.slide#39 + ''' + n = 100 + + leftmost = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR) + left = leftmost + + # TODO(thuan): Use fluid.While() after scope capture is implemented. + # https://github.com/PaddlePaddle/Paddle/issues/8502 + for i in range(n): + right = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR) + with fluid.Go(): + one_tensor = self._create_one_dim_tensor(1) + result = self._create_tensor('return_value', + core.VarDesc.VarType.LOD_TENSOR, + core.VarDesc.VarType.INT64) + + result, status = fluid.channel_recv(right, result) + one_added = fluid.layers.elementwise_add(x=one_tensor, y=result) + fluid.channel_send(left, one_added) + left = right + + # Trigger the channel propagation by sending a "1" to rightmost channel + with fluid.Go(): + one_tensor = self._create_one_dim_tensor(1) + fluid.channel_send(right, one_tensor) + + leftmost_result = self._create_tensor('return_value', + core.VarDesc.VarType.LOD_TENSOR, + core.VarDesc.VarType.INT64) + leftmost_result, status = fluid.channel_recv(leftmost, leftmost_result) + + cpu = core.CPUPlace() + exe = Executor(cpu) + leftmost_data = exe.run(fetch_list=[leftmost_result]) + + # The leftmost_data should be equal to the number of channels + 1 + self.assertEqual(leftmost_data[0][0], n + 1) + + def _create_one_dim_tensor(self, value): + one_dim_tensor = fill_constant( + shape=[1], dtype=core.VarDesc.VarType.INT64, value=value) + one_dim_tensor.stop_gradient = True + return one_dim_tensor + + def _create_tensor(self, name, type, dtype): + return framework.default_main_program().current_block().create_var( + name=unique_name.generate(name), type=type, dtype=dtype) + + +if __name__ == '__main__': + unittest.main()