提交 97e7f8ae 编写于 作者: A antirez

non blocking loading of the DB / AOF with informations and ETA in INFO output

上级 57c9babd
...@@ -218,6 +218,7 @@ int loadAppendOnlyFile(char *filename) { ...@@ -218,6 +218,7 @@ int loadAppendOnlyFile(char *filename) {
FILE *fp = fopen(filename,"r"); FILE *fp = fopen(filename,"r");
struct redis_stat sb; struct redis_stat sb;
int appendonly = server.appendonly; int appendonly = server.appendonly;
long loops = 0;
if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
return REDIS_ERR; return REDIS_ERR;
...@@ -232,6 +233,8 @@ int loadAppendOnlyFile(char *filename) { ...@@ -232,6 +233,8 @@ int loadAppendOnlyFile(char *filename) {
server.appendonly = 0; server.appendonly = 0;
fakeClient = createFakeClient(); fakeClient = createFakeClient();
startLoading(fp);
while(1) { while(1) {
int argc, j; int argc, j;
unsigned long len; unsigned long len;
...@@ -241,6 +244,12 @@ int loadAppendOnlyFile(char *filename) { ...@@ -241,6 +244,12 @@ int loadAppendOnlyFile(char *filename) {
struct redisCommand *cmd; struct redisCommand *cmd;
int force_swapout; int force_swapout;
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
}
if (fgets(buf,sizeof(buf),fp) == NULL) { if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp)) if (feof(fp))
break; break;
...@@ -297,6 +306,7 @@ int loadAppendOnlyFile(char *filename) { ...@@ -297,6 +306,7 @@ int loadAppendOnlyFile(char *filename) {
fclose(fp); fclose(fp);
freeFakeClient(fakeClient); freeFakeClient(fakeClient);
server.appendonly = appendonly; server.appendonly = appendonly;
stopLoading();
return REDIS_OK; return REDIS_OK;
readerr: readerr:
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <sys/resource.h> #include <sys/resource.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <sys/stat.h>
int rdbSaveType(FILE *fp, unsigned char type) { int rdbSaveType(FILE *fp, unsigned char type) {
if (fwrite(&type,1,1,fp) == 0) return -1; if (fwrite(&type,1,1,fp) == 0) return -1;
...@@ -793,6 +794,31 @@ robj *rdbLoadObject(int type, FILE *fp) { ...@@ -793,6 +794,31 @@ robj *rdbLoadObject(int type, FILE *fp) {
return o; return o;
} }
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
void startLoading(FILE *fp) {
struct stat sb;
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
if (fstat(fileno(fp), &sb) == -1) {
server.loading_total_bytes = 1; /* just to avoid division by zero */
} else {
server.loading_total_bytes = sb.st_size;
}
}
/* Refresh the loading progress info */
void loadingProgress(off_t pos) {
server.loading_loaded_bytes = pos;
}
/* Loading finished */
void stopLoading(void) {
server.loading = 0;
}
int rdbLoad(char *filename) { int rdbLoad(char *filename) {
FILE *fp; FILE *fp;
uint32_t dbid; uint32_t dbid;
...@@ -801,6 +827,7 @@ int rdbLoad(char *filename) { ...@@ -801,6 +827,7 @@ int rdbLoad(char *filename) {
redisDb *db = server.db+0; redisDb *db = server.db+0;
char buf[1024]; char buf[1024];
time_t expiretime, now = time(NULL); time_t expiretime, now = time(NULL);
long loops = 0;
fp = fopen(filename,"r"); fp = fopen(filename,"r");
if (!fp) return REDIS_ERR; if (!fp) return REDIS_ERR;
...@@ -817,11 +844,20 @@ int rdbLoad(char *filename) { ...@@ -817,11 +844,20 @@ int rdbLoad(char *filename) {
redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver); redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
return REDIS_ERR; return REDIS_ERR;
} }
startLoading(fp);
while(1) { while(1) {
robj *key, *val; robj *key, *val;
int force_swapout; int force_swapout;
expiretime = -1; expiretime = -1;
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
}
/* Read type. */ /* Read type. */
if ((type = rdbLoadType(fp)) == -1) goto eoferr; if ((type = rdbLoadType(fp)) == -1) goto eoferr;
if (type == REDIS_EXPIRETIME) { if (type == REDIS_EXPIRETIME) {
...@@ -900,6 +936,7 @@ int rdbLoad(char *filename) { ...@@ -900,6 +936,7 @@ int rdbLoad(char *filename) {
} }
} }
fclose(fp); fclose(fp);
stopLoading();
return REDIS_OK; return REDIS_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */ eoferr: /* unexpected end of file is handled here with a fatal exit */
......
...@@ -702,6 +702,8 @@ void createSharedObjects(void) { ...@@ -702,6 +702,8 @@ void createSharedObjects(void) {
"-ERR source and destination objects are the same\r\n")); "-ERR source and destination objects are the same\r\n"));
shared.outofrangeerr = createObject(REDIS_STRING,sdsnew( shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
"-ERR index out of range\r\n")); "-ERR index out of range\r\n"));
shared.loadingerr = createObject(REDIS_STRING,sdsnew(
"-LOADING Redis is loading the dataset in memory\r\n"));
shared.space = createObject(REDIS_STRING,sdsnew(" ")); shared.space = createObject(REDIS_STRING,sdsnew(" "));
shared.colon = createObject(REDIS_STRING,sdsnew(":")); shared.colon = createObject(REDIS_STRING,sdsnew(":"));
shared.plus = createObject(REDIS_STRING,sdsnew("+")); shared.plus = createObject(REDIS_STRING,sdsnew("+"));
...@@ -739,6 +741,7 @@ void initServerConfig() { ...@@ -739,6 +741,7 @@ void initServerConfig() {
server.verbosity = REDIS_VERBOSE; server.verbosity = REDIS_VERBOSE;
server.maxidletime = REDIS_MAXIDLETIME; server.maxidletime = REDIS_MAXIDLETIME;
server.saveparams = NULL; server.saveparams = NULL;
server.loading = 0;
server.logfile = NULL; /* NULL = log on standard output */ server.logfile = NULL; /* NULL = log on standard output */
server.glueoutputbuf = 1; server.glueoutputbuf = 1;
server.daemonize = 0; server.daemonize = 0;
...@@ -1006,6 +1009,12 @@ int processCommand(redisClient *c) { ...@@ -1006,6 +1009,12 @@ int processCommand(redisClient *c) {
return REDIS_OK; return REDIS_OK;
} }
/* Loading DB? Return an error if the command is not INFO */
if (server.loading && cmd->proc != infoCommand) {
addReply(c, shared.loadingerr);
return REDIS_OK;
}
/* Exec the command */ /* Exec the command */
if (c->flags & REDIS_MULTI && if (c->flags & REDIS_MULTI &&
cmd->proc != execCommand && cmd->proc != discardCommand && cmd->proc != execCommand && cmd->proc != discardCommand &&
...@@ -1133,6 +1142,8 @@ sds genRedisInfoString(void) { ...@@ -1133,6 +1142,8 @@ sds genRedisInfoString(void) {
"used_memory_rss:%zu\r\n" "used_memory_rss:%zu\r\n"
"mem_fragmentation_ratio:%.2f\r\n" "mem_fragmentation_ratio:%.2f\r\n"
"use_tcmalloc:%d\r\n" "use_tcmalloc:%d\r\n"
"loading:%d\r\n"
"aof_enabled:%d\r\n"
"changes_since_last_save:%lld\r\n" "changes_since_last_save:%lld\r\n"
"bgsave_in_progress:%d\r\n" "bgsave_in_progress:%d\r\n"
"last_save_time:%ld\r\n" "last_save_time:%ld\r\n"
...@@ -1173,6 +1184,8 @@ sds genRedisInfoString(void) { ...@@ -1173,6 +1184,8 @@ sds genRedisInfoString(void) {
#else #else
0, 0,
#endif #endif
server.loading,
server.appendonly,
server.dirty, server.dirty,
server.bgsavechildpid != -1, server.bgsavechildpid != -1,
server.lastsave, server.lastsave,
...@@ -1243,6 +1256,35 @@ sds genRedisInfoString(void) { ...@@ -1243,6 +1256,35 @@ sds genRedisInfoString(void) {
); );
unlockThreadedIO(); unlockThreadedIO();
} }
if (server.loading) {
double perc;
time_t eta, elapsed;
off_t remaining_bytes = server.loading_total_bytes-
server.loading_loaded_bytes;
perc = ((double)server.loading_loaded_bytes /
server.loading_total_bytes) * 100;
elapsed = time(NULL)-server.loading_start_time;
if (elapsed == 0) {
eta = 1; /* A fake 1 second figure if we don't have enough info */
} else {
eta = (elapsed*remaining_bytes)/server.loading_loaded_bytes;
}
info = sdscatprintf(info,
"loading_start_time:%ld\r\n"
"loading_total_bytes:%llu\r\n"
"loading_loaded_bytes:%llu\r\n"
"loading_loaded_perc:%.2f\r\n"
"loading_eta_seconds:%ld\r\n"
,(unsigned long) server.loading_start_time,
(unsigned long long) server.loading_total_bytes,
(unsigned long long) server.loading_loaded_bytes,
perc,
eta
);
}
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
long long keys, vkeys; long long keys, vkeys;
......
...@@ -340,7 +340,7 @@ struct sharedObjectsStruct { ...@@ -340,7 +340,7 @@ struct sharedObjectsStruct {
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space, robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space,
*colon, *nullbulk, *nullmultibulk, *queued, *colon, *nullbulk, *nullmultibulk, *queued,
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
*outofrangeerr, *plus, *outofrangeerr, *loadingerr, *plus,
*select0, *select1, *select2, *select3, *select4, *select0, *select1, *select2, *select3, *select4,
*select5, *select6, *select7, *select8, *select9, *select5, *select6, *select7, *select8, *select9,
*messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
...@@ -361,6 +361,11 @@ struct redisServer { ...@@ -361,6 +361,11 @@ struct redisServer {
long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */
list *clients; list *clients;
dict *commands; /* Command table hahs table */ dict *commands; /* Command table hahs table */
/* RDB / AOF loading information */
int loading;
off_t loading_total_bytes;
off_t loading_loaded_bytes;
time_t loading_start_time;
/* Fast pointers to often looked up command */ /* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand; struct redisCommand *delCommand, *multiCommand;
list *slaves, *monitors; list *slaves, *monitors;
...@@ -726,6 +731,11 @@ int syncWithMaster(void); ...@@ -726,6 +731,11 @@ int syncWithMaster(void);
void updateSlavesWaitingBgsave(int bgsaveerr); void updateSlavesWaitingBgsave(int bgsaveerr);
void replicationCron(void); void replicationCron(void);
/* Generic persistence functions */
void startLoading(FILE *fp);
void loadingProgress(off_t pos);
void stopLoading(void);
/* RDB persistence */ /* RDB persistence */
int rdbLoad(char *filename); int rdbLoad(char *filename);
int rdbSaveBackground(char *filename); int rdbSaveBackground(char *filename);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册