提交 c69c6c80 编写于 作者: A antirez

Lazyfree: ability to free whole DBs in background.

上级 b08c36c5
......@@ -85,6 +85,8 @@ struct bio_job {
void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl);
/* Make sure we have enough stack to perform all the things we do in the
* main thread. */
......@@ -187,7 +189,16 @@ void *bioProcessBackgroundJobs(void *arg) {
} else if (type == BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
lazyfreeFreeObjectFromBioThread(job->arg1);
/* What we free changes depending on what arguments are set:
* arg1 -> free the object at pointer.
* arg2 & arg3 -> free two dictionaries (a Redis DB).
* only arg3 -> free the skiplist. */
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
......
......@@ -495,7 +495,7 @@ void clusterReset(int hard) {
if (nodeIsSlave(myself)) {
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();
emptyDb(NULL);
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
}
/* Close slots, reset manual failover state. */
......
......@@ -29,6 +29,7 @@
#include "server.h"
#include "cluster.h"
#include "atomicvar.h"
#include <signal.h>
#include <ctype.h>
......@@ -238,16 +239,46 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
return o;
}
long long emptyDb(void(callback)(void*)) {
int j;
/* Remove all keys from all the databases in a Redis server.
* If callback is given the function is called from time to time to
* signal that work is in progress.
*
* The dbnum can be -1 if all teh DBs should be flushed, or the specified
* DB number if we want to flush only a single Redis database number.
*
* Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
* EMPTYDB_ASYCN if we want the memory to be freed in a different thread
* and the function to return ASAP.
*
* On success the fuction returns the number of keys removed from the
* database(s). Otherwise -1 is returned in the specific case the
* DB number is out of range, and errno is set to EINVAL. */
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
int j, async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
if (dbnum < -1 || dbnum >= server.dbnum) {
errno = EINVAL;
return -1;
}
for (j = 0; j < server.dbnum; j++) {
if (dbnum != 1 && dbnum != j) continue;
removed += dictSize(server.db[j].dict);
dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires,callback);
if (async) {
emptyDbAsync(&server.db[j]);
} else {
dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires,callback);
}
}
if (server.cluster_enabled) {
if (async) {
slotToKeyFlushAsync();
} else {
slotToKeyFlush();
}
}
if (server.cluster_enabled) slotToKeyFlush();
return removed;
}
......@@ -290,7 +321,7 @@ void flushdbCommand(client *c) {
void flushallCommand(client *c) {
signalFlushedDb(-1);
server.dirty += emptyDb(NULL);
server.dirty += emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
addReply(c,shared.ok);
if (server.rdb_child_pid != -1) {
kill(server.rdb_child_pid,SIGUSR1);
......
......@@ -271,7 +271,7 @@ void debugCommand(client *c) {
addReply(c,shared.err);
return;
}
emptyDb(NULL);
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
if (rdbLoad(server.rdb_filename) != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");
return;
......@@ -279,7 +279,7 @@ void debugCommand(client *c) {
serverLog(LL_WARNING,"DB reloaded by DEBUG RELOAD");
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
emptyDb(NULL);
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
if (loadAppendOnlyFile(server.aof_filename) != C_OK) {
addReply(c,shared.err);
return;
......
#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "cluster.h"
static size_t lazyfree_objects = 0;
pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
......@@ -75,9 +76,51 @@ int dbAsyncDelete(redisDb *db, robj *key) {
}
}
/* Implementation of function to release a single object called from the
* lazyfree thread from bio.c. */
/* Empty a Redis DB asynchronously. What the function does actually is to
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->dict, *oldht2 = db->expires;
db->dict = dictCreate(&dbDictType,NULL);
db->expires = dictCreate(&keyptrDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1),
&lazyfree_objects_mutex);
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
}
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
* and scheduiling the old for lazy freeing. */
void slotToKeyFlushAsync(void) {
zskiplist *oldsl = server.cluster->slots_to_keys;
server.cluster->slots_to_keys = zslCreate();
atomicIncr(lazyfree_objects,oldsl->length,
&lazyfree_objects_mutex);
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,oldsl);
}
/* Release objects from the lazyfree thread. It's just decrRefCount()
* updating the count of objects to release. */
void lazyfreeFreeObjectFromBioThread(robj *o) {
decrRefCount(o);
atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);
}
/* Release a database from the lazyfree thread. The 'db' pointer is the
* database which was substitutied with a fresh one in the main thread
* when the database was logically deleted. 'sl' is a skiplist used by
* Redis Cluster in order to take the hash slots -> keys mapping. This
* may be NULL if Redis Cluster is disabled. */
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
size_t numkeys = dictSize(ht1);
dictRelease(ht1);
dictRelease(ht2);
atomicDecr(lazyfree_objects,numkeys,&lazyfree_objects_mutex);
}
/* Release the skiplist mapping Redis Cluster keys to slots in the
* lazyfree thread. */
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl) {
size_t len = sl->length;
zslFree(sl);
atomicDecr(lazyfree_objects,len,&lazyfree_objects_mutex);
}
......@@ -1111,7 +1111,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
signalFlushedDb(-1);
emptyDb(replicationEmptyDbCallback);
emptyDb(-1,EMPTYDB_NO_FLAGS,replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
......
......@@ -1050,6 +1050,7 @@ extern dictType shaScriptObjectDictType;
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
extern dictType hashDictType;
extern dictType replScriptCacheDictType;
extern dictType keyptrDictType;
/*-----------------------------------------------------------------------------
* Functions prototypes
......@@ -1384,7 +1385,11 @@ robj *dbRandomKey(redisDb *db);
int dbSyncDelete(redisDb *db, robj *key);
int dbDelete(redisDb *db, robj *key);
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
long long emptyDb(void(callback)(void*));
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
int selectDb(client *c, int id);
void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid);
......@@ -1407,6 +1412,8 @@ void slotToKeyFlush(void);
#define LAZYFREE_STEP_OOM 2 /* Free a few elements at any cost if there
is something to free: we are out of memory */
int dbAsyncDelete(redisDb *db, robj *key);
void emptyDbAsync(redisDb *db);
void slotToKeyFlushAsync(void);
/* API to get key arguments from commands */
int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册