提交 a41ac457 编写于 作者: I isidor

queue.cancel

上级 78d7d64c
......@@ -5,6 +5,7 @@
'use strict';
import * as nls from 'vs/nls';
import * as errors from 'vs/base/common/errors';
import { TPromise, ValueCallback, ErrorCallback, ProgressCallback } from 'vs/base/common/winjs.base';
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
......@@ -486,7 +487,7 @@ interface ILimitedTaskFactory {
* ensures that at any time no more than M promises are running at the same time.
*/
export class Limiter<T> {
private runningPromises: number;
private runningPromises: TPromise<T>[];
private maxDegreeOfParalellism: number;
private outstandingPromises: ILimitedTaskFactory[];
private readonly _onFinished: Emitter<void>;
......@@ -494,7 +495,7 @@ export class Limiter<T> {
constructor(maxDegreeOfParalellism: number) {
this.maxDegreeOfParalellism = maxDegreeOfParalellism;
this.outstandingPromises = [];
this.runningPromises = 0;
this.runningPromises = [];
this._onFinished = new Emitter<void>();
}
......@@ -503,7 +504,7 @@ export class Limiter<T> {
}
public get size(): number {
return this.runningPromises + this.outstandingPromises.length;
return this.runningPromises.length + this.outstandingPromises.length;
}
queue(promiseFactory: ITask<TPromise>): TPromise;
......@@ -520,19 +521,29 @@ export class Limiter<T> {
});
}
cancel(): void {
const outstanding = this.outstandingPromises;
const running = this.runningPromises;
this.outstandingPromises = [];
this.runningPromises = [];
outstanding.forEach(o => o.e(new Error(nls.localize('canceled', "Canceled"))));
running.forEach(p => p.cancel());
}
private consume(): void {
while (this.outstandingPromises.length && this.runningPromises < this.maxDegreeOfParalellism) {
while (this.outstandingPromises.length && this.runningPromises.length < this.maxDegreeOfParalellism) {
const iLimitedTask = this.outstandingPromises.shift();
this.runningPromises++;
const promise = iLimitedTask.factory();
this.runningPromises.push(promise);
promise.done(iLimitedTask.c, iLimitedTask.e, iLimitedTask.p);
promise.done(() => this.consumed(), () => this.consumed());
promise.done(() => this.consumed(promise), () => this.consumed(promise));
}
}
private consumed(): void {
this.runningPromises--;
private consumed(promise: TPromise<T>): void {
this.runningPromises = this.runningPromises.filter(p => p !== promise);
if (this.outstandingPromises.length > 0) {
this.consume();
......
......@@ -507,6 +507,31 @@ suite('Async', () => {
});
});
test('Queue - cancel', function () {
let queue = new Async.Queue();
let res: number[] = [];
let f1 = () => TPromise.timeout(0).then(() => res.push(1));
let f2 = () => TPromise.timeout(10).then(() => res.push(2));
let f3 = () => TPromise.as(true).then(() => res.push(3));
let f4 = () => TPromise.timeout(20).then(() => res.push(4));
let f5 = () => TPromise.timeout(0).then(() => res.push(5));
queue.queue(f1).then(() => assert.fail());
queue.queue(f2).then(() => assert.fail());
queue.queue(f3).then(() => assert.fail());
assert.equal(queue.size, 3);
queue.cancel();
assert.equal(queue.size, 0);
queue.queue(f4).then(undefined, () => assert.fail());
return queue.queue(f5).then(() => {
assert.equal(res.length, 2);
assert.equal(res[0], 4);
assert.equal(res[1], 5);
});
});
test('Queue - errors bubble individually but not cause stop', function () {
let queue = new Async.Queue();
......@@ -547,6 +572,8 @@ suite('Async', () => {
return queue.queue(f2).then(() => {
return queue.queue(f3).then(() => {
return queue.queue(f4).then(() => {
// Queue is empty, cancel should be a noop
queue.cancel();
return queue.queue(f5).then(() => {
assert.equal(res[0], 1);
assert.equal(res[1], 2);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册