提交 500155b9 编写于 作者: A antirez

Draft #1 of a new expired keys collection algorithm.

The main idea here is that when we are no longer to expire keys at the
rate the are created, we can't block more in the normal expire cycle as
this would result in too big latency spikes.

For this reason the commit introduces a "fast" expire cycle that does
not run for more than 1 millisecond but is called in the beforeSleep()
hook of the event loop, so much more often, and with a frequency bound
to the frequency of executed commnads.

The fast expire cycle is only called when the standard expiration
algorithm runs out of time, that is, consumed more than
REDIS_EXPIRELOOKUPS_TIME_PERC of CPU in a given cycle without being able
to take the number of already expired keys that are yet not collected
to a number smaller than 25% of the number of keys.

You can test this commit with different loads, but a simple way is to
use the following:

Extreme load with pipelining:

redis-benchmark -r 100000000 -n 100000000  \
        -P 32 set ele:rand:000000000000 foo ex 2

Remove the -P32 in order to avoid the pipelining for a more real-world
load.

In another terminal tab you can monitor the Redis behavior with:

redis-cli -i 0.1 -r -1 info keyspace

and

redis-cli --latency-history

Note: this commit will make Redis printing a lot of debug messages, it
is not a good idea to use it in production.
上级 cc946972
...@@ -628,14 +628,51 @@ void updateDictResizePolicy(void) { ...@@ -628,14 +628,51 @@ void updateDictResizePolicy(void) {
/* ======================= Cron: called every 100 ms ======================== */ /* ======================= Cron: called every 100 ms ======================== */
/* Helper function for the activeExpireCycle() function.
* This function will try to expire the key that is stored in the hash table
* entry 'de' of the 'expires' hash table of a Redis database.
*
* If the key is found to be expired, it is removed from the database and
* 1 is returned. Otherwise no operation is performed and 0 is returned.
*
* When a key is expired, server.stat_expiredkeys is incremented.
*
* The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */
int activeExpireCycleTryExpire(redisDb *db, struct dictEntry *de, long long now) {
long long t = dictGetSignedIntegerVal(de);
if (now > t) {
sds key = dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj);
dbDelete(db,keyobj);
notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
"expired",keyobj,db->id);
decrRefCount(keyobj);
server.stat_expiredkeys++;
return 1;
} else {
return 0;
}
}
/* Try to expire a few timed out keys. The algorithm used is adaptive and /* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise * will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by * it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace. * keys that can be removed from the keyspace.
* *
* No more than REDIS_DBCRON_DBS_PER_CALL databases are tested at every * No more than REDIS_DBCRON_DBS_PER_CALL databases are tested at every
* iteration. */ * iteration.
void activeExpireCycle(void) { *
* If fast is non-zero the function will try to expire just one key ASAP
* from the current DB and return. This kind of call is used when Redis detects
* that timelimit_exit is true, so there is more work to do, and we do it
* more incrementally from the beforeSleep() function of the event loop. */
#define EXPIRED_HISTORY_LEN 10
void activeExpireCycle(int fast) {
/* This function has some global state in order to continue the work /* This function has some global state in order to continue the work
* incrementally across calls. */ * incrementally across calls. */
static unsigned int current_db = 0; /* Last DB tested. */ static unsigned int current_db = 0; /* Last DB tested. */
...@@ -645,6 +682,31 @@ void activeExpireCycle(void) { ...@@ -645,6 +682,31 @@ void activeExpireCycle(void) {
unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL; unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
long long start = ustime(), timelimit; long long start = ustime(), timelimit;
#if 0
static int expired_history[EXPIRED_HISTORY_LEN];
static int expired_history_id = 0;
static int expired_perc_avg = 0;
#endif
if (fast && !timelimit_exit) return;
#if 0
if (fast) {
if (!timelimit_exit) return;
/* Let's try to expire a single key from the previous DB, the one that
* had enough keys expiring to reach the time limit. */
redisDb *db = server.db+((current_db+server.dbnum-1) % server.dbnum);
dictEntry *de;
for (j = 0; j < 100; j++) {
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
activeExpireCycleTryExpire(db,de,server.mstime);
}
return;
}
#endif
/* We usually should test REDIS_DBCRON_DBS_PER_CALL per iteration, with /* We usually should test REDIS_DBCRON_DBS_PER_CALL per iteration, with
* two exceptions: * two exceptions:
* *
...@@ -663,6 +725,8 @@ void activeExpireCycle(void) { ...@@ -663,6 +725,8 @@ void activeExpireCycle(void) {
timelimit_exit = 0; timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1; if (timelimit <= 0) timelimit = 1;
if (fast) timelimit = 1000; /* 1 millisecond. */
for (j = 0; j < dbs_per_call; j++) { for (j = 0; j < dbs_per_call; j++) {
int expired; int expired;
redisDb *db = server.db+(current_db % server.dbnum); redisDb *db = server.db+(current_db % server.dbnum);
...@@ -696,22 +760,9 @@ void activeExpireCycle(void) { ...@@ -696,22 +760,9 @@ void activeExpireCycle(void) {
num = REDIS_EXPIRELOOKUPS_PER_CRON; num = REDIS_EXPIRELOOKUPS_PER_CRON;
while (num--) { while (num--) {
dictEntry *de; dictEntry *de;
long long t;
if ((de = dictGetRandomKey(db->expires)) == NULL) break; if ((de = dictGetRandomKey(db->expires)) == NULL) break;
t = dictGetSignedIntegerVal(de); if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (now > t) {
sds key = dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj);
dbDelete(db,keyobj);
notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
"expired",keyobj,db->id);
decrRefCount(keyobj);
expired++;
server.stat_expiredkeys++;
}
} }
/* We can't block forever here even if there are many keys to /* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the * expire. So after a given amount of milliseconds return to the
...@@ -721,8 +772,21 @@ void activeExpireCycle(void) { ...@@ -721,8 +772,21 @@ void activeExpireCycle(void) {
(ustime()-start) > timelimit) (ustime()-start) > timelimit)
{ {
timelimit_exit = 1; timelimit_exit = 1;
return;
} }
#if 0
expired_history_id = (expired_history_id+1) % EXPIRED_HISTORY_LEN;
expired_history[expired_history_id] = expired;
{
int i;
expired_perc_avg = 0;
for (i = 0; i < EXPIRED_HISTORY_LEN; i++) {
expired_perc_avg += expired_history[i];
}
expired_perc_avg = (expired_perc_avg * 100) / (REDIS_EXPIRELOOKUPS_PER_CRON*EXPIRED_HISTORY_LEN);
// printf("Expired AVG: %d\n", expired_perc_avg);
}
#endif
if (timelimit_exit) return;
} while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
} }
} }
...@@ -842,8 +906,12 @@ void clientsCron(void) { ...@@ -842,8 +906,12 @@ void clientsCron(void) {
void databasesCron(void) { void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves /* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */ * as master will synthesize DELs for us. */
if (server.active_expire_enabled && server.masterhost == NULL) if (server.active_expire_enabled && server.masterhost == NULL) {
activeExpireCycle(); long long totalex = server.stat_expiredkeys;
activeExpireCycle(0);
if (server.stat_expiredkeys - totalex)
printf("EXPIRED SLOW: %lld\n", server.stat_expiredkeys - totalex);
}
/* Perform hash tables rehashing if needed, but only if there are no /* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad * other processes saving the DB on disk. Otherwise rehashing is bad
...@@ -915,6 +983,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ...@@ -915,6 +983,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* in objects at every object access, and accuracy is not needed. * in objects at every object access, and accuracy is not needed.
* To access a global var is faster than calling time(NULL) */ * To access a global var is faster than calling time(NULL) */
server.unixtime = time(NULL); server.unixtime = time(NULL);
server.mstime = mstime();
run_with_period(100) trackOperationsPerSecond(); run_with_period(100) trackOperationsPerSecond();
...@@ -1074,6 +1143,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) { ...@@ -1074,6 +1143,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
listNode *ln; listNode *ln;
redisClient *c; redisClient *c;
/* Run a fast expire cycle. */
{
long long totalex = server.stat_expiredkeys;
activeExpireCycle(1);
if (server.stat_expiredkeys - totalex)
printf("EXPIRED FAST: %lld\n", server.stat_expiredkeys - totalex);
}
/* Try to process pending commands for clients that were just unblocked. */ /* Try to process pending commands for clients that were just unblocked. */
while (listLength(server.unblocked_clients)) { while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients); ln = listFirst(server.unblocked_clients);
...@@ -1473,6 +1550,7 @@ void initServer() { ...@@ -1473,6 +1550,7 @@ void initServer() {
server.ops_sec_last_sample_time = mstime(); server.ops_sec_last_sample_time = mstime();
server.ops_sec_last_sample_ops = 0; server.ops_sec_last_sample_ops = 0;
server.unixtime = time(NULL); server.unixtime = time(NULL);
server.mstime = mstime();
server.lastbgsave_status = REDIS_OK; server.lastbgsave_status = REDIS_OK;
server.repl_good_slaves_count = 0; server.repl_good_slaves_count = 0;
......
...@@ -74,7 +74,7 @@ ...@@ -74,7 +74,7 @@
#define REDIS_MAXIDLETIME 0 /* default client timeout: infinite */ #define REDIS_MAXIDLETIME 0 /* default client timeout: infinite */
#define REDIS_DEFAULT_DBNUM 16 #define REDIS_DEFAULT_DBNUM 16
#define REDIS_CONFIGLINE_MAX 1024 #define REDIS_CONFIGLINE_MAX 1024
#define REDIS_EXPIRELOOKUPS_PER_CRON 10 /* lookup 10 expires per loop */ #define REDIS_EXPIRELOOKUPS_PER_CRON 20 /* lookup 20 expires per loop */
#define REDIS_EXPIRELOOKUPS_TIME_PERC 25 /* CPU max % for keys collection */ #define REDIS_EXPIRELOOKUPS_TIME_PERC 25 /* CPU max % for keys collection */
#define REDIS_DBCRON_DBS_PER_CALL 16 #define REDIS_DBCRON_DBS_PER_CALL 16
#define REDIS_MAX_WRITE_PER_EVENT (1024*64) #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
...@@ -744,7 +744,8 @@ struct redisServer { ...@@ -744,7 +744,8 @@ struct redisServer {
size_t set_max_intset_entries; size_t set_max_intset_entries;
size_t zset_max_ziplist_entries; size_t zset_max_ziplist_entries;
size_t zset_max_ziplist_value; size_t zset_max_ziplist_value;
time_t unixtime; /* Unix time sampled every second. */ time_t unixtime; /* Unix time sampled every cron cycle. */
long long mstime; /* Like 'unixtime' but with milliseconds resolution. */
/* Pubsub */ /* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */ dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */ list *pubsub_patterns; /* A list of pubsub_patterns */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册