提交 d790365c 编写于 作者: J Johannes Rieken

move stream implementation into their own files, chunked reading, towards chunked writing, #41985

上级 f8938fb2
......@@ -162,17 +162,32 @@ export interface IStat {
type: FileType2;
}
export interface IFileSystemProvider {
export interface IFileSystemProviderBase {
onDidChange: Event<IFileChange[]>;
stat(resource: URI): TPromise<IStat>;
readFile(resource: URI): TPromise<Uint8Array>;
writeFile(resource: URI, content: Uint8Array): TPromise<void>;
rename(from: URI, to: URI): TPromise<IStat>;
mkdir(resource: URI): TPromise<IStat>;
readdir(resource: URI): TPromise<[string, IStat][]>;
delete(resource: URI): TPromise<void>;
}
export interface ISimpleReadWriteProvider {
_type: 'simple';
readFile(resource: URI): TPromise<Uint8Array>;
writeFile(resource: URI, content: Uint8Array): TPromise<void>;
}
export interface IReadWriteProvider {
_type: 'chunked';
open(resource: URI, options: { mode: string }): TPromise<number>;
close(fd: number): TPromise<void>;
read(fd: number, pos: number, data: Uint8Array, offset: number, length: number): TPromise<number>;
// write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): TPromise<number>;
writeFile(resource: URI, content: Uint8Array): TPromise<void>;
}
export type IFileSystemProvider = (IFileSystemProviderBase & ISimpleReadWriteProvider) | (IFileSystemProviderBase & IReadWriteProvider);
export enum FileOperation {
CREATE,
DELETE,
......@@ -433,6 +448,14 @@ export interface ITextSnapshot {
read(): string;
}
export class StringSnapshot implements ITextSnapshot {
constructor(private _value: string) { }
read(): string {
let ret = this._value;
this._value = null;
return ret;
}
}
/**
* Helper method to convert a snapshot into its full string form.
*/
......
......@@ -7,7 +7,7 @@
import URI, { UriComponents } from 'vs/base/common/uri';
import { TPromise, PPromise } from 'vs/base/common/winjs.base';
import { ExtHostContext, MainContext, IExtHostContext, MainThreadFileSystemShape, ExtHostFileSystemShape, IFileChangeDto } from '../node/extHost.protocol';
import { IFileService, IFileSystemProvider, IStat, IFileChange } from 'vs/platform/files/common/files';
import { IFileService, IStat, IFileChange, ISimpleReadWriteProvider, IFileSystemProviderBase } from 'vs/platform/files/common/files';
import { IDisposable, dispose } from 'vs/base/common/lifecycle';
import { Event, Emitter } from 'vs/base/common/event';
import { extHostNamedCustomer } from 'vs/workbench/api/electron-browser/extHostCustomers';
......@@ -61,14 +61,15 @@ export class MainThreadFileSystem implements MainThreadFileSystemShape {
}
}
class RemoteFileSystemProvider implements IFileSystemProvider {
class RemoteFileSystemProvider implements ISimpleReadWriteProvider, IFileSystemProviderBase {
_type: 'simple' = 'simple';
private readonly _onDidChange = new Emitter<IFileChange[]>();
private readonly _registrations: IDisposable[];
readonly onDidChange: Event<IFileChange[]> = this._onDidChange.event;
constructor(
fileService: IFileService,
scheme: string,
......
......@@ -38,7 +38,6 @@ import { IEnvironmentService } from 'vs/platform/environment/common/environment'
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
import { ILifecycleService, LifecyclePhase } from 'vs/platform/lifecycle/common/lifecycle';
import { getBaseLabel } from 'vs/base/common/labels';
import { Readable } from 'stream';
import { Schemas } from 'vs/base/common/network';
import { IStorageService, StorageScope } from 'vs/platform/storage/common/storage';
import { INotificationService, Severity } from 'vs/platform/notification/common/notification';
......@@ -46,6 +45,7 @@ import { onUnexpectedError } from 'vs/base/common/errors';
import product from 'vs/platform/node/product';
import { shell } from 'electron';
import { IEncodingOverride, ResourceEncodings } from 'vs/workbench/services/files/electron-browser/encoding';
import { createReadableOfSnapshot } from 'vs/workbench/services/files/electron-browser/streams';
class BufferPool {
......@@ -633,7 +633,7 @@ export class FileService implements IFileService {
if (typeof value === 'string') {
writeFilePromise = pfs.writeFile(absolutePath, value, writeFileOptions);
} else {
writeFilePromise = pfs.writeFile(absolutePath, this.snapshotToReadableStream(value), writeFileOptions);
writeFilePromise = pfs.writeFile(absolutePath, createReadableOfSnapshot(value), writeFileOptions);
}
// set contents
......@@ -644,31 +644,6 @@ export class FileService implements IFileService {
});
}
private snapshotToReadableStream(snapshot: ITextSnapshot): NodeJS.ReadableStream {
return new Readable({
read: function () {
try {
let chunk: string;
let canPush = true;
// Push all chunks as long as we can push and as long as
// the underlying snapshot returns strings to us
while (canPush && typeof (chunk = snapshot.read()) === 'string') {
canPush = this.push(chunk);
}
// Signal EOS by pushing NULL
if (typeof chunk !== 'string') {
this.push(null);
}
} catch (error) {
this.emit('error', error);
}
},
encoding: encoding.UTF8 // very important, so that strings are passed around and not buffers!
});
}
private doUpdateContentElevated(resource: uri, value: string | ITextSnapshot, options: IUpdateContentOptions = Object.create(null)): TPromise<IFileStat> {
const absolutePath = this.toAbsolutePath(resource);
......
......@@ -6,13 +6,13 @@
import URI from 'vs/base/common/uri';
import { FileService } from 'vs/workbench/services/files/electron-browser/fileService';
import { IContent, IStreamContent, IFileStat, IResolveContentOptions, IUpdateContentOptions, IResolveFileOptions, IResolveFileResult, FileOperationEvent, FileOperation, IFileSystemProvider, IStat, FileType2, FileChangesEvent, ICreateFileOptions, FileOperationError, FileOperationResult, ITextSnapshot, snapshotToString } from 'vs/platform/files/common/files';
import { IContent, IStreamContent, IFileStat, IResolveContentOptions, IUpdateContentOptions, IResolveFileOptions, IResolveFileResult, FileOperationEvent, FileOperation, IFileSystemProvider, IStat, FileType2, FileChangesEvent, ICreateFileOptions, FileOperationError, FileOperationResult, ITextSnapshot, StringSnapshot } from 'vs/platform/files/common/files';
import { TPromise } from 'vs/base/common/winjs.base';
import { posix } from 'path';
import { IDisposable } from 'vs/base/common/lifecycle';
import { isFalsyOrEmpty, distinct } from 'vs/base/common/arrays';
import { Schemas } from 'vs/base/common/network';
import { encode, toDecodeStream, IDecodeStreamOptions } from 'vs/base/node/encoding';
import { toDecodeStream, IDecodeStreamOptions, decodeStream } from 'vs/base/node/encoding';
import { TernarySearchTree } from 'vs/base/common/map';
import { IConfigurationService } from 'vs/platform/configuration/common/configuration';
import { IWorkspaceContextService } from 'vs/platform/workspace/common/workspace';
......@@ -23,7 +23,7 @@ import { ITextResourceConfigurationService } from 'vs/editor/common/services/res
import { IExtensionService } from 'vs/workbench/services/extensions/common/extensions';
import { localize } from 'vs/nls';
import { INotificationService } from 'vs/platform/notification/common/notification';
import { Readable } from 'stream';
import { createReadableOfProvider, createReadableOfSnapshot, createWritableOfProvider } from 'vs/workbench/services/files/electron-browser/streams';
function toIFileStat(provider: IFileSystemProvider, tuple: [URI, IStat], recurse?: (tuple: [URI, IStat]) => boolean): TPromise<IFileStat> {
const [resource, stat] = tuple;
......@@ -225,24 +225,6 @@ export class RemoteFileService extends FileService {
}
}
private _createReadStream(provider: IFileSystemProvider, resource: URI): Readable {
return new class extends Readable {
_readOperation: Thenable<any>;
_read(size?: number): void {
if (this._readOperation) {
return;
}
this._readOperation = provider.readFile(resource).then(data => {
this.push(data);
this.push(null);
}, err => {
this.emit('error', err);
this.push(null);
});
}
};
}
private _readFile(resource: URI, options: IResolveContentOptions = Object.create(null)): TPromise<IStreamContent> {
return this._withProvider(resource).then(provider => {
......@@ -272,7 +254,7 @@ export class RemoteFileService extends FileService {
}
};
return toDecodeStream(this._createReadStream(provider, resource), decodeStreamOpts).then(data => {
return toDecodeStream(createReadableOfProvider(provider, resource), decodeStreamOpts).then(data => {
if (options.acceptTextOnly && data.detected.seemsBinary) {
return TPromise.wrapError<IStreamContent>(new FileOperationError(
......@@ -331,9 +313,20 @@ export class RemoteFileService extends FileService {
}
private _writeFile(provider: IFileSystemProvider, resource: URI, content: string | ITextSnapshot, options: IUpdateContentOptions): TPromise<IFileStat> {
const snapshot = typeof content === 'string' ? new StringSnapshot(content) : content;
const readable = createReadableOfSnapshot(snapshot);
const encoding = this.encoding.getWriteEncoding(resource, options.encoding);
// TODO@Joh support streaming API for remote file system writes
return provider.writeFile(resource, encode(typeof content === 'string' ? content : snapshotToString(content), encoding)).then(() => {
const decoder = decodeStream(encoding);
const target = createWritableOfProvider(provider, resource);
return new TPromise<IFileStat>((resolve, reject) => {
let stream = readable.pipe(decoder).pipe(target);
stream.on('error', err => reject(err));
stream.on('finish', _ => resolve(void 0));
}).then(_ => {
return this.resolveFile(resource);
});
}
......
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
import { Readable, Writable } from 'stream';
import { UTF8 } from 'vs/base/node/encoding';
import URI from 'vs/base/common/uri';
import { IFileSystemProvider, ITextSnapshot } from 'vs/platform/files/common/files';
export function createWritableOfProvider(provider: IFileSystemProvider, resource: URI): Writable {
return new class extends Writable {
_chunks: Buffer[] = [];
constructor(opts?) {
super(opts);
this.once('finish', () => this._finish());
}
_write(chunk: Buffer, encoding: string, callback: Function) {
this._chunks.push(chunk);
callback(null);
}
_finish() {
provider.writeFile(resource, Buffer.concat(this._chunks)).then(undefined, err => this.emit('error', err));
}
};
}
export function createReadableOfProvider(provider: IFileSystemProvider, resource: URI): Readable {
if (provider._type === 'simple') {
return new class extends Readable {
_readOperation: Thenable<any>;
_read(size?: number): void {
if (this._readOperation) {
return;
}
this._readOperation = provider.readFile(resource).then(data => {
this.push(data);
this.push(null);
}, err => {
this.emit('error', err);
this.push(null);
});
}
};
} else {
return new class extends Readable {
_fd: number;
_pos: number = 0;
_reading: boolean = false;
constructor(opts?) {
super(opts);
this.once('close', _ => this._final());
}
async _read(size?: number) {
if (this._reading) {
return;
}
this._reading = true;
try {
if (typeof this._fd !== 'number') {
this._fd = await provider.open(resource, { mode: 'r' });
}
let buffer = Buffer.allocUnsafe(64 * 1024);
while (this._reading) {
let bytesRead = await provider.read(this._fd, this._pos, buffer, 0, buffer.length);
if (bytesRead === 0) {
this._reading = false;
this.push(null);
} else {
this._reading = this.push(buffer.slice(0, bytesRead));
this._pos += bytesRead;
}
}
} catch (err) {
//
this.emit('error', err);
}
}
async _final() {
if (typeof this._fd === 'number') {
await provider.close(this._fd);
}
}
};
}
}
export function createReadableOfSnapshot(snapshot: ITextSnapshot): Readable {
return new Readable({
read: function () {
try {
let chunk: string;
let canPush = true;
// Push all chunks as long as we can push and as long as
// the underlying snapshot returns strings to us
while (canPush && typeof (chunk = snapshot.read()) === 'string') {
canPush = this.push(chunk);
}
// Signal EOS by pushing NULL
if (typeof chunk !== 'string') {
this.push(null);
}
} catch (error) {
this.emit('error', error);
}
},
encoding: UTF8 // very important, so that strings are passed around and not buffers!
});
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册