提交 dd34f703 编写于 作者: G Guy Benoish 提交者: antirez

Diskless-load emptyDb-related fixes

1. Call emptyDb even in case of diskless-load: We want modules
   to get the same FLUSHDB event as disk-based replication.
2. Do not fire any module events when flushing the backups array.
3. Delete redundant call to signalFlushedDb (Called from emptyDb).
上级 5e042dbc
...@@ -347,7 +347,10 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { ...@@ -347,7 +347,10 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
* DB number if we want to flush only a single Redis database number. * DB number if we want to flush only a single Redis database number.
* *
* Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
* EMPTYDB_ASYNC if we want the memory to be freed in a different thread * 1. EMPTYDB_ASYNC if we want the memory to be freed in a different thread.
* 2. EMPTYDB_BACKUP if we want to empty the backup dictionaries created by
* disklessLoadMakeBackups. In that case we only free memory and avoid
* firing module events.
* and the function to return ASAP. * and the function to return ASAP.
* *
* On success the fuction returns the number of keys removed from the * On success the fuction returns the number of keys removed from the
...@@ -355,6 +358,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { ...@@ -355,6 +358,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
* DB number is out of range, and errno is set to EINVAL. */ * DB number is out of range, and errno is set to EINVAL. */
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) { long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) {
int async = (flags & EMPTYDB_ASYNC); int async = (flags & EMPTYDB_ASYNC);
int backup = (flags & EMPTYDB_BACKUP); /* Just free the memory, nothing else */
RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
long long removed = 0; long long removed = 0;
if (dbnum < -1 || dbnum >= server.dbnum) { if (dbnum < -1 || dbnum >= server.dbnum) {
...@@ -362,16 +367,18 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( ...@@ -362,16 +367,18 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(
return -1; return -1;
} }
/* Fire the flushdb modules event. */ /* Pre-flush actions */
RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; if (!backup) {
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, /* Fire the flushdb modules event. */
REDISMODULE_SUBEVENT_FLUSHDB_START, moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
&fi); REDISMODULE_SUBEVENT_FLUSHDB_START,
&fi);
/* Make sure the WATCHed keys are affected by the FLUSH* commands.
* Note that we need to call the function while the keys are still /* Make sure the WATCHed keys are affected by the FLUSH* commands.
* there. */ * Note that we need to call the function while the keys are still
signalFlushedDb(dbnum); * there. */
signalFlushedDb(dbnum);
}
int startdb, enddb; int startdb, enddb;
if (dbnum == -1) { if (dbnum == -1) {
...@@ -390,20 +397,24 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( ...@@ -390,20 +397,24 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(
dictEmpty(dbarray[j].expires,callback); dictEmpty(dbarray[j].expires,callback);
} }
} }
if (server.cluster_enabled) {
if (async) { /* Post-flush actions */
slotToKeyFlushAsync(); if (!backup) {
} else { if (server.cluster_enabled) {
slotToKeyFlush(); if (async) {
slotToKeyFlushAsync();
} else {
slotToKeyFlush();
}
} }
} if (dbnum == -1) flushSlaveKeysWithExpireList();
if (dbnum == -1) flushSlaveKeysWithExpireList();
/* Also fire the end event. Note that this event will fire almost /* Also fire the end event. Note that this event will fire almost
* immediately after the start event if the flush is asynchronous. */ * immediately after the start event if the flush is asynchronous. */
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
REDISMODULE_SUBEVENT_FLUSHDB_END, REDISMODULE_SUBEVENT_FLUSHDB_END,
&fi); &fi);
}
return removed; return removed;
} }
......
...@@ -1339,8 +1339,8 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags ...@@ -1339,8 +1339,8 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags
server.db[i] = backup[i]; server.db[i] = backup[i];
} }
} else { } else {
/* Delete. */ /* Delete (Pass EMPTYDB_BACKUP in order to avoid firing module events) . */
emptyDbGeneric(backup,-1,empty_db_flags,replicationEmptyDbCallback); emptyDbGeneric(backup,-1,empty_db_flags|EMPTYDB_BACKUP,replicationEmptyDbCallback);
for (int i=0; i<server.dbnum; i++) { for (int i=0; i<server.dbnum; i++) {
dictRelease(backup[i].dict); dictRelease(backup[i].dict);
dictRelease(backup[i].expires); dictRelease(backup[i].expires);
...@@ -1524,7 +1524,6 @@ void readSyncBulkPayload(connection *conn) { ...@@ -1524,7 +1524,6 @@ void readSyncBulkPayload(connection *conn) {
/* We need to stop any AOF rewriting child before flusing and parsing /* We need to stop any AOF rewriting child before flusing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */ * the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly(); if (server.aof_state != AOF_OFF) stopAppendOnly();
signalFlushedDb(-1);
/* When diskless RDB loading is used by replicas, it may be configured /* When diskless RDB loading is used by replicas, it may be configured
* in order to save the current DB instead of throwing it away, * in order to save the current DB instead of throwing it away,
...@@ -1532,10 +1531,15 @@ void readSyncBulkPayload(connection *conn) { ...@@ -1532,10 +1531,15 @@ void readSyncBulkPayload(connection *conn) {
if (use_diskless_load && if (use_diskless_load &&
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
{ {
/* Create a backup of server.db[] and initialize to empty
* dictionaries */
diskless_load_backup = disklessLoadMakeBackups(); diskless_load_backup = disklessLoadMakeBackups();
} else {
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
} }
/* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
* (Where disklessLoadMakeBackups left server.db empty) because we
* want to execute all the auxiliary logic of emptyDb (Namely,
* fire module events) */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable /* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since * handler, otherwise it will get called recursively since
......
...@@ -2045,6 +2045,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); ...@@ -2045,6 +2045,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_NO_FLAGS 0 /* No flags. */
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
#define EMPTYDB_BACKUP (1<<2) /* DB array is a backup for REPL_DISKLESS_LOAD_SWAPDB. */
long long emptyDb(int dbnum, int flags, void(callback)(void*)); long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)); long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
void flushAllDataAndResetRDB(int flags); void flushAllDataAndResetRDB(int flags);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册