提交 6f823003 编写于 作者: B Benjamin Pasero

files - speed up reads and writes

For reading, introduce the readFileStream capability that reduces
the overhead of low level file reading by switching to an event based
model.

For writing, try to consume up to N chunks of data from the underlying
source and directly call writeFile if the source is consumed. This will avoid
the overhead of low level file reading for small to medium files.
上级 b535c81d
......@@ -164,8 +164,8 @@ export function bufferToStream(buffer: VSBuffer): streams.ReadableStream<VSBuffe
return streams.toStream<VSBuffer>(buffer, chunks => VSBuffer.concat(chunks));
}
export function streamToBufferReadableStream(stream: streams.ReadableStream<Uint8Array | string>): streams.ReadableStream<VSBuffer> {
return streams.transform<Uint8Array | string, VSBuffer>(stream, data => typeof data === 'string' ? VSBuffer.fromString(data) : VSBuffer.wrap(data), chunks => VSBuffer.concat(chunks));
export function streamToBufferReadableStream(stream: streams.ReadableStreamEvents<Uint8Array | string>): streams.ReadableStream<VSBuffer> {
return streams.transform<Uint8Array | string, VSBuffer>(stream, { data: data => typeof data === 'string' ? VSBuffer.fromString(data) : VSBuffer.wrap(data) }, chunks => VSBuffer.concat(chunks));
}
export function newWriteableBufferStream(): streams.WriteableStream<VSBuffer> {
......
......@@ -4,16 +4,17 @@
*--------------------------------------------------------------------------------------------*/
/**
* A interface that emulates the API shape of a node.js readable
* stream for use in desktop and web environments.
* The payload that flows in readable stream events.
*/
export interface ReadableStream<T> {
export type ReadableStreamEventPayload<T> = T | Error | 'end';
export interface ReadableStreamEvents<T> {
/**
* The 'data' event is emitted whenever the stream is
* relinquishing ownership of a chunk of data to a consumer.
*/
on(event: 'data', callback: (chunk: T) => void): void;
on(event: 'data', callback: (data: T) => void): void;
/**
* Emitted when any error occurs.
......@@ -26,6 +27,13 @@ export interface ReadableStream<T> {
* not be emitted unless the data is completely consumed.
*/
on(event: 'end', callback: () => void): void;
}
/**
* A interface that emulates the API shape of a node.js readable
* stream for use in desktop and web environments.
*/
export interface ReadableStream<T> extends ReadableStreamEvents<T> {
/**
* Stops emitting any events until resume() is called.
......@@ -97,11 +105,20 @@ export interface IReducer<T> {
(data: T[]): T;
}
export interface ITransformer<S, T> {
(source: S): T;
export interface IDataTransformer<Original, Transformed> {
(data: Original): Transformed;
}
export interface IErrorTransformer {
(error: Error): Error;
}
export function newWriteableStream<T>(reducer: IReducer<T>) {
export interface ITransformer<Original, Transformed> {
data: IDataTransformer<Original, Transformed>;
error?: IErrorTransformer;
}
export function newWriteableStream<T>(reducer: IReducer<T>): WriteableStream<T> {
return new WriteableStreamImpl<T>(reducer);
}
......@@ -119,7 +136,7 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
};
private readonly listeners = {
data: [] as { (chunk: T): void }[],
data: [] as { (data: T): void }[],
error: [] as { (error: Error): void }[],
end: [] as { (): void }[]
};
......@@ -302,13 +319,61 @@ export function consumeReadable<T>(readable: Readable<T>, reducer: IReducer<T>):
const chunks: T[] = [];
let chunk: T | null;
while (chunk = readable.read()) {
while ((chunk = readable.read()) !== null) {
chunks.push(chunk);
}
return reducer(chunks);
}
/**
* Helper to read a T readable up to a maximum of chunks. If the limit is
* reached, will return a readable instead to ensure all data can still
* be read.
*/
export function consumeReadableWithLimit<T>(readable: Readable<T>, reducer: IReducer<T>, maxChunks: number): T | Readable<T> {
const chunks: T[] = [];
let chunk: T | null | undefined = undefined;
while ((chunk = readable.read()) !== null && chunks.length < maxChunks) {
chunks.push(chunk);
}
// If the last chunk is null, it means we reached the end of
// the readable and return all the data at once
if (chunk === null && chunks.length > 0) {
return reducer(chunks);
}
// Otherwise, we still have a chunk, it means we reached the maxChunks
// value and as such we return a new Readable that first returns
// the existing read chunks and then continues with reading from
// the underlying readable.
return {
read: () => {
// First consume chunks from our array
if (chunks.length > 0) {
return chunks.shift()!;
}
// Then ensure to return our last read chunk
if (typeof chunk !== 'undefined') {
const lastReadChunk = chunk;
// explicitly use undefined here to indicate that we consumed
// the chunk, which could have either been null or valued.
chunk = undefined;
return lastReadChunk;
}
// Finally delegate back to the Readable
return readable.read();
}
};
}
/**
* Helper to fully read a T stream into a T.
*/
......@@ -316,12 +381,68 @@ export function consumeStream<T>(stream: ReadableStream<T>, reducer: IReducer<T>
return new Promise((resolve, reject) => {
const chunks: T[] = [];
stream.on('data', chunk => chunks.push(chunk));
stream.on('data', data => chunks.push(data));
stream.on('error', error => reject(error));
stream.on('end', () => resolve(reducer(chunks)));
});
}
/**
* Helper to read a T stream up to a maximum of chunks. If the limit is
* reached, will return a stream instead to ensure all data can still
* be read.
*/
export function consumeStreamWithLimit<T>(stream: ReadableStream<T>, reducer: IReducer<T>, maxChunks: number): Promise<T | ReadableStream<T>> {
return new Promise((resolve, reject) => {
const chunks: T[] = [];
let wrapperStream: WriteableStream<T> | undefined = undefined;
stream.on('data', data => {
// If we reach maxChunks, we start to return a stream
// and make sure that any data we have already read
// is in it as well
if (!wrapperStream && chunks.length === maxChunks) {
wrapperStream = newWriteableStream(reducer);
while (chunks.length) {
wrapperStream.write(chunks.shift()!);
}
wrapperStream.write(data);
return resolve(wrapperStream);
}
if (wrapperStream) {
wrapperStream.write(data);
} else {
chunks.push(data);
}
});
stream.on('error', error => {
if (wrapperStream) {
wrapperStream.error(error);
} else {
return reject(error);
}
});
stream.on('end', () => {
if (wrapperStream) {
while (chunks.length) {
wrapperStream.write(chunks.shift()!);
}
wrapperStream.end();
} else {
return resolve(reducer(chunks));
}
});
});
}
/**
* Helper to create a readable stream from an existing T.
*/
......@@ -352,12 +473,15 @@ export function toReadable<T>(t: T): Readable<T> {
};
}
export function transform<S, T>(stream: ReadableStream<S>, transformer: ITransformer<S, T>, reducer: IReducer<T>): ReadableStream<T> {
const target = newWriteableStream<T>(reducer);
/**
* Helper to transform a readable stream into another stream.
*/
export function transform<Original, Transformed>(stream: ReadableStreamEvents<Original>, transformer: ITransformer<Original, Transformed>, reducer: IReducer<Transformed>): ReadableStream<Transformed> {
const target = newWriteableStream<Transformed>(reducer);
stream.on('data', data => target.write(transformer(data)));
stream.on('data', data => target.write(transformer.data(data)));
stream.on('end', () => target.end());
stream.on('error', error => target.error(error));
stream.on('error', error => target.error(transformer.error ? transformer.error(error) : error));
return target;
}
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import * as assert from 'assert';
import { isReadableStream, newWriteableStream, Readable, consumeReadable, consumeReadableWithLimit, consumeStream, ReadableStream, toStream, toReadable, transform, consumeStreamWithLimit } from 'vs/base/common/stream';
suite('Stream', () => {
test('isReadableStream', () => {
assert.ok(!isReadableStream(Object.create(null)));
assert.ok(isReadableStream(newWriteableStream(d => d)));
});
test('WriteableStream', () => {
const stream = newWriteableStream<string>(strings => strings.join());
let error = false;
stream.on('error', e => {
error = true;
});
let end = false;
stream.on('end', () => {
end = true;
});
stream.write('Hello');
const chunks: string[] = [];
stream.on('data', data => {
chunks.push(data);
});
assert.equal(chunks[0], 'Hello');
stream.write('World');
assert.equal(chunks[1], 'World');
assert.equal(error, false);
assert.equal(end, false);
stream.pause();
stream.write('1');
stream.write('2');
stream.write('3');
assert.equal(chunks.length, 2);
stream.resume();
assert.equal(chunks.length, 3);
assert.equal(chunks[2], '1,2,3');
stream.error(new Error());
assert.equal(error, true);
stream.end('Final Bit');
assert.equal(chunks.length, 4);
assert.equal(chunks[3], 'Final Bit');
stream.destroy();
stream.write('Unexpected');
assert.equal(chunks.length, 4);
});
test('consumeReadable', () => {
const readable = arrayToReadable(['1', '2', '3', '4', '5']);
const consumed = consumeReadable(readable, strings => strings.join());
assert.equal(consumed, '1,2,3,4,5');
});
test('consumeReadableWithLimit', () => {
for (let i = 0; i < 5; i++) {
const readable = arrayToReadable(['1', '2', '3', '4', '5']);
const consumedOrReadable = consumeReadableWithLimit(readable, strings => strings.join(), i);
if (typeof consumedOrReadable === 'string') {
assert.fail('Unexpected result');
} else {
const consumed = consumeReadable(consumedOrReadable, strings => strings.join());
assert.equal(consumed, '1,2,3,4,5');
}
}
let readable = arrayToReadable(['1', '2', '3', '4', '5']);
let consumedOrReadable = consumeReadableWithLimit(readable, strings => strings.join(), 5);
assert.equal(consumedOrReadable, '1,2,3,4,5');
readable = arrayToReadable(['1', '2', '3', '4', '5']);
consumedOrReadable = consumeReadableWithLimit(readable, strings => strings.join(), 6);
assert.equal(consumedOrReadable, '1,2,3,4,5');
});
function arrayToReadable<T>(array: T[]): Readable<T> {
return {
read: () => array.shift() || null
};
}
function readableToStream(readable: Readable<string>): ReadableStream<string> {
const stream = newWriteableStream<string>(strings => strings.join());
// Simulate async behavior
setTimeout(() => {
let chunk: string | null = null;
while ((chunk = readable.read()) !== null) {
stream.write(chunk);
}
stream.end();
}, 0);
return stream;
}
test('consumeStream', async () => {
const stream = readableToStream(arrayToReadable(['1', '2', '3', '4', '5']));
const consumed = await consumeStream(stream, strings => strings.join());
assert.equal(consumed, '1,2,3,4,5');
});
test('consumeStreamWithLimit', async () => {
for (let i = 0; i < 5; i++) {
const readable = readableToStream(arrayToReadable(['1', '2', '3', '4', '5']));
const consumedOrStream = await consumeStreamWithLimit(readable, strings => strings.join(), i);
if (typeof consumedOrStream === 'string') {
assert.fail('Unexpected result');
} else {
const consumed = await consumeStream(consumedOrStream, strings => strings.join());
assert.equal(consumed, '1,2,3,4,5');
}
}
let stream = readableToStream(arrayToReadable(['1', '2', '3', '4', '5']));
let consumedOrStream = await consumeStreamWithLimit(stream, strings => strings.join(), 5);
assert.equal(consumedOrStream, '1,2,3,4,5');
stream = readableToStream(arrayToReadable(['1', '2', '3', '4', '5']));
consumedOrStream = await consumeStreamWithLimit(stream, strings => strings.join(), 6);
assert.equal(consumedOrStream, '1,2,3,4,5');
});
test('toStream', async () => {
const stream = toStream('1,2,3,4,5', strings => strings.join());
const consumed = await consumeStream(stream, strings => strings.join());
assert.equal(consumed, '1,2,3,4,5');
});
test('toReadable', async () => {
const readable = toReadable('1,2,3,4,5');
const consumed = await consumeReadable(readable, strings => strings.join());
assert.equal(consumed, '1,2,3,4,5');
});
test('transform', async () => {
const source = newWriteableStream<string>(strings => strings.join());
const result = transform(source, { data: string => string + string }, strings => strings.join());
// Simulate async behavior
setTimeout(() => {
source.write('1');
source.write('2');
source.write('3');
source.write('4');
source.end('5');
}, 0);
const consumed = await consumeStream(result, strings => strings.join());
assert.equal(consumed, '11,22,33,44,55');
});
});
......@@ -3,6 +3,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { localize } from 'vs/nls';
import { sep } from 'vs/base/common/path';
import { URI } from 'vs/base/common/uri';
import * as glob from 'vs/base/common/glob';
......@@ -13,6 +14,8 @@ import { IDisposable } from 'vs/base/common/lifecycle';
import { isEqualOrParent, isEqual } from 'vs/base/common/resources';
import { isUndefinedOrNull } from 'vs/base/common/types';
import { VSBuffer, VSBufferReadable, VSBufferReadableStream } from 'vs/base/common/buffer';
import { ReadableStreamEvents } from 'vs/base/common/stream';
import { CancellationToken } from 'vs/base/common/cancellation';
export const IFileService = createDecorator<IFileService>('fileService');
......@@ -158,6 +161,29 @@ export interface FileOverwriteOptions {
overwrite: boolean;
}
export interface FileReadStreamOptions {
/**
* Is an integer specifying where to begin reading from in the file. If position is undefined,
* data will be read from the current file position.
*/
readonly position?: number;
/**
* Is an integer specifying how many bytes to read from the file. By default, all bytes
* will be read.
*/
readonly length?: number;
/**
* If provided, the size of the file will be checked against the limits.
*/
limits?: {
readonly size?: number;
readonly memory?: number;
};
}
export interface FileWriteOptions {
overwrite: boolean;
create: boolean;
......@@ -194,6 +220,8 @@ export interface IWatchOptions {
export const enum FileSystemProviderCapabilities {
FileReadWrite = 1 << 1,
FileOpenReadWriteClose = 1 << 2,
FileReadStream = 1 << 4,
FileFolderCopy = 1 << 3,
PathCaseSensitive = 1 << 10,
......@@ -223,6 +251,8 @@ export interface IFileSystemProvider {
readFile?(resource: URI): Promise<Uint8Array>;
writeFile?(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise<void>;
readFileStream?(resource: URI, opts: FileReadStreamOptions, token?: CancellationToken): ReadableStreamEvents<Uint8Array>;
open?(resource: URI, opts: FileOpenOptions): Promise<number>;
close?(fd: number): Promise<void>;
read?(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise<number>;
......@@ -257,11 +287,21 @@ export function hasOpenReadWriteCloseCapability(provider: IFileSystemProvider):
return !!(provider.capabilities & FileSystemProviderCapabilities.FileOpenReadWriteClose);
}
export interface IFileSystemProviderWithFileReadStreamCapability extends IFileSystemProvider {
readFileStream(resource: URI, opts: FileReadStreamOptions, token?: CancellationToken): ReadableStreamEvents<Uint8Array>;
}
export function hasFileReadStreamCapability(provider: IFileSystemProvider): provider is IFileSystemProviderWithFileReadStreamCapability {
return !!(provider.capabilities & FileSystemProviderCapabilities.FileReadStream);
}
export enum FileSystemProviderErrorCode {
FileExists = 'EntryExists',
FileNotFound = 'EntryNotFound',
FileNotADirectory = 'EntryNotADirectory',
FileIsADirectory = 'EntryIsADirectory',
FileExceedsMemoryLimit = 'EntryExceedsMemoryLimit',
FileTooLarge = 'EntryTooLarge',
NoPermissions = 'NoPermissions',
Unavailable = 'Unavailable',
Unknown = 'Unknown'
......@@ -274,13 +314,21 @@ export class FileSystemProviderError extends Error {
}
}
export function createFileSystemProviderError(error: Error, code: FileSystemProviderErrorCode): FileSystemProviderError {
export function createFileSystemProviderError(error: Error | string, code: FileSystemProviderErrorCode): FileSystemProviderError {
const providerError = new FileSystemProviderError(error.toString(), code);
markAsFileSystemProviderError(providerError, code);
return providerError;
}
export function ensureFileSystemProviderError(error?: Error): Error {
if (!error) {
return createFileSystemProviderError(localize('unknownError', "Unknown Error"), FileSystemProviderErrorCode.Unknown); // https://github.com/Microsoft/vscode/issues/72798
}
return error;
}
export function markAsFileSystemProviderError(error: Error, code: FileSystemProviderErrorCode): Error {
error.name = code ? `${code} (FileSystemError)` : `FileSystemError`;
......@@ -311,6 +359,8 @@ export function toFileSystemProviderErrorCode(error: Error | undefined | null):
case FileSystemProviderErrorCode.FileIsADirectory: return FileSystemProviderErrorCode.FileIsADirectory;
case FileSystemProviderErrorCode.FileNotADirectory: return FileSystemProviderErrorCode.FileNotADirectory;
case FileSystemProviderErrorCode.FileNotFound: return FileSystemProviderErrorCode.FileNotFound;
case FileSystemProviderErrorCode.FileExceedsMemoryLimit: return FileSystemProviderErrorCode.FileExceedsMemoryLimit;
case FileSystemProviderErrorCode.FileTooLarge: return FileSystemProviderErrorCode.FileTooLarge;
case FileSystemProviderErrorCode.NoPermissions: return FileSystemProviderErrorCode.NoPermissions;
case FileSystemProviderErrorCode.Unavailable: return FileSystemProviderErrorCode.Unavailable;
}
......@@ -335,7 +385,10 @@ export function toFileOperationResult(error: Error): FileOperationResult {
return FileOperationResult.FILE_PERMISSION_DENIED;
case FileSystemProviderErrorCode.FileExists:
return FileOperationResult.FILE_MOVE_CONFLICT;
case FileSystemProviderErrorCode.FileNotADirectory:
case FileSystemProviderErrorCode.FileExceedsMemoryLimit:
return FileOperationResult.FILE_EXCEEDS_MEMORY_LIMIT;
case FileSystemProviderErrorCode.FileTooLarge:
return FileOperationResult.FILE_TOO_LARGE;
default:
return FileOperationResult.FILE_OTHER_ERROR;
}
......@@ -612,7 +665,7 @@ export interface IFileStreamContent extends IBaseStatWithMetadata {
value: VSBufferReadableStream;
}
export interface IReadFileOptions {
export interface IReadFileOptions extends FileReadStreamOptions {
/**
* The optional etag parameter allows to return early from resolving the resource if
......@@ -621,26 +674,6 @@ export interface IReadFileOptions {
* It is the task of the caller to makes sure to handle this error case from the promise.
*/
readonly etag?: string;
/**
* Is an integer specifying where to begin reading from in the file. If position is null,
* data will be read from the current file position.
*/
readonly position?: number;
/**
* Is an integer specifying how many bytes to read from the file. By default, all bytes
* will be read.
*/
readonly length?: number;
/**
* If provided, the size of the file will be checked against the limits.
*/
limits?: {
readonly size?: number;
readonly memory?: number;
};
}
export interface IWriteFileOptions {
......
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { localize } from 'vs/nls';
import { URI } from 'vs/base/common/uri';
import { VSBuffer, VSBufferWriteableStream, newWriteableBufferStream, VSBufferReadableStream } from 'vs/base/common/buffer';
import { CancellationToken } from 'vs/base/common/cancellation';
import { IFileSystemProviderWithOpenReadWriteCloseCapability, FileReadStreamOptions, createFileSystemProviderError, FileSystemProviderErrorCode, ensureFileSystemProviderError } from 'vs/platform/files/common/files';
import { canceled } from 'vs/base/common/errors';
export interface ICreateReadStreamOptions extends FileReadStreamOptions {
/**
* The size of the buffer to use before sending to the stream.
*/
bufferSize: number;
}
export function createReadStream(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, options: ICreateReadStreamOptions, token?: CancellationToken): VSBufferReadableStream {
const stream = newWriteableBufferStream();
// do not await reading but simply return the stream directly since it operates
// via events. finally end the stream and send through the possible error
let error: Error | undefined = undefined;
doReadFileIntoStream(provider, resource, stream, options, token).then(undefined, err => error = err).finally(() => stream.end(error));
return stream;
}
async function doReadFileIntoStream(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, stream: VSBufferWriteableStream, options: ICreateReadStreamOptions, token?: CancellationToken): Promise<void> {
// Check for cancellation
throwIfCancelled(token);
// open handle through provider
const handle = await provider.open(resource, { create: false });
// Check for cancellation
throwIfCancelled(token);
try {
let totalBytesRead = 0;
let bytesRead = 0;
let allowedRemainingBytes = (options && typeof options.length === 'number') ? options.length : undefined;
let buffer = VSBuffer.alloc(Math.min(options.bufferSize, typeof allowedRemainingBytes === 'number' ? allowedRemainingBytes : options.bufferSize));
let posInFile = options && typeof options.position === 'number' ? options.position : 0;
let posInBuffer = 0;
do {
// read from source (handle) at current position (pos) into buffer (buffer) at
// buffer position (posInBuffer) up to the size of the buffer (buffer.byteLength).
bytesRead = await provider.read(handle, posInFile, buffer.buffer, posInBuffer, buffer.byteLength - posInBuffer);
posInFile += bytesRead;
posInBuffer += bytesRead;
totalBytesRead += bytesRead;
if (typeof allowedRemainingBytes === 'number') {
allowedRemainingBytes -= bytesRead;
}
// when buffer full, create a new one and emit it through stream
if (posInBuffer === buffer.byteLength) {
stream.write(buffer);
buffer = VSBuffer.alloc(Math.min(options.bufferSize, typeof allowedRemainingBytes === 'number' ? allowedRemainingBytes : options.bufferSize));
posInBuffer = 0;
}
} while (bytesRead > 0 && (typeof allowedRemainingBytes !== 'number' || allowedRemainingBytes > 0) && throwIfCancelled(token) && throwIfTooLarge(totalBytesRead, options));
// wrap up with last buffer (also respect maxBytes if provided)
if (posInBuffer > 0) {
let lastChunkLength = posInBuffer;
if (typeof allowedRemainingBytes === 'number') {
lastChunkLength = Math.min(posInBuffer, allowedRemainingBytes);
}
stream.write(buffer.slice(0, lastChunkLength));
}
} catch (error) {
throw ensureFileSystemProviderError(error);
} finally {
await provider.close(handle);
}
}
function throwIfCancelled(token?: CancellationToken): boolean {
if (token && token.isCancellationRequested) {
throw canceled();
}
return true;
}
function throwIfTooLarge(totalBytesRead: number, options: ICreateReadStreamOptions): boolean {
// Return early if file is too large to load and we have configured limits
if (options?.limits) {
if (typeof options.limits.memory === 'number' && totalBytesRead > options.limits.memory) {
throw createFileSystemProviderError(localize('fileTooLargeForHeapError', "To open a file of this size, you need to restart and allow it to use more memory"), FileSystemProviderErrorCode.FileExceedsMemoryLimit);
}
if (typeof options.limits.size === 'number' && totalBytesRead > options.limits.size) {
throw createFileSystemProviderError(localize('fileTooLargeError', "File is too large to open"), FileSystemProviderErrorCode.FileTooLarge);
}
}
return true;
}
......@@ -6,7 +6,7 @@
import { mkdir, open, close, read, write, fdatasync, Dirent, Stats } from 'fs';
import { promisify } from 'util';
import { IDisposable, Disposable, toDisposable, dispose, combinedDisposable } from 'vs/base/common/lifecycle';
import { IFileSystemProvider, FileSystemProviderCapabilities, IFileChange, IWatchOptions, IStat, FileType, FileDeleteOptions, FileOverwriteOptions, FileWriteOptions, FileOpenOptions, FileSystemProviderErrorCode, createFileSystemProviderError, FileSystemProviderError } from 'vs/platform/files/common/files';
import { FileSystemProviderCapabilities, IFileChange, IWatchOptions, IStat, FileType, FileDeleteOptions, FileOverwriteOptions, FileWriteOptions, FileOpenOptions, FileSystemProviderErrorCode, createFileSystemProviderError, FileSystemProviderError, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, FileReadStreamOptions, IFileSystemProviderWithFileFolderCopyCapability } from 'vs/platform/files/common/files';
import { URI } from 'vs/base/common/uri';
import { Event, Emitter } from 'vs/base/common/event';
import { isLinux, isWindows } from 'vs/base/common/platform';
......@@ -22,13 +22,23 @@ import { FileWatcher as UnixWatcherService } from 'vs/platform/files/node/watche
import { FileWatcher as WindowsWatcherService } from 'vs/platform/files/node/watcher/win32/watcherService';
import { FileWatcher as NsfwWatcherService } from 'vs/platform/files/node/watcher/nsfw/watcherService';
import { FileWatcher as NodeJSWatcherService } from 'vs/platform/files/node/watcher/nodejs/watcherService';
import { VSBuffer } from 'vs/base/common/buffer';
import { CancellationToken } from 'vs/base/common/cancellation';
import { ReadableStreamEvents, transform } from 'vs/base/common/stream';
import { createReadStream } from 'vs/platform/files/common/io';
export interface IWatcherOptions {
pollingInterval?: number;
usePolling: boolean;
}
export class DiskFileSystemProvider extends Disposable implements IFileSystemProvider {
export class DiskFileSystemProvider extends Disposable implements
IFileSystemProviderWithFileReadWriteCapability,
IFileSystemProviderWithOpenReadWriteCloseCapability,
IFileSystemProviderWithFileReadStreamCapability,
IFileSystemProviderWithFileFolderCopyCapability {
private readonly BUFFER_SIZE = 64 * 1024;
constructor(private logService: ILogService, private watcherOptions?: IWatcherOptions) {
super();
......@@ -44,6 +54,7 @@ export class DiskFileSystemProvider extends Disposable implements IFileSystemPro
this._capabilities =
FileSystemProviderCapabilities.FileReadWrite |
FileSystemProviderCapabilities.FileOpenReadWriteClose |
FileSystemProviderCapabilities.FileReadStream |
FileSystemProviderCapabilities.FileFolderCopy;
if (isLinux) {
......@@ -121,6 +132,15 @@ export class DiskFileSystemProvider extends Disposable implements IFileSystemPro
}
}
readFileStream(resource: URI, opts: FileReadStreamOptions, token?: CancellationToken): ReadableStreamEvents<Uint8Array> {
const fileStream = createReadStream(this, resource, {
...opts,
bufferSize: this.BUFFER_SIZE
}, token);
return transform(fileStream, { data: data => data.buffer }, data => VSBuffer.concat(data.map(data => VSBuffer.wrap(data))).buffer);
}
async writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise<void> {
let handle: number | undefined = undefined;
try {
......@@ -131,11 +151,11 @@ export class DiskFileSystemProvider extends Disposable implements IFileSystemPro
const fileExists = await exists(filePath);
if (fileExists) {
if (!opts.overwrite) {
throw createFileSystemProviderError(new Error(localize('fileExists', "File already exists")), FileSystemProviderErrorCode.FileExists);
throw createFileSystemProviderError(localize('fileExists', "File already exists"), FileSystemProviderErrorCode.FileExists);
}
} else {
if (!opts.create) {
throw createFileSystemProviderError(new Error(localize('fileNotExists', "File does not exist")), FileSystemProviderErrorCode.FileNotFound);
throw createFileSystemProviderError(localize('fileNotExists', "File does not exist"), FileSystemProviderErrorCode.FileNotFound);
}
}
}
......@@ -441,13 +461,13 @@ export class DiskFileSystemProvider extends Disposable implements IFileSystemPro
}
if (isSameResourceWithDifferentPathCase && mode === 'copy') {
throw createFileSystemProviderError(new Error('File cannot be copied to same path with different path case'), FileSystemProviderErrorCode.FileExists);
throw createFileSystemProviderError(localize('fileCopyErrorPathCase', "'File cannot be copied to same path with different path case"), FileSystemProviderErrorCode.FileExists);
}
// handle existing target (unless this is a case change)
if (!isSameResourceWithDifferentPathCase && await exists(toFilePath)) {
if (!overwrite) {
throw createFileSystemProviderError(new Error('File at target already exists'), FileSystemProviderErrorCode.FileExists);
throw createFileSystemProviderError(localize('fileCopyErrorExists', "File at target already exists"), FileSystemProviderErrorCode.FileExists);
}
// Delete target
......
......@@ -5,14 +5,19 @@
import { Event, Emitter } from 'vs/base/common/event';
import { Disposable, IDisposable } from 'vs/base/common/lifecycle';
import { IFileSystemProviderWithFileReadWriteCapability, IFileChange, IWatchOptions, IStat, FileOverwriteOptions, FileType, FileWriteOptions, FileDeleteOptions, FileSystemProviderCapabilities, IFileSystemProviderWithOpenReadWriteCloseCapability, FileOpenOptions, hasReadWriteCapability, hasOpenReadWriteCloseCapability } from 'vs/platform/files/common/files';
import { IFileSystemProviderWithFileReadWriteCapability, IFileChange, IWatchOptions, IStat, FileOverwriteOptions, FileType, FileWriteOptions, FileDeleteOptions, FileSystemProviderCapabilities, IFileSystemProviderWithOpenReadWriteCloseCapability, FileOpenOptions, hasReadWriteCapability, hasOpenReadWriteCloseCapability, IFileSystemProviderWithFileReadStreamCapability, FileReadStreamOptions, hasFileReadStreamCapability } from 'vs/platform/files/common/files';
import { URI } from 'vs/base/common/uri';
import * as resources from 'vs/base/common/resources';
import { startsWith } from 'vs/base/common/strings';
import { BACKUPS } from 'vs/platform/environment/common/environment';
import { IWorkbenchEnvironmentService } from 'vs/workbench/services/environment/common/environmentService';
import { CancellationToken } from 'vs/base/common/cancellation';
import { ReadableStreamEvents } from 'vs/base/common/stream';
export class FileUserDataProvider extends Disposable implements IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability {
export class FileUserDataProvider extends Disposable implements
IFileSystemProviderWithFileReadWriteCapability,
IFileSystemProviderWithOpenReadWriteCloseCapability,
IFileSystemProviderWithFileReadStreamCapability {
readonly capabilities: FileSystemProviderCapabilities = this.fileSystemProvider.capabilities;
readonly onDidChangeCapabilities: Event<void> = Event.None;
......@@ -60,6 +65,13 @@ export class FileUserDataProvider extends Disposable implements IFileSystemProvi
throw new Error('not supported');
}
readFileStream(resource: URI, opts: FileReadStreamOptions, token?: CancellationToken): ReadableStreamEvents<Uint8Array> {
if (hasFileReadStreamCapability(this.fileSystemProvider)) {
return this.fileSystemProvider.readFileStream(this.toFileSystemResource(resource), opts, token);
}
throw new Error('not supported');
}
readdir(resource: URI): Promise<[string, FileType][]> {
return this.fileSystemProvider.readdir(this.toFileSystemResource(resource));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册