提交 b618e4a2 编写于 作者: B Benjamin Pasero

introduce writeFileStreamAndFlush

上级 81d1942e
......@@ -14,6 +14,7 @@ import * as fs from 'fs';
import * as paths from 'path';
import { TPromise } from 'vs/base/common/winjs.base';
import { nfcall } from 'vs/base/common/async';
import { Readable } from 'stream';
const loop = flow.loop;
......@@ -54,7 +55,7 @@ export function copy(source: string, target: string, callback: (error: Error) =>
}
if (!stat.isDirectory()) {
return pipeFs(source, target, stat.mode & 511, callback);
return doCopyFile(source, target, stat.mode & 511, callback);
}
if (copiedSources[source]) {
......@@ -75,6 +76,38 @@ export function copy(source: string, target: string, callback: (error: Error) =>
});
}
function doCopyFile(source: string, target: string, mode: number, callback: (error: Error) => void): void {
const reader = fs.createReadStream(source);
const writer = fs.createWriteStream(target, { mode });
let finished = false;
const finish = (error?: Error) => {
if (!finished) {
finished = true;
// in error cases, pass to callback
if (error) {
callback(error);
}
// we need to explicitly chmod because of https://github.com/nodejs/node/issues/1104
else {
fs.chmod(target, mode, callback);
}
}
};
// handle errors properly
reader.once('error', error => finish(error));
writer.once('error', error => finish(error));
// we are done (underlying fd has been closed)
writer.once('close', () => finish());
// start piping
reader.pipe(writer);
}
export function mkdirp(path: string, mode?: number): TPromise<boolean> {
const mkdir = () => nfcall(fs.mkdir, path, mode)
.then(null, (err: NodeJS.ErrnoException) => {
......@@ -88,11 +121,12 @@ export function mkdirp(path: string, mode?: number): TPromise<boolean> {
return TPromise.wrapError<boolean>(err);
});
// is root?
// stop at root
if (path === paths.dirname(path)) {
return TPromise.as(true);
}
// recursively mkdir
return mkdir().then(null, (err: NodeJS.ErrnoException) => {
if (err.code === 'ENOENT') {
return mkdirp(paths.dirname(path), mode).then(mkdir);
......@@ -102,40 +136,6 @@ export function mkdirp(path: string, mode?: number): TPromise<boolean> {
});
}
function pipeFs(source: string, target: string, mode: number, callback: (error: Error) => void): void {
let callbackHandled = false;
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(target, { mode: mode });
const onError = (error: Error) => {
if (!callbackHandled) {
callbackHandled = true;
callback(error);
}
};
readStream.on('error', onError);
writeStream.on('error', onError);
readStream.on('end', () => {
(<any>writeStream).end(() => { // In this case the write stream is known to have an end signature with callback
if (!callbackHandled) {
callbackHandled = true;
fs.chmod(target, mode, callback); // we need to explicitly chmod because of https://github.com/nodejs/node/issues/1104
}
});
});
// In node 0.8 there is no easy way to find out when the pipe operation has finished. As such, we use the end property = false
// so that we are in charge of calling end() on the write stream and we will be notified when the write stream is really done.
// We can do this because file streams have an end() method that allows to pass in a callback.
// In node 0.10 there is an event 'finish' emitted from the write stream that can be used. See
// https://groups.google.com/forum/?fromgroups=#!topic/nodejs/YWQ1sRoXOdI
readStream.pipe(writeStream, { end: false });
}
// Deletes the given path by first moving it out of the workspace. This has two benefits. For one, the operation can return fast because
// after the rename, the contents are out of the workspace although not yet deleted. The greater benefit however is that this operation
// will fail in case any file is used by another process. fs.unlink() in node will not bail if a file unlinked is used by another process.
......@@ -320,15 +320,95 @@ export function mv(source: string, target: string, callback: (error: Error) => v
});
}
let canFlush = true;
export function writeFileAndFlush(path: string, data: string | NodeBuffer | Readable, options: { mode?: number; flag?: string; }, callback: (error?: Error) => void): void {
options = ensureOptions(options);
if (data instanceof Readable) {
doWriteFileStreamAndFlush(path, data, options, callback);
} else {
doWriteFileAndFlush(path, data, options, callback);
}
}
function doWriteFileStreamAndFlush(path: string, reader: Readable, options: { mode?: number; flag?: string; }, callback: (error?: Error) => void): void {
// finish only once
let finished = false;
const finish = (error?: Error) => {
if (!finished) {
finished = true;
// in error cases we need to manually close streams
// if the write stream was successfully opened
if (error) {
if (isOpen) {
writer.once('close', () => callback(error));
writer.close();
} else {
callback(error);
}
}
// otherwise just return without error
else {
callback();
}
}
};
// create writer to target
const writer = fs.createWriteStream(path, options);
// handle errors properly
reader.once('error', error => finish(error));
writer.once('error', error => finish(error));
// save the fd for later use
let fd: number;
let isOpen: boolean;
writer.once('open', descriptor => {
fd = descriptor;
isOpen = true;
});
// we are done (underlying fd has been closed)
writer.once('close', () => finish());
// handle end event because we are in charge
reader.once('end', () => {
// flush to disk
if (canFlush && isOpen) {
fs.fdatasync(fd, (syncError: Error) => {
// In some exotic setups it is well possible that node fails to sync
// In that case we disable flushing and warn to the console
if (syncError) {
console.warn('[node.js fs] fdatasync is now disabled for this session because it failed: ', syncError);
canFlush = false;
}
writer.end();
});
}
// do not flush
else {
writer.end();
}
});
// end: false means we are in charge of ending the streams properly
reader.pipe(writer, { end: false });
}
// Calls fs.writeFile() followed by a fs.sync() call to flush the changes to disk
// We do this in cases where we want to make sure the data is really on disk and
// not in some cache.
//
// See https://github.com/nodejs/node/blob/v5.10.0/lib/fs.js#L1194
let canFlush = true;
export function writeFileAndFlush(path: string, data: string | NodeBuffer, options: { mode?: number; flag?: string; }, callback: (error: Error) => void): void {
options = ensureOptions(options);
function doWriteFileAndFlush(path: string, data: string | NodeBuffer, options: { mode?: number; flag?: string; }, callback: (error?: Error) => void): void {
if (!canFlush) {
return fs.writeFile(path, data, options, callback);
}
......
......@@ -13,6 +13,7 @@ import * as fs from 'fs';
import * as os from 'os';
import * as platform from 'vs/base/common/platform';
import { once } from 'vs/base/common/event';
import { Readable } from 'stream';
export function readdir(path: string): TPromise<string[]> {
return nfcall(extfs.readdir, path);
......@@ -101,6 +102,7 @@ const writeFilePathQueue: { [path: string]: Queue<void> } = Object.create(null);
export function writeFile(path: string, data: string, options?: { mode?: number; flag?: string; }): TPromise<void>;
export function writeFile(path: string, data: NodeBuffer, options?: { mode?: number; flag?: string; }): TPromise<void>;
export function writeFile(path: string, data: Readable, options?: { mode?: number; flag?: string; }): TPromise<void>;
export function writeFile(path: string, data: any, options?: { mode?: number; flag?: string; }): TPromise<void> {
let queueKey = toQueueKey(path);
......
......@@ -15,6 +15,7 @@ import uuid = require('vs/base/common/uuid');
import strings = require('vs/base/common/strings');
import extfs = require('vs/base/node/extfs');
import { onError } from 'vs/base/test/common/utils';
import { Readable } from 'stream';
const ignore = () => { };
......@@ -22,6 +23,37 @@ const mkdirp = (path: string, mode: number, callback: (error) => void) => {
extfs.mkdirp(path, mode).done(() => callback(null), error => callback(error));
};
const chunkSize = 64 * 1024;
const readError = 'Error while reading';
function toReadable(value: string, throwError?: boolean): Readable {
const totalChunks = Math.ceil(value.length / chunkSize);
const stringChunks: string[] = [];
for (let i = 0, j = 0; i < totalChunks; ++i, j += chunkSize) {
stringChunks[i] = value.substr(j, chunkSize);
}
let counter = 0;
return new Readable({
read: function () {
if (throwError) {
this.emit('error', new Error(readError));
}
let res: string;
let canPush = true;
while (canPush && (res = stringChunks[counter++])) {
canPush = this.push(res);
}
// EOS
if (!res) {
this.push(null);
}
}
});
}
suite('Extfs', () => {
test('mkdirp', function (done: () => void) {
......@@ -174,7 +206,7 @@ suite('Extfs', () => {
}
});
test('writeFileAndFlush', function (done: () => void) {
test('writeFileAndFlush (string)', function (done: () => void) {
const id = uuid.generateUuid();
const parentDir = path.join(os.tmpdir(), 'vsctests', id);
const newDir = path.join(parentDir, 'extfs', id);
......@@ -209,6 +241,192 @@ suite('Extfs', () => {
});
});
test('writeFileAndFlush (stream)', function (done: () => void) {
const id = uuid.generateUuid();
const parentDir = path.join(os.tmpdir(), 'vsctests', id);
const newDir = path.join(parentDir, 'extfs', id);
const testFile = path.join(newDir, 'flushed.txt');
mkdirp(newDir, 493, error => {
if (error) {
return onError(error, done);
}
assert.ok(fs.existsSync(newDir));
extfs.writeFileAndFlush(testFile, toReadable('Hello World'), null, error => {
if (error) {
return onError(error, done);
}
assert.equal(fs.readFileSync(testFile), 'Hello World');
const largeString = (new Array(100 * 1024)).join('Large String\n');
extfs.writeFileAndFlush(testFile, toReadable(largeString), null, error => {
if (error) {
return onError(error, done);
}
assert.equal(fs.readFileSync(testFile), largeString);
extfs.del(parentDir, os.tmpdir(), done, ignore);
});
});
});
});
test('writeFileAndFlush (file stream)', function (done: () => void) {
const id = uuid.generateUuid();
const parentDir = path.join(os.tmpdir(), 'vsctests', id);
const sourceFile = require.toUrl('./fixtures/index.html');
const newDir = path.join(parentDir, 'extfs', id);
const testFile = path.join(newDir, 'flushed.txt');
mkdirp(newDir, 493, error => {
if (error) {
return onError(error, done);
}
assert.ok(fs.existsSync(newDir));
extfs.writeFileAndFlush(testFile, fs.createReadStream(sourceFile), null, error => {
if (error) {
return onError(error, done);
}
assert.equal(fs.readFileSync(testFile).toString(), fs.readFileSync(sourceFile).toString());
extfs.del(parentDir, os.tmpdir(), done, ignore);
});
});
});
test('writeFileAndFlush (string, error handling)', function (done: () => void) {
const id = uuid.generateUuid();
const parentDir = path.join(os.tmpdir(), 'vsctests', id);
const newDir = path.join(parentDir, 'extfs', id);
const testFile = path.join(newDir, 'flushed.txt');
mkdirp(newDir, 493, error => {
if (error) {
return onError(error, done);
}
assert.ok(fs.existsSync(newDir));
fs.mkdirSync(testFile); // this will trigger an error because testFile is now a directory!
extfs.writeFileAndFlush(testFile, 'Hello World', null, error => {
if (!error) {
return onError(new Error('Expected error for writing to readonly file'), done);
}
extfs.del(parentDir, os.tmpdir(), done, ignore);
});
});
});
test('writeFileAndFlush (stream, error handling EISDIR)', function (done: () => void) {
const id = uuid.generateUuid();
const parentDir = path.join(os.tmpdir(), 'vsctests', id);
const newDir = path.join(parentDir, 'extfs', id);
const testFile = path.join(newDir, 'flushed.txt');
mkdirp(newDir, 493, error => {
if (error) {
return onError(error, done);
}
assert.ok(fs.existsSync(newDir));
fs.mkdirSync(testFile); // this will trigger an error because testFile is now a directory!
extfs.writeFileAndFlush(testFile, toReadable('Hello World'), null, error => {
if (!error || (<any>error).code !== 'EISDIR') {
return onError(new Error('Expected EISDIR error for writing to folder but got: ' + (error ? (<any>error).code : 'no error')), done);
}
extfs.del(parentDir, os.tmpdir(), done, ignore);
});
});
});
test('writeFileAndFlush (stream, error handling READERROR)', function (done: () => void) {
const id = uuid.generateUuid();
const parentDir = path.join(os.tmpdir(), 'vsctests', id);
const newDir = path.join(parentDir, 'extfs', id);
const testFile = path.join(newDir, 'flushed.txt');
mkdirp(newDir, 493, error => {
if (error) {
return onError(error, done);
}
assert.ok(fs.existsSync(newDir));
extfs.writeFileAndFlush(testFile, toReadable('Hello World', true /* throw error */), null, error => {
if (!error || error.message !== readError) {
return onError(new Error('Expected error for writing to folder'), done);
}
extfs.del(parentDir, os.tmpdir(), done, ignore);
});
});
});
test('pasero writeFileAndFlush (stream, error handling EACCES)', function (done: () => void) {
const id = uuid.generateUuid();
const parentDir = path.join(os.tmpdir(), 'vsctests', id);
const newDir = path.join(parentDir, 'extfs', id);
const testFile = path.join(newDir, 'flushed.txt');
mkdirp(newDir, 493, error => {
if (error) {
return onError(error, done);
}
assert.ok(fs.existsSync(newDir));
fs.writeFileSync(testFile, '');
fs.chmodSync(testFile, 33060); // make readonly
extfs.writeFileAndFlush(testFile, toReadable('Hello World'), null, error => {
if (!error || !((<any>error).code !== 'EACCES' || (<any>error).code !== 'EPERM')) {
return onError(new Error('Expected EACCES/EPERM error for writing to folder but got: ' + (error ? (<any>error).code : 'no error')), done);
}
extfs.del(parentDir, os.tmpdir(), done, ignore);
});
});
});
test('writeFileAndFlush (file stream, error handling)', function (done: () => void) {
const id = uuid.generateUuid();
const parentDir = path.join(os.tmpdir(), 'vsctests', id);
const sourceFile = require.toUrl('./fixtures/index.html');
const newDir = path.join(parentDir, 'extfs', id);
const testFile = path.join(newDir, 'flushed.txt');
mkdirp(newDir, 493, error => {
if (error) {
return onError(error, done);
}
assert.ok(fs.existsSync(newDir));
fs.mkdirSync(testFile); // this will trigger an error because testFile is now a directory!
extfs.writeFileAndFlush(testFile, fs.createReadStream(sourceFile), null, error => {
if (!error) {
return onError(new Error('Expected error for writing to folder'), done);
}
extfs.del(parentDir, os.tmpdir(), done, ignore);
});
});
});
test('writeFileAndFlushSync', function (done: () => void) {
const id = uuid.generateUuid();
const parentDir = path.join(os.tmpdir(), 'vsctests', id);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册