提交 315a2a6e 编写于 作者: A Alex Dima

Add `ISocket.dispose()`

上级 0c782908
......@@ -17,6 +17,7 @@ export interface ISocket {
onEnd(listener: () => void): IDisposable;
write(buffer: VSBuffer): void;
end(): void;
dispose(): void;
}
let emptyBuffer: VSBuffer | null = null;
......@@ -27,7 +28,7 @@ function getEmptyBuffer(): VSBuffer {
return emptyBuffer;
}
class ChunkStream {
export class ChunkStream {
private _chunks: VSBuffer[];
private _totalLength: number;
......@@ -47,6 +48,15 @@ class ChunkStream {
}
public read(byteCount: number): VSBuffer {
return this._read(byteCount, true);
}
public peek(byteCount: number): VSBuffer {
return this._read(byteCount, false);
}
private _read(byteCount: number, advance: boolean): VSBuffer {
if (byteCount === 0) {
return getEmptyBuffer();
}
......@@ -57,39 +67,53 @@ class ChunkStream {
if (this._chunks[0].byteLength === byteCount) {
// super fast path, precisely first chunk must be returned
const result = this._chunks.shift()!;
const result = this._chunks[0];
if (advance) {
this._chunks.shift();
this._totalLength -= byteCount;
}
return result;
}
if (this._chunks[0].byteLength > byteCount) {
// fast path, the reading is entirely within the first chunk
const result = this._chunks[0].slice(0, byteCount);
if (advance) {
this._chunks[0] = this._chunks[0].slice(byteCount);
this._totalLength -= byteCount;
}
return result;
}
let result = VSBuffer.alloc(byteCount);
let resultOffset = 0;
let chunkIndex = 0;
while (byteCount > 0) {
const chunk = this._chunks[0];
const chunk = this._chunks[chunkIndex];
if (chunk.byteLength > byteCount) {
// this chunk will survive
this._chunks[0] = chunk.slice(byteCount);
const chunkPart = chunk.slice(0, byteCount);
result.set(chunkPart, resultOffset);
resultOffset += byteCount;
if (advance) {
this._chunks[chunkIndex] = chunk.slice(byteCount);
this._totalLength -= byteCount;
}
byteCount -= byteCount;
} else {
// this chunk will be entirely read
this._chunks.shift();
result.set(chunk, resultOffset);
resultOffset += chunk.byteLength;
if (advance) {
this._chunks.shift();
this._totalLength -= chunk.byteLength;
} else {
chunkIndex++;
}
byteCount -= chunk.byteLength;
}
}
......@@ -154,7 +178,7 @@ class ProtocolReader extends Disposable {
private readonly _incomingData: ChunkStream;
public lastReadTime: number;
private readonly _onMessage = new Emitter<ProtocolMessage>();
private readonly _onMessage = this._register(new Emitter<ProtocolMessage>());
public readonly onMessage: Event<ProtocolMessage> = this._onMessage.event;
private readonly _state = {
......
......@@ -20,6 +20,10 @@ export class NodeSocket implements ISocket {
this.socket = socket;
}
public dispose(): void {
this.socket.destroy();
}
public onData(_listener: (e: VSBuffer) => void): IDisposable {
const listener = (buff: Buffer) => _listener(VSBuffer.wrap(buff));
this.socket.on('data', listener);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册