/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ module thrift.transport.piped; import thrift.transport.base; import thrift.transport.memory; /** * Pipes data request from one transport to another when readEnd() * or writeEnd() is called. * * A typical use case would be to log requests on e.g. a socket to * disk (i. e. pipe them to a TFileWriterTransport). * * The implementation keeps an internal buffer which expands to * hold the whole amount of data read/written until the corresponding *End() * method is called. * * Contrary to the C++ implementation, this doesn't introduce yet another layer * of input/output buffering, all calls are passed to the underlying source * transport verbatim. */ final class TPipedTransport(Source = TTransport) if ( isTTransport!Source ) : TBaseTransport { /// The default initial buffer size if not explicitly specified, in bytes. enum DEFAULT_INITIAL_BUFFER_SIZE = 512; /** * Constructs a new instance. * * By default, only reads are piped (pipeReads = true, pipeWrites = false). * * Params: * srcTrans = The transport to which all requests are forwarded. * dstTrans = The transport the read/written data is copied to. * initialBufferSize = The default size of the read/write buffers, for * performance tuning. */ this(Source srcTrans, TTransport dstTrans, size_t initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE ) { srcTrans_ = srcTrans; dstTrans_ = dstTrans; readBuffer_ = new TMemoryBuffer(initialBufferSize); writeBuffer_ = new TMemoryBuffer(initialBufferSize); pipeReads_ = true; pipeWrites_ = false; } bool pipeReads() @property const { return pipeReads_; } void pipeReads(bool value) @property { if (!value) { readBuffer_.reset(); } pipeReads_ = value; } bool pipeWrites() @property const { return pipeWrites_; } void pipeWrites(bool value) @property { if (!value) { writeBuffer_.reset(); } pipeWrites_ = value; } override bool isOpen() { return srcTrans_.isOpen(); } override bool peek() { return srcTrans_.peek(); } override void open() { srcTrans_.open(); } override void close() { srcTrans_.close(); } override size_t read(ubyte[] buf) { auto bytesRead = srcTrans_.read(buf); if (pipeReads_) { readBuffer_.write(buf[0 .. bytesRead]); } return bytesRead; } override size_t readEnd() { if (pipeReads_) { auto data = readBuffer_.getContents(); dstTrans_.write(data); dstTrans_.flush(); readBuffer_.reset(); srcTrans_.readEnd(); // Return data.length instead of the readEnd() result of the source // transports because it might not be available from it. return data.length; } return srcTrans_.readEnd(); } override void write(in ubyte[] buf) { if (pipeWrites_) { writeBuffer_.write(buf); } srcTrans_.write(buf); } override size_t writeEnd() { if (pipeWrites_) { auto data = writeBuffer_.getContents(); dstTrans_.write(data); dstTrans_.flush(); writeBuffer_.reset(); srcTrans_.writeEnd(); // Return data.length instead of the readEnd() result of the source // transports because it might not be available from it. return data.length; } return srcTrans_.writeEnd(); } override void flush() { srcTrans_.flush(); } private: Source srcTrans_; TTransport dstTrans_; TMemoryBuffer readBuffer_; TMemoryBuffer writeBuffer_; bool pipeReads_; bool pipeWrites_; } /** * TPipedTransport construction helper to avoid having to explicitly * specify the transport types, i.e. to allow the constructor being called * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). */ TPipedTransport!Source tPipedTransport(Source)( Source srcTrans, TTransport dstTrans ) if (isTTransport!Source) { return new typeof(return)(srcTrans, dstTrans); } version (unittest) { // DMD @@BUG@@: UFCS for std.array.empty doesn't work when import is moved // into unittest block. import std.array; import std.exception : enforce; } unittest { auto underlying = new TMemoryBuffer; auto pipeTarget = new TMemoryBuffer; auto trans = tPipedTransport(underlying, pipeTarget); underlying.write(cast(ubyte[])"abcd"); ubyte[4] buffer; trans.readAll(buffer[0 .. 2]); enforce(buffer[0 .. 2] == "ab"); enforce(pipeTarget.getContents().empty); trans.readEnd(); enforce(pipeTarget.getContents() == "ab"); pipeTarget.reset(); underlying.write(cast(ubyte[])"ef"); trans.readAll(buffer[0 .. 2]); enforce(buffer[0 .. 2] == "cd"); enforce(pipeTarget.getContents().empty); trans.readAll(buffer[0 .. 2]); enforce(buffer[0 .. 2] == "ef"); enforce(pipeTarget.getContents().empty); trans.readEnd(); enforce(pipeTarget.getContents() == "cdef"); }