提交 0c05436c 编写于 作者: A antirez

Lazyfree: a first implementation of non blocking DEL.

上级 712ea729
...@@ -117,7 +117,7 @@ endif ...@@ -117,7 +117,7 @@ endif
REDIS_SERVER_NAME=redis-server REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel REDIS_SENTINEL_NAME=redis-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o
REDIS_GEOHASH_OBJ=../deps/geohash-int/geohash.o ../deps/geohash-int/geohash_helper.o REDIS_GEOHASH_OBJ=../deps/geohash-int/geohash.o ../deps/geohash-int/geohash_helper.o
REDIS_CLI_NAME=redis-cli REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
......
...@@ -33,10 +33,6 @@ ...@@ -33,10 +33,6 @@
#include <signal.h> #include <signal.h>
#include <ctype.h> #include <ctype.h>
void slotToKeyAdd(robj *key);
void slotToKeyDel(robj *key);
void slotToKeyFlush(void);
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* C-level DB API * C-level DB API
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
...@@ -184,7 +180,7 @@ robj *dbRandomKey(redisDb *db) { ...@@ -184,7 +180,7 @@ robj *dbRandomKey(redisDb *db) {
} }
/* Delete a key, value, and associated expiration entry if any, from the DB */ /* Delete a key, value, and associated expiration entry if any, from the DB */
int dbDelete(redisDb *db, robj *key) { int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of /* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */ * the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
...@@ -196,6 +192,14 @@ int dbDelete(redisDb *db, robj *key) { ...@@ -196,6 +192,14 @@ int dbDelete(redisDb *db, robj *key) {
} }
} }
/* This is a wrapper whose behavior depends on the Redis lazy free
* configuration. Deletes the key synchronously or asynchronously. */
int dbDelete(redisDb *db, robj *key) {
int async = 1; /* TODO: Fixme making this a proper option. */
if (async) return dbAsyncDelete(db,key);
else return dbSyncDelete(db,key);
}
/* Prepare the string object stored at 'key' to be modified destructively /* Prepare the string object stored at 'key' to be modified destructively
* to implement commands like SETBIT or APPEND. * to implement commands like SETBIT or APPEND.
* *
...@@ -302,20 +306,31 @@ void flushallCommand(client *c) { ...@@ -302,20 +306,31 @@ void flushallCommand(client *c) {
server.dirty++; server.dirty++;
} }
void delCommand(client *c) { /* This command implements DEL and LAZYDEL. */
int deleted = 0, j; void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;
for (j = 1; j < c->argc; j++) { for (j = 1; j < c->argc; j++) {
expireIfNeeded(c->db,c->argv[j]); expireIfNeeded(c->db,c->argv[j]);
if (dbDelete(c->db,c->argv[j])) { int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
signalModifiedKey(c->db,c->argv[j]); signalModifiedKey(c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC, notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id); "del",c->argv[j],c->db->id);
server.dirty++; server.dirty++;
deleted++; numdel++;
} }
} }
addReplyLongLong(c,deleted); addReplyLongLong(c,numdel);
}
void delCommand(client *c) {
delGenericCommand(c,0);
}
void unlinkCommand(client *c) {
delGenericCommand(c,1);
} }
/* EXISTS key1 key2 ... key_N. /* EXISTS key1 key2 ... key_N.
......
...@@ -855,7 +855,7 @@ unsigned long dictScan(dict *d, ...@@ -855,7 +855,7 @@ unsigned long dictScan(dict *d,
void *privdata) void *privdata)
{ {
dictht *t0, *t1; dictht *t0, *t1;
const dictEntry *de; const dictEntry *de, *next;
unsigned long m0, m1; unsigned long m0, m1;
if (dictSize(d) == 0) return 0; if (dictSize(d) == 0) return 0;
...@@ -867,8 +867,9 @@ unsigned long dictScan(dict *d, ...@@ -867,8 +867,9 @@ unsigned long dictScan(dict *d,
/* Emit entries at cursor */ /* Emit entries at cursor */
de = t0->table[v & m0]; de = t0->table[v & m0];
while (de) { while (de) {
next = de->next;
fn(privdata, de); fn(privdata, de);
de = de->next; de = next;
} }
} else { } else {
...@@ -887,8 +888,9 @@ unsigned long dictScan(dict *d, ...@@ -887,8 +888,9 @@ unsigned long dictScan(dict *d,
/* Emit entries at cursor */ /* Emit entries at cursor */
de = t0->table[v & m0]; de = t0->table[v & m0];
while (de) { while (de) {
next = de->next;
fn(privdata, de); fn(privdata, de);
de = de->next; de = next;
} }
/* Iterate over indices in larger table that are the expansion /* Iterate over indices in larger table that are the expansion
...@@ -897,8 +899,9 @@ unsigned long dictScan(dict *d, ...@@ -897,8 +899,9 @@ unsigned long dictScan(dict *d,
/* Emit entries at cursor */ /* Emit entries at cursor */
de = t1->table[v & m1]; de = t1->table[v & m1];
while (de) { while (de) {
next = de->next;
fn(privdata, de); fn(privdata, de);
de = de->next; de = next;
} }
/* Increment bits not covered by the smaller mask */ /* Increment bits not covered by the smaller mask */
......
...@@ -78,7 +78,7 @@ typedef struct dict { ...@@ -78,7 +78,7 @@ typedef struct dict {
void *privdata; void *privdata;
dictht ht[2]; dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */ long rehashidx; /* rehashing not in progress if rehashidx == -1 */
int iterators; /* number of iterators currently running */ unsigned long iterators; /* number of iterators currently running */
} dict; } dict;
/* If safe is set to 1 this is a safe iterator, that means, you can call /* If safe is set to 1 this is a safe iterator, that means, you can call
......
#include "server.h"
/* Initialization of the lazy free engine. Must be called only once at server
* startup. */
void initLazyfreeEngine(void) {
server.lazyfree_dbs = listCreate();
server.lazyfree_obj = listCreate();
server.lazyfree_elements = 0;
}
/* Return the amount of work needed in order to free an object.
* The return value is not always the actual number of allocations the
* object is compoesd of, but a number proportional to it.
*
* For strings the function always returns 1.
*
* For aggregated objects represented by hash tables or other data structures
* the function just returns the number of elements the object is composed of.
*
* Objects composed of single allocations are always reported as having a
* single item even if they are actaully logical composed of multiple
* elements.
*
* For lists the funciton returns the number of elements in the quicklist
* representing the list. */
size_t lazyfreeGetFreeEffort(robj *obj) {
if (obj->type == OBJ_LIST) {
quicklist *ql = obj->ptr;
return ql->len;
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
zset *zs = obj->ptr;
return zs->zsl->length;
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else {
return 1; /* Everything else is a single allocation. */
}
}
/* This callback is used together with dictScan() in order to free a dict.c
* hash table incrementally. */
void lazyfreeScanCallback(void *privdata, const dictEntry *de) {
dict *ht = privdata;
long saved_iterators = ht->iterators;
ht->iterators = 1; /* Make sure no rehashing happens. */
dictDelete(ht,dictGetKey(de));
ht->iterators = saved_iterators;
}
/* Free some object from the lazy free list. */
#define LAZYFREE_ITER_PER_STEP 100
size_t lazyfreeFastStep(void) {
size_t maxiter = LAZYFREE_ITER_PER_STEP;
size_t workdone = 0;
robj *current = NULL;
while(maxiter--) {
if (current == NULL) {
listNode *ln = listFirst(server.lazyfree_obj);
if (ln == NULL) break; /* Nothing more to free. */
current = ln->value;
}
if ((current->type == OBJ_SET ||
current->type == OBJ_HASH) &&
current->encoding == OBJ_ENCODING_HT)
{
dict *ht = current->ptr;
size_t origsize = dictSize(ht);
ht->iterators = dictScan(ht,ht->iterators,lazyfreeScanCallback,ht);
workdone++; /* We are not sure how many elements we freed, even if
zero, the free list is non empty so we don't return
0 to the caller. */
server.lazyfree_elements -= (origsize - dictSize(ht));
if (dictSize(ht) == 0) {
decrRefCount(current);
listNode *ln = listFirst(server.lazyfree_obj);
listDelNode(server.lazyfree_obj,ln);
current = NULL;
}
} else {
/* Not handled type or encoding. Do a blocking free. */
size_t effort = lazyfreeGetFreeEffort(current);
server.lazyfree_elements -= effort;
workdone += effort;
decrRefCount(current);
listNode *ln = listFirst(server.lazyfree_obj);
listDelNode(server.lazyfree_obj,ln);
current = NULL;
}
}
return workdone;
}
/* Handles slow or fast collection steps. */
size_t lazyfreeStep(int type) {
if (type == LAZYFREE_STEP_FAST) return lazyfreeFastStep();
size_t totalwork = 0;
mstime_t end = mstime()+2;
do {
size_t workdone = lazyfreeFastStep();
if (workdone == 0) break;
totalwork += workdone;
} while(mstime() < end);
return totalwork;
}
/* Delete a key, value, and associated expiration entry if any, from the DB.
* If there are enough allocations to free the value object may be put into
* a lazy free list instead of being freed synchronously. The lazy free list
* will be reclaimed incrementally in a non blocking way. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);
/* If releasing the object is too much work, let's put it into the
* lazy free list. */
if (free_effort > LAZYFREE_THRESHOLD) {
listAddNodeTail(server.lazyfree_obj,val);
server.lazyfree_elements += free_effort;
dictSetVal(db->dict,de,NULL);
}
}
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}
/* This is the timer handler we use to incrementally perform collection
* into the lazy free lists. We can't use serverCron since we need a
* very high timer frequency when there are many objects to collect, while
* we lower the frequency to just 1HZ when there is nothing to do.
*
* Since a slow lazy free step will take 1.5 milliseconds and we modulate
* the timer frequency from 1 to 333 HZ in an adaptive way, the CPU
* used is between 0% (nothing in the lazy free list) to 50%.
*
* The frequency is obtained as follows: if the lazy free list is empty
* it is set to 1HZ. If the lazy free has elements the call period starts
* at 20 (50HZ) and is decremented (up to 3 ms = 333HZ) each time the server
* used memory raises between calls of this function. */
int lazyfreeCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
{
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);
static size_t prev_mem;
static int timer_period = 1000; /* Defauls to 1HZ */
static double mem_trend = 0;
size_t mem = zmalloc_used_memory();
/* Compute the memory trend, biased towards thinking memory is raising
* for a few calls every time previous and current memory raise. */
if (prev_mem < mem) mem_trend = 1;
mem_trend *= 0.9; /* Make it slowly forget. */
int mem_is_raising = mem_trend > .1;
/* Free a few items. */
size_t workdone = lazyfreeStep(LAZYFREE_STEP_SLOW);
/* Adjust this timer call frequency according to the current state. */
if (workdone) {
if (timer_period == 1000) timer_period = 20;
if (mem_is_raising && timer_period > 3)
timer_period--; /* Raise call frequency. */
else if (!mem_is_raising && timer_period < 20)
timer_period++; /* Lower call frequency. */
} else {
timer_period = 1000; /* 1 HZ */
}
prev_mem = mem;
#if 0
printf("%llu (%d hz) %s (%f)\n",
(unsigned long long)server.lazyfree_elements,
1000/timer_period,
mem_is_raising ? "RAISING" : "lowering",
mem_trend);
#endif
return timer_period;
}
...@@ -48,6 +48,23 @@ robj *createObject(int type, void *ptr) { ...@@ -48,6 +48,23 @@ robj *createObject(int type, void *ptr) {
return o; return o;
} }
/* Set a special refcount in the object to make it "shared":
* incrRefCount and decrRefCount() will test for this special refcount
* and will not touch the object. This way it is free to access shared
* objects such as small integers from different threads without any
* mutex.
*
* A common patter to create shared objects:
*
* robj *myobject = makeObjectShared(createObject(...));
*
*/
robj *makeObjectShared(robj *o) {
serverAssert(o->refcount == 1);
o->refcount = OBJ_SHARED_REFCOUNT;
return o;
}
/* Create a string object with encoding OBJ_ENCODING_RAW, that is a plain /* Create a string object with encoding OBJ_ENCODING_RAW, that is a plain
* string object where o->ptr points to a proper sds string. */ * string object where o->ptr points to a proper sds string. */
robj *createRawStringObject(const char *ptr, size_t len) { robj *createRawStringObject(const char *ptr, size_t len) {
...@@ -295,11 +312,10 @@ void freeHashObject(robj *o) { ...@@ -295,11 +312,10 @@ void freeHashObject(robj *o) {
} }
void incrRefCount(robj *o) { void incrRefCount(robj *o) {
o->refcount++; if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
} }
void decrRefCount(robj *o) { void decrRefCount(robj *o) {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount == 1) { if (o->refcount == 1) {
switch(o->type) { switch(o->type) {
case OBJ_STRING: freeStringObject(o); break; case OBJ_STRING: freeStringObject(o); break;
...@@ -311,7 +327,8 @@ void decrRefCount(robj *o) { ...@@ -311,7 +327,8 @@ void decrRefCount(robj *o) {
} }
zfree(o); zfree(o);
} else { } else {
o->refcount--; if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
} }
} }
......
...@@ -131,6 +131,7 @@ struct redisCommand redisCommandTable[] = { ...@@ -131,6 +131,7 @@ struct redisCommand redisCommandTable[] = {
{"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0}, {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
{"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}, {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
{"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
{"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0}, {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
{"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0}, {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
{"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0}, {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
...@@ -458,7 +459,7 @@ void dictObjectDestructor(void *privdata, void *val) ...@@ -458,7 +459,7 @@ void dictObjectDestructor(void *privdata, void *val)
{ {
DICT_NOTUSED(privdata); DICT_NOTUSED(privdata);
if (val == NULL) return; /* Values of swapped out keys as set to NULL */ if (val == NULL) return; /* Lazy freeing will set value to NULL. */
decrRefCount(val); decrRefCount(val);
} }
...@@ -1286,6 +1287,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) { ...@@ -1286,6 +1287,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* later in this function. */ * later in this function. */
if (server.cluster_enabled) clusterBeforeSleep(); if (server.cluster_enabled) clusterBeforeSleep();
/* Lazy free a few objects before to return to the event loop, this way
* if there is activity in the server (that may generate writes) we
* reclaim memory at a faster rate. */
lazyfreeStep(LAZYFREE_STEP_FAST);
/* Run a fast expire cycle (the called function will return /* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */ * ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && server.masterhost == NULL) if (server.active_expire_enabled && server.masterhost == NULL)
...@@ -1397,7 +1403,8 @@ void createSharedObjects(void) { ...@@ -1397,7 +1403,8 @@ void createSharedObjects(void) {
shared.lpop = createStringObject("LPOP",4); shared.lpop = createStringObject("LPOP",4);
shared.lpush = createStringObject("LPUSH",5); shared.lpush = createStringObject("LPUSH",5);
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] = createObject(OBJ_STRING,(void*)(long)j); shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
shared.integers[j]->encoding = OBJ_ENCODING_INT; shared.integers[j]->encoding = OBJ_ENCODING_INT;
} }
for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) { for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
...@@ -1794,6 +1801,7 @@ void initServer(void) { ...@@ -1794,6 +1801,7 @@ void initServer(void) {
server.system_memory_size = zmalloc_get_memory_size(); server.system_memory_size = zmalloc_get_memory_size();
createSharedObjects(); createSharedObjects();
initLazyfreeEngine();
adjustOpenFilesLimit(); adjustOpenFilesLimit();
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
server.db = zmalloc(sizeof(redisDb)*server.dbnum); server.db = zmalloc(sizeof(redisDb)*server.dbnum);
...@@ -1858,10 +1866,11 @@ void initServer(void) { ...@@ -1858,10 +1866,11 @@ void initServer(void) {
server.repl_good_slaves_count = 0; server.repl_good_slaves_count = 0;
updateCachedTime(); updateCachedTime();
/* Create the serverCron() time event, that's our main way to process /* Create out timers, that's our main way to process background
* background operations. */ * operations. */
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR ||
serverPanic("Can't create the serverCron time event."); aeCreateTimeEvent(server.el, 1, lazyfreeCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1); exit(1);
} }
...@@ -3268,10 +3277,15 @@ int freeMemoryIfNeeded(void) { ...@@ -3268,10 +3277,15 @@ int freeMemoryIfNeeded(void) {
size_t mem_used, mem_tofree, mem_freed; size_t mem_used, mem_tofree, mem_freed;
int slaves = listLength(server.slaves); int slaves = listLength(server.slaves);
mstime_t latency, eviction_latency; mstime_t latency, eviction_latency;
long long delta;
/* Check if we are over the memory usage limit. If we are not, no need
* to subtract the slaves output buffers. We can just return ASAP. */
mem_used = zmalloc_used_memory();
if (mem_used <= server.maxmemory) return C_OK;
/* Remove the size of slaves output buffers and AOF buffer from the /* Remove the size of slaves output buffers and AOF buffer from the
* count of used memory. */ * count of used memory. */
mem_used = zmalloc_used_memory();
if (slaves) { if (slaves) {
listIter li; listIter li;
listNode *ln; listNode *ln;
...@@ -3291,15 +3305,36 @@ int freeMemoryIfNeeded(void) { ...@@ -3291,15 +3305,36 @@ int freeMemoryIfNeeded(void) {
mem_used -= aofRewriteBufferSize(); mem_used -= aofRewriteBufferSize();
} }
/* Check if we are over the memory limit. */ /* Check if we are still over the memory limit. */
if (mem_used <= server.maxmemory) return C_OK; if (mem_used <= server.maxmemory) return C_OK;
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
return C_ERR; /* We need to free memory, but policy forbids. */
/* Compute how much memory we need to free. */ /* Compute how much memory we need to free. */
mem_tofree = mem_used - server.maxmemory; mem_tofree = mem_used - server.maxmemory;
mem_freed = 0; mem_freed = 0;
/* Let's start reclaiming memory from the lazy free list: those
* objects are logically freed so this is the first thing we want
* to get rid of. */
if (listLength(server.lazyfree_dbs) || listLength(server.lazyfree_obj)) {
latencyStartMonitor(eviction_latency);
while (mem_freed < mem_tofree) {
delta = (long long) zmalloc_used_memory();
size_t workdone = lazyfreeStep(LAZYFREE_STEP_FAST);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
if (!workdone) break; /* Lazy free list is empty. */
}
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-lazyfree",eviction_latency);
}
/* If after lazy freeing we are alraedy back to our limit, no need
* to evict keys. Return to the caller. */
if (mem_freed >= mem_tofree) return C_OK;
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
return C_ERR; /* We need to free memory, but policy forbids. */
latencyStartMonitor(latency); latencyStartMonitor(latency);
while (mem_freed < mem_tofree) { while (mem_freed < mem_tofree) {
int j, k, keys_freed = 0; int j, k, keys_freed = 0;
...@@ -3385,8 +3420,6 @@ int freeMemoryIfNeeded(void) { ...@@ -3385,8 +3420,6 @@ int freeMemoryIfNeeded(void) {
/* Finally remove the selected key. */ /* Finally remove the selected key. */
if (bestkey) { if (bestkey) {
long long delta;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj); propagateExpire(db,keyobj);
/* We compute the amount of memory freed by dbDelete() alone. /* We compute the amount of memory freed by dbDelete() alone.
......
...@@ -185,27 +185,6 @@ typedef long long mstime_t; /* millisecond time type. */ ...@@ -185,27 +185,6 @@ typedef long long mstime_t; /* millisecond time type. */
#define CMD_ASKING 4096 /* "k" flag */ #define CMD_ASKING 4096 /* "k" flag */
#define CMD_FAST 8192 /* "F" flag */ #define CMD_FAST 8192 /* "F" flag */
/* Object types */
#define OBJ_STRING 0
#define OBJ_LIST 1
#define OBJ_SET 2
#define OBJ_ZSET 3
#define OBJ_HASH 4
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
/* Defines related to the dump file format. To store 32 bits lengths for short /* Defines related to the dump file format. To store 32 bits lengths for short
* keys requires a lot of space, so we check the most significant 2 bits of * keys requires a lot of space, so we check the most significant 2 bits of
* the first byte to interpreter the length: * the first byte to interpreter the length:
...@@ -441,9 +420,31 @@ typedef long long mstime_t; /* millisecond time type. */ ...@@ -441,9 +420,31 @@ typedef long long mstime_t; /* millisecond time type. */
/* A redis object, that is a type able to hold a string / list / set */ /* A redis object, that is a type able to hold a string / list / set */
/* The actual Redis Object */ /* The actual Redis Object */
#define OBJ_STRING 0
#define OBJ_LIST 1
#define OBJ_SET 2
#define OBJ_ZSET 3
#define OBJ_HASH 4
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
#define LRU_BITS 24 #define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */ #define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ #define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
#define OBJ_SHARED_REFCOUNT INT_MAX
typedef struct redisObject { typedef struct redisObject {
unsigned type:4; unsigned type:4;
unsigned encoding:4; unsigned encoding:4;
...@@ -702,6 +703,10 @@ struct redisServer { ...@@ -702,6 +703,10 @@ struct redisServer {
int cronloops; /* Number of times the cron function run */ int cronloops; /* Number of times the cron function run */
char runid[CONFIG_RUN_ID_SIZE+1]; /* ID always different at every exec. */ char runid[CONFIG_RUN_ID_SIZE+1]; /* ID always different at every exec. */
int sentinel_mode; /* True if this instance is a Sentinel. */ int sentinel_mode; /* True if this instance is a Sentinel. */
/* Lazy free */
list *lazyfree_dbs; /* List of DBs to free in background. */
list *lazyfree_obj; /* List of objects to free in background. */
size_t lazyfree_elements; /* Number of logical element in obj list. */
/* Networking */ /* Networking */
int port; /* TCP listening port */ int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */ int tcp_backlog; /* TCP listen() backlog */
...@@ -1156,6 +1161,7 @@ void flagTransaction(client *c); ...@@ -1156,6 +1161,7 @@ void flagTransaction(client *c);
void decrRefCount(robj *o); void decrRefCount(robj *o);
void decrRefCountVoid(void *o); void decrRefCountVoid(void *o);
void incrRefCount(robj *o); void incrRefCount(robj *o);
robj *makeObjectShared(robj *o);
robj *resetRefCount(robj *obj); robj *resetRefCount(robj *obj);
void freeStringObject(robj *o); void freeStringObject(robj *o);
void freeListObject(robj *o); void freeListObject(robj *o);
...@@ -1376,6 +1382,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val); ...@@ -1376,6 +1382,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val);
void setKey(redisDb *db, robj *key, robj *val); void setKey(redisDb *db, robj *key, robj *val);
int dbExists(redisDb *db, robj *key); int dbExists(redisDb *db, robj *key);
robj *dbRandomKey(redisDb *db); robj *dbRandomKey(redisDb *db);
int dbSyncDelete(redisDb *db, robj *key);
int dbDelete(redisDb *db, robj *key); int dbDelete(redisDb *db, robj *key);
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
long long emptyDb(void(callback)(void*)); long long emptyDb(void(callback)(void*));
...@@ -1388,6 +1395,17 @@ unsigned int delKeysInSlot(unsigned int hashslot); ...@@ -1388,6 +1395,17 @@ unsigned int delKeysInSlot(unsigned int hashslot);
int verifyClusterConfigWithData(void); int verifyClusterConfigWithData(void);
void scanGenericCommand(client *c, robj *o, unsigned long cursor); void scanGenericCommand(client *c, robj *o, unsigned long cursor);
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor); int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor);
void slotToKeyAdd(robj *key);
void slotToKeyDel(robj *key);
void slotToKeyFlush(void);
/* Lazy free */
#define LAZYFREE_STEP_SLOW 0
#define LAZYFREE_STEP_FAST 1
int dbAsyncDelete(redisDb *db, robj *key);
void initLazyfreeEngine(void);
size_t lazyfreeStep(int type);
int lazyfreeCron(struct aeEventLoop *eventLoop, long long id, void *clientData);
/* API to get key arguments from commands */ /* API to get key arguments from commands */
int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
...@@ -1443,6 +1461,7 @@ void setexCommand(client *c); ...@@ -1443,6 +1461,7 @@ void setexCommand(client *c);
void psetexCommand(client *c); void psetexCommand(client *c);
void getCommand(client *c); void getCommand(client *c);
void delCommand(client *c); void delCommand(client *c);
void unlinkCommand(client *c);
void existsCommand(client *c); void existsCommand(client *c);
void setbitCommand(client *c); void setbitCommand(client *c);
void getbitCommand(client *c); void getbitCommand(client *c);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册