From 36708c0f5dabfe8a7f3724833d52b466b11cea8d Mon Sep 17 00:00:00 2001 From: Johannes Rieken Date: Tue, 10 Apr 2018 16:13:44 +0200 Subject: [PATCH] add `toDecodeStream` to enable seamless encoding handling, #41985 --- src/vs/base/node/encoding.ts | 82 ++++++++++++- .../base/test/node/encoding/encoding.test.ts | 114 +++++++++++++++++- 2 files changed, 194 insertions(+), 2 deletions(-) diff --git a/src/vs/base/node/encoding.ts b/src/vs/base/node/encoding.ts index e861c4048cb..6318f2cbc5f 100644 --- a/src/vs/base/node/encoding.ts +++ b/src/vs/base/node/encoding.ts @@ -10,12 +10,92 @@ import * as iconv from 'iconv-lite'; import { TPromise } from 'vs/base/common/winjs.base'; import { isLinux, isMacintosh } from 'vs/base/common/platform'; import { exec } from 'child_process'; +import { Readable, Writable, WritableOptions } from 'stream'; export const UTF8 = 'utf8'; export const UTF8_with_bom = 'utf8bom'; export const UTF16be = 'utf16be'; export const UTF16le = 'utf16le'; +export interface IDecodeStreamOptions { + minBytesRequiredForDetection: number; + guessEncoding: boolean; + overwriteEncoding(detected: string): string; +} + +export function toDecodeStream(readable: Readable, opts: IDecodeStreamOptions): TPromise<{ detected: IDetectedEncodingResult, stream: NodeJS.ReadableStream }> { + return new TPromise<{ detected: IDetectedEncodingResult, stream: NodeJS.ReadableStream }>((resolve, reject) => { + readable.pipe(new class extends Writable { + + private _decodeStream: NodeJS.ReadWriteStream; + private _decodeStreamConstruction: Thenable; + private _buffer: Buffer[] = []; + private _bytesBuffered = 0; + + constructor(opts?: WritableOptions) { + super(opts); + this.once('finish', () => this._finish()); + } + + _write(chunk: any, encoding: string, callback: Function): void { + if (!Buffer.isBuffer(chunk)) { + callback(new Error('data must be a buffer')); + } + + if (this._decodeStream) { + // just a forwarder now + this._decodeStream.write(chunk, callback); + return; + } + + this._buffer.push(chunk); + this._bytesBuffered += chunk.length; + + if (this._decodeStreamConstruction) { + // waiting for the decoder to be ready + this._decodeStreamConstruction.then(_ => callback(), err => callback(err)); + + } else if (this._bytesBuffered >= opts.minBytesRequiredForDetection) { + // buffered enough data, create stream and forward data + this._startDecodeStream(callback); + + } else { + // only buffering + callback(); + } + } + + _startDecodeStream(callback: Function): void { + + this._decodeStreamConstruction = TPromise.as(detectEncodingFromBuffer({ + buffer: Buffer.concat(this._buffer), bytesRead: this._bytesBuffered + }, opts.guessEncoding)).then(detected => { + detected.encoding = opts.overwriteEncoding(detected.encoding); // default encoding + this._decodeStream = decodeStream(detected.encoding); + for (const buffer of this._buffer) { + this._decodeStream.write(buffer); + } + callback(); + resolve({ detected, stream: this._decodeStream }); + + }, err => { + callback(err); + }); + } + + _finish(): void { + if (this._decodeStream) { + // normal finish + this._decodeStream.end(); + } else { + // we were still waiting for data... + this._startDecodeStream(() => this._decodeStream.end()); + } + } + }); + }); +} + export function bomLength(encoding: string): number { switch (encoding) { case UTF8: @@ -350,4 +430,4 @@ export function resolveTerminalEncoding(verbose?: boolean): TPromise { return UTF8; }); -} \ No newline at end of file +} diff --git a/src/vs/base/test/node/encoding/encoding.test.ts b/src/vs/base/test/node/encoding/encoding.test.ts index 7ade8bf5873..063912953f0 100644 --- a/src/vs/base/test/node/encoding/encoding.test.ts +++ b/src/vs/base/test/node/encoding/encoding.test.ts @@ -6,9 +6,10 @@ 'use strict'; import * as assert from 'assert'; - +import * as fs from 'fs'; import * as encoding from 'vs/base/node/encoding'; import { readExactlyByFile } from 'vs/base/node/stream'; +import { Readable } from 'stream'; suite('Encoding', () => { test('detectBOM UTF-8', () => { @@ -150,4 +151,115 @@ suite('Encoding', () => { }); }); }); + + async function readAndDecodeFromDisk(path, _encoding) { + return new Promise((resolve, reject) => { + fs.readFile(path, (err, data) => { + if (err) { + reject(err); + } else { + resolve(encoding.decode(data, _encoding)); + } + }); + }); + } + + async function readAllAsString(stream: NodeJS.ReadableStream) { + return new Promise((resolve, reject) => { + let all = ''; + stream.on('data', chunk => { + all += chunk; + assert.equal(typeof chunk, 'string'); + }); + stream.on('end', () => { + resolve(all); + }); + stream.on('error', reject); + }); + } + + test('toDecodeStream - some stream', async function () { + + let source = new Readable({ + read(size) { + this.push(Buffer.from([65, 66, 67])); + this.push(Buffer.from([65, 66, 67])); + this.push(Buffer.from([65, 66, 67])); + this.push(null); + } + }); + + let { detected, stream } = await encoding.toDecodeStream(source, { minBytesRequiredForDetection: 4, guessEncoding: true, overwriteEncoding() { return encoding.UTF8; } }); + + assert.ok(detected); + assert.ok(stream); + + const content = await readAllAsString(stream); + assert.equal(content, 'ABCABCABC'); + }); + + test('toDecodeStream - some stream, expect too much data', async function () { + + let source = new Readable({ + read(size) { + this.push(Buffer.from([65, 66, 67])); + this.push(Buffer.from([65, 66, 67])); + this.push(Buffer.from([65, 66, 67])); + this.push(null); + } + }); + + let { detected, stream } = await encoding.toDecodeStream(source, { minBytesRequiredForDetection: 64, guessEncoding: true, overwriteEncoding() { return encoding.UTF8; } }); + + assert.ok(detected); + assert.ok(stream); + + const content = await readAllAsString(stream); + assert.equal(content, 'ABCABCABC'); + }); + + test('toDecodeStream - some stream, no data', async function () { + + let source = new Readable({ + read(size) { + this.push(null); // empty + } + }); + + let { detected, stream } = await encoding.toDecodeStream(source, { minBytesRequiredForDetection: 512, guessEncoding: true, overwriteEncoding() { return encoding.UTF8; } }); + + assert.ok(detected); + assert.ok(stream); + + const content = await readAllAsString(stream); + assert.equal(content, ''); + }); + + + test('toDecodeStream - encoding, utf16be', async function () { + + let path = require.toUrl('./fixtures/some_utf16be.css'); + let source = fs.createReadStream(path); + + let { detected, stream } = await encoding.toDecodeStream(source, { minBytesRequiredForDetection: 64, guessEncoding: true, overwriteEncoding(detected) { return detected; } }); + + assert.equal(detected.encoding, 'utf16be'); + assert.equal(detected.seemsBinary, false); + + let expected = await readAndDecodeFromDisk(path, detected.encoding); + let actual = await readAllAsString(stream); + assert.equal(actual, expected); + }); + + + test('toDecodeStream - empty file', async function () { + + let path = require.toUrl('./fixtures/empty.txt'); + let source = fs.createReadStream(path); + let { detected, stream } = await encoding.toDecodeStream(source, { minBytesRequiredForDetection: 64, guessEncoding: true, overwriteEncoding() { return encoding.UTF8; } }); + + let expected = await readAndDecodeFromDisk(path, detected.encoding); + let actual = await readAllAsString(stream); + assert.equal(actual, expected); + }); }); -- GitLab