提交 daa50ede 编写于 作者: B Benjamin Pasero

files2 - more use of streams

上级 bfe321b7
...@@ -226,7 +226,7 @@ export function writeableBufferStream(): VSBufferWriteableStream { ...@@ -226,7 +226,7 @@ export function writeableBufferStream(): VSBufferWriteableStream {
} }
export interface VSBufferWriteableStream extends VSBufferReadableStream { export interface VSBufferWriteableStream extends VSBufferReadableStream {
data(chunk: VSBuffer): void; write(chunk: VSBuffer): void;
error(error: Error): void; error(error: Error): void;
end(result?: VSBuffer | Error): void; end(result?: VSBuffer | Error): void;
} }
...@@ -250,7 +250,7 @@ class VSBufferWriteableStreamImpl implements VSBufferWriteableStream { ...@@ -250,7 +250,7 @@ class VSBufferWriteableStreamImpl implements VSBufferWriteableStream {
end: [] as { (): void }[] end: [] as { (): void }[]
}; };
data(chunk: VSBuffer): void { write(chunk: VSBuffer): void {
if (this.state.finished) { if (this.state.finished) {
return; return;
} }
...@@ -291,7 +291,7 @@ class VSBufferWriteableStreamImpl implements VSBufferWriteableStream { ...@@ -291,7 +291,7 @@ class VSBufferWriteableStreamImpl implements VSBufferWriteableStream {
if (result instanceof Error) { if (result instanceof Error) {
this.error(result); this.error(result);
} else if (result) { } else if (result) {
this.data(result); this.write(result);
} }
// flowing: send end event to listeners // flowing: send end event to listeners
......
...@@ -48,7 +48,7 @@ suite('Buffer', () => { ...@@ -48,7 +48,7 @@ suite('Buffer', () => {
}); });
await timeout(0); await timeout(0);
stream.data(VSBuffer.fromString('Hello')); stream.write(VSBuffer.fromString('Hello'));
await timeout(0); await timeout(0);
stream.end(VSBuffer.fromString('World')); stream.end(VSBuffer.fromString('World'));
...@@ -78,7 +78,7 @@ suite('Buffer', () => { ...@@ -78,7 +78,7 @@ suite('Buffer', () => {
}); });
await timeout(0); await timeout(0);
stream.data(VSBuffer.fromString('Hello')); stream.write(VSBuffer.fromString('Hello'));
await timeout(0); await timeout(0);
stream.end(new Error()); stream.end(new Error());
...@@ -92,7 +92,7 @@ suite('Buffer', () => { ...@@ -92,7 +92,7 @@ suite('Buffer', () => {
const stream = writeableBufferStream(); const stream = writeableBufferStream();
await timeout(0); await timeout(0);
stream.data(VSBuffer.fromString('Hello')); stream.write(VSBuffer.fromString('Hello'));
await timeout(0); await timeout(0);
stream.end(VSBuffer.fromString('World')); stream.end(VSBuffer.fromString('World'));
...@@ -121,7 +121,7 @@ suite('Buffer', () => { ...@@ -121,7 +121,7 @@ suite('Buffer', () => {
const stream = writeableBufferStream(); const stream = writeableBufferStream();
await timeout(0); await timeout(0);
stream.data(VSBuffer.fromString('Hello')); stream.write(VSBuffer.fromString('Hello'));
await timeout(0); await timeout(0);
stream.error(new Error()); stream.error(new Error());
...@@ -152,7 +152,7 @@ suite('Buffer', () => { ...@@ -152,7 +152,7 @@ suite('Buffer', () => {
const stream = writeableBufferStream(); const stream = writeableBufferStream();
await timeout(0); await timeout(0);
stream.data(VSBuffer.fromString('Hello')); stream.write(VSBuffer.fromString('Hello'));
await timeout(0); await timeout(0);
stream.end(VSBuffer.fromString('World')); stream.end(VSBuffer.fromString('World'));
...@@ -186,7 +186,7 @@ suite('Buffer', () => { ...@@ -186,7 +186,7 @@ suite('Buffer', () => {
}); });
await timeout(0); await timeout(0);
stream.data(VSBuffer.fromString('Hello')); stream.write(VSBuffer.fromString('Hello'));
await timeout(0); await timeout(0);
stream.end(VSBuffer.fromString('World')); stream.end(VSBuffer.fromString('World'));
...@@ -206,7 +206,7 @@ suite('Buffer', () => { ...@@ -206,7 +206,7 @@ suite('Buffer', () => {
}); });
await timeout(0); await timeout(0);
stream.data(VSBuffer.fromString('Hello')); stream.write(VSBuffer.fromString('Hello'));
await timeout(0); await timeout(0);
stream.error(new Error()); stream.error(new Error());
await timeout(0); await timeout(0);
......
...@@ -44,7 +44,7 @@ export class FileService2 extends Disposable implements IFileService { ...@@ -44,7 +44,7 @@ export class FileService2 extends Disposable implements IFileService {
_serviceBrand: ServiceIdentifier<any>; _serviceBrand: ServiceIdentifier<any>;
private readonly BUFFER_SIZE = 16 * 1024; private readonly BUFFER_SIZE = 64 * 1024;
constructor(@ILogService private logService: ILogService) { constructor(@ILogService private logService: ILogService) {
super(); super();
...@@ -471,7 +471,7 @@ export class FileService2 extends Disposable implements IFileService { ...@@ -471,7 +471,7 @@ export class FileService2 extends Disposable implements IFileService {
// when buffer full, create a new one and emit it through stream // when buffer full, create a new one and emit it through stream
if (posInBuffer === buffer.byteLength) { if (posInBuffer === buffer.byteLength) {
stream.data(buffer); stream.write(buffer);
buffer = VSBuffer.alloc(this.BUFFER_SIZE); buffer = VSBuffer.alloc(this.BUFFER_SIZE);
...@@ -481,7 +481,7 @@ export class FileService2 extends Disposable implements IFileService { ...@@ -481,7 +481,7 @@ export class FileService2 extends Disposable implements IFileService {
// wrap up with last buffer // wrap up with last buffer
if (posInBuffer > 0) { if (posInBuffer > 0) {
stream.data(buffer.slice(0, posInBuffer)); stream.write(buffer.slice(0, posInBuffer));
} }
} catch (error) { } catch (error) {
throw error; throw error;
...@@ -985,43 +985,11 @@ export class FileService2 extends Disposable implements IFileService { ...@@ -985,43 +985,11 @@ export class FileService2 extends Disposable implements IFileService {
private async doPipeBufferedToUnbuffered(sourceProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, source: URI, targetProvider: IFileSystemProviderWithFileReadWriteCapability, target: URI): Promise<void> { private async doPipeBufferedToUnbuffered(sourceProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, source: URI, targetProvider: IFileSystemProviderWithFileReadWriteCapability, target: URI): Promise<void> {
// Open handle // Read buffer via stream buffered
const sourceHandle = await sourceProvider.open(source, { create: false }); const buffer = await streamToBuffer(this.readFileBuffered(sourceProvider, source, CancellationToken.None));
try {
const buffers: VSBuffer[] = [];
let buffer = VSBuffer.alloc(this.BUFFER_SIZE);
let posInFile = 0;
let totalBytesRead = 0;
let bytesRead = 0;
let posInBuffer = 0;
do {
// read from source (sourceHandle) at current position (pos) into buffer (buffer) at
// buffer position (posInBuffer) up to the size of the buffer (buffer.byteLength).
bytesRead = await sourceProvider.read(sourceHandle, posInFile, buffer.buffer, posInBuffer, buffer.byteLength - posInBuffer);
posInFile += bytesRead;
posInBuffer += bytesRead;
totalBytesRead += bytesRead;
// when buffer full, create a new one
if (posInBuffer === buffer.byteLength) {
buffers.push(buffer);
buffer = VSBuffer.alloc(this.BUFFER_SIZE);
posInBuffer = 0;
}
} while (bytesRead > 0);
// Write buffer into target at once // Write buffer into target at once
await this.doWriteUnbuffered(targetProvider, target, VSBuffer.concat([...buffers, buffer.slice(0, posInBuffer)], totalBytesRead)); await this.doWriteUnbuffered(targetProvider, target, buffer);
} catch (error) {
throw error;
} finally {
await sourceProvider.close(sourceHandle);
}
} }
protected throwIfFileSystemIsReadonly<T extends IFileSystemProvider>(provider: T): T { protected throwIfFileSystemIsReadonly<T extends IFileSystemProvider>(provider: T): T {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册