bufio.ts 17.6 KB
Newer Older
R
Ryan Dahl 已提交
1
// Based on https://github.com/golang/go/blob/891682/src/bufio/bufio.go
R
Ryan Dahl 已提交
2 3 4 5
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

6 7
type Reader = Deno.Reader;
type Writer = Deno.Writer;
8
import { charCode, copyBytes } from "./util.ts";
9
import { assert } from "../testing/asserts.ts";
R
Ryan Dahl 已提交
10 11 12 13

const DEFAULT_BUF_SIZE = 4096;
const MIN_BUF_SIZE = 16;
const MAX_CONSECUTIVE_EMPTY_READS = 100;
R
Ryan Dahl 已提交
14 15
const CR = charCode("\r");
const LF = charCode("\n");
R
Ryan Dahl 已提交
16

17 18 19 20 21 22 23
export class BufferFullError extends Error {
  name = "BufferFullError";
  constructor(public partial: Uint8Array) {
    super("Buffer full");
  }
}

24 25
export class PartialReadError extends Deno.errors.UnexpectedEof {
  name = "PartialReadError";
K
Kitson Kelly 已提交
26
  partial?: Uint8Array;
27
  constructor() {
28
    super("Encountered UnexpectedEof, data only partially read");
29 30 31 32 33 34 35 36
  }
}

/** Result type returned by of BufReader.readLine(). */
export interface ReadLineResult {
  line: Uint8Array;
  more: boolean;
}
R
Ryan Dahl 已提交
37

R
Ryan Dahl 已提交
38 39
/** BufReader implements buffering for a Reader object. */
export class BufReader implements Reader {
40 41
  private buf!: Uint8Array;
  private rd!: Reader; // Reader provided by caller.
R
Ryan Dahl 已提交
42 43
  private r = 0; // buf read position.
  private w = 0; // buf write position.
44 45 46
  private eof = false;
  // private lastByte: number;
  // private lastCharSize: number;
R
Ryan Dahl 已提交
47

48
  /** return new BufReader unless r is BufReader */
49
  static create(r: Reader, size: number = DEFAULT_BUF_SIZE): BufReader {
50 51 52
    return r instanceof BufReader ? r : new BufReader(r, size);
  }

53
  constructor(rd: Reader, size: number = DEFAULT_BUF_SIZE) {
R
Ryan Dahl 已提交
54 55 56
    if (size < MIN_BUF_SIZE) {
      size = MIN_BUF_SIZE;
    }
R
Ryan Dahl 已提交
57
    this._reset(new Uint8Array(size), rd);
R
Ryan Dahl 已提交
58 59 60
  }

  /** Returns the size of the underlying buffer in bytes. */
R
Ryan Dahl 已提交
61
  size(): number {
R
Ryan Dahl 已提交
62 63 64
    return this.buf.byteLength;
  }

R
Ryan Dahl 已提交
65 66 67 68
  buffered(): number {
    return this.w - this.r;
  }

R
Ryan Dahl 已提交
69
  // Reads a new chunk into the buffer.
R
Ryan Dahl 已提交
70
  private async _fill(): Promise<void> {
R
Ryan Dahl 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83
    // Slide existing data to beginning.
    if (this.r > 0) {
      this.buf.copyWithin(0, this.r, this.w);
      this.w -= this.r;
      this.r = 0;
    }

    if (this.w >= this.buf.byteLength) {
      throw Error("bufio: tried to fill full buffer");
    }

    // Read new data: try a limited number of times.
    for (let i = MAX_CONSECUTIVE_EMPTY_READS; i > 0; i--) {
84 85
      const rr = await this.rd.read(this.buf.subarray(this.w));
      if (rr === Deno.EOF) {
86
        this.eof = true;
R
Ryan Dahl 已提交
87
        return;
R
Ryan Dahl 已提交
88
      }
89 90 91
      assert(rr >= 0, "negative read");
      this.w += rr;
      if (rr > 0) {
R
Ryan Dahl 已提交
92
        return;
R
Ryan Dahl 已提交
93 94
      }
    }
95 96 97 98

    throw new Error(
      `No progress after ${MAX_CONSECUTIVE_EMPTY_READS} read() calls`
    );
R
Ryan Dahl 已提交
99 100 101 102 103
  }

