提交 f77d6768 编写于 作者: B Benjamin Pasero

files - 💄

上级 bef1b60d
......@@ -13,12 +13,12 @@ import { TernarySearchTree } from 'vs/base/common/map';
import { isNonEmptyArray, coalesce } from 'vs/base/common/arrays';
import { getBaseLabel } from 'vs/base/common/labels';
import { ILogService } from 'vs/platform/log/common/log';
import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, bufferToStream, VSBufferReadableStream, VSBufferReadableBufferedStream, bufferedStreamToBuffer } from 'vs/base/common/buffer';
import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, bufferToStream, VSBufferReadableStream, VSBufferReadableBufferedStream, bufferedStreamToBuffer, newWriteableBufferStream } from 'vs/base/common/buffer';
import { isReadableStream, transform, peekReadable, peekStream, isReadableBufferedStream } from 'vs/base/common/stream';
import { Queue } from 'vs/base/common/async';
import { CancellationTokenSource, CancellationToken } from 'vs/base/common/cancellation';
import { Schemas } from 'vs/base/common/network';
import { createVSBufferReadStream } from 'vs/platform/files/common/io';
import { readFileIntoStream } from 'vs/platform/files/common/io';
export class FileService extends Disposable implements IFileService {
......@@ -476,11 +476,15 @@ export class FileService extends Disposable implements IFileService {
}
private readFileBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, token: CancellationToken, options: IReadFileOptions = Object.create(null)): VSBufferReadableStream {
return createVSBufferReadStream(provider, resource, {
const stream = newWriteableBufferStream();
readFileIntoStream(provider, resource, stream, data => data, {
...options,
bufferSize: this.BUFFER_SIZE,
errorTransformer: error => new FileOperationError(localize('err.read', "Unable to read file '{0}' ({1})", this.resourceForError(resource), ensureFileSystemProviderError(error).toString()), toFileOperationResult(error), options)
}, token);
return stream;
}
private async readFileUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, options?: IReadFileOptions): Promise<VSBufferReadableStream> {
......
......@@ -283,7 +283,7 @@ export interface IFileSystemProvider {
readFile?(resource: URI): Promise<Uint8Array>;
writeFile?(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise<void>;
readFileStream?(resource: URI, opts: FileReadStreamOptions, token?: CancellationToken): ReadableStreamEvents<Uint8Array>;
readFileStream?(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents<Uint8Array>;
open?(resource: URI, opts: FileOpenOptions): Promise<number>;
close?(fd: number): Promise<void>;
......@@ -320,7 +320,7 @@ export function hasOpenReadWriteCloseCapability(provider: IFileSystemProvider):
}
export interface IFileSystemProviderWithFileReadStreamCapability extends IFileSystemProvider {
readFileStream(resource: URI, opts: FileReadStreamOptions, token?: CancellationToken): ReadableStreamEvents<Uint8Array>;
readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents<Uint8Array>;
}
export function hasFileReadStreamCapability(provider: IFileSystemProvider): provider is IFileSystemProviderWithFileReadStreamCapability {
......
......@@ -5,11 +5,11 @@
import { localize } from 'vs/nls';
import { URI } from 'vs/base/common/uri';
import { VSBuffer, newWriteableBufferStream, VSBufferReadableStream } from 'vs/base/common/buffer';
import { VSBuffer } from 'vs/base/common/buffer';
import { CancellationToken } from 'vs/base/common/cancellation';
import { IFileSystemProviderWithOpenReadWriteCloseCapability, FileReadStreamOptions, createFileSystemProviderError, FileSystemProviderErrorCode, ensureFileSystemProviderError } from 'vs/platform/files/common/files';
import { canceled } from 'vs/base/common/errors';
import { IErrorTransformer, IDataTransformer, ReadableStream, WriteableStream, newWriteableStream } from 'vs/base/common/stream';
import { IErrorTransformer, IDataTransformer, WriteableStream } from 'vs/base/common/stream';
export interface ICreateReadStreamOptions extends FileReadStreamOptions {
......@@ -24,27 +24,21 @@ export interface ICreateReadStreamOptions extends FileReadStreamOptions {
errorTransformer?: IErrorTransformer;
}
export function createVSBufferReadStream(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, options: ICreateReadStreamOptions, token?: CancellationToken): VSBufferReadableStream {
const stream = newWriteableBufferStream();
readFileIntoStream(provider, resource, stream, data => data, options, token);
return stream;
}
export function createUint8ArrayReadStream(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, options: ICreateReadStreamOptions, token?: CancellationToken): ReadableStream<Uint8Array> {
const stream = newWriteableStream<Uint8Array>(data => VSBuffer.concat(data.map(data => VSBuffer.wrap(data))).buffer);
readFileIntoStream(provider, resource, stream, data => data.buffer, options, token);
return stream;
}
async function readFileIntoStream<T>(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, stream: WriteableStream<T>, transformer: IDataTransformer<VSBuffer, T>, options: ICreateReadStreamOptions, token?: CancellationToken): Promise<void> {
/**
* A helper to read a file from a provider with open/read/close capability into a stream.
*/
export async function readFileIntoStream<T>(
provider: IFileSystemProviderWithOpenReadWriteCloseCapability,
resource: URI,
target: WriteableStream<T>,
transformer: IDataTransformer<VSBuffer, T>,
options: ICreateReadStreamOptions,
token: CancellationToken
): Promise<void> {
let error: Error | undefined = undefined;
try {
await doReadFileIntoStream(provider, resource, stream, transformer, options, token);
await doReadFileIntoStream(provider, resource, target, transformer, options, token);
} catch (err) {
error = err;
} finally {
......@@ -52,11 +46,11 @@ async function readFileIntoStream<T>(provider: IFileSystemProviderWithOpenReadWr
error = options.errorTransformer(error);
}
stream.end(error);
target.end(error);
}
}
async function doReadFileIntoStream<T>(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, stream: WriteableStream<T>, transformer: IDataTransformer<VSBuffer, T>, options: ICreateReadStreamOptions, token?: CancellationToken): Promise<void> {
async function doReadFileIntoStream<T>(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, target: WriteableStream<T>, transformer: IDataTransformer<VSBuffer, T>, options: ICreateReadStreamOptions, token: CancellationToken): Promise<void> {
// Check for cancellation
throwIfCancelled(token);
......@@ -91,7 +85,7 @@ async function doReadFileIntoStream<T>(provider: IFileSystemProviderWithOpenRead
// when buffer full, create a new one and emit it through stream
if (posInBuffer === buffer.byteLength) {
stream.write(transformer(buffer));
target.write(transformer(buffer));
buffer = VSBuffer.alloc(Math.min(options.bufferSize, typeof allowedRemainingBytes === 'number' ? allowedRemainingBytes : options.bufferSize));
......@@ -106,7 +100,7 @@ async function doReadFileIntoStream<T>(provider: IFileSystemProviderWithOpenRead
lastChunkLength = Math.min(posInBuffer, allowedRemainingBytes);
}
stream.write(transformer(buffer.slice(0, lastChunkLength)));
target.write(transformer(buffer.slice(0, lastChunkLength)));
}
} catch (error) {
throw ensureFileSystemProviderError(error);
......@@ -115,8 +109,8 @@ async function doReadFileIntoStream<T>(provider: IFileSystemProviderWithOpenRead
}
}
function throwIfCancelled(token?: CancellationToken): boolean {
if (token && token.isCancellationRequested) {
function throwIfCancelled(token: CancellationToken): boolean {
if (token.isCancellationRequested) {
throw canceled();
}
......
......@@ -23,9 +23,10 @@ import { FileWatcher as WindowsWatcherService } from 'vs/platform/files/node/wat
import { FileWatcher as NsfwWatcherService } from 'vs/platform/files/node/watcher/nsfw/watcherService';
import { FileWatcher as NodeJSWatcherService } from 'vs/platform/files/node/watcher/nodejs/watcherService';
import { CancellationToken } from 'vs/base/common/cancellation';
import { ReadableStreamEvents } from 'vs/base/common/stream';
import { createUint8ArrayReadStream } from 'vs/platform/files/common/io';
import { ReadableStreamEvents, newWriteableStream } from 'vs/base/common/stream';
import { readFileIntoStream } from 'vs/platform/files/common/io';
import { insert } from 'vs/base/common/arrays';
import { VSBuffer } from 'vs/base/common/buffer';
export interface IWatcherOptions {
pollingInterval?: number;
......@@ -153,11 +154,15 @@ export class DiskFileSystemProvider extends Disposable implements
}
}
readFileStream(resource: URI, opts: FileReadStreamOptions, token?: CancellationToken): ReadableStreamEvents<Uint8Array> {
return createUint8ArrayReadStream(this, resource, {
readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents<Uint8Array> {
const stream = newWriteableStream<Uint8Array>(data => VSBuffer.concat(data.map(data => VSBuffer.wrap(data))).buffer);
readFileIntoStream(this, resource, stream, data => data.buffer, {
...opts,
bufferSize: this.BUFFER_SIZE
}, token);
return stream;
}
async writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise<void> {
......
......@@ -121,7 +121,7 @@ export class RemoteFileSystemProvider extends Disposable implements
return buff.buffer;
}
readFileStream(resource: URI, opts: FileReadStreamOptions, token?: CancellationToken): ReadableStreamEvents<Uint8Array> {
readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents<Uint8Array> {
const stream = newWriteableStream<Uint8Array>(data => VSBuffer.concat(data.map(data => VSBuffer.wrap(data))).buffer);
// Reading as file stream goes through an event to the remote side
......@@ -156,19 +156,17 @@ export class RemoteFileSystemProvider extends Disposable implements
});
// Support cancellation
if (token) {
token.onCancellationRequested(() => {
token.onCancellationRequested(() => {
// Ensure to end the stream properly with an error
// to indicate the cancellation.
stream.end(canceled());
// Ensure to end the stream properly with an error
// to indicate the cancellation.
stream.end(canceled());
// Ensure to dispose the listener upon cancellation. This will
// bubble through the remote side as event and allows to stop
// reading the file.
listener.dispose();
});
}
// Ensure to dispose the listener upon cancellation. This will
// bubble through the remote side as event and allows to stop
// reading the file.
listener.dispose();
});
return stream;
}
......
......@@ -67,7 +67,7 @@ export class FileUserDataProvider extends Disposable implements
throw new Error('not supported');
}
readFileStream(resource: URI, opts: FileReadStreamOptions, token?: CancellationToken): ReadableStreamEvents<Uint8Array> {
readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents<Uint8Array> {
if (hasFileReadStreamCapability(this.fileSystemProvider)) {
return this.fileSystemProvider.readFileStream(this.toFileSystemResource(resource), opts, token);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册