提交 358cf866 编写于 作者: J Johannes Rieken

have chunked writing (internally) wired up, #41985

上级 d790365c
...@@ -182,8 +182,7 @@ export interface IReadWriteProvider { ...@@ -182,8 +182,7 @@ export interface IReadWriteProvider {
open(resource: URI, options: { mode: string }): TPromise<number>; open(resource: URI, options: { mode: string }): TPromise<number>;
close(fd: number): TPromise<void>; close(fd: number): TPromise<void>;
read(fd: number, pos: number, data: Uint8Array, offset: number, length: number): TPromise<number>; read(fd: number, pos: number, data: Uint8Array, offset: number, length: number): TPromise<number>;
// write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): TPromise<number>; write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): TPromise<number>;
writeFile(resource: URI, content: Uint8Array): TPromise<void>;
} }
export type IFileSystemProvider = (IFileSystemProviderBase & ISimpleReadWriteProvider) | (IFileSystemProviderBase & IReadWriteProvider); export type IFileSystemProvider = (IFileSystemProviderBase & ISimpleReadWriteProvider) | (IFileSystemProviderBase & IReadWriteProvider);
......
...@@ -7,89 +7,143 @@ ...@@ -7,89 +7,143 @@
import { Readable, Writable } from 'stream'; import { Readable, Writable } from 'stream';
import { UTF8 } from 'vs/base/node/encoding'; import { UTF8 } from 'vs/base/node/encoding';
import URI from 'vs/base/common/uri'; import URI from 'vs/base/common/uri';
import { IFileSystemProvider, ITextSnapshot } from 'vs/platform/files/common/files'; import { IFileSystemProvider, ITextSnapshot, ISimpleReadWriteProvider, IReadWriteProvider } from 'vs/platform/files/common/files';
export function createWritableOfProvider(provider: IFileSystemProvider, resource: URI): Writable { export function createWritableOfProvider(provider: IFileSystemProvider, resource: URI): Writable {
return new class extends Writable { switch (provider._type) {
case 'simple': return createSimpleWritable(provider, resource);
case 'chunked': return createWritable(provider, resource);
}
}
function createSimpleWritable(provider: ISimpleReadWriteProvider, resource: URI): Writable {
return new class extends Writable {
_chunks: Buffer[] = []; _chunks: Buffer[] = [];
constructor(opts?) { constructor(opts?) {
super(opts); super(opts);
this.once('finish', () => this._finish());
} }
_write(chunk: Buffer, encoding: string, callback: Function) { _write(chunk: Buffer, encoding: string, callback: Function) {
this._chunks.push(chunk); this._chunks.push(chunk);
callback(null); callback(null);
} }
_finish() { end(chunk?, encoding?, callback?) {
provider.writeFile(resource, Buffer.concat(this._chunks)).then(undefined, err => this.emit('error', err)); super.end(chunk, encoding, err => {
provider.writeFile(resource, Buffer.concat(this._chunks)).then(_ => {
if (callback) {
callback(null);
}
}, err => {
if (callback) {
callback(err);
} else {
this.emit('error', err);
}
});
});
} }
}; };
} }
export function createReadableOfProvider(provider: IFileSystemProvider, resource: URI): Readable { function createWritable(provider: IReadWriteProvider, resource: URI): Writable {
if (provider._type === 'simple') { return new class extends Writable {
return new class extends Readable { _fd: number;
_readOperation: Thenable<any>; _pos: number;
_read(size?: number): void { constructor(opts?) {
if (this._readOperation) { super(opts);
return; }
_write(chunk: Buffer, encoding, callback: Function) {
this._doWrite(chunk).then(() => callback(null), err => callback(err));
}
async _doWrite(chunk: Buffer) {
if (typeof this._fd !== 'number') {
this._fd = await provider.open(resource, { mode: 'w+' });
}
let bytesWritten = await provider.write(this._fd, this._pos, chunk, 0, chunk.length);
this._pos += bytesWritten;
}
end(chunk?, encoding?, callback?) {
provider.close(this._fd).then(_ => {
if (callback) {
callback();
} }
this._readOperation = provider.readFile(resource).then(data => { }, err => {
this.push(data); if (callback) {
this.push(null); callback(err);
}, err => { } else {
this.emit('error', err); this.emit('error', err);
this.push(null); }
}); });
} }
}; };
} else { }
return new class extends Readable {
_fd: number;
_pos: number = 0;
_reading: boolean = false;
constructor(opts?) { export function createReadableOfProvider(provider: IFileSystemProvider, resource: URI): Readable {
super(opts); switch (provider._type) {
this.once('close', _ => this._final()); case 'simple': return createSimpleReadable(provider, resource);
} case 'chunked': return createReadable(provider, resource);
}
}
async _read(size?: number) { function createReadable(provider: IReadWriteProvider, resource: URI): Readable {
if (this._reading) { return new class extends Readable {
return; _fd: number;
_pos: number = 0;
_reading: boolean = false;
constructor(opts?) {
super(opts);
this.once('close', _ => this._final());
}
async _read(size?: number) {
if (this._reading) {
return;
}
this._reading = true;
try {
if (typeof this._fd !== 'number') {
this._fd = await provider.open(resource, { mode: 'r' });
} }
this._reading = true; let buffer = Buffer.allocUnsafe(64 * 1024);
try { while (this._reading) {
if (typeof this._fd !== 'number') { let bytesRead = await provider.read(this._fd, this._pos, buffer, 0, buffer.length);
this._fd = await provider.open(resource, { mode: 'r' }); if (bytesRead === 0) {
this._reading = false;
this.push(null);
} }
let buffer = Buffer.allocUnsafe(64 * 1024); else {
this._reading = this.push(buffer.slice(0, bytesRead));
while (this._reading) { this._pos += bytesRead;
let bytesRead = await provider.read(this._fd, this._pos, buffer, 0, buffer.length);
if (bytesRead === 0) {
this._reading = false;
this.push(null);
} else {
this._reading = this.push(buffer.slice(0, bytesRead));
this._pos += bytesRead;
}
} }
} catch (err) {
//
this.emit('error', err);
} }
} }
catch (err) {
//
this.emit('error', err);
}
}
async _final() {
if (typeof this._fd === 'number') {
await provider.close(this._fd);
}
}
};
}
async _final() { function createSimpleReadable(provider: ISimpleReadWriteProvider, resource: URI): Readable {
if (typeof this._fd === 'number') { return new class extends Readable {
await provider.close(this._fd); _readOperation: Thenable<any>;
} _read(size?: number): void {
if (this._readOperation) {
return;
} }
}; this._readOperation = provider.readFile(resource).then(data => {
} this.push(data);
this.push(null);
}, err => {
this.emit('error', err);
this.push(null);
});
}
};
} }
export function createReadableOfSnapshot(snapshot: ITextSnapshot): Readable { export function createReadableOfSnapshot(snapshot: ITextSnapshot): Readable {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册