  /** Discards any buffered data, resets all state, and switches
   * the buffered reader to read from r.
   */
R
Ryan Dahl 已提交
104
  reset(r: Reader): void {
R
Ryan Dahl 已提交
105 106 107
    this._reset(this.buf, r);
  }

R
Ryan Dahl 已提交
108
  private _reset(buf: Uint8Array, rd: Reader): void {
R
Ryan Dahl 已提交
109 110
    this.buf = buf;
    this.rd = rd;
111 112 113
    this.eof = false;
    // this.lastByte = -1;
    // this.lastCharSize = -1;
R
Ryan Dahl 已提交
114 115
  }

R
Ryan Dahl 已提交
116 117 118 119 120 121
  /** reads data into p.
   * It returns the number of bytes read into p.
   * The bytes are taken from at most one Read on the underlying Reader,
   * hence n may be less than len(p).
   * To read exactly len(p) bytes, use io.ReadFull(b, p).
   */
122 123
  async read(p: Uint8Array): Promise<number | Deno.EOF> {
    let rr: number | Deno.EOF = p.byteLength;
124
    if (p.byteLength === 0) return rr;
R
Ryan Dahl 已提交
125 126 127 128 129

    if (this.r === this.w) {
      if (p.byteLength >= this.buf.byteLength) {
        // Large read, empty buffer.
        // Read directly into p to avoid copy.
130
        const rr = await this.rd.read(p);
131 132
        const nread = rr === Deno.EOF ? 0 : rr;
        assert(nread >= 0, "negative read");
133 134 135 136
        // if (rr.nread > 0) {
        //   this.lastByte = p[rr.nread - 1];
        //   this.lastCharSize = -1;
        // }
R
Ryan Dahl 已提交
137 138
        return rr;
      }
139

R
Ryan Dahl 已提交
140 141 142 143
      // One read.
      // Do not use this.fill, which will loop.
      this.r = 0;
      this.w = 0;
144
      rr = await this.rd.read(this.buf);
145 146 147
      if (rr === 0 || rr === Deno.EOF) return rr;
      assert(rr >= 0, "negative read");
      this.w += rr;
R
Ryan Dahl 已提交
148 149 150
    }

    // copy as much as we can
151 152
    const copied = copyBytes(p, this.buf.subarray(this.r, this.w), 0);
    this.r += copied;
153 154
    // this.lastByte = this.buf[this.r - 1];
    // this.lastCharSize = -1;
155
    return copied;
R
Ryan Dahl 已提交
156 157
  }

158 159 160 161 162 163 164 165 166 167 168 169
  /** reads exactly `p.length` bytes into `p`.
   *
   * If successful, `p` is returned.
   *
   * If the end of the underlying stream has been reached, and there are no more
   * bytes available in the buffer, `readFull()` returns `EOF` instead.
   *
   * An error is thrown if some bytes could be read, but not enough to fill `p`
   * entirely before the underlying stream reported an error or EOF. Any error
   * thrown will have a `partial` property that indicates the slice of the
   * buffer that has been successfully filled with data.
   *
170 171
   * Ported from https://golang.org/pkg/io/#ReadFull
   */
172
  async readFull(p: Uint8Array): Promise<Uint8Array | Deno.EOF> {
173 174 175 176
    let bytesRead = 0;
    while (bytesRead < p.length) {
      try {
        const rr = await this.read(p.subarray(bytesRead));
177
        if (rr === Deno.EOF) {
178
          if (bytesRead === 0) {
179
            return Deno.EOF;
180
          } else {
181
            throw new PartialReadError();
182 183
          }
        }
184
        bytesRead += rr;
185 186 187 188
      } catch (err) {
        err.partial = p.subarray(0, bytesRead);
        throw err;
      }
189
    }
190
    return p;
191 192
  }

193
  /** Returns the next byte [0, 255] or `EOF`. */
194
  async readByte(): Promise<number | Deno.EOF> {
R
Ryan Dahl 已提交
195
    while (this.r === this.w) {
196
      if (this.eof) return Deno.EOF;
R
Ryan Dahl 已提交
197
      await this._fill(); // buffer is empty.
R
Ryan Dahl 已提交
198 199 200
    }
    const c = this.buf[this.r];
    this.r++;
201
    // this.lastByte = c;
R
Ryan Dahl 已提交
202 203
    return c;
  }
R
Ryan Dahl 已提交
204 205 206 207

