wireProtocol.ts 3.9 KB
Newer Older
I
isidor 已提交
1 2 3 4 5
/*---------------------------------------------------------------------------------------------
 *  Copyright (c) Microsoft Corporation. All rights reserved.
 *  Licensed under the MIT License. See License.txt in the project root for license information.
 *--------------------------------------------------------------------------------------------*/

E
Erich Gamma 已提交
6 7 8 9
'use strict';

import stream = require('stream');

10 11 12 13 14 15
const DefaultSize: number = 8192;
const ContentLength: string = 'Content-Length: ';
const ContentLengthSize: number = Buffer.byteLength(ContentLength, 'utf8');
const Blank: number = new Buffer(' ', 'utf8')[0];
const BackslashR: number = new Buffer('\r', 'utf8')[0];
const BackslashN: number = new Buffer('\n', 'utf8')[0];
E
Erich Gamma 已提交
16 17 18

class ProtocolBuffer {

J
Johannes Rieken 已提交
19 20
	private index: number;
	private buffer: Buffer;
E
Erich Gamma 已提交
21 22 23 24 25 26

	constructor() {
		this.index = 0;
		this.buffer = new Buffer(DefaultSize);
	}

J
Johannes Rieken 已提交
27
	public append(data: string | Buffer): void {
28
		let toAppend: Buffer | null = null;
E
Erich Gamma 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
		if (Buffer.isBuffer(data)) {
			toAppend = <Buffer>data;
		} else {
			toAppend = new Buffer(<string>data, 'utf8');
		}
		if (this.buffer.length - this.index >= toAppend.length) {
			toAppend.copy(this.buffer, this.index, 0, toAppend.length);
		} else {
			let newSize = (Math.ceil((this.index + toAppend.length) / DefaultSize) + 1) * DefaultSize;
			if (this.index === 0) {
				this.buffer = new Buffer(newSize);
				toAppend.copy(this.buffer, 0, 0, toAppend.length);
			} else {
				this.buffer = Buffer.concat([this.buffer.slice(0, this.index), toAppend], newSize);
			}
		}
J
Johannes Rieken 已提交
45
		this.index += toAppend.length;
E
Erich Gamma 已提交
46 47
	}

J
Johannes Rieken 已提交
48
	public tryReadContentLength(): number {
E
Erich Gamma 已提交
49 50 51 52 53 54 55 56 57 58 59
		let result = -1;
		let current = 0;
		// we are utf8 encoding...
		while (current < this.index && (this.buffer[current] === Blank || this.buffer[current] === BackslashR || this.buffer[current] === BackslashN)) {
			current++;
		}
		if (this.index < current + ContentLengthSize) {
			return result;
		}
		current += ContentLengthSize;
		let start = current;
J
Johannes Rieken 已提交
60
		while (current < this.index && this.buffer[current] !== BackslashR) {
E
Erich Gamma 已提交
61 62 63 64 65 66 67 68 69 70 71 72
			current++;
		}
		if (current + 3 >= this.index || this.buffer[current + 1] !== BackslashN || this.buffer[current + 2] !== BackslashR || this.buffer[current + 3] !== BackslashN) {
			return result;
		}
		let data = this.buffer.toString('utf8', start, current);
		result = parseInt(data);
		this.buffer = this.buffer.slice(current + 4);
		this.index = this.index - (current + 4);
		return result;
	}

73
	public tryReadContent(length: number): string | null {
E
Erich Gamma 已提交
74 75 76 77 78
		if (this.index < length) {
			return null;
		}
		let result = this.buffer.toString('utf8', 0, length);
		let sourceStart = length;
J
Johannes Rieken 已提交
79
		while (sourceStart < this.index && (this.buffer[sourceStart] === BackslashR || this.buffer[sourceStart] === BackslashN)) {
E
Erich Gamma 已提交
80 81 82 83 84 85 86 87 88
			sourceStart++;
		}
		this.buffer.copy(this.buffer, 0, sourceStart);
		this.index = this.index - sourceStart;
		return result;
	}
}

export interface ICallback<T> {
J
Johannes Rieken 已提交
89
	(data: T): void;
E
Erich Gamma 已提交
90 91 92 93
}

export class Reader<T> {

M
Matt Bierner 已提交
94 95 96
	private readonly readable: stream.Readable;
	private readonly callback: ICallback<T>;
	private readonly buffer: ProtocolBuffer;
J
Johannes Rieken 已提交
97
	private nextMessageLength: number;
E
Erich Gamma 已提交
98

M
Matt Bierner 已提交
99 100 101 102 103
	public constructor(
		readable: stream.Readable,
		callback: ICallback<T>,
		private readonly onError: (error: any) => void = () => ({})
	) {
E
Erich Gamma 已提交
104 105 106 107
		this.readable = readable;
		this.buffer = new ProtocolBuffer();
		this.callback = callback;
		this.nextMessageLength = -1;
108 109 110
		this.readable.on('data', (data: Buffer) => {
			this.onLengthData(data);
		});
E
Erich Gamma 已提交
111 112
	}

J
Johannes Rieken 已提交
113
	private onLengthData(data: Buffer): void {
M
Matt Bierner 已提交
114 115 116
		try {
			this.buffer.append(data);
			while (true) {
E
Erich Gamma 已提交
117
				if (this.nextMessageLength === -1) {
M
Matt Bierner 已提交
118 119 120 121 122 123 124
					this.nextMessageLength = this.buffer.tryReadContentLength();
					if (this.nextMessageLength === -1) {
						return;
					}
				}
				const msg = this.buffer.tryReadContent(this.nextMessageLength);
				if (msg === null) {
E
Erich Gamma 已提交
125 126
					return;
				}
M
Matt Bierner 已提交
127 128 129
				this.nextMessageLength = -1;
				const json = JSON.parse(msg);
				this.callback(json);
E
Erich Gamma 已提交
130
			}
M
Matt Bierner 已提交
131 132
		} catch (e) {
			this.onError(e);
E
Erich Gamma 已提交
133 134 135
		}
	}
}