提交 839719d4 编写于 作者: J Johannes Rieken

simplify AsyncEmitter usage

上级 3f69d88a
......@@ -654,27 +654,39 @@ export interface IWaitUntil {
export class AsyncEmitter<T extends IWaitUntil> extends Emitter<T> {
private _asyncDeliveryQueue?: [Listener<T>, T, Promise<any>[]][];
private _asyncDeliveryQueue?: LinkedList<[Listener<T>, Omit<T, 'waitUntil'>]>;
async fireAsync(eventFn: (thenables: Promise<any>[], listener: Function) => T, token: CancellationToken): Promise<void> {
async fireAsync(data: Omit<T, 'waitUntil'>, token: CancellationToken, promiseJoin?: (p: Promise<any>) => Promise<any>): Promise<void> {
if (!this._listeners) {
return;
}
// put all [listener,event]-pairs into delivery queue
// then emit all event. an inner/nested event might be
// the driver of this
if (!this._asyncDeliveryQueue) {
this._asyncDeliveryQueue = [];
this._asyncDeliveryQueue = new LinkedList();
}
for (let iter = this._listeners.iterator(), e = iter.next(); !e.done; e = iter.next()) {
const thenables: Promise<void>[] = [];
this._asyncDeliveryQueue.push([e.value, eventFn(thenables, typeof e.value === 'function' ? e.value : e.value[0]), thenables]);
this._asyncDeliveryQueue.push([e.value, data]);
}
while (this._asyncDeliveryQueue.length > 0 && !token.isCancellationRequested) {
const [listener, event, thenables] = this._asyncDeliveryQueue.shift()!;
while (this._asyncDeliveryQueue.size > 0 && !token.isCancellationRequested) {
const [listener, data] = this._asyncDeliveryQueue.shift()!;
const thenables: Promise<any>[] = [];
const event = <T>{
...data,
waitUntil: (p: Promise<any>): void => {
if (Object.isFrozen(thenables)) {
throw new Error('waitUntil can NOT be called asynchronous');
}
if (promiseJoin) {
p = promiseJoin(p);
}
thenables.push(p);
}
};
try {
if (typeof listener === 'function') {
listener.call(undefined, event);
......
......@@ -3,7 +3,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import * as assert from 'assert';
import { Event, Emitter, EventBufferer, EventMultiplexer, AsyncEmitter, IWaitUntil, PauseableEmitter } from 'vs/base/common/event';
import { Event, Emitter, EventBufferer, EventMultiplexer, IWaitUntil, PauseableEmitter, AsyncEmitter } from 'vs/base/common/event';
import { IDisposable, DisposableStore } from 'vs/base/common/lifecycle';
import * as Errors from 'vs/base/common/errors';
import { timeout } from 'vs/base/common/async';
......@@ -273,11 +273,7 @@ suite('AsyncEmitter', function () {
assert.equal(typeof e.waitUntil, 'function');
});
emitter.fireAsync(thenables => ({
foo: true,
bar: 1,
waitUntil(t: Promise<void>) { thenables.push(t); }
}), CancellationToken.None);
emitter.fireAsync({ foo: true, bar: 1, }, CancellationToken.None);
emitter.dispose();
});
......@@ -304,12 +300,7 @@ suite('AsyncEmitter', function () {
}));
});
await emitter.fireAsync(thenables => ({
foo: true,
waitUntil(t) {
thenables.push(t);
}
}), CancellationToken.None);
await emitter.fireAsync({ foo: true }, CancellationToken.None);
assert.equal(globalState, 2);
});
......@@ -325,12 +316,7 @@ suite('AsyncEmitter', function () {
emitter.event(e => {
e.waitUntil(timeout(10).then(async _ => {
if (e.foo === 1) {
await emitter.fireAsync(thenables => ({
foo: 2,
waitUntil(t) {
thenables.push(t);
}
}), CancellationToken.None);
await emitter.fireAsync({ foo: 2 }, CancellationToken.None);
assert.deepEqual(events, [1, 2]);
done = true;
}
......@@ -343,12 +329,7 @@ suite('AsyncEmitter', function () {
e.waitUntil(timeout(7));
});
await emitter.fireAsync(thenables => ({
foo: 1,
waitUntil(t) {
thenables.push(t);
}
}), CancellationToken.None);
await emitter.fireAsync({ foo: 1 }, CancellationToken.None);
assert.ok(done);
});
......@@ -373,12 +354,7 @@ suite('AsyncEmitter', function () {
e.waitUntil(timeout(10));
});
await emitter.fireAsync(thenables => ({
foo: true,
waitUntil(t) {
thenables.push(t);
}
}), CancellationToken.None).then(() => {
await emitter.fireAsync({ foo: true }, CancellationToken.None).then(() => {
assert.equal(globalState, 2);
}).catch(e => {
console.log(e);
......
......@@ -196,26 +196,14 @@ export class ExtHostFileSystemEventService implements ExtHostFileSystemEventServ
private async _fireWillEvent<E extends IWaitUntil>(emitter: AsyncEmitter<E>, data: Omit<E, 'waitUntil'>, token: CancellationToken): Promise<any> {
const edits: WorkspaceEdit[] = [];
await Promise.resolve(emitter.fireAsync(bucket => {
return <E>{
...data,
...{
waitUntil: (thenable: Promise<vscode.WorkspaceEdit>): void => {
if (Object.isFrozen(bucket)) {
throw new TypeError('waitUntil cannot be called async');
}
const promise = Promise.resolve(thenable).then(result => {
// ignore all results except for WorkspaceEdits. Those
// are stored in a spare array
if (result instanceof WorkspaceEdit) {
edits.push(result);
}
});
bucket.push(promise);
}
}
};
}, token));
await emitter.fireAsync(data, token, async p => {
// ignore all results except for WorkspaceEdits. Those are stored in an array.
const result = await Promise.resolve(p);
if (result instanceof WorkspaceEdit) {
edits.push(result);
}
});
if (token.isCancellationRequested) {
return;
......
......@@ -345,7 +345,7 @@ export abstract class AbstractTextFileService extends Disposable implements ITex
async create(resource: URI, value?: string | ITextSnapshot, options?: ICreateFileOptions): Promise<IFileStatWithMetadata> {
// before event
await this._onWillRunOperation.fireAsync(promises => new FileOperationWillRunEvent(promises, FileOperation.CREATE, resource), CancellationToken.None);
await this._onWillRunOperation.fireAsync({ operation: FileOperation.CREATE, target: resource }, CancellationToken.None);
const stat = await this.doCreate(resource, value, options);
......@@ -375,7 +375,7 @@ export abstract class AbstractTextFileService extends Disposable implements ITex
async delete(resource: URI, options?: { useTrash?: boolean, recursive?: boolean }): Promise<void> {
// before event
await this._onWillRunOperation.fireAsync(promises => new FileOperationWillRunEvent(promises, FileOperation.DELETE, resource), CancellationToken.None);
await this._onWillRunOperation.fireAsync({ operation: FileOperation.DELETE, target: resource }, CancellationToken.None);
const dirtyFiles = this.getDirty().filter(dirty => isEqualOrParent(dirty, resource));
await this.revertAll(dirtyFiles, { soft: true });
......@@ -389,7 +389,7 @@ export abstract class AbstractTextFileService extends Disposable implements ITex
async move(source: URI, target: URI, overwrite?: boolean): Promise<IFileStatWithMetadata> {
// before event
await this._onWillRunOperation.fireAsync(promises => new FileOperationWillRunEvent(promises, FileOperation.MOVE, target, source), CancellationToken.None);
await this._onWillRunOperation.fireAsync({ operation: FileOperation.MOVE, target, source }, CancellationToken.None);
// find all models that related to either source or target (can be many if resource is a folder)
const sourceModels: ITextFileEditorModel[] = [];
......
......@@ -131,21 +131,10 @@ export interface ITextFileService extends IDisposable {
move(source: URI, target: URI, overwrite?: boolean): Promise<IFileStatWithMetadata>;
}
export class FileOperationWillRunEvent implements IWaitUntil {
constructor(
private _thenables: Promise<any>[],
readonly operation: FileOperation,
readonly target: URI,
readonly source?: URI | undefined
) { }
waitUntil(thenable: Promise<any>): void {
if (Object.isFrozen(this._thenables)) {
throw new Error('waitUntil cannot be used aync');
}
this._thenables.push(thenable);
}
export interface FileOperationWillRunEvent extends IWaitUntil {
operation: FileOperation;
target: URI;
source?: URI;
}
export class FileOperationDidRunEvent {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册