  /** readString() reads until the first occurrence of delim in the input,
   * returning a string containing the data up to and including the delimiter.
   * If ReadString encounters an error before finding a delimiter,
208 209 210
   * it returns the data read before the error and the error itself
   * (often io.EOF).
   * ReadString returns err != nil if and only if the returned data does not end
211
   * in delim.
R
Ryan Dahl 已提交
212 213
   * For simple uses, a Scanner may be more convenient.
   */
214 215 216 217
  async readString(delim: string): Promise<string | Deno.EOF> {
    if (delim.length !== 1)
      throw new Error("Delimiter should be a single character");
    const buffer = await this.readSlice(delim.charCodeAt(0));
218 219
    if (buffer == Deno.EOF) return Deno.EOF;
    return new TextDecoder().decode(buffer);
R
Ryan Dahl 已提交
220
  }
R
Ryan Dahl 已提交
221

222 223
  /** `readLine()` is a low-level line-reading primitive. Most callers should
   * use `readString('\n')` instead or use a Scanner.
R
Ryan Dahl 已提交
224
   *
225 226
   * `readLine()` tries to return a single line, not including the end-of-line
   * bytes. If the line was too long for the buffer then `more` is set and the
R
Ryan Dahl 已提交
227
   * beginning of the line is returned. The rest of the line will be returned
228
   * from future calls. `more` will be false when returning the last fragment
R
Ryan Dahl 已提交
229
   * of the line. The returned buffer is only valid until the next call to
230
   * `readLine()`.
R
Ryan Dahl 已提交
231
   *
232 233 234 235 236 237 238 239 240 241 242
   * The text returned from ReadLine does not include the line end ("\r\n" or
   * "\n").
   *
   * When the end of the underlying stream is reached, the final bytes in the
   * stream are returned. No indication or error is given if the input ends
   * without a final line end. When there are no more trailing bytes to read,
   * `readLine()` returns the `EOF` symbol.
   *
   * Calling `unreadByte()` after `readLine()` will always unread the last byte
   * read (possibly a character belonging to the line end) even if that byte is
   * not part of the line returned by `readLine()`.
R
Ryan Dahl 已提交
243
   */
244 245
  async readLine(): Promise<ReadLineResult | Deno.EOF> {
    let line: Uint8Array | Deno.EOF;
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260

    try {
      line = await this.readSlice(LF);
    } catch (err) {
      let { partial } = err;
      assert(
        partial instanceof Uint8Array,
        "bufio: caught error from `readSlice()` without `partial` property"
      );

      // Don't throw if `readSlice()` failed with `BufferFullError`, instead we
      // just return whatever is available and set the `more` flag.
      if (!(err instanceof BufferFullError)) {
        throw err;
      }
R
Ryan Dahl 已提交
261 262

      // Handle the case where "\r\n" straddles the buffer.
263 264 265 266 267
      if (
        !this.eof &&
        partial.byteLength > 0 &&
        partial[partial.byteLength - 1] === CR
      ) {
R
Ryan Dahl 已提交
268 269 270 271
        // Put the '\r' back on buf and drop it from line.
        // Let the next call to ReadLine check for "\r\n".
        assert(this.r > 0, "bufio: tried to rewind past start of buffer");
        this.r--;
272
        partial = partial.subarray(0, partial.byteLength - 1);
R
Ryan Dahl 已提交
273
      }
274 275 276 277

      return { line: partial, more: !this.eof };
    }

278 279
    if (line === Deno.EOF) {
      return Deno.EOF;
R
Ryan Dahl 已提交
280 281 282
    }

    if (line.byteLength === 0) {
283
      return { line, more: false };
R
Ryan Dahl 已提交
284 285 286 287 288 289 290 291 292
    }

    if (line[line.byteLength - 1] == LF) {
      let drop = 1;
      if (line.byteLength > 1 && line[line.byteLength - 2] === CR) {
        drop = 2;
      }
      line = line.subarray(0, line.byteLength - drop);
    }
293
    return { line, more: false };
R
Ryan Dahl 已提交
294 295
  }

296
  /** `readSlice()` reads until the first occurrence of `delim` in the input,
R
Ryan Dahl 已提交
297
   * returning a slice pointing at the bytes in the buffer. The bytes stop
298 299 300 301 302 303 304 305 306 307 308 309 310
   * being valid at the next read.
   *
   * If `readSlice()` encounters an error before finding a delimiter, or the
   * buffer fills without finding a delimiter, it throws an error with a
   * `partial` property that contains the entire buffer.
   *
   * If `readSlice()` encounters the end of the underlying stream and there are
   * any bytes left in the buffer, the rest of the buffer is returned. In other
   * words, EOF is always treated as a delimiter. Once the buffer is empty,
   * it returns `EOF`.
   *
   * Because the data returned from `readSlice()` will be overwritten by the
   * next I/O operation, most clients should use `readString()` instead.
R
Ryan Dahl 已提交
311
   */
312
  async readSlice(delim: number): Promise<Uint8Array | Deno.EOF> {
R
Ryan Dahl 已提交
313
    let s = 0; // search start index
314
    let slice: Uint8Array | undefined;
315

R
Ryan Dahl 已提交
316 317 318 319 320
    while (true) {
      // Search buffer.
      let i = this.buf.subarray(this.r + s, this.w).indexOf(delim);
      if (i >= 0) {
        i += s;
321
        slice = this.buf.subarray(this.r, this.r + i + 1);
R
Ryan Dahl 已提交
322 323 324 325
        this.r += i + 1;
        break;
      }

326 327 328
      // EOF?
      if (this.eof) {
        if (this.r === this.w) {
329
          return Deno.EOF;
330 331
        }
        slice = this.buf.subarray(this.r, this.w);
R
Ryan Dahl 已提交
332 333 334 335 336 337 338
        this.r = this.w;
        break;
      }

      // Buffer full?
      if (this.buffered() >= this.buf.byteLength) {
        this.r = this.w;
339
        throw new BufferFullError(this.buf);
R
Ryan Dahl 已提交
340 341 342 343
      }

      s = this.w - this.r; // do not rescan area we scanned before

344 345 346 347
      // Buffer is not full.
      try {
        await this._fill();
      } catch (err) {
348
        err.partial = slice;
349 350
        throw err;
      }
R
Ryan Dahl 已提交
351 352 353
    }

    // Handle last byte, if any.
354 355 356 357 358
    // const i = slice.byteLength - 1;
    // if (i >= 0) {
    //   this.lastByte = slice[i];
    //   this.lastCharSize = -1
    // }
R
Ryan Dahl 已提交
359

360
    return slice;
R
Ryan Dahl 已提交
361
  }
R
Ryan Dahl 已提交
362

363 364 365 366 367 368 369 370 371 372
  /** `peek()` returns the next `n` bytes without advancing the reader. The
   * bytes stop being valid at the next read call.
   *
   * When the end of the underlying stream is reached, but there are unread
   * bytes left in the buffer, those bytes are returned. If there are no bytes
   * left in the buffer, it returns `EOF`.
   *
   * If an error is encountered before `n` bytes are available, `peek()` throws
   * an error with the `partial` property set to a slice of the buffer that
   * contains the bytes that were available before the error occurred.
R
Ryan Dahl 已提交
373
   */
374
  async peek(n: number): Promise<Uint8Array | Deno.EOF> {
R
Ryan Dahl 已提交
375 376 377 378
    if (n < 0) {
      throw Error("negative count");
    }

379 380 381 382 383 384 385 386 387
    let avail = this.w - this.r;
    while (avail < n && avail < this.buf.byteLength && !this.eof) {
      try {
        await this._fill();
      } catch (err) {
        err.partial = this.buf.subarray(this.r, this.w);
        throw err;
      }
      avail = this.w - this.r;
R
Ryan Dahl 已提交
388 389
    }

390
    if (avail === 0 && this.eof) {
391
      return Deno.EOF;
392 393 394 395
    } else if (avail < n && this.eof) {
      return this.buf.subarray(this.r, this.r + avail);
    } else if (avail < n) {
      throw new BufferFullError(this.buf.subarray(this.r, this.w));
R
Ryan Dahl 已提交
396 397
    }

398
    return this.buf.subarray(this.r, this.r + n);
R
Ryan Dahl 已提交
399
  }
R
Ryan Dahl 已提交
400
}
R
wip  
Ryan Dahl 已提交
401 402 403 404 405 406 407 408 409

