提交 d393578d 编写于 作者: B Benjamin Pasero

storage - switch to throttled delayer

上级 3fb99698
......@@ -242,8 +242,10 @@ export class Delayer<T> implements IDisposable {
* A helper to delay execution of a task that is being requested often, while
* preventing accumulation of consecutive executions, while the task runs.
*
* Simply combine the two mail men's strategies from the Throttler and Delayer
* helpers, for an analogy.
* The mail man is clever and waits for a certain amount of time, before going
* out to deliver letters. While the mail man is going out, more letters arrive
* and can only be delivered once he is back. Once he is back the mail man will
* do one more trip to deliver the letters that have accumulated while he was out.
*/
export class ThrottledDelayer<T> extends Delayer<TPromise<T>> {
......
......@@ -6,7 +6,7 @@
import { Database, Statement, OPEN_READWRITE, OPEN_CREATE } from 'vscode-sqlite3';
import { Disposable, IDisposable } from 'vs/base/common/lifecycle';
import { Emitter, Event } from 'vs/base/common/event';
import { RunOnceScheduler, Queue } from 'vs/base/common/async';
import { ThrottledDelayer } from 'vs/base/common/async';
import { isUndefinedOrNull } from 'vs/base/common/types';
import { mapToString, setToString } from 'vs/base/common/map';
import { basename } from 'path';
......@@ -50,8 +50,8 @@ export interface IStorage extends IDisposable {
getInteger(key: string, fallbackValue: number): number;
getInteger(key: string, fallbackValue?: number): number | undefined;
set(key: string, value: any): Promise<void>;
delete(key: string): Promise<void>;
set(key: string, value: any): Thenable<void>;
delete(key: string): Thenable<void>;
close(): Thenable<void>;
......@@ -72,19 +72,17 @@ export class Storage extends Disposable implements IStorage {
private storage: SQLiteStorageImpl;
private cache: Map<string, string> = new Map<string, string>();
private pendingQueue: Queue<void>;
private pendingScheduler: RunOnceScheduler;
private flushDelayer: ThrottledDelayer<void>;
private pendingDeletes: Set<string> = new Set<string>();
private pendingInserts: Map<string, string> = new Map();
private pendingPromises: { resolve: Function, reject: Function }[] = [];
constructor(options: IStorageOptions) {
super();
this.storage = new SQLiteStorageImpl(options);
this.pendingQueue = this._register(new Queue());
this.pendingScheduler = this._register(new RunOnceScheduler(() => this.flushPending(), Storage.FLUSH_DELAY));
this.flushDelayer = this._register(new ThrottledDelayer(Storage.FLUSH_DELAY));
}
get size(): number {
......@@ -139,7 +137,7 @@ export class Storage extends Disposable implements IStorage {
return parseInt(value, 10);
}
set(key: string, value: any): Promise<void> {
set(key: string, value: any): Thenable<void> {
if (this.state === StorageState.Closed) {
return Promise.resolve(); // Return early if we are already closed
}
......@@ -166,10 +164,11 @@ export class Storage extends Disposable implements IStorage {
// Event
this._onDidChangeStorage.fire(key);
return this.update();
// Accumulate work by scheduling after timeout
return this.flushDelayer.trigger(() => this.flushPending());
}
delete(key: string): Promise<void> {
delete(key: string): Thenable<void> {
if (this.state === StorageState.Closed) {
return Promise.resolve(); // Return early if we are already closed
}
......@@ -189,17 +188,8 @@ export class Storage extends Disposable implements IStorage {
// Event
this._onDidChangeStorage.fire(key);
return this.update();
}
private update(): Promise<void> {
// Schedule
if (!this.pendingScheduler.isScheduled()) {
this.pendingScheduler.schedule();
}
return new Promise((resolve, reject) => this.pendingPromises.push({ resolve, reject }));
// Accumulate work by scheduling after timeout
return this.flushDelayer.trigger(() => this.flushPending());
}
close(): Thenable<void> {
......@@ -210,44 +200,23 @@ export class Storage extends Disposable implements IStorage {
// Update state
this.state = StorageState.Closed;
// Dispose scheduler (no more scheduling possible)
this.pendingScheduler.dispose();
// Flush & close
return this.flushPending().then(() => {
return this.storage.close();
});
// Trigger new flush to ensure data is persisted and then close
// even if there is an error flushing. We must always ensure
// the DB is closed to avoid corruption.
const onDone = () => this.storage.close();
return this.flushDelayer.trigger(() => this.flushPending()).then(onDone, onDone);
}
private flushPending(): Thenable<void> {
// We use a Queue to ensure that:
// - there is only ever one call to storage.updateItems() at the same time
// - upon close() we are certain that all calls to storage.updateItems()
// have finished. Otherwise there is a risk that we close() the DB while
// a transaction is active.
return this.pendingQueue.queue(() => {
// Get pending data
const pendingPromises = this.pendingPromises;
const pendingDeletes = this.pendingDeletes;
const pendingInserts = this.pendingInserts;
// Reset pending data for next run
this.pendingPromises = [];
this.pendingDeletes = new Set<string>();
this.pendingInserts = new Map<string, string>();
// Get pending data
const updateRequest: IUpdateRequest = { insert: this.pendingInserts, delete: this.pendingDeletes };
return this.storage.updateItems({ insert: pendingInserts, delete: pendingDeletes }).then(() => {
// Reset pending data for next run
this.pendingDeletes = new Set<string>();
this.pendingInserts = new Map<string, string>();
// Resolve pending
pendingPromises.forEach(promise => promise.resolve());
}, error => {
// Forward error to pending
pendingPromises.forEach(promise => promise.reject(error));
});
});
return this.storage.updateItems(updateRequest);
}
getItems(): Promise<Map<string, string>> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册