提交 ed0057fd 编写于 作者: B Benjamin Pasero

storage - have a connection object

上级 b6792745
......@@ -298,9 +298,13 @@ export class Storage extends Disposable implements IStorage {
}
}
interface IOpenDatabaseResult {
interface IDatabaseConnection {
db: Database;
path: string;
isInMemory: boolean;
isCorrupted?: boolean;
isErroneous?: boolean;
}
export interface ISQLiteStorageDatabaseOptions {
......@@ -322,25 +326,27 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
private static BUSY_OPEN_TIMEOUT = 2000; // timeout in ms to retry when opening DB fails with SQLITE_BUSY
private path: string;
private name: string;
private logger: SQLiteStorageDatabaseLogger;
private isCorrupt: boolean;
private logger: SQLiteStorageDatabaseLogger;
private whenOpened: Promise<IOpenDatabaseResult>;
private whenConnected: Promise<IDatabaseConnection>;
constructor(path: string, options: ISQLiteStorageDatabaseOptions = Object.create(null)) {
this.path = path;
this.name = basename(path);
this.logger = new SQLiteStorageDatabaseLogger(options.logging);
this.whenOpened = this.open(path);
this.whenConnected = this.connect(path);
}
getItems(): Promise<Map<string, string>> {
return this.whenOpened.then(({ db }) => {
return this.whenConnected.then(connection => {
const items = new Map<string, string>();
return this.all(db, 'SELECT * FROM ItemTable').then(rows => {
return this.all(connection, 'SELECT * FROM ItemTable').then(rows => {
rows.forEach(row => items.set(row.key, row.value));
if (this.logger.isTracing) {
......@@ -369,10 +375,10 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
this.logger.trace(`[storage ${this.name}] updateItems(): insert(${request.insert ? mapToString(request.insert) : '0'}), delete(${request.delete ? setToString(request.delete) : '0'})`);
}
return this.whenOpened.then(({ db }) => {
return this.transaction(db, () => {
return this.whenConnected.then(connection => {
return this.transaction(connection, () => {
if (request.insert && request.insert.size > 0) {
this.prepare(db, 'INSERT INTO ItemTable VALUES (?,?)', stmt => {
this.prepare(connection, 'INSERT INTO ItemTable VALUES (?,?)', stmt => {
request.insert!.forEach((value, key) => {
stmt.run([key, value]);
});
......@@ -389,7 +395,7 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
}
if (request.delete && request.delete.size) {
this.prepare(db, 'DELETE FROM ItemTable WHERE key=?', stmt => {
this.prepare(connection, 'DELETE FROM ItemTable WHERE key=?', stmt => {
request.delete!.forEach(key => {
stmt.run(key);
});
......@@ -409,25 +415,25 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
close(): Promise<void> {
this.logger.trace(`[storage ${this.name}] close()`);
return this.whenOpened.then(result => {
return this.whenConnected.then(connection => {
return new Promise((resolve, reject) => {
result.db.close(closeError => {
connection.db.close(closeError => {
if (closeError) {
this.handleSQLiteError(closeError, `[storage ${this.name}] close(): ${closeError}`);
this.handleSQLiteError(connection, closeError, `[storage ${this.name}] close(): ${closeError}`);
}
if (result.path === SQLiteStorageDatabase.IN_MEMORY_PATH) {
if (connection.isInMemory) {
return resolve(); // return early for in-memory DBs
}
if (this.isCorrupt) {
// If the DB is corrupt, make sure to rename the file so that we can start
// from a fresh DB or a previous backup on the next startup and not be stuck
// with a corrupt DB for ever.
if (connection.isCorrupted) {
this.logger.error(`[storage ${this.name}] close(): removing corrupt DB and trying to restore backup`);
return always(rename(result.path, this.toCorruptPath(result.path))
.then(() => rename(this.toBackupPath(result.path), result.path)), () => closeError ? reject(closeError) : resolve());
return always(rename(this.path, this.toCorruptPath(this.path))
.then(() => rename(this.toBackupPath(this.path), this.path)), () => closeError ? reject(closeError) : resolve());
}
if (closeError) {
......@@ -438,7 +444,7 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
// and the DB did not get corrupted during runtime, make a backup
// of the DB so that we can use it as fallback in case the actual
// DB becomes corrupt.
return this.backup(result).then(resolve, error => {
return this.backup(connection).then(resolve, error => {
this.logger.error(`[storage ${this.name}] backup(): ${error}`);
return resolve(); // ignore failing backup
......@@ -448,14 +454,14 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
});
}
private backup(db: IOpenDatabaseResult): Promise<void> {
if (db.path === SQLiteStorageDatabase.IN_MEMORY_PATH) {
private backup(db: IDatabaseConnection): Promise<void> {
if (db.isInMemory) {
return Promise.resolve(); // no backups when running in-memory
}
const backupPath = this.toBackupPath(db.path);
const backupPath = this.toBackupPath(this.path);
return copy(db.path, backupPath);
return copy(this.path, backupPath);
}
private toBackupPath(path: string): string {
......@@ -465,26 +471,26 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
checkIntegrity(full: boolean): Promise<string> {
this.logger.trace(`[storage ${this.name}] checkIntegrity(full: ${full})`);
return this.whenOpened.then(({ db }) => {
return this.get(db, full ? 'PRAGMA integrity_check' : 'PRAGMA quick_check').then(row => {
return this.whenConnected.then(connection => {
return this.get(connection, full ? 'PRAGMA integrity_check' : 'PRAGMA quick_check').then(row => {
return full ? row['integrity_check'] : row['quick_check'];
});
});
}
private open(path: string): Promise<IOpenDatabaseResult> {
private connect(path: string): Promise<IDatabaseConnection> {
this.logger.trace(`[storage ${this.name}] open()`);
return new Promise((resolve, reject) => {
const fallbackToInMemoryDatabase = (error: Error) => {
this.handleSQLiteError(error, `[storage ${this.name}] open(): Error (open DB): ${error}. Falling back to in-memory DB`);
this.logger.error(`[storage ${this.name}] open(): Error (open DB): ${error}. Falling back to in-memory DB`);
// In case of any error to open the DB, use an in-memory
// DB so that we always have a valid DB to talk to.
this.doOpen(SQLiteStorageDatabase.IN_MEMORY_PATH).then(resolve, reject);
this.doConnect(SQLiteStorageDatabase.IN_MEMORY_PATH).then(resolve, reject);
};
this.doOpen(path).then(resolve, error => {
return this.doConnect(path).then(resolve, error => {
// This error code should only arise if another process is locking the same DB we
// want to open at that time. This typically never happens because a DB connection
......@@ -492,13 +498,23 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
// that the previous connection was not properly closed while the new connection is
// already established.
if (error.code === 'SQLITE_BUSY') {
return this.handleSQLiteBusy(path, error).then(resolve, fallbackToInMemoryDatabase);
this.logger.error(`[storage ${this.name}] open(): Retrying after ${SQLiteStorageDatabase.BUSY_OPEN_TIMEOUT}ms due to SQLITE_BUSY`);
// Retry after some time if the DB is busy
return timeout(SQLiteStorageDatabase.BUSY_OPEN_TIMEOUT).then(() => this.doConnect(path)).then(resolve, fallbackToInMemoryDatabase);
}
// This error code indicates that even though the DB file exists,
// SQLite cannot open it and signals it is corrupt or not a DB.
if (error.code === 'SQLITE_CORRUPT' || error.code === 'SQLITE_NOTADB') {
return this.handleSQLiteCorrupt(path, error).then(resolve, fallbackToInMemoryDatabase);
this.logger.error(`[storage ${this.name}] open(): Unable to open DB due to ${error.code}`);
// Move corrupt DB to a different filename and try to load from backup
// If that fails, a new empty DB is being created automatically
return rename(path, this.toCorruptPath(path))
.then(() => renameIgnoreError(this.toBackupPath(path), path))
.then(() => this.doConnect(path))
.then(resolve, fallbackToInMemoryDatabase);
}
// Otherwise give up and fallback to in-memory DB
......@@ -507,26 +523,11 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
});
}
private handleSQLiteBusy(path: string, error: Error & { code?: string }): Promise<IOpenDatabaseResult> {
this.handleSQLiteError(error, `[storage ${this.name}] open(): Retrying after ${SQLiteStorageDatabase.BUSY_OPEN_TIMEOUT}ms due to SQLITE_BUSY`);
private handleSQLiteError(connection: IDatabaseConnection, error: Error & { code?: string }, msg: string): void {
connection.isErroneous = true;
// Retry after some time if the DB is busy
return timeout(SQLiteStorageDatabase.BUSY_OPEN_TIMEOUT).then(() => this.doOpen(path));
}
private handleSQLiteCorrupt(path: string, error: Error & { code?: string }): Promise<IOpenDatabaseResult> {
this.handleSQLiteError(error, `[storage ${this.name}] open(): Unable to open DB due to ${error.code}`);
// Move corrupt DB to a different filename and try to load from backup
// If that fails, a new empty DB is being created automatically
return rename(path, this.toCorruptPath(path))
.then(() => renameIgnoreError(this.toBackupPath(path), path))
.then(() => this.doOpen(path));
}
private handleSQLiteError(error: Error & { code?: string }, msg: string): void {
if (error.code === 'SQLITE_CORRUPT' || error.code === 'SQLITE_NOTADB') {
this.isCorrupt = true;
connection.isCorrupted = true;
}
this.logger.error(msg);
......@@ -538,10 +539,7 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
return `${path}.${randomSuffix}.corrupt`;
}
private doOpen(path: string): Promise<IOpenDatabaseResult> {
// Reset flags when we open a DB
this.isCorrupt = false;
private doConnect(path: string): Promise<IDatabaseConnection> {
// TODO@Ben clean up performance markers
return new Promise((resolve, reject) => {
......@@ -557,45 +555,48 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
mark('didRequireSQLite');
}
const db: Database = new (this.logger.isTracing ? sqlite3.verbose().Database : sqlite3.Database)(path, error => {
const connection: IDatabaseConnection = {
db: new (this.logger.isTracing ? sqlite3.verbose().Database : sqlite3.Database)(path, error => {
if (error) {
return db ? db.close(() => reject(error)) : reject(error);
return connection.db ? connection.db.close(() => reject(error)) : reject(error);
}
// The following exec() statement serves two purposes:
// - create the DB if it does not exist yet
// - validate that the DB is not corrupt (the open() call does not throw otherwise)
mark('willSetupSQLiteSchema');
this.exec(db, [
this.exec(connection, [
'PRAGMA user_version = 1;',
'CREATE TABLE IF NOT EXISTS ItemTable (key TEXT UNIQUE ON CONFLICT REPLACE, value BLOB)'
].join('')).then(() => {
mark('didSetupSQLiteSchema');
return resolve({ path, db });
return resolve(connection);
}, error => {
mark('didSetupSQLiteSchema');
return db.close(() => reject(error));
});
return connection.db.close(() => reject(error));
});
}),
isInMemory: path === SQLiteStorageDatabase.IN_MEMORY_PATH
};
// Errors
db.on('error', error => this.handleSQLiteError(error, `[storage ${this.name}] Error (event): ${error}`));
connection.db.on('error', error => this.handleSQLiteError(connection, error, `[storage ${this.name}] Error (event): ${error}`));
// Tracing
if (this.logger.isTracing) {
db.on('trace', sql => this.logger.trace(`[storage ${this.name}] Trace (event): ${sql}`));
connection.db.on('trace', sql => this.logger.trace(`[storage ${this.name}] Trace (event): ${sql}`));
}
});
});
}
private exec(db: Database, sql: string): Promise<void> {
private exec(connection: IDatabaseConnection, sql: string): Promise<void> {
return new Promise((resolve, reject) => {
db.exec(sql, error => {
connection.db.exec(sql, error => {
if (error) {
this.handleSQLiteError(error, `[storage ${this.name}] exec(): ${error}`);
this.handleSQLiteError(connection, error, `[storage ${this.name}] exec(): ${error}`);
return reject(error);
}
......@@ -605,11 +606,11 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
});
}
private get(db: Database, sql: string): Promise<object> {
private get(connection: IDatabaseConnection, sql: string): Promise<object> {
return new Promise((resolve, reject) => {
db.get(sql, (error, row) => {
connection.db.get(sql, (error, row) => {
if (error) {
this.handleSQLiteError(error, `[storage ${this.name}] get(): ${error}`);
this.handleSQLiteError(connection, error, `[storage ${this.name}] get(): ${error}`);
return reject(error);
}
......@@ -619,11 +620,11 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
});
}
private all(db: Database, sql: string): Promise<{ key: string, value: string }[]> {
private all(connection: IDatabaseConnection, sql: string): Promise<{ key: string, value: string }[]> {
return new Promise((resolve, reject) => {
db.all(sql, (error, rows) => {
connection.db.all(sql, (error, rows) => {
if (error) {
this.handleSQLiteError(error, `[storage ${this.name}] all(): ${error}`);
this.handleSQLiteError(connection, error, `[storage ${this.name}] all(): ${error}`);
return reject(error);
}
......@@ -633,16 +634,16 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
});
}
private transaction(db: Database, transactions: () => void): Promise<void> {
private transaction(connection: IDatabaseConnection, transactions: () => void): Promise<void> {
return new Promise((resolve, reject) => {
db.serialize(() => {
db.run('BEGIN TRANSACTION');
connection.db.serialize(() => {
connection.db.run('BEGIN TRANSACTION');
transactions();
db.run('END TRANSACTION', error => {
connection.db.run('END TRANSACTION', error => {
if (error) {
this.handleSQLiteError(error, `[storage ${this.name}] transaction(): ${error}`);
this.handleSQLiteError(connection, error, `[storage ${this.name}] transaction(): ${error}`);
return reject(error);
}
......@@ -653,11 +654,11 @@ export class SQLiteStorageDatabase implements IStorageDatabase {
});
}
private prepare(db: Database, sql: string, runCallback: (stmt: Statement) => void, errorDetails: () => string): void {
const stmt = db.prepare(sql);
private prepare(connection: IDatabaseConnection, sql: string, runCallback: (stmt: Statement) => void, errorDetails: () => string): void {
const stmt = connection.db.prepare(sql);
const statementErrorListener = error => {
this.handleSQLiteError(error, `[storage ${this.name}] prepare(): ${error} (${sql}). Details: ${errorDetails()}`);
this.handleSQLiteError(connection, error, `[storage ${this.name}] prepare(): ${error} (${sql}). Details: ${errorDetails()}`);
};
stmt.on('error', statementErrorListener);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册