/** BufWriter implements buffering for an deno.Writer object.
 * If an error occurs writing to a Writer, no more data will be
 * accepted and all subsequent writes, and flush(), will return the error.
 * After all data has been written, the client should call the
 * flush() method to guarantee all data has been forwarded to
 * the underlying deno.Writer.
 */
export class BufWriter implements Writer {
R
Ryan Dahl 已提交
410
  buf: Uint8Array;
411
  n = 0;
412
  err: Error | null = null;
R
Ryan Dahl 已提交
413

414
  /** return new BufWriter unless w is BufWriter */
415
  static create(w: Writer, size: number = DEFAULT_BUF_SIZE): BufWriter {
416 417 418
    return w instanceof BufWriter ? w : new BufWriter(w, size);
  }

419
  constructor(private wr: Writer, size: number = DEFAULT_BUF_SIZE) {
R
Ryan Dahl 已提交
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
    if (size <= 0) {
      size = DEFAULT_BUF_SIZE;
    }
    this.buf = new Uint8Array(size);
  }

  /** Size returns the size of the underlying buffer in bytes. */
  size(): number {
    return this.buf.byteLength;
  }

  /** Discards any unflushed buffered data, clears any error, and
   * resets b to write its output to w.
   */
  reset(w: Writer): void {
    this.err = null;
    this.n = 0;
    this.wr = w;
  }

