From 705cc93336abe74a748c6a6e01759772db4bbdb9 Mon Sep 17 00:00:00 2001 From: "A. Unique TensorFlower" Date: Fri, 13 Jan 2017 13:28:17 -0800 Subject: [PATCH] Do parallel_stack as a graph rewrite instead of python code. Change: 144478254 --- .../core/common_runtime/graph_optimizer.cc | 129 ++++++++++++++++++ tensorflow/core/kernels/inplace_ops.cc | 26 ++++ tensorflow/core/ops/array_ops.cc | 65 +++++++++ tensorflow/python/ops/array_ops.py | 13 +- tensorflow/python/ops/hidden_ops.txt | 1 + 5 files changed, 224 insertions(+), 10 deletions(-) diff --git a/tensorflow/core/common_runtime/graph_optimizer.cc b/tensorflow/core/common_runtime/graph_optimizer.cc index cbf16fa513a..929c196259b 100644 --- a/tensorflow/core/common_runtime/graph_optimizer.cc +++ b/tensorflow/core/common_runtime/graph_optimizer.cc @@ -21,6 +21,130 @@ limitations under the License. #include "tensorflow/core/graph/optimizer_cse.h" namespace tensorflow { +namespace { + +// Replaces occurrences of parallel_concat with the implementation based on +// unsafe ops. Sets removed_any to true if any parallel_concats were removed; +// leaves it untouched otherwise. +// TODO(apassos) Use NodeBuilder. +Status RemoveParallelConcat(bool* removed_any, Graph* g) { + gtl::InlinedVector matches; + for (Node* n : g->nodes()) { + if (n->type_string() == "ParallelConcat") { + matches.push_back(n); + } + } + for (Node* n : matches) { + AttrSlice n_attrs(n->def()); + auto make_node = [n, g, &n_attrs](string op) { + NodeDef node; + node.set_op(op); + node.set_name(g->NewName(n->name())); + node.set_device(n->def().device()); + string colo; + if (GetNodeAttr(n_attrs, "_class", &colo).ok()) { + AddNodeAttr("_class", colo, &node); + } + return node; + }; + DataType dtype; + TF_RETURN_IF_ERROR(GetNodeAttr(n_attrs, "T", &dtype)); + TensorShapeProto shape; + TF_RETURN_IF_ERROR(GetNodeAttr(n_attrs, "shape", &shape)); + // Add the constant shape input to the _empty node. + NodeDef shape_node_def = make_node("Const"); + AddNodeAttr("dtype", DT_INT32, &shape_node_def); + TensorProto shape_tensor; + shape_tensor.set_dtype(DT_INT32); + shape_tensor.mutable_tensor_shape()->add_dim()->set_size(shape.dim_size()); + for (int i = 0; i < shape.dim_size(); ++i) { + shape_tensor.add_int_val(shape.dim(i).size()); + } + AddNodeAttr("value", shape_tensor, &shape_node_def); + Status status = Status::OK(); + Node* shape_node = g->AddNode(shape_node_def, &status); + if (!status.ok()) return status; + + // Add the _empty node + // TODO(apassos): create and use _ParallelStackBegin instead of empty, and + // something similar for InplaceUpdate. + NodeDef empty_def = make_node("Empty"); + AddNodeAttr("dtype", dtype, &empty_def); + AddNodeAttr("Tshape", DT_INT32, &empty_def); + AddNodeAttr("init", false, &empty_def); + empty_def.add_input(shape_node_def.name()); + Node* empty = g->AddNode(empty_def, &status); + if (!status.ok()) return status; + // TODO(apassos): make the shape an attr of _ParallelStackBegin. + g->AddEdge(shape_node, 0, empty, 0); + + // Add all the inplace_updates. + std::vector control_dependencies; + std::vector control_nodes; + int i = 0; + for (const Edge* input_edge : n->in_edges()) { + if (input_edge->IsControlEdge()) { + g->AddControlEdge(input_edge->src(), empty); + continue; + } + // Constant index for the inplace node. + // TODO(apassos): make _ParallelStackUpdate take this as an attr. + NodeDef inplace_idx_def = make_node("Const"); + AddNodeAttr("dtype", DT_INT64, &inplace_idx_def); + TensorProto index_tensor; + index_tensor.set_dtype(DT_INT64); + index_tensor.mutable_tensor_shape()->add_dim()->set_size(1); + index_tensor.add_int64_val(i); + AddNodeAttr("value", index_tensor, &inplace_idx_def); + Node* index = g->AddNode(inplace_idx_def, &status); + if (!status.ok()) return status; + + NodeDef inplace_def = make_node("InplaceUpdate"); + control_dependencies.push_back(inplace_def.name()); + AddNodeAttr("T", dtype, &inplace_def); + AddNodeAttr("Tshape", DT_INT64, &inplace_def); + inplace_def.add_input(empty_def.name()); + inplace_def.add_input(inplace_idx_def.name()); + inplace_def.add_input(strings::StrCat(input_edge->src()->name(), ":", + input_edge->src_output())); + Node* inplace = g->AddNode(inplace_def, &status); + if (!status.ok()) return status; + g->AddEdge(empty, 0, inplace, 0); + g->AddEdge(index, 0, inplace, 1); + g->AddEdge(input_edge->src(), input_edge->src_output(), inplace, 2); + control_nodes.push_back(inplace); + + ++i; + } + + // Add the final identity. + NodeDef identity_def = make_node("Identity"); + AddNodeAttr("T", dtype, &identity_def); + identity_def.add_input(empty_def.name()); + for (const string& s : control_dependencies) { + identity_def.add_input(strings::StrCat("^", s)); + } + Node* identity_node = g->AddNode(identity_def, &status); + if (!status.ok()) return status; + g->AddEdge(empty, 0, identity_node, 0); + for (Node* inp : control_nodes) { + g->AddControlEdge(inp, identity_node); + } + + // Remove the node and redirect edges. + for (auto* e : n->out_edges()) { + if (e->IsControlEdge()) { + g->AddControlEdge(identity_node, e->dst()); + } else { + g->AddEdge(identity_node, 0, e->dst(), e->dst_input()); + } + } + g->RemoveNode(n); + *removed_any = true; + } + return Status::OK(); +} +} GraphOptimizer::GraphOptimizer(const OptimizerOptions& opts) : opts_(opts) { if (opts_.opt_level() >= OptimizerOptions::L1) { @@ -44,6 +168,11 @@ void GraphOptimizer::Optimize(FunctionLibraryRuntime* runtime, Env* env, DumpGraph("RemoveListArrayConverter", g); changed = true; } + auto s = RemoveParallelConcat(&changed, g); + if (!s.ok()) { + // TODO(apassos): figure out how to halt here. + LOG(WARNING) << s; + } if (opts_.do_function_inlining() && RemoveDeadNodes(g)) { DumpGraph("RemoveDeadNodes", g); changed = true; diff --git a/tensorflow/core/kernels/inplace_ops.cc b/tensorflow/core/kernels/inplace_ops.cc index a56524f3697..a86c4bd0117 100644 --- a/tensorflow/core/kernels/inplace_ops.cc +++ b/tensorflow/core/kernels/inplace_ops.cc @@ -27,6 +27,7 @@ namespace tensorflow { typedef Eigen::ThreadPoolDevice CPUDevice; +// TODO(apassos): validate the shapes better. class InplaceOpBase : public OpKernel { public: explicit InplaceOpBase(OpKernelConstruction* ctx) : OpKernel(ctx) {} @@ -159,6 +160,17 @@ class EmptyOp : public OpKernel { bool init_; }; +class FailureKernel : public OpKernel { + public: + explicit FailureKernel(OpKernelConstruction* ctx) : OpKernel(ctx) { + OP_REQUIRES_OK(ctx, + errors::Internal("Found instance of parallel_stack which " + "could not be properly replaced.")); + } + + void Compute(OpKernelContext*) {} +}; + #define REGISTER(type) \ REGISTER_KERNEL_BUILDER( \ Name("InplaceUpdate").Device(DEVICE_CPU).TypeConstraint("T"), \ @@ -182,6 +194,13 @@ TF_CALL_NUMBER_TYPES(REGISTER) TF_CALL_POD_STRING_TYPES(REGISTER_EMPTY) #undef REGISTER_EMPTY +#define REGISTER_PARALLEL_CONCAT(type) \ + REGISTER_KERNEL_BUILDER( \ + Name("ParallelConcat").Device(DEVICE_CPU).TypeConstraint("T"), \ + FailureKernel); +TF_CALL_POD_STRING_TYPES(REGISTER_PARALLEL_CONCAT); +#undef REGISTER_PARALLEL_CONCAT + #if GOOGLE_CUDA typedef Eigen::GpuDevice GPUDevice; @@ -195,6 +214,13 @@ typedef Eigen::GpuDevice GPUDevice; TF_CALL_GPU_NUMBER_TYPES(REGISTER_EMPTY) #undef REGISTER_EMPTY +#define REGISTER_PARALLEL_CONCAT(type) \ + REGISTER_KERNEL_BUILDER( \ + Name("ParallelConcat").Device(DEVICE_GPU).TypeConstraint("T"), \ + FailureKernel); +TF_CALL_GPU_NUMBER_TYPES(REGISTER_PARALLEL_CONCAT); +#undef REGISTER_PARALLEL_CONCAT + #define REGISTER(type) \ REGISTER_KERNEL_BUILDER( \ Name("InplaceUpdate").Device(DEVICE_GPU).TypeConstraint("T"), \ diff --git a/tensorflow/core/ops/array_ops.cc b/tensorflow/core/ops/array_ops.cc index 59c82e937d4..be42b2196f7 100644 --- a/tensorflow/core/ops/array_ops.cc +++ b/tensorflow/core/ops/array_ops.cc @@ -164,6 +164,71 @@ Status SetOutputShapeForReshape(InferenceContext* c) { } // namespace +REGISTER_OP("ParallelConcat") + .Input("values: N * T") + .Output("output: T") + .Attr("N: int >= 1") + .Attr("T: type") + .Attr("shape: shape") + .SetShapeFn([](InferenceContext* c) { + // Validate that the shape attr is correct. + TensorShapeProto passed_shape_proto; + TF_RETURN_IF_ERROR(c->GetAttr("shape", &passed_shape_proto)); + ShapeHandle passed_shape; + TF_RETURN_IF_ERROR( + c->MakeShapeFromShapeProto(passed_shape_proto, &passed_shape)); + if (!c->FullyDefined(passed_shape)) { + return errors::InvalidArgument("shape attr must be fully defined."); + } + ShapeHandle cur; + TF_RETURN_IF_ERROR(c->ReplaceDim( + passed_shape, 0, c->MakeDim(shape_inference::DimensionOrConstant(1)), + &cur)); + for (int i = 0; i < c->num_inputs(); ++i) { + if (!c->FullyDefined(c->input(i))) { + return errors::InvalidArgument( + "All input shapes must be fully defined."); + } + DimensionHandle unused; + if (!c->WithValue(c->Dim(c->input(i), 0), 1, &unused).ok()) { + return errors::InvalidArgument("Size of first dimension must be 1."); + } + TF_RETURN_WITH_CONTEXT_IF_ERROR(c->Merge(c->input(i), cur, &cur), + "From merging shape ", i, + " with other shapes."); + } + + c->set_output(0, passed_shape); + + return Status::OK(); + }) + .Doc(R"doc( +Concatenates a list of `N` tensors along the first dimension. + +The input tensors are all required to have size 1 in the first dimension. + +For example: + +```prettyprint +# 'x' is [[1, 4]] +# 'y' is [[2, 5]] +# 'z' is [[3, 6]] +parallel_concat([x, y, z]) => [[1, 4], [2, 5], [3, 6]] # Pack along first dim. +``` + +The difference between concat and parallel_concat is that concat requires all +of the inputs be computed before the operation will begin but doesn't require +that the input shapes be known during graph construction. Parallel concat +will copy pieces of the input into the output as they become available, in +some situations this can provide a performance benefit. + +values: Tensors to be concatenated. All must have size 1 in the first dimension + and same shape. +output: The concatenated tensor. +shape: the final shape of the result; should be equal to the shapes of any input + but with the number of input values in the first dimension. +)doc"); + REGISTER_OP("Pack") .Input("values: N * T") .Output("output: T") diff --git a/tensorflow/python/ops/array_ops.py b/tensorflow/python/ops/array_ops.py index 1201c0705b1..1f023d7d83c 100644 --- a/tensorflow/python/ops/array_ops.py +++ b/tensorflow/python/ops/array_ops.py @@ -972,16 +972,9 @@ def parallel_stack(values, name="parallel_stack"): output_shape = tensor_shape.TensorShape([len(values)]) output_shape = output_shape.concatenate(value_shape) - - outputs = _empty(output_shape, values[0].dtype) - output_ops = [] - for i in range(len(values)): - with ops.colocate_with(outputs): - output_op = _alias_inplace_update(outputs, i, values[i]) - output_ops.append(output_op) - with ops.control_dependencies(output_ops): - outputs = identity(outputs) - return outputs + # expand_dims converts concat to stack. + return gen_array_ops._parallel_concat( + [expand_dims(value, 0) for value in values], shape=output_shape) def stack(values, axis=0, name="stack"): """Stacks a list of rank-`R` tensors into one rank-`(R+1)` tensor. diff --git a/tensorflow/python/ops/hidden_ops.txt b/tensorflow/python/ops/hidden_ops.txt index abe5c538d0a..4b1b9815ca8 100644 --- a/tensorflow/python/ops/hidden_ops.txt +++ b/tensorflow/python/ops/hidden_ops.txt @@ -18,6 +18,7 @@ MirrorPadGrad OneHot Pack Pad +ParallelConcat Placeholder RefIdentity Reverse -- GitLab