diff --git a/cmake/generic.cmake b/cmake/generic.cmake
index c749c97f13649fe8432091414b56f7d0ea8ace8b..3fe750f47efc149bb1af6086841bffd5dd8e85fd 100644
--- a/cmake/generic.cmake
+++ b/cmake/generic.cmake
@@ -587,6 +587,9 @@ function(grpc_library TARGET_NAME)
get_filename_component(PROTO_WE ${grpc_library_PROTO} NAME_WE)
get_filename_component(PROTO_PATH ${ABS_PROTO} PATH)
+ #FIXME(putcn): the follwoing line is supposed to generate *.pb.h and cc, but
+ # somehow it didn't. line 602 to 604 is to patching this. Leaving this here
+ # for now to enable dist CI.
protobuf_generate_cpp(grpc_proto_srcs grpc_proto_hdrs "${ABS_PROTO}")
set(grpc_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/${PROTO_WE}.grpc.pb.cc")
set(grpc_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/${PROTO_WE}.grpc.pb.h")
@@ -597,6 +600,9 @@ function(grpc_library TARGET_NAME)
COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" -I "${PROTO_PATH}"
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN}" "${ABS_PROTO}"
+ COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
+ ARGS --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" -I "${PROTO_PATH}"
+ "${ABS_PROTO}"
DEPENDS "${ABS_PROTO}" ${PROTOBUF_PROTOC_EXECUTABLE} extern_grpc)
# FIXME(typhoonzero): grpc generated code do not generate virtual-dtor, mark it
diff --git a/doc/fluid/design/concurrent/channel.md b/doc/fluid/design/concurrent/channel.md
new file mode 100644
index 0000000000000000000000000000000000000000..a00a3325e7b49381f0f82ebbf32b74683f02de5f
--- /dev/null
+++ b/doc/fluid/design/concurrent/channel.md
@@ -0,0 +1,139 @@
+# Channel Design
+
+## Introduction
+
+A Channel is a data structure that allows for synchronous interprocess
+communication via message passing. It is a fundemental component of CSP
+(communicating sequential processes), and allows for users to pass data
+between threads without having to worry about synchronization.
+
+## How to use it
+
+Paddle offers python APIs to open and close channels, along with sending
+and receiving data to/from a channel.
+
+### Create a channel
+
+Creates a new channel that takes in variables of a specific dtype.
+
+- **fluid.make_channel(dtype, capacity=0)**
+ - **dtype**: The data type of variables being sent/received through channel
+ - **capacity**: The capacity of the channel. A capacity of 0 represents
+ an unbuffered channel. Capacity > 0 represents a buffered channel
+
+```
+ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR, 10)
+```
+
+### Close a channel
+
+Closes a channel. Any pending senders and receivers will be awoken during
+this time. Receivers can still receive from a closed channel, but senders
+are not allowed to send any additional data to the channel (Paddle will
+raise an exception if users try to send to a closed channel.)
+
+- **fluid.channel_close(channel)**
+
+```
+fluid.channel_close(ch)
+```
+
+### Send data to a channel
+
+Sends a variable to a channel. Currently, variables of dtype `LoDTensor`,
+`LoDRankTable`, `LoDTensorArray`, `SelectedRows`, `ReaderHolder`, and
+`ChannelHolder` are supported.
+
+By default, the data of the Variable is moved from the sender to the receiver,
+however the user can optionally copy the data before performing the send.
+
+- **channel_send(channel, variable, is_copy=False)**
+ - **channel**: The channel to send the variable to
+ - **variable**: The variable to send to the channel
+ - **is_copy**: If set to True, channel_send will perform a variable assign
+ to copy the source variable to a new variable to be sent.
+
+```
+ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
+var = fill_constant(shape=[1],dtype=core.VarDesc.VarType.INT32, value=100)
+fluid.channel_send(ch, var, True)
+```
+
+### Receive data from a channel
+
+Receives a variable from a channel. The data of the variable is moved to the
+receiving variable.
+
+- **channel_recv(channel, return_variable)**
+ - **channel**: The channel to receive the variable from
+ - **return_variable**: The destination variable used to store the data of the
+ variable received from the channel
+
+```
+ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
+var = fill_constant(shape=[1],dtype=core.VarDesc.VarType.INT32, value=-1)
+fluid.channel_recv(ch, var)
+```
+
+## How it Works
+
+Channels provides a simple interface for different threads to share data.
+To support the synchronization requirements, channels utilizes a series of
+internal queues, locks, and conditional variables.
+
+### QueueMessage
+
+QueueMessage encapsulates the state of the channel send/receive operation to be
+put in the **sendq/recvq**. It contains a condition variable used to lock the
+thread (when there are no available sends/receives). In addition, it contains
+a callback function to notify a thread when the QueueMessage is being
+processed by the channel.
+
+### Queues
+
+- **buff_**: This queue holds the data buffer in a buffered channel. The
+capacity is set to the capacity of the channel. This data buffer is not
+used in an unbuffered channel.
+
+- **sendq**: This queue holds the QueueMessage of any pending senders of a
+channel. When a thread performs a channel_send operation on the channel, the
+channel_send operation will put a new QueueMessage on the sendq and block the
+current thread under two conditions:
+ 1. The channel is buffered and is full
+ 2. The channel is unbuffered and does not have a receiver
+
+- **recvq**: This queue holds the QueueMessage of any pending receivers of a
+channel. When a thread performs a channel_recv operation on the channel, the
+channel_recv operation will put a new QueueMessage on the recvq and block the
+current thread under two conditions:
+ 1. The channel is buffered and there is no data on the buff_
+ 2. The channel is unbuffered and does not have a sender
+
+### State diagram
+
+#### Channel Send
+
+
+
+
+
+#### Channel Receive
+
+
+
+
+
+## Limitations and Considerations
+
+### Variable Copy
+
+In golang, variables in channels are copied from the sender to the receiver.
+In Paddle, the data from our variables are **moved** from sender to receiver.
+As a result, these variables should not be used after they are sent. We
+provide a flag in channel_send method to allow users to copy the variable to
+be sent before it is sent.
+
+Please note that this is acheived by adding an **assign** operator and creating
+a temporary variable that is sent in place of the original variable. Please
+note that **assign** operator has limited support for only certain variables
+datatypes.
diff --git a/doc/fluid/design/concurrent/images/channel_recv.png b/doc/fluid/design/concurrent/images/channel_recv.png
new file mode 100644
index 0000000000000000000000000000000000000000..c06cd15ae7b8a8c94d5742f6675e389081fcf789
Binary files /dev/null and b/doc/fluid/design/concurrent/images/channel_recv.png differ
diff --git a/doc/fluid/design/concurrent/images/channel_send.png b/doc/fluid/design/concurrent/images/channel_send.png
new file mode 100644
index 0000000000000000000000000000000000000000..006ebb4a5a4bcd32c97847e9fb7729a740255f7c
Binary files /dev/null and b/doc/fluid/design/concurrent/images/channel_send.png differ
diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt
index a4ea74a6d2fbc29dc33a6b57ee453f49ed36c7fa..8c8def6bf47f00a59519f5c6aebcfd0900ca38cf 100644
--- a/paddle/fluid/framework/CMakeLists.txt
+++ b/paddle/fluid/framework/CMakeLists.txt
@@ -100,7 +100,7 @@ cc_test(init_test SRCS init_test.cc DEPS init)
cc_test(op_kernel_type_test SRCS op_kernel_type_test.cc DEPS place device_context framework_proto)
cc_test(cow_ptr_tests SRCS details/cow_ptr_test.cc)
-cc_test(channel_test SRCS channel_test.cc)
+# cc_test(channel_test SRCS channel_test.cc)
cc_test(tuple_test SRCS tuple_test.cc )
cc_test(concurrency_test SRCS concurrency_test.cc DEPS go_op channel_close_op channel_create_op
channel_send_op channel_recv_op sum_op select_op elementwise_add_op compare_op
diff --git a/paddle/fluid/framework/block_desc.cc b/paddle/fluid/framework/block_desc.cc
index 3693bc25d81a8309df1a6ddf3d9b08d484596ea9..fbe08349c37c4fde115ceea954ba2b84880088d7 100644
--- a/paddle/fluid/framework/block_desc.cc
+++ b/paddle/fluid/framework/block_desc.cc
@@ -147,15 +147,52 @@ void BlockDesc::RemoveOp(size_t s, size_t e) {
if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) {
return;
}
+ auto get_vars = [](std::deque>::iterator &op,
+ std::vector &v) {
+ auto in_names = (*op)->InputArgumentNames();
+ v.insert(v.end(), in_names.begin(), in_names.end());
+ auto out_names = (*op)->OutputArgumentNames();
+ v.insert(v.end(), out_names.begin(), out_names.end());
+ std::sort(v.begin(), v.end());
+ auto last = std::unique(v.begin(), v.end());
+ v.erase(last, v.end());
+ };
need_update_ = true;
- for (auto it = ops_.begin() + s; it != ops_.begin() + e; it++) {
- auto names = (*it)->InputArgumentNames();
- for (auto n : names) {
- // TODO(typhoonzero): delete vars if no other op use it.
- VLOG(3) << "deleting var " << n;
+
+ for (size_t i = s; i < e; i++) {
+ // since remove op one by one, every time remove the first op.
+ auto op = ops_.begin() + s;
+
+ // collect input and output variables from current delete op
+ std::vector cur_vars;
+ get_vars(op, cur_vars);
+
+ // remove current op
+ ops_.erase(ops_.begin() + s);
+
+ // collect input and output variables from other ops
+ std::vector other_vars;
+ for (auto it = ops_.begin(); it != ops_.end(); it++) {
+ get_vars(it, other_vars);
+ }
+
+ // variables should be deleted
+ std::vector delete_vars;
+ // delete_vars = cur_vars - cur_vars ^ other_input_vars
+ std::set_difference(cur_vars.begin(), cur_vars.end(), other_vars.begin(),
+ other_vars.end(),
+ std::inserter(delete_vars, delete_vars.end()));
+ // remove variables
+ for (size_t i = 0; i < delete_vars.size(); i++) {
+ auto name = delete_vars[i];
+ auto it = vars_.find(name);
+ PADDLE_ENFORCE(it != vars_.end(),
+ "%s is not in variable list, it should not be deleted",
+ name);
+ vars_.erase(it);
+ VLOG(3) << "deleting variable " << name;
}
}
- ops_.erase(ops_.begin() + s, ops_.begin() + e);
}
std::vector BlockDesc::AllOps() const {
diff --git a/paddle/fluid/framework/block_desc.h b/paddle/fluid/framework/block_desc.h
index 185f018ac1b5863e0ee86fdaa17df1ccbc6e030e..468423e0e8e7b8c9ebc14b7568c9c3bd21645ea7 100644
--- a/paddle/fluid/framework/block_desc.h
+++ b/paddle/fluid/framework/block_desc.h
@@ -89,6 +89,11 @@ class BlockDesc {
OpDesc *InsertOp(size_t index);
+ /*
+ * Remove Op and its input/output variables.
+ * Note that for either input or ouput variable, if it is also an input or
+ * output variable of other ops, we should remain it.
+ */
void RemoveOp(size_t s, size_t e);
std::vector AllOps() const;
diff --git a/paddle/scripts/submit_local.sh.in b/paddle/scripts/submit_local.sh.in
index 80fa0c72af65cbdc21ba955389318a233e02657c..1283de9d957a46b848c7bb6caf9c5f49398468e2 100755
--- a/paddle/scripts/submit_local.sh.in
+++ b/paddle/scripts/submit_local.sh.in
@@ -153,9 +153,15 @@ if [ $? -ne 0 ]; then
exit 1
fi
-INSTALLED_VERSION=`pip freeze 2>/dev/null | grep '^paddle' | sed 's/.*==//g'`
+if [ "@WITH_GPU@" == "ON" ]; then
+ PADDLE_NAME="paddlepaddle-gpu"
+else
+ PADDLE_NAME="paddlepaddle"
+fi
+
+INSTALLED_VERSION=`pip freeze 2>/dev/null | grep "^${PADDLE_NAME}==" | sed 's/.*==//g'`
-if [ -z ${INSTALLED_VERSION} ]; then
+if [ -z "${INSTALLED_VERSION}" ]; then
INSTALLED_VERSION="0.0.0" # not installed
fi
cat < 1e-5:
tmp = layers.dropout(x=tmp, dropout_prob=drop_rate)
diff --git a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
index 309ea2b9b7ede442da3ac897ce8d1a4b9aa68233..da85786d0c085a4e97d9ac272feed251296ad52d 100644
--- a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
+++ b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
@@ -186,6 +186,34 @@ class TestBlockDesc(unittest.TestCase):
all_ops.append(block.op(idx))
self.assertEqual(all_ops, [op0, op1, op2])
+ def test_remove_op(self):
+ prog = core.ProgramDesc()
+ self.assertIsNotNone(prog)
+ block = prog.block(0)
+ self.assertIsNotNone(block)
+ op1 = block.append_op()
+ op2 = block.append_op()
+ var1 = block.var("var1")
+ var2 = block.var("var2")
+ var3 = block.var("var3")
+ var4 = block.var("var4")
+ var5 = block.var("var5")
+ op1.set_input("X", ["var1", "var2"])
+ op1.set_output("Y", ["var3", "var4"])
+ op2.set_input("X", ["var1"])
+ op2.set_output("Y", ["var4", "var5"])
+
+ # remove op1, its input var2 and output var3 will be removed at the same time,
+ # but its input var1 and output var4 will not be removed since they are used for op2.
+ block.remove_op(0, 1)
+
+ all_ops = []
+ for idx in xrange(0, block.op_size()):
+ all_ops.append(block.op(idx))
+ self.assertEqual(all_ops, [op2])
+ all_vars = block.all_vars()
+ self.assertEqual(set(all_vars), {var1, var4, var5})
+
if __name__ == '__main__':
unittest.main()