  /** Flush writes any buffered data to the underlying io.Writer. */
441 442 443
  async flush(): Promise<void> {
    if (this.err !== null) throw this.err;
    if (this.n === 0) return;
R
Ryan Dahl 已提交
444

445
    let n = 0;
R
Ryan Dahl 已提交
446 447 448
    try {
      n = await this.wr.write(this.buf.subarray(0, this.n));
    } catch (e) {
449 450
      this.err = e;
      throw e;
R
Ryan Dahl 已提交
451 452
    }

453 454
    if (n < this.n) {
      if (n > 0) {
R
Ryan Dahl 已提交
455
        this.buf.copyWithin(0, n, this.n);
456
        this.n -= n;
R
Ryan Dahl 已提交
457
      }
458 459
      this.err = new Error("Short write");
      throw this.err;
R
Ryan Dahl 已提交
460
    }
461

R
Ryan Dahl 已提交
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
    this.n = 0;
  }

  /** Returns how many bytes are unused in the buffer. */
  available(): number {
    return this.buf.byteLength - this.n;
  }

  /** buffered returns the number of bytes that have been written into the
   * current buffer.
   */
  buffered(): number {
    return this.n;
  }

  /** Writes the contents of p into the buffer.
   * Returns the number of bytes written.
   */
  async write(p: Uint8Array): Promise<number> {
481 482 483
    if (this.err !== null) throw this.err;
    if (p.length === 0) return 0;

R
Ryan Dahl 已提交
484
    let nn = 0;
485
    let n = 0;
486 487
    while (p.byteLength > this.available()) {
      if (this.buffered() === 0) {
R
Ryan Dahl 已提交
488 489 490 491 492 493
        // Large write, empty buffer.
        // Write directly from p to avoid copy.
        try {
          n = await this.wr.write(p);
        } catch (e) {
          this.err = e;
494
          throw e;
R
Ryan Dahl 已提交
495 496 497 498
        }
      } else {
        n = copyBytes(this.buf, p, this.n);
        this.n += n;
499
        await this.flush();
R
Ryan Dahl 已提交
500 501 502 503
      }
      nn += n;
      p = p.subarray(n);
    }
504

R
Ryan Dahl 已提交
505 506 507 508 509
    n = copyBytes(this.buf, p, this.n);
    this.n += n;
    nn += n;
    return nn;
  }
R
wip  
Ryan Dahl 已提交
510
}
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604

