未验证 提交 957050cd 编写于 作者: C Chris Knight 提交者: GitHub

feature: synchronous buffered writer (#4693)

上级 41f836dc
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
type Reader = Deno.Reader; type Reader = Deno.Reader;
type Writer = Deno.Writer; type Writer = Deno.Writer;
type SyncWriter = Deno.SyncWriter;
import { charCode, copyBytes } from "./util.ts"; import { charCode, copyBytes } from "./util.ts";
import { assert } from "../testing/asserts.ts"; import { assert } from "../testing/asserts.ts";
...@@ -400,6 +401,40 @@ export class BufReader implements Reader { ...@@ -400,6 +401,40 @@ export class BufReader implements Reader {
} }
} }
abstract class AbstractBufBase {
buf!: Uint8Array;
usedBufferBytes = 0;
err: Error | null = null;
/** Size returns the size of the underlying buffer in bytes. */
size(): number {
return this.buf.byteLength;
}
/** Returns how many bytes are unused in the buffer. */
available(): number {
return this.buf.byteLength - this.usedBufferBytes;
}
/** buffered returns the number of bytes that have been written into the
* current buffer.
*/
buffered(): number {
return this.usedBufferBytes;
}
checkBytesWritten(numBytesWritten: number): void {
if (numBytesWritten < this.usedBufferBytes) {
if (numBytesWritten > 0) {
this.buf.copyWithin(0, numBytesWritten, this.usedBufferBytes);
this.usedBufferBytes -= numBytesWritten;
}
this.err = new Error("Short write");
throw this.err;
}
}
}
/** BufWriter implements buffering for an deno.Writer object. /** BufWriter implements buffering for an deno.Writer object.
* If an error occurs writing to a Writer, no more data will be * If an error occurs writing to a Writer, no more data will be
* accepted and all subsequent writes, and flush(), will return the error. * accepted and all subsequent writes, and flush(), will return the error.
...@@ -407,106 +442,179 @@ export class BufReader implements Reader { ...@@ -407,106 +442,179 @@ export class BufReader implements Reader {
* flush() method to guarantee all data has been forwarded to * flush() method to guarantee all data has been forwarded to
* the underlying deno.Writer. * the underlying deno.Writer.
*/ */
export class BufWriter implements Writer { export class BufWriter extends AbstractBufBase implements Writer {
buf: Uint8Array; /** return new BufWriter unless writer is BufWriter */
n = 0; static create(writer: Writer, size: number = DEFAULT_BUF_SIZE): BufWriter {
err: Error | null = null; return writer instanceof BufWriter ? writer : new BufWriter(writer, size);
/** return new BufWriter unless w is BufWriter */
static create(w: Writer, size: number = DEFAULT_BUF_SIZE): BufWriter {
return w instanceof BufWriter ? w : new BufWriter(w, size);
} }
constructor(private wr: Writer, size: number = DEFAULT_BUF_SIZE) { constructor(private writer: Writer, size: number = DEFAULT_BUF_SIZE) {
super();
if (size <= 0) { if (size <= 0) {
size = DEFAULT_BUF_SIZE; size = DEFAULT_BUF_SIZE;
} }
this.buf = new Uint8Array(size); this.buf = new Uint8Array(size);
} }
/** Size returns the size of the underlying buffer in bytes. */
size(): number {
return this.buf.byteLength;
}
/** Discards any unflushed buffered data, clears any error, and /** Discards any unflushed buffered data, clears any error, and
* resets b to write its output to w. * resets buffer to write its output to w.
*/ */
reset(w: Writer): void { reset(w: Writer): void {
this.err = null; this.err = null;
this.n = 0; this.usedBufferBytes = 0;
this.wr = w; this.writer = w;
} }
/** Flush writes any buffered data to the underlying io.Writer. */ /** Flush writes any buffered data to the underlying io.Writer. */
async flush(): Promise<void> { async flush(): Promise<void> {
if (this.err !== null) throw this.err; if (this.err !== null) throw this.err;
if (this.n === 0) return; if (this.usedBufferBytes === 0) return;
let n = 0; let numBytesWritten = 0;
try { try {
n = await this.wr.write(this.buf.subarray(0, this.n)); numBytesWritten = await this.writer.write(
this.buf.subarray(0, this.usedBufferBytes)
);
} catch (e) { } catch (e) {
this.err = e; this.err = e;
throw e; throw e;
} }
if (n < this.n) { this.checkBytesWritten(numBytesWritten);
if (n > 0) {
this.buf.copyWithin(0, n, this.n); this.usedBufferBytes = 0;
this.n -= n; }
/** Writes the contents of `data` into the buffer. If the contents won't fully
* fit into the buffer, those bytes that can are copied into the buffer, the
* buffer is the flushed to the writer and the remaining bytes are copied into
* the now empty buffer.
*
* @return the number of bytes written to the buffer.
*/
async write(data: Uint8Array): Promise<number> {
if (this.err !== null) throw this.err;
if (data.length === 0) return 0;
let totalBytesWritten = 0;
let numBytesWritten = 0;
while (data.byteLength > this.available()) {
if (this.buffered() === 0) {
// Large write, empty buffer.
// Write directly from data to avoid copy.
try {
numBytesWritten = await this.writer.write(data);
} catch (e) {
this.err = e;
throw e;
}
} else {
numBytesWritten = copyBytes(this.buf, data, this.usedBufferBytes);
this.usedBufferBytes += numBytesWritten;
await this.flush();
} }
this.err = new Error("Short write"); totalBytesWritten += numBytesWritten;
throw this.err; data = data.subarray(numBytesWritten);
} }
this.n = 0; numBytesWritten = copyBytes(this.buf, data, this.usedBufferBytes);
this.usedBufferBytes += numBytesWritten;
totalBytesWritten += numBytesWritten;
return totalBytesWritten;
} }
}
/** Returns how many bytes are unused in the buffer. */ /** BufWriterSync implements buffering for a deno.SyncWriter object.
available(): number { * If an error occurs writing to a SyncWriter, no more data will be
return this.buf.byteLength - this.n; * accepted and all subsequent writes, and flush(), will return the error.
* After all data has been written, the client should call the
* flush() method to guarantee all data has been forwarded to
* the underlying deno.SyncWriter.
*/
export class BufWriterSync extends AbstractBufBase implements SyncWriter {
/** return new BufWriterSync unless writer is BufWriterSync */
static create(
writer: SyncWriter,
size: number = DEFAULT_BUF_SIZE
): BufWriterSync {
return writer instanceof BufWriterSync
? writer
: new BufWriterSync(writer, size);
} }
/** buffered returns the number of bytes that have been written into the constructor(private writer: SyncWriter, size: number = DEFAULT_BUF_SIZE) {
* current buffer. super();
if (size <= 0) {
size = DEFAULT_BUF_SIZE;
}
this.buf = new Uint8Array(size);
}
/** Discards any unflushed buffered data, clears any error, and
* resets buffer to write its output to w.
*/ */
buffered(): number { reset(w: SyncWriter): void {
return this.n; this.err = null;
this.usedBufferBytes = 0;
this.writer = w;
}
/** Flush writes any buffered data to the underlying io.SyncWriter. */
flush(): void {
if (this.err !== null) throw this.err;
if (this.usedBufferBytes === 0) return;
let numBytesWritten = 0;
try {
numBytesWritten = this.writer.writeSync(
this.buf.subarray(0, this.usedBufferBytes)
);
} catch (e) {
this.err = e;
throw e;
}
this.checkBytesWritten(numBytesWritten);
this.usedBufferBytes = 0;
} }
/** Writes the contents of p into the buffer. /** Writes the contents of `data` into the buffer. If the contents won't fully
* Returns the number of bytes written. * fit into the buffer, those bytes that can are copied into the buffer, the
* buffer is the flushed to the writer and the remaining bytes are copied into
* the now empty buffer.
*
* @return the number of bytes written to the buffer.
*/ */
async write(p: Uint8Array): Promise<number> { writeSync(data: Uint8Array): number {
if (this.err !== null) throw this.err; if (this.err !== null) throw this.err;
if (p.length === 0) return 0; if (data.length === 0) return 0;
let nn = 0; let totalBytesWritten = 0;
let n = 0; let numBytesWritten = 0;
while (p.byteLength > this.available()) { while (data.byteLength > this.available()) {
if (this.buffered() === 0) { if (this.buffered() === 0) {
// Large write, empty buffer. // Large write, empty buffer.
// Write directly from p to avoid copy. // Write directly from data to avoid copy.
try { try {
n = await this.wr.write(p); numBytesWritten = this.writer.writeSync(data);
} catch (e) { } catch (e) {
this.err = e; this.err = e;
throw e; throw e;
} }
} else { } else {
n = copyBytes(this.buf, p, this.n); numBytesWritten = copyBytes(this.buf, data, this.usedBufferBytes);
this.n += n; this.usedBufferBytes += numBytesWritten;
await this.flush(); this.flush();
} }
nn += n; totalBytesWritten += numBytesWritten;
p = p.subarray(n); data = data.subarray(numBytesWritten);
} }
n = copyBytes(this.buf, p, this.n); numBytesWritten = copyBytes(this.buf, data, this.usedBufferBytes);
this.n += n; this.usedBufferBytes += numBytesWritten;
nn += n; totalBytesWritten += numBytesWritten;
return nn; return totalBytesWritten;
} }
} }
......
...@@ -14,6 +14,7 @@ import { ...@@ -14,6 +14,7 @@ import {
import { import {
BufReader, BufReader,
BufWriter, BufWriter,
BufWriterSync,
BufferFullError, BufferFullError,
PartialReadError, PartialReadError,
readStringDelim, readStringDelim,
...@@ -353,6 +354,40 @@ Deno.test(async function bufioWriter(): Promise<void> { ...@@ -353,6 +354,40 @@ Deno.test(async function bufioWriter(): Promise<void> {
} }
}); });
Deno.test(function bufioWriterSync(): void {
const data = new Uint8Array(8192);
for (let i = 0; i < data.byteLength; i++) {
// eslint-disable-next-line @typescript-eslint/restrict-plus-operands
data[i] = charCode(" ") + (i % (charCode("~") - charCode(" ")));
}
const w = new Buffer();
for (const nwrite of bufsizes) {
for (const bs of bufsizes) {
// Write nwrite bytes using buffer size bs.
// Check that the right amount makes it out
// and that the data is correct.
w.reset();
const buf = new BufWriterSync(w, bs);
const context = `nwrite=${nwrite} bufsize=${bs}`;
const n = buf.writeSync(data.subarray(0, nwrite));
assertEquals(n, nwrite, context);
buf.flush();
const written = w.bytes();
assertEquals(written.byteLength, nwrite);
for (let l = 0; l < written.byteLength; l++) {
assertEquals(written[l], data[l]);
}
}
}
});
Deno.test(async function bufReaderReadFull(): Promise<void> { Deno.test(async function bufReaderReadFull(): Promise<void> {
const enc = new TextEncoder(); const enc = new TextEncoder();
const dec = new TextDecoder(); const dec = new TextDecoder();
......
...@@ -5,14 +5,20 @@ type Reader = Deno.Reader; ...@@ -5,14 +5,20 @@ type Reader = Deno.Reader;
import * as path from "../path/mod.ts"; import * as path from "../path/mod.ts";
import { encode } from "../encoding/utf8.ts"; import { encode } from "../encoding/utf8.ts";
// `off` is the offset into `dst` where it will at which to begin writing values /**
// from `src`. * Copy bytes from one Uint8Array to another. Bytes from `src` which don't fit
// Returns the number of bytes copied. * into `dst` will not be copied.
*
* @param dst Destination byte array
* @param src Source byte array
* @param off Offset into `dst` at which to begin writing values from `src`.
* @return number of bytes copied
*/
export function copyBytes(dst: Uint8Array, src: Uint8Array, off = 0): number { export function copyBytes(dst: Uint8Array, src: Uint8Array, off = 0): number {
off = Math.max(0, Math.min(off, dst.byteLength)); off = Math.max(0, Math.min(off, dst.byteLength));
const r = dst.byteLength - off; const dstBytesAvailable = dst.byteLength - off;
if (src.byteLength > r) { if (src.byteLength > dstBytesAvailable) {
src = src.subarray(0, r); src = src.subarray(0, dstBytesAvailable);
} }
dst.set(src, off); dst.set(src, off);
return src.byteLength; return src.byteLength;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册