提交 0f4c0a7b 编写于 作者: B Benjamin Pasero 提交者: Sandeep Somavarapu

files - first cut allow stream for writeFile

上级 0df67647
......@@ -182,6 +182,12 @@ export interface VSBufferReadableStream {
destroy(): void;
}
export function isVSBufferReadableStream(obj: any): obj is VSBufferReadableStream {
const candidate: VSBufferReadableStream = obj;
return candidate && [candidate.on, candidate.pause, candidate.resume, candidate.destroy].every(fn => typeof fn === 'function');
}
/**
* Helper to fully read a VSBuffer readable into a single buffer.
*/
......
......@@ -14,7 +14,7 @@ import { TernarySearchTree } from 'vs/base/common/map';
import { isNonEmptyArray, coalesce } from 'vs/base/common/arrays';
import { getBaseLabel } from 'vs/base/common/labels';
import { ILogService } from 'vs/platform/log/common/log';
import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, bufferToStream, VSBufferReadableStream, writeableBufferStream, VSBufferWriteableStream } from 'vs/base/common/buffer';
import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, bufferToStream, VSBufferReadableStream, writeableBufferStream, VSBufferWriteableStream, isVSBufferReadableStream } from 'vs/base/common/buffer';
import { Queue } from 'vs/base/common/async';
import { CancellationTokenSource, CancellationToken } from 'vs/base/common/cancellation';
import { Schemas } from 'vs/base/common/network';
......@@ -281,7 +281,7 @@ export class FileService extends Disposable implements IFileService {
return fileStat;
}
async writeFile(resource: URI, bufferOrReadable: VSBuffer | VSBufferReadable, options?: IWriteFileOptions): Promise<IFileStatWithMetadata> {
async writeFile(resource: URI, bufferOrReadableOrStream: VSBuffer | VSBufferReadable | VSBufferReadableStream, options?: IWriteFileOptions): Promise<IFileStatWithMetadata> {
const provider = this.throwIfFileSystemIsReadonly(await this.withReadWriteProvider(resource));
try {
......@@ -296,12 +296,12 @@ export class FileService extends Disposable implements IFileService {
// write file: buffered
if (hasOpenReadWriteCloseCapability(provider)) {
await this.doWriteBuffered(provider, resource, bufferOrReadable instanceof VSBuffer ? bufferToReadable(bufferOrReadable) : bufferOrReadable);
await this.doWriteBuffered(provider, resource, bufferOrReadableOrStream instanceof VSBuffer ? bufferToReadable(bufferOrReadableOrStream) : bufferOrReadableOrStream);
}
// write file: unbuffered
else {
await this.doWriteUnbuffered(provider, resource, bufferOrReadable);
await this.doWriteUnbuffered(provider, resource, bufferOrReadableOrStream);
}
} catch (error) {
throw new FileOperationError(localize('err.write', "Unable to write file ({0})", this.ensureError(error).toString()), toFileOperationResult(error), options);
......@@ -857,29 +857,60 @@ export class FileService extends Disposable implements IFileService {
return isPathCaseSensitive ? resource.toString() : resource.toString().toLowerCase();
}
private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readable: VSBufferReadable): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteBufferedQueued(provider, resource, readable));
}
private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readableOrStream: VSBufferReadable | VSBufferReadableStream): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(async () => {
private async doWriteBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readable: VSBufferReadable): Promise<void> {
// open handle
const handle = await provider.open(resource, { create: true });
// open handle
const handle = await provider.open(resource, { create: true });
// write into handle until all bytes from buffer have been written
try {
if (isVSBufferReadableStream(readableOrStream)) {
await this.doWriteStreamBufferedQueued(provider, handle, readableOrStream);
} else {
await this.doWriteReadableBufferedQueued(provider, handle, readableOrStream);
}
} catch (error) {
throw this.ensureError(error);
} finally {
// write into handle until all bytes from buffer have been written
try {
// close handle always
await provider.close(handle);
}
});
}
private doWriteStreamBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, stream: VSBufferReadableStream): Promise<void> {
return new Promise((resolve, reject) => {
let posInFile = 0;
let chunk: VSBuffer | null;
while (chunk = readable.read()) {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);
stream.on('data', async chunk => {
stream.pause();
try {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);
} catch (error) {
reject(error);
}
posInFile += chunk.byteLength;
}
} catch (error) {
throw this.ensureError(error);
} finally {
await provider.close(handle);
stream.resume();
});
stream.on('error', error => reject(error));
stream.on('end', () => resolve());
});
}
private async doWriteReadableBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, readable: VSBufferReadable): Promise<void> {
let posInFile = 0;
let chunk: VSBuffer | null;
while (chunk = readable.read()) {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);
posInFile += chunk.byteLength;
}
}
......@@ -891,16 +922,18 @@ export class FileService extends Disposable implements IFileService {
}
}
private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadable: VSBuffer | VSBufferReadable): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadable));
private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStream: VSBuffer | VSBufferReadable | VSBufferReadableStream): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadableOrStream));
}
private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadable: VSBuffer | VSBufferReadable): Promise<void> {
private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStream: VSBuffer | VSBufferReadable | VSBufferReadableStream): Promise<void> {
let buffer: VSBuffer;
if (bufferOrReadable instanceof VSBuffer) {
buffer = bufferOrReadable;
if (bufferOrReadableOrStream instanceof VSBuffer) {
buffer = bufferOrReadableOrStream;
} else if (isVSBufferReadableStream(bufferOrReadableOrStream)) {
buffer = await streamToBuffer(bufferOrReadableOrStream);
} else {
buffer = readableToBuffer(bufferOrReadable);
buffer = readableToBuffer(bufferOrReadableOrStream);
}
return provider.writeFile(resource, buffer.buffer, { create: true, overwrite: true });
......
......@@ -20,7 +20,7 @@ import { NullLogService } from 'vs/platform/log/common/log';
import { isLinux, isWindows } from 'vs/base/common/platform';
import { DisposableStore } from 'vs/base/common/lifecycle';
import { isEqual } from 'vs/base/common/resources';
import { VSBuffer, VSBufferReadable } from 'vs/base/common/buffer';
import { VSBuffer, VSBufferReadable, bufferToStream } from 'vs/base/common/buffer';
function getByName(root: IFileStat, name: string): IFileStat | null {
if (root.children === undefined) {
......@@ -1545,6 +1545,48 @@ suite('Disk File Service', () => {
assert.equal(readFileSync(resource.fsPath), newContent);
});
test('writeFile (large file - stream) - buffered', async () => {
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileOpenReadWriteClose);
const resource = URI.file(join(testDir, 'lorem.txt'));
const content = readFileSync(resource.fsPath);
const newContent = content.toString() + content.toString();
const fileStat = await service.writeFile(resource, bufferToStream(VSBuffer.fromString(newContent)));
assert.equal(fileStat.name, 'lorem.txt');
assert.equal(readFileSync(resource.fsPath), newContent);
});
test('writeFile (stream) - unbuffered', async () => {
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileReadWrite);
const resource = URI.file(join(testDir, 'small.txt'));
const content = readFileSync(resource.fsPath);
assert.equal(content, 'Small File');
const newContent = 'Updates to the small file';
await service.writeFile(resource, bufferToStream(VSBuffer.fromString(newContent)));
assert.equal(readFileSync(resource.fsPath), newContent);
});
test('writeFile (large file - stream) - unbuffered', async () => {
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileReadWrite);
const resource = URI.file(join(testDir, 'lorem.txt'));
const content = readFileSync(resource.fsPath);
const newContent = content.toString() + content.toString();
const fileStat = await service.writeFile(resource, bufferToStream(VSBuffer.fromString(newContent)));
assert.equal(fileStat.name, 'lorem.txt');
assert.equal(readFileSync(resource.fsPath), newContent);
});
test('writeFile (file is created including parents)', async () => {
const resource = URI.file(join(testDir, 'other', 'newfile.txt'));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册