/** Generate longest proper prefix which is also suffix array. */
function createLPS(pat: Uint8Array): Uint8Array {
  const lps = new Uint8Array(pat.length);
  lps[0] = 0;
  let prefixEnd = 0;
  let i = 1;
  while (i < lps.length) {
    if (pat[i] == pat[prefixEnd]) {
      prefixEnd++;
      lps[i] = prefixEnd;
      i++;
    } else if (prefixEnd === 0) {
      lps[i] = 0;
      i++;
    } else {
      prefixEnd = pat[prefixEnd - 1];
    }
  }
  return lps;
}

/** Read delimited bytes from a Reader. */
export async function* readDelim(
  reader: Reader,
  delim: Uint8Array
): AsyncIterableIterator<Uint8Array> {
  // Avoid unicode problems
  const delimLen = delim.length;
  const delimLPS = createLPS(delim);

  let inputBuffer = new Deno.Buffer();
  const inspectArr = new Uint8Array(Math.max(1024, delimLen + 1));

  // Modified KMP
  let inspectIndex = 0;
  let matchIndex = 0;
  while (true) {
    const result = await reader.read(inspectArr);
    if (result === Deno.EOF) {
      // Yield last chunk.
      yield inputBuffer.bytes();
      return;
    }
    if ((result as number) < 0) {
      // Discard all remaining and silently fail.
      return;
    }
    const sliceRead = inspectArr.subarray(0, result as number);
    await Deno.writeAll(inputBuffer, sliceRead);

    let sliceToProcess = inputBuffer.bytes();
    while (inspectIndex < sliceToProcess.length) {
      if (sliceToProcess[inspectIndex] === delim[matchIndex]) {
        inspectIndex++;
        matchIndex++;
        if (matchIndex === delimLen) {
          // Full match
          const matchEnd = inspectIndex - delimLen;
          const readyBytes = sliceToProcess.subarray(0, matchEnd);
          // Copy
          const pendingBytes = sliceToProcess.slice(inspectIndex);
          yield readyBytes;
          // Reset match, different from KMP.
          sliceToProcess = pendingBytes;
          inspectIndex = 0;
          matchIndex = 0;
        }
      } else {
        if (matchIndex === 0) {
          inspectIndex++;
        } else {
          matchIndex = delimLPS[matchIndex - 1];
        }
      }
    }
    // Keep inspectIndex and matchIndex.
    inputBuffer = new Deno.Buffer(sliceToProcess);
  }
}

/** Read delimited strings from a Reader. */
export async function* readStringDelim(
  reader: Reader,
  delim: string
): AsyncIterableIterator<string> {
  const encoder = new TextEncoder();
  const decoder = new TextDecoder();
  for await (const chunk of readDelim(reader, encoder.encode(delim))) {
    yield decoder.decode(chunk);
  }
}

/** Read strings line-by-line from a Reader. */
605
// eslint-disable-next-line require-await
606 607 608 609 610
export async function* readLines(
  reader: Reader
): AsyncIterableIterator<string> {
  yield* readStringDelim(reader, "\n");
}