diff --git a/src/bio.c b/src/bio.c index fa10a503cfb58a58729dd3221308be00d80af250..da11f7b868bb4f82136f2be05dd1163047f0a805 100644 --- a/src/bio.c +++ b/src/bio.c @@ -85,6 +85,8 @@ struct bio_job { void *bioProcessBackgroundJobs(void *arg); void lazyfreeFreeObjectFromBioThread(robj *o); +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2); +void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl); /* Make sure we have enough stack to perform all the things we do in the * main thread. */ @@ -187,7 +189,16 @@ void *bioProcessBackgroundJobs(void *arg) { } else if (type == BIO_AOF_FSYNC) { aof_fsync((long)job->arg1); } else if (type == BIO_LAZY_FREE) { - lazyfreeFreeObjectFromBioThread(job->arg1); + /* What we free changes depending on what arguments are set: + * arg1 -> free the object at pointer. + * arg2 & arg3 -> free two dictionaries (a Redis DB). + * only arg3 -> free the skiplist. */ + if (job->arg1) + lazyfreeFreeObjectFromBioThread(job->arg1); + else if (job->arg2 && job->arg3) + lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); + else if (job->arg3) + lazyfreeFreeSlotsMapFromBioThread(job->arg3); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } diff --git a/src/cluster.c b/src/cluster.c index 6fdc221755f08b8e116a0eb02df94f13d4773b7c..17e86a6c0035cea6488f56c09fa52bda17210b7e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -495,7 +495,7 @@ void clusterReset(int hard) { if (nodeIsSlave(myself)) { clusterSetNodeAsMaster(myself); replicationUnsetMaster(); - emptyDb(NULL); + emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); } /* Close slots, reset manual failover state. */ diff --git a/src/db.c b/src/db.c index d4b215e5328a8213b2040a5dedc6c49dccd609b0..0bfa146959f27dd76b97944d7e3910f01fc73041 100644 --- a/src/db.c +++ b/src/db.c @@ -29,6 +29,7 @@ #include "server.h" #include "cluster.h" +#include "atomicvar.h" #include #include @@ -238,16 +239,46 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { return o; } -long long emptyDb(void(callback)(void*)) { - int j; +/* Remove all keys from all the databases in a Redis server. + * If callback is given the function is called from time to time to + * signal that work is in progress. + * + * The dbnum can be -1 if all teh DBs should be flushed, or the specified + * DB number if we want to flush only a single Redis database number. + * + * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or + * EMPTYDB_ASYCN if we want the memory to be freed in a different thread + * and the function to return ASAP. + * + * On success the fuction returns the number of keys removed from the + * database(s). Otherwise -1 is returned in the specific case the + * DB number is out of range, and errno is set to EINVAL. */ +long long emptyDb(int dbnum, int flags, void(callback)(void*)) { + int j, async = (flags & EMPTYDB_ASYNC); long long removed = 0; + if (dbnum < -1 || dbnum >= server.dbnum) { + errno = EINVAL; + return -1; + } + for (j = 0; j < server.dbnum; j++) { + if (dbnum != 1 && dbnum != j) continue; removed += dictSize(server.db[j].dict); - dictEmpty(server.db[j].dict,callback); - dictEmpty(server.db[j].expires,callback); + if (async) { + emptyDbAsync(&server.db[j]); + } else { + dictEmpty(server.db[j].dict,callback); + dictEmpty(server.db[j].expires,callback); + } + } + if (server.cluster_enabled) { + if (async) { + slotToKeyFlushAsync(); + } else { + slotToKeyFlush(); + } } - if (server.cluster_enabled) slotToKeyFlush(); return removed; } @@ -290,7 +321,7 @@ void flushdbCommand(client *c) { void flushallCommand(client *c) { signalFlushedDb(-1); - server.dirty += emptyDb(NULL); + server.dirty += emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); addReply(c,shared.ok); if (server.rdb_child_pid != -1) { kill(server.rdb_child_pid,SIGUSR1); diff --git a/src/debug.c b/src/debug.c index 463b7f374d2c9af382376499ca22335b8d013f6e..48a1b513771a8b5d274df639f3ceb2fb0589e07a 100644 --- a/src/debug.c +++ b/src/debug.c @@ -271,7 +271,7 @@ void debugCommand(client *c) { addReply(c,shared.err); return; } - emptyDb(NULL); + emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); if (rdbLoad(server.rdb_filename) != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); return; @@ -279,7 +279,7 @@ void debugCommand(client *c) { serverLog(LL_WARNING,"DB reloaded by DEBUG RELOAD"); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) { - emptyDb(NULL); + emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); if (loadAppendOnlyFile(server.aof_filename) != C_OK) { addReply(c,shared.err); return; diff --git a/src/lazyfree.c b/src/lazyfree.c index f94edfd0af2612ead3ca9f56e653350ea10550fa..321bd5411f20e9bd6e5ca110827802a5320c0acd 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -1,6 +1,7 @@ #include "server.h" #include "bio.h" #include "atomicvar.h" +#include "cluster.h" static size_t lazyfree_objects = 0; pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -75,9 +76,51 @@ int dbAsyncDelete(redisDb *db, robj *key) { } } -/* Implementation of function to release a single object called from the - * lazyfree thread from bio.c. */ +/* Empty a Redis DB asynchronously. What the function does actually is to + * create a new empty set of hash tables and scheduling the old ones for + * lazy freeing. */ +void emptyDbAsync(redisDb *db) { + dict *oldht1 = db->dict, *oldht2 = db->expires; + db->dict = dictCreate(&dbDictType,NULL); + db->expires = dictCreate(&keyptrDictType,NULL); + atomicIncr(lazyfree_objects,dictSize(oldht1), + &lazyfree_objects_mutex); + bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); +} + +/* Empty the slots-keys map of Redis CLuster by creating a new empty one + * and scheduiling the old for lazy freeing. */ +void slotToKeyFlushAsync(void) { + zskiplist *oldsl = server.cluster->slots_to_keys; + server.cluster->slots_to_keys = zslCreate(); + atomicIncr(lazyfree_objects,oldsl->length, + &lazyfree_objects_mutex); + bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,oldsl); +} + +/* Release objects from the lazyfree thread. It's just decrRefCount() + * updating the count of objects to release. */ void lazyfreeFreeObjectFromBioThread(robj *o) { decrRefCount(o); atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex); } + +/* Release a database from the lazyfree thread. The 'db' pointer is the + * database which was substitutied with a fresh one in the main thread + * when the database was logically deleted. 'sl' is a skiplist used by + * Redis Cluster in order to take the hash slots -> keys mapping. This + * may be NULL if Redis Cluster is disabled. */ +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { + size_t numkeys = dictSize(ht1); + dictRelease(ht1); + dictRelease(ht2); + atomicDecr(lazyfree_objects,numkeys,&lazyfree_objects_mutex); +} + +/* Release the skiplist mapping Redis Cluster keys to slots in the + * lazyfree thread. */ +void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl) { + size_t len = sl->length; + zslFree(sl); + atomicDecr(lazyfree_objects,len,&lazyfree_objects_mutex); +} diff --git a/src/replication.c b/src/replication.c index 7ab646362a6bce1dd10740bf1493ed4480646a3a..5a61f4d995a2615242acd2e8b5e4824510d2728b 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1111,7 +1111,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); signalFlushedDb(-1); - emptyDb(replicationEmptyDbCallback); + emptyDb(-1,EMPTYDB_NO_FLAGS,replicationEmptyDbCallback); /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to diff --git a/src/server.h b/src/server.h index 5b5bc9887fdfbaccbb68755ab5acb546c8f60b9b..7ba9a4325cc9eac38b576236891ee2a9d3477875 100644 --- a/src/server.h +++ b/src/server.h @@ -1050,6 +1050,7 @@ extern dictType shaScriptObjectDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; extern dictType hashDictType; extern dictType replScriptCacheDictType; +extern dictType keyptrDictType; /*----------------------------------------------------------------------------- * Functions prototypes @@ -1384,7 +1385,11 @@ robj *dbRandomKey(redisDb *db); int dbSyncDelete(redisDb *db, robj *key); int dbDelete(redisDb *db, robj *key); robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); -long long emptyDb(void(callback)(void*)); + +#define EMPTYDB_NO_FLAGS 0 /* No flags. */ +#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ +long long emptyDb(int dbnum, int flags, void(callback)(void*)); + int selectDb(client *c, int id); void signalModifiedKey(redisDb *db, robj *key); void signalFlushedDb(int dbid); @@ -1407,6 +1412,8 @@ void slotToKeyFlush(void); #define LAZYFREE_STEP_OOM 2 /* Free a few elements at any cost if there is something to free: we are out of memory */ int dbAsyncDelete(redisDb *db, robj *key); +void emptyDbAsync(redisDb *db); +void slotToKeyFlushAsync(void); /* API to get key arguments from commands */ int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);