aof.c 53.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

30
#include "redis.h"
31
#include "bio.h"
A
antirez 已提交
32
#include "rio.h"
33 34 35 36

#include <signal.h>
#include <fcntl.h>
#include <sys/stat.h>
37
#include <sys/types.h>
H
Henry Rawas 已提交
38
#ifndef _WIN32
39 40 41
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
H
Henry Rawas 已提交
42
#endif
43

44 45
void aofUpdateCurrentSize(void);

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
/* ----------------------------------------------------------------------------
 * AOF rewrite buffer implementation.
 *
 * The following code implement a simple buffer used in order to accumulate
 * changes while the background process is rewriting the AOF file.
 *
 * We only need to append, but can't just use realloc with a large block
 * because 'huge' reallocs are not always handled as one could expect
 * (via remapping of pages at OS level) but may involve copying data.
 *
 * For this reason we use a list of blocks, every block is
 * AOF_RW_BUF_BLOCK_SIZE bytes.
 * ------------------------------------------------------------------------- */

#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */

typedef struct aofrwblock {
    unsigned long used, free;
    char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;

/* This function free the old AOF rewrite buffer if needed, and initialize
 * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
 * so can be used for the first initialization as well. */
void aofRewriteBufferReset(void) {
    if (server.aof_rewrite_buf_blocks)
        listRelease(server.aof_rewrite_buf_blocks);

    server.aof_rewrite_buf_blocks = listCreate();
    listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
}

/* Return the current size of the AOF rerwite buffer. */
unsigned long aofRewriteBufferSize(void) {
H
Henry Rawas 已提交
80
    unsigned long size;
81 82 83 84
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    if (block == NULL) return 0;
H
Henry Rawas 已提交
85
    size =
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
        (listLength(server.aof_rewrite_buf_blocks)-1) * AOF_RW_BUF_BLOCK_SIZE;
    size += block->used;
    return size;
}

/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
        /* If we already got at least an allocated block, try appending
         * at least some piece into it. */
        if (block) {
            unsigned long thislen = (block->free < len) ? block->free : len;
            if (thislen) {  /* The current block is not already full. */
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }

        if (len) { /* First block to allocate, or need another block. */
            int numblocks;

            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            /* Log every time we cross more 10 or 100 blocks, respectively
             * as a notice or warning. */
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
                                                         REDIS_NOTICE;
                redisLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }
}

/* Write the buffer (possibly composed of multiple blocks) into the specified
 * fd. If no short write or any other error happens -1 is returned,
 * otherwise the number of bytes written is returned. */
ssize_t aofRewriteBufferWrite(int fd) {
    listNode *ln;
    listIter li;
    ssize_t count = 0;

    listRewind(server.aof_rewrite_buf_blocks,&li);
    while((ln = listNext(&li))) {
        aofrwblock *block = listNodeValue(ln);
        ssize_t nwritten;

        if (block->used) {
145 146 147
#ifdef _WIN32
            nwritten = send(fd,block->buf,block->used,0);
#else
148
            nwritten = write(fd,block->buf,block->used);
149
#endif
150 151 152 153 154 155 156 157 158 159
            if (nwritten != block->used) {
                if (nwritten == 0) errno = EIO;
                return -1;
            }
            count += nwritten;
        }
    }
    return count;
}

160 161 162 163 164 165
/* ----------------------------------------------------------------------------
 * AOF file implementation
 * ------------------------------------------------------------------------- */

/* Starts a background task that performs fsync() against the specified
 * file descriptor (the one of the AOF file) in another thread. */
166
void aof_background_fsync(int fd) {
H
Henry Rawas 已提交
167 168 169
#ifdef _WIN32
    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(size_t)fd,NULL,NULL);
#else
A
antirez 已提交
170
    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
H
Henry Rawas 已提交
171
#endif
172 173
}

174 175 176
/* Called when the user switches from "appendonly yes" to "appendonly no"
 * at runtime using the CONFIG command. */
void stopAppendOnly(void) {
177
    redisAssert(server.aof_state != REDIS_AOF_OFF);
178
    flushAppendOnlyFile(1);
A
antirez 已提交
179 180
    aof_fsync(server.aof_fd);
    close(server.aof_fd);
181

A
antirez 已提交
182 183
    server.aof_fd = -1;
    server.aof_selected_db = -1;
184
    server.aof_state = REDIS_AOF_OFF;
185
    /* rewrite operation in progress? kill it, wait child exit */
A
antirez 已提交
186
    if (server.aof_child_pid != -1) {
H
Henry Rawas 已提交
187 188 189 190 191 192 193 194
#ifdef _WIN32
        bkgdsave_termthread();
        server.rdbbkgdfsave.state = BKSAVE_IDLE;
        /* turn off copy on write */
        cowBkgdSaveStop();
        redisLog(REDIS_NOTICE,"Killing running AOF rewrite child: %ld",
            (long) server.aof_child_pid);
#else
195 196
        int statloc;

197 198
        redisLog(REDIS_NOTICE,"Killing running AOF rewrite child: %ld",
            (long) server.aof_child_pid);
199
        if (kill(server.aof_child_pid,SIGUSR1) != -1)
200
            wait3(&statloc,0,NULL);
H
Henry Rawas 已提交
201
#endif
202
        /* reset the buffer accumulating changes while the child saves */
203
        aofRewriteBufferReset();
A
antirez 已提交
204 205
        aofRemoveTempFile(server.aof_child_pid);
        server.aof_child_pid = -1;
206
        server.aof_rewrite_time_start = -1;
207 208 209 210 211 212
    }
}

/* Called when the user switches from "appendonly no" to "appendonly yes"
 * at runtime using the CONFIG command. */
int startAppendOnly(void) {
213
    server.aof_last_fsync = server.unixtime;
H
Henry Rawas 已提交
214 215 216
#ifdef _WIN32
    server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT|_O_BINARY,_S_IREAD|_S_IWRITE);
#else
A
antirez 已提交
217
    server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
H
Henry Rawas 已提交
218
#endif
219
    redisAssert(server.aof_state == REDIS_AOF_OFF);
A
antirez 已提交
220
    if (server.aof_fd == -1) {
221
        redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't open the append only file: %s",strerror(errno));
222 223 224
        return REDIS_ERR;
    }
    if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
A
antirez 已提交
225
        close(server.aof_fd);
226
        redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
227 228
        return REDIS_ERR;
    }
229 230
    /* We correctly switched on AOF, now wait for the rerwite to be complete
     * in order to append data on disk. */
231
    server.aof_state = REDIS_AOF_WAIT_REWRITE;
232 233 234 235 236 237 238 239 240
    return REDIS_OK;
}

/* Write the append only file buffer on disk.
 *
 * Since we are required to write the AOF before replying to the client,
 * and the only way the client socket can get a write is entering when the
 * the event loop, we accumulate all the AOF writes in a memory
 * buffer and write it on disk using this function just before entering
241 242 243 244 245 246 247 248 249 250 251 252 253
 * the event loop again.
 *
 * About the 'force' argument:
 *
 * When the fsync policy is set to 'everysec' we may delay the flush if there
 * is still an fsync() going on in the background thread, since for instance
 * on Linux write(2) will be blocked by the background fsync anyway.
 * When this happens we remember that there is some aof buffer to be
 * flushed ASAP, and will try to do that in the serverCron() function.
 *
 * However if force is set to 1 we'll write regardless of the background
 * fsync. */
void flushAppendOnlyFile(int force) {
254
    ssize_t nwritten;
255
    int sync_in_progress = 0;
256

A
antirez 已提交
257
    if (sdslen(server.aof_buf) == 0) return;
258

259
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
260 261
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

262
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
263 264 265 266 267 268 269 270 271 272
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponinig, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
A
antirez 已提交
273
                /* We were already waiting for fsync to finish, but for less
274 275 276 277 278
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
279
            server.aof_delayed_fsync++;
280
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
281 282 283 284 285 286
        }
    }
    /* If you are following this code path, then we are going to write so
     * set reset the postponed flush sentinel to zero. */
    server.aof_flush_postponed_start = 0;

287 288 289 290 291
    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike */
H
Henry Rawas 已提交
292
    nwritten = write(server.aof_fd,server.aof_buf,(unsigned int)sdslen(server.aof_buf));
A
antirez 已提交
293
    if (nwritten != (signed)sdslen(server.aof_buf)) {
294 295 296
        /* Ooops, we are in troubles. The best thing to do for now is
         * aborting instead of giving the illusion that everything is
         * working as expected. */
P
Pieter Noordhuis 已提交
297
        if (nwritten == -1) {
298
            redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
P
Pieter Noordhuis 已提交
299
        } else {
A
antirez 已提交
300 301 302 303 304 305
            redisLog(REDIS_WARNING,"Exiting on short write while writing to "
                                   "the append-only file: %s (nwritten=%ld, "
                                   "expected=%ld)",
                                   strerror(errno),
                                   (long)nwritten,
                                   (long)sdslen(server.aof_buf));
306 307 308 309 310 311 312

            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                redisLog(REDIS_WARNING, "Could not remove short write "
                         "from the append-only file.  Redis may refuse "
                         "to load the AOF the next time it starts.  "
                         "ftruncate: %s", strerror(errno));
            }
P
Pieter Noordhuis 已提交
313 314
        }
        exit(1);
315
    }
316
    server.aof_current_size += nwritten;
317

318 319
    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
A
antirez 已提交
320 321
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
322
    } else {
A
antirez 已提交
323 324
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
325 326
    }

327 328
    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
329
    if (server.aof_no_fsync_on_rewrite &&
A
antirez 已提交
330
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
331
            return;
332 333

    /* Perform the fsync if needed. */
334
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
335 336
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
A
antirez 已提交
337 338
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        server.aof_last_fsync = server.unixtime;
339
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
A
antirez 已提交
340 341 342
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
343 344 345
    }
}

346 347 348 349 350 351 352 353 354 355 356
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;

    buf[0] = '*';
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

357
    for (j = 0; j < argc; j++) {
358 359 360 361 362 363 364 365
        o = getDecodedObject(argv[j]);
        buf[0] = '$';
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        dst = sdscatlen(dst,buf,len);
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        dst = sdscatlen(dst,"\r\n",2);
366 367
        decrRefCount(o);
    }
368
    return dst;
369 370
}

371 372 373 374 375 376 377 378 379
/* Create the sds representation of an PEXPIREAT command, using
 * 'seconds' as time to live and 'cmd' to understand what command
 * we are translating into a PEXPIREAT.
 *
 * This command is used in order to translate EXPIRE and PEXPIRE commands
 * into PEXPIREAT command so that we retain precision in the append only
 * file, and the time is always absolute and not relative. */
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
    long long when;
380 381 382 383
    robj *argv[3];

    /* Make sure we can use strtol */
    seconds = getDecodedObject(seconds);
384 385 386 387 388 389 390 391 392 393 394 395 396
    when = strtoll(seconds->ptr,NULL,10);
    /* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
    if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
        cmd->proc == expireatCommand)
    {
        when *= 1000;
    }
    /* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == setexCommand || cmd->proc == psetexCommand)
    {
        when += mstime();
    }
397 398
    decrRefCount(seconds);

399
    argv[0] = createStringObject("PEXPIREAT",9);
400
    argv[1] = key;
401 402
    argv[2] = createStringObjectFromLongLong(when);
    buf = catAppendOnlyGenericCommand(buf, 3, argv);
403 404 405 406 407 408
    decrRefCount(argv[0]);
    decrRefCount(argv[2]);
    return buf;
}

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
409
    sds buf = sdsempty();
410 411
    robj *tmpargv[3];

G
guiquanz 已提交
412
    /* The DB this command was targeting is not the same as the last command
413
     * we appendend. To issue a SELECT command is needed. */
A
antirez 已提交
414
    if (dictid != server.aof_selected_db) {
415 416 417 418 419
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
A
antirez 已提交
420
        server.aof_selected_db = dictid;
421 422
    }

423 424 425 426 427 428
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
429 430 431 432 433
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
434
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
435
    } else {
436 437 438
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
439 440 441 442 443
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
444 445
     * positive reply about the operation performed. */
    if (server.aof_state == REDIS_AOF_ON)
A
antirez 已提交
446
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
447 448 449 450 451

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
A
antirez 已提交
452
    if (server.aof_child_pid != -1)
H
Henry Rawas 已提交
453
        aofRewriteBufferAppend((unsigned char*)buf,(unsigned long)sdslen(buf));
454 455 456 457

    sdsfree(buf);
}

458 459 460 461
/* ----------------------------------------------------------------------------
 * AOF loading
 * ------------------------------------------------------------------------- */

462 463 464 465 466 467 468
/* In Redis commands are always executed in the context of a client, so in
 * order to load the append only file we need to create a fake client. */
struct redisClient *createFakeClient(void) {
    struct redisClient *c = zmalloc(sizeof(*c));

    selectDb(c,0);
    c->fd = -1;
469
    c->name = NULL;
470
    c->querybuf = sdsempty();
471
    c->querybuf_peak = 0;
472 473
    c->argc = 0;
    c->argv = NULL;
474
    c->bufpos = 0;
475 476 477 478 479
    c->flags = 0;
    /* We set the fake client as a slave waiting for the synchronization
     * so that Redis will not try to send replies to this client. */
    c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
    c->reply = listCreate();
480
    c->reply_bytes = 0;
481
    c->obuf_soft_limit_reached_time = 0;
482
    c->watched_keys = listCreate();
483 484 485 486 487 488 489 490 491
    listSetFreeMethod(c->reply,decrRefCount);
    listSetDupMethod(c->reply,dupClientReplyValue);
    initClientMultiState(c);
    return c;
}

void freeFakeClient(struct redisClient *c) {
    sdsfree(c->querybuf);
    listRelease(c->reply);
492
    listRelease(c->watched_keys);
493 494 495 496 497 498 499 500 501
    freeClientMultiState(c);
    zfree(c);
}

/* Replay the append log file. On error REDIS_OK is returned. On non fatal
 * error (the append only file is zero-length) REDIS_ERR is returned. On
 * fatal error an error message is logged and the program exists. */
int loadAppendOnlyFile(char *filename) {
    struct redisClient *fakeClient;
H
Henry Rawas 已提交
502 503 504
#ifdef _WIN32
    FILE *fp = fopen(filename,"rb");
#else
505
    FILE *fp = fopen(filename,"r");
H
Henry Rawas 已提交
506
#endif
507
    struct redis_stat sb;
508
    int old_aof_state = server.aof_state;
509
    long loops = 0;
510

A
antirez 已提交
511
    if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
512
        server.aof_current_size = 0;
A
antirez 已提交
513
        fclose(fp);
514
        return REDIS_ERR;
A
antirez 已提交
515
    }
516 517 518 519 520 521 522 523

    if (fp == NULL) {
        redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
        exit(1);
    }

    /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
     * to the same file we're about to read. */
524
    server.aof_state = REDIS_AOF_OFF;
525 526

    fakeClient = createFakeClient();
527 528
    startLoading(fp);

529 530
    while(1) {
        int argc, j;
H
Henry Rawas 已提交
531 532 533
#ifdef _WIN32
        size_t len;
#else
534
        unsigned long len;
H
Henry Rawas 已提交
535
#endif
536 537 538 539 540
        robj **argv;
        char buf[128];
        sds argsds;
        struct redisCommand *cmd;

541 542
        /* Serve the clients from time to time */
        if (!(loops++ % 1000)) {
H
Henry Rawas 已提交
543
            loadingProgress((off_t)ftello(fp));
544 545 546
            aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
        }

547 548 549 550 551 552 553 554
        if (fgets(buf,sizeof(buf),fp) == NULL) {
            if (feof(fp))
                break;
            else
                goto readerr;
        }
        if (buf[0] != '*') goto fmterr;
        argc = atoi(buf+1);
555 556
        if (argc < 1) goto fmterr;

557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
        argv = zmalloc(sizeof(robj*)*argc);
        for (j = 0; j < argc; j++) {
            if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
            if (buf[0] != '$') goto fmterr;
            len = strtol(buf+1,NULL,10);
            argsds = sdsnewlen(NULL,len);
            if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
            argv[j] = createObject(REDIS_STRING,argsds);
            if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
        }

        /* Command lookup */
        cmd = lookupCommand(argv[0]->ptr);
        if (!cmd) {
            redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
            exit(1);
        }
        /* Run the command in the context of a fake client */
        fakeClient->argc = argc;
        fakeClient->argv = argv;
        cmd->proc(fakeClient);
578 579 580

        /* The fake client should not have a reply */
        redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
581 582
        /* The fake client should never get blocked */
        redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
583

584 585 586 587 588
        /* Clean up. Command code may have changed argv/argc so we use the
         * argv/argc of the client instead of the local variables. */
        for (j = 0; j < fakeClient->argc; j++)
            decrRefCount(fakeClient->argv[j]);
        zfree(fakeClient->argv);
589 590 591 592 593 594 595 596
    }

    /* This point can only be reached when EOF is reached without errors.
     * If the client is in the middle of a MULTI/EXEC, log error and quit. */
    if (fakeClient->flags & REDIS_MULTI) goto readerr;

    fclose(fp);
    freeFakeClient(fakeClient);
597
    server.aof_state = old_aof_state;
598
    stopLoading();
599
    aofUpdateCurrentSize();
600
    server.aof_rewrite_base_size = server.aof_current_size;
601 602 603 604 605 606 607 608 609 610
    return REDIS_OK;

readerr:
    if (feof(fp)) {
        redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
    } else {
        redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
    }
    exit(1);
fmterr:
A
antirez 已提交
611
    redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
612 613 614
    exit(1);
}

615 616 617 618
/* ----------------------------------------------------------------------------
 * AOF rewrite
 * ------------------------------------------------------------------------- */

P
Pieter Noordhuis 已提交
619 620 621 622 623 624
/* Delegate writing an object to writing a bulk string or bulk long long.
 * This is not placed in rio.c since that adds the redis.h dependency. */
int rioWriteBulkObject(rio *r, robj *obj) {
    /* Avoid using getDecodedObject to help copy-on-write (we are often
     * in a child process when this function is called). */
    if (obj->encoding == REDIS_ENCODING_INT) {
H
Henry Rawas 已提交
625
        return (int)rioWriteBulkLongLong(r,(long)obj->ptr);
P
Pieter Noordhuis 已提交
626
    } else if (obj->encoding == REDIS_ENCODING_RAW) {
H
Henry Rawas 已提交
627
        return (int)rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
P
Pieter Noordhuis 已提交
628 629 630 631 632
    } else {
        redisPanic("Unknown string encoding");
    }
}

633 634 635 636 637 638 639 640 641 642 643 644
/* Emit the commands needed to rebuild a list object.
 * The function returns 0 on error, 1 on success. */
int rewriteListObject(rio *r, robj *key, robj *o) {
    long long count = 0, items = listTypeLength(o);

    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *zl = o->ptr;
        unsigned char *p = ziplistIndex(zl,0);
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;

H
Henry Rawas 已提交
645 646 647
#ifdef _WIN32
        cowUnlock();
#endif
648 649
        while(ziplistGet(p,&vstr,&vlen,&vlong)) {
            if (count == 0) {
650
                int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
H
Henry Rawas 已提交
651
                    REDIS_AOF_REWRITE_ITEMS_PER_CMD : (int)items;
652

653 654 655 656 657 658 659 660 661 662
                if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
                if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
            if (vstr) {
                if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
            } else {
                if (rioWriteBulkLongLong(r,vlong) == 0) return 0;
            }
            p = ziplistNext(zl,p);
663
            if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
664 665
            items--;
        }
H
Henry Rawas 已提交
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682
#ifdef _WIN32
    } else if (o->encoding == REDIS_ENCODING_LINKEDLIST ||
               o->encoding == REDIS_ENCODING_LINKEDLISTARRAY) {
        roListIter li;
        listNode *ln;

        if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
            list *list = o->ptr;
            roListRewind(list, NULL, &li);
        } else {
            cowListArray *ar = (cowListArray *)o->ptr;
            roListRewind(NULL, ar, &li);
        }
        cowUnlock();

        while((ln = roListNext(&li))) {
#else
683 684 685 686 687 688 689
    } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
        list *list = o->ptr;
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while((ln = listNext(&li))) {
H
Henry Rawas 已提交
690
#endif
691 692
            robj *eleobj = listNodeValue(ln);

693
            if (count == 0) {
694
                int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
H
Henry Rawas 已提交
695
                    REDIS_AOF_REWRITE_ITEMS_PER_CMD : (int)items;
696

697 698 699 700
                if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
                if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
701
            if (rioWriteBulkObject(r,eleobj) == 0) return 0;
702
            if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
703
            items--;
704 705
        }
    } else {
H
Henry Rawas 已提交
706 707 708
#ifdef _WIN32
        cowUnlock();
#endif
709 710 711 712 713
        redisPanic("Unknown list encoding");
    }
    return 1;
}

714 715 716 717 718 719 720 721 722
/* Emit the commands needed to rebuild a set object.
 * The function returns 0 on error, 1 on success. */
int rewriteSetObject(rio *r, robj *key, robj *o) {
    long long count = 0, items = setTypeSize(o);

    if (o->encoding == REDIS_ENCODING_INTSET) {
        int ii = 0;
        int64_t llval;

H
Henry Rawas 已提交
723 724 725
#ifdef _WIN32
        cowUnlock();
#endif
726 727
        while(intsetGet(o->ptr,ii++,&llval)) {
            if (count == 0) {
728
                int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
H
Henry Rawas 已提交
729
                    REDIS_AOF_REWRITE_ITEMS_PER_CMD : (int)items;
730 731 732 733 734 735

                if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
                if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
            if (rioWriteBulkLongLong(r,llval) == 0) return 0;
736
            if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
737 738
            items--;
        }
H
Henry Rawas 已提交
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
#ifdef _WIN32
    } else if (o->encoding == REDIS_ENCODING_HT ||
               o->encoding == REDIS_ENCODING_HTARRAY) {
        roDictIter *di;
        dictEntry *de;
        if (o->encoding == REDIS_ENCODING_HT) {
            di = roDictGetIterator(o->ptr, NULL);
        } else {
            cowDictArray *ar = (cowDictArray *)o->ptr;
            di = roDictGetIterator(NULL, ar);
        }
        cowUnlock();

        while((de = roDictNext(di)) != NULL) {
#else
754 755 756 757 758
    } else if (o->encoding == REDIS_ENCODING_HT) {
        dictIterator *di = dictGetIterator(o->ptr);
        dictEntry *de;

        while((de = dictNext(di)) != NULL) {
H
Henry Rawas 已提交
759
#endif
760 761
            robj *eleobj = dictGetKey(de);
            if (count == 0) {
762
                int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
H
Henry Rawas 已提交
763
                    REDIS_AOF_REWRITE_ITEMS_PER_CMD : (int)items;
764 765 766 767 768 769

                if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
                if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
            if (rioWriteBulkObject(r,eleobj) == 0) return 0;
770
            if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
771 772
            items--;
        }
H
Henry Rawas 已提交
773 774 775
#ifdef _WIN32
        roDictReleaseIterator(di);
#else
776
        dictReleaseIterator(di);
H
Henry Rawas 已提交
777
#endif
778
    } else {
H
Henry Rawas 已提交
779 780 781
#ifdef _WIN32
        cowUnlock();
#endif
782 783 784 785 786
        redisPanic("Unknown set encoding");
    }
    return 1;
}

787 788 789 790 791 792 793 794 795 796 797 798 799
/* Emit the commands needed to rebuild a sorted set object.
 * The function returns 0 on error, 1 on success. */
int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
    long long count = 0, items = zsetLength(o);

    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *zl = o->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vll;
        double score;

H
Henry Rawas 已提交
800 801 802
#ifdef _WIN32
        cowUnlock();
#endif
803 804 805 806 807 808 809 810 811 812
        eptr = ziplistIndex(zl,0);
        redisAssert(eptr != NULL);
        sptr = ziplistNext(zl,eptr);
        redisAssert(sptr != NULL);

        while (eptr != NULL) {
            redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
            score = zzlGetScore(sptr);

            if (count == 0) {
813
                int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
H
Henry Rawas 已提交
814
                    REDIS_AOF_REWRITE_ITEMS_PER_CMD : (int)items;
815 816 817 818 819 820 821 822 823 824 825 826

                if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
                if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
            if (rioWriteBulkDouble(r,score) == 0) return 0;
            if (vstr != NULL) {
                if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
            } else {
                if (rioWriteBulkLongLong(r,vll) == 0) return 0;
            }
            zzlNext(zl,&eptr,&sptr);
827
            if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
828 829
            items--;
        }
H
Henry Rawas 已提交
830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846
#ifdef _WIN32
    } else if (o->encoding == REDIS_ENCODING_SKIPLIST ||
               o->encoding == REDIS_ENCODING_HTZARRAY) {
        roZDictIter *di;
        dictEntry *de;

        if (o->encoding == REDIS_ENCODING_SKIPLIST) {
            zset *zs = o->ptr;
            di = roZDictGetIterator(zs->dict, NULL);
        } else {
            cowDictZArray *ar = (cowDictZArray *)o->ptr;
            di = roZDictGetIterator(NULL, ar);
        }

        cowUnlock();
        while((de = roZDictNext(di)) != NULL) {
#else
847 848 849 850 851 852
    } else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
        zset *zs = o->ptr;
        dictIterator *di = dictGetIterator(zs->dict);
        dictEntry *de;

        while((de = dictNext(di)) != NULL) {
H
Henry Rawas 已提交
853
#endif
854 855 856 857
            robj *eleobj = dictGetKey(de);
            double *score = dictGetVal(de);

            if (count == 0) {
858
                int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
H
Henry Rawas 已提交
859
                    REDIS_AOF_REWRITE_ITEMS_PER_CMD : (int)items;
860 861 862 863 864 865 866

                if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
                if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
            if (rioWriteBulkDouble(r,*score) == 0) return 0;
            if (rioWriteBulkObject(r,eleobj) == 0) return 0;
867
            if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
868 869
            items--;
        }
H
Henry Rawas 已提交
870 871 872
#ifdef _WIN32
        roZDictReleaseIterator(di);
#else
873
        dictReleaseIterator(di);
H
Henry Rawas 已提交
874
#endif
875
    } else {
H
Henry Rawas 已提交
876 877 878
#ifdef _WIN32
        cowUnlock();
#endif
879 880 881 882 883
        redisPanic("Unknown sorted zset encoding");
    }
    return 1;
}

884 885 886 887 888 889
/* Write either the key or the value of the currently selected item of an hash.
 * The 'hi' argument passes a valid Redis hash iterator.
 * The 'what' filed specifies if to write a key or a value and can be
 * either REDIS_HASH_KEY or REDIS_HASH_VALUE.
 *
 * The function returns 0 on error, non-zero on success. */
890 891 892 893 894 895 896 897
static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
    if (hi->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *vstr = NULL;
        unsigned int vlen = UINT_MAX;
        long long vll = LLONG_MAX;

        hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll);
        if (vstr) {
H
Henry Rawas 已提交
898
            return (int)rioWriteBulkString(r, (char*)vstr, vlen);
899
        } else {
H
Henry Rawas 已提交
900
            return (int)rioWriteBulkLongLong(r, vll);
901 902 903 904 905 906 907 908 909 910 911 912 913
        }

    } else if (hi->encoding == REDIS_ENCODING_HT) {
        robj *value;

        hashTypeCurrentFromHashTable(hi, what, &value);
        return rioWriteBulkObject(r, value);
    }

    redisPanic("Unknown hash encoding");
    return 0;
}

H
Henry Rawas 已提交
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965
#ifdef _WIN32
/* Wrap rioWriteHashIteratorCursor to handle the read only array if appropriate
 *
 * The function returns 0 on error, non-zero on success. */
static int rioWriteRoHashIteratorCursor(rio *r, roHashIter *rohi, int what) {
    int enc = roHashGetEncoding(rohi);
    if (enc == REDIS_ENCODING_HTARRAY) {
        robj *value;

        roHashGetCurrentFromArray(rohi, what, &value);
        return rioWriteBulkObject(r, value);
    } else {
        hashTypeIterator * hi = (hashTypeIterator *)roHashGetHashIter(rohi);
        return rioWriteHashIteratorCursor(r, hi, what);
    }

    return 0;
}

/* Emit the commands needed to rebuild a hash object.
 * The function returns 0 on error, 1 on success. */
int rewriteHashObject(rio *r, robj *key, robj *o) {
    roHashIter *rohi;
    long long count = 0, items = hashTypeLength(o);

    if (o->encoding == REDIS_ENCODING_HTARRAY) {
        cowDictArray *ar = (cowDictArray *)o->ptr;
        rohi = roHashGetIterator(NULL, ar);
    } else {
        rohi = roHashGetIterator(o, NULL);
    }
    while (roHashNext(rohi) != REDIS_ERR) {
        if (count == 0) {
            int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
                REDIS_AOF_REWRITE_ITEMS_PER_CMD : (int)items;

            if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
            if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
            if (rioWriteBulkObject(r,key) == 0) return 0;
        }

        if (rioWriteRoHashIteratorCursor(r, rohi, REDIS_HASH_KEY) == 0) return 0;
        if (rioWriteRoHashIteratorCursor(r, rohi, REDIS_HASH_VALUE) == 0) return 0;
        if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
        items--;
    }

    roHashReleaseIterator(rohi);

    return 1;
}
#else
A
antirez 已提交
966 967 968
/* Emit the commands needed to rebuild a hash object.
 * The function returns 0 on error, 1 on success. */
int rewriteHashObject(rio *r, robj *key, robj *o) {
969
    hashTypeIterator *hi;
A
antirez 已提交
970 971
    long long count = 0, items = hashTypeLength(o);

972 973 974 975 976
    hi = hashTypeInitIterator(o);
    while (hashTypeNext(hi) != REDIS_ERR) {
        if (count == 0) {
            int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
                REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
A
antirez 已提交
977

978 979 980
            if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
            if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
            if (rioWriteBulkObject(r,key) == 0) return 0;
A
antirez 已提交
981 982
        }

983 984 985 986 987
        if (rioWriteHashIteratorCursor(r, hi, REDIS_HASH_KEY) == 0) return 0;
        if (rioWriteHashIteratorCursor(r, hi, REDIS_HASH_VALUE) == 0) return 0;
        if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
        items--;
    }
A
antirez 已提交
988

989
    hashTypeReleaseIterator(hi);
A
antirez 已提交
990 991 992

    return 1;
}
H
Henry Rawas 已提交
993
#endif
A
antirez 已提交
994

995
/* Write a sequence of commands able to fully rebuild the dataset into
996 997 998 999
 * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
 *
 * In order to minimize the number of commands needed in the rewritten
 * log Redis uses variadic commands when possible, such as RPUSH, SADD
1000
 * and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
1001
 * are inserted using a single command. */
1002
int rewriteAppendOnlyFile(char *filename) {
H
Henry Rawas 已提交
1003 1004 1005
#ifdef _WIN32
    roDictIter *di = NULL;
#else
1006
    dictIterator *di = NULL;
H
Henry Rawas 已提交
1007
#endif
1008
    dictEntry *de;
P
Pieter Noordhuis 已提交
1009
    rio aof;
1010 1011 1012
    FILE *fp;
    char tmpfile[256];
    int j;
1013
    long long now = mstime();
H
Henry Rawas 已提交
1014
    dictIterator * expIter = NULL;
1015 1016 1017 1018

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
H
Henry Rawas 已提交
1019 1020 1021
#ifdef _WIN32
    fp = fopen(tmpfile,"wb");
#else
1022
    fp = fopen(tmpfile,"w");
H
Henry Rawas 已提交
1023
#endif
1024
    if (!fp) {
A
antirez 已提交
1025
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
1026 1027
        return REDIS_ERR;
    }
P
Pieter Noordhuis 已提交
1028

1029
    rioInitWithFile(&aof,fp);
1030 1031
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
1032 1033 1034
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
H
Henry Rawas 已提交
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
        dict *d;

#ifdef _WIN32
        cowLock();
        if (server.isBackgroundSaving == 1) {
            /* use background DB copy */
            db = server.cowSaveDb+j;
        }
        d = db->dict;
        if (dictSize(d) == 0) {
            cowUnlock();
            continue;
        }
        di = roDBGetIterator(j);
        /* to prevent rehash of expires from background thread, get safe iterator */
        expIter = dictGetSafeIterator(db->expires);
        cowUnlock();
#else
        d = db->dict;
1054
        if (dictSize(d) == 0) continue;
1055
        di = dictGetSafeIterator(d);
H
Henry Rawas 已提交
1056
#endif
1057 1058 1059 1060 1061 1062
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }

        /* SELECT the new DB */
P
Pieter Noordhuis 已提交
1063 1064
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
1065 1066

        /* Iterate this DB writing every entry */
H
Henry Rawas 已提交
1067 1068 1069
#ifdef _WIN32
        while((de = roDictNext(di)) != NULL) {
#else
1070
        while((de = dictNext(di)) != NULL) {
H
Henry Rawas 已提交
1071
#endif
A
antirez 已提交
1072
            sds keystr;
1073
            robj key, *o;
1074
            long long expiretime;
1075

1076 1077
            keystr = dictGetKey(de);
            o = dictGetVal(de);
1078
            initStaticStringObject(key,keystr);
H
Henry Rawas 已提交
1079 1080 1081
#ifdef _WIN32
            expiretime = getExpireForSave(db,&key);
#else
1082
            expiretime = getExpire(db,&key);
H
Henry Rawas 已提交
1083
#endif
1084

1085 1086 1087
            /* If this key is already expired skip it */
            if (expiretime != -1 && expiretime < now) continue;

H
Henry Rawas 已提交
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
#ifdef _WIN32
            cowLock();
            if (o->type == REDIS_LIST ||
                o->type == REDIS_SET ||
                o->type == REDIS_ZSET ||
                o->type == REDIS_HASH) {
                    o = (robj *)getRoConvertedObj(keystr, o);
            }
            cowUnlock();
#endif
1098 1099 1100 1101
            /* Save the key and associated value */
            if (o->type == REDIS_STRING) {
                /* Emit a SET command */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
P
Pieter Noordhuis 已提交
1102
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
1103
                /* Key and value */
P
Pieter Noordhuis 已提交
1104 1105
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(&aof,o) == 0) goto werr;
1106
            } else if (o->type == REDIS_LIST) {
1107
                if (rewriteListObject(&aof,&key,o) == 0) goto werr;
1108
            } else if (o->type == REDIS_SET) {
1109
                if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
1110
            } else if (o->type == REDIS_ZSET) {
1111
                if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
1112
            } else if (o->type == REDIS_HASH) {
A
antirez 已提交
1113
                if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
1114 1115 1116 1117 1118
            } else {
                redisPanic("Unknown object type");
            }
            /* Save the expire time */
            if (expiretime != -1) {
1119
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
P
Pieter Noordhuis 已提交
1120 1121
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
1122
                if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
1123 1124
            }
        }
H
Henry Rawas 已提交
1125 1126 1127 1128
#ifdef _WIN32
        if (expIter) dictReleaseIterator(expIter);
        roDictReleaseIterator(di);
#else
1129
        dictReleaseIterator(di);
H
Henry Rawas 已提交
1130
#endif
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
    }

    /* Make sure data will not remain on the OS's output buffers */
    fflush(fp);
    aof_fsync(fileno(fp));
    fclose(fp);

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
    return REDIS_OK;

werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
H
Henry Rawas 已提交
1152 1153 1154 1155
#ifdef _WIN32
    if (expIter) dictReleaseIterator(expIter);
    if (di) roDictReleaseIterator(di);
#else
1156
    if (di) dictReleaseIterator(di);
H
Henry Rawas 已提交
1157
#endif
1158 1159 1160 1161 1162 1163 1164 1165
    return REDIS_ERR;
}

/* This is how rewriting of the append only file in background works:
 *
 * 1) The user calls BGREWRITEAOF
 * 2) Redis calls this function, that forks():
 *    2a) the child rewrite the append only file in a temp file.
A
antirez 已提交
1166
 *    2b) the parent accumulates differences in server.aof_rewrite_buf.
1167 1168
 * 3) When the child finished '2a' exists.
 * 4) The parent will trap the exit code, if it's OK, will append the
A
antirez 已提交
1169
 *    data accumulated into server.aof_rewrite_buf into the temp file, and
1170 1171 1172
 *    finally will rename(2) the temp file in the actual file name.
 *    The the new file is reopened as the new append only file. Profit!
 */
H
Henry Rawas 已提交
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
#ifdef _WIN32
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    char tmpfile[256];

    if (server.aof_child_pid != -1) return REDIS_ERR;
    if (server.rdb_child_pid != -1) return REDIS_ERR;

    childpid = getpid();
    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", childpid);
    server.aof_rewrite_scheduled = 0;
    server.aof_child_pid = childpid;
    updateDictResizePolicy();
    server.aof_selected_db = -1;

    if (bkgdsave_start(tmpfile, rewriteAppendOnlyFile) == -1) {
        server.rdbbkgdfsave.background = 0;
        redisLog(REDIS_NOTICE,
            "Foreground append only file rewriting started by pid %d", childpid);

        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
            backgroundRewriteDoneHandler(0, 0);
            return REDIS_OK;
        } else {
            backgroundRewriteDoneHandler(0, 255);
            redisLog(REDIS_WARNING,
                "Can't rewrite append only file in background: spoon: %s",
                strerror(errno));
            return REDIS_ERR;
        }
    }
    return REDIS_OK; /* unreached */
}

#else
1208 1209
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
1210
    long long start;
1211

A
antirez 已提交
1212
    if (server.aof_child_pid != -1) return REDIS_ERR;
1213
    start = ustime();
1214 1215 1216
    if ((childpid = fork()) == 0) {
        char tmpfile[256];

1217
        /* Child */
1218 1219
        if (server.ipfd > 0) close(server.ipfd);
        if (server.sofd > 0) close(server.sofd);
1220 1221
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
1222 1223 1224 1225 1226 1227 1228
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                redisLog(REDIS_NOTICE,
                    "AOF rewrite: %lu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
1229
            exitFromChild(0);
1230
        } else {
1231
            exitFromChild(1);
1232 1233 1234
        }
    } else {
        /* Parent */
1235
        server.stat_fork_time = ustime()-start;
1236 1237 1238 1239 1240 1241 1242 1243
        if (childpid == -1) {
            redisLog(REDIS_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        redisLog(REDIS_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);
1244
        server.aof_rewrite_scheduled = 0;
1245
        server.aof_rewrite_time_start = time(NULL);
A
antirez 已提交
1246
        server.aof_child_pid = childpid;
1247 1248 1249
        updateDictResizePolicy();
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
A
antirez 已提交
1250
         * accumulated by the parent into server.aof_rewrite_buf will start
1251
         * with a SELECT statement and it will be safe to merge. */
A
antirez 已提交
1252
        server.aof_selected_db = -1;
1253 1254 1255 1256
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}
H
Henry Rawas 已提交
1257
#endif
1258 1259

void bgrewriteaofCommand(redisClient *c) {
A
antirez 已提交
1260
    if (server.aof_child_pid != -1) {
1261
        addReplyError(c,"Background append only file rewriting already in progress");
A
antirez 已提交
1262
    } else if (server.rdb_child_pid != -1) {
1263
        server.aof_rewrite_scheduled = 1;
1264
        addReplyStatus(c,"Background append only file rewriting scheduled");
1265
    } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
1266
        addReplyStatus(c,"Background append only file rewriting started");
1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
    } else {
        addReply(c,shared.err);
    }
}

void aofRemoveTempFile(pid_t childpid) {
    char tmpfile[256];

    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
    unlink(tmpfile);
}

1279
/* Update the server.aof_current_size filed explicitly using stat(2)
1280 1281
 * to check the size of the file. This is useful after a rewrite or after
 * a restart, normally the size is updated just adding the write length
A
antirez 已提交
1282
 * to the current length, that is much faster. */
1283 1284 1285
void aofUpdateCurrentSize(void) {
    struct redis_stat sb;

H
Henry Rawas 已提交
1286 1287 1288 1289 1290 1291
#ifdef _WIN32
    if (server.aof_fd == -1) {
        redisLog(REDIS_NOTICE,"Unable to check the AOF length: %s", "appendfd is -1");
        return;
    }
#endif
A
antirez 已提交
1292
    if (redis_fstat(server.aof_fd,&sb) == -1) {
A
antirez 已提交
1293
        redisLog(REDIS_WARNING,"Unable to obtain the AOF file length. stat: %s",
1294 1295
            strerror(errno));
    } else {
1296
        server.aof_current_size = sb.st_size;
1297 1298 1299
    }
}

1300 1301
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
 * Handle this. */
1302
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
1303
    if (!bysignal && exitcode == 0) {
1304
        int newfd, oldfd;
1305
        char tmpfile[256];
1306
        long long now = ustime();
H
Henry Rawas 已提交
1307 1308 1309
#ifdef _WIN32
        char tmpfile_old[256];
#endif
1310 1311

        redisLog(REDIS_NOTICE,
1312 1313
            "Background AOF rewrite terminated with success");

1314 1315 1316
        /* Flush the differences accumulated by the parent to the
         * rewritten AOF. */
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
A
antirez 已提交
1317
            (int)server.aof_child_pid);
H
Henry Rawas 已提交
1318 1319 1320
#ifdef _WIN32
        newfd = open(tmpfile,O_WRONLY|O_APPEND|O_CREAT|_O_BINARY,_S_IREAD|_S_IWRITE);
#else
1321
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
H
Henry Rawas 已提交
1322
#endif
1323 1324 1325
        if (newfd == -1) {
            redisLog(REDIS_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
1326 1327
            goto cleanup;
        }
1328

1329 1330 1331
        if (aofRewriteBufferWrite(newfd) == -1) {
            redisLog(REDIS_WARNING,
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
1332
            close(newfd);
1333 1334
            goto cleanup;
        }
1335 1336

        redisLog(REDIS_NOTICE,
1337
            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
1338 1339 1340

        /* The only remaining thing to do is to rename the temporary file to
         * the configured file and switch the file descriptor used to do AOF
1341 1342 1343 1344
         * writes. We don't want close(2) or rename(2) calls to block the
         * server on old file deletion.
         *
         * There are two possible scenarios:
1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
         *
         * 1) AOF is DISABLED and this was a one time rewrite. The temporary
         * file will be renamed to the configured file. When this file already
         * exists, it will be unlinked, which may block the server.
         *
         * 2) AOF is ENABLED and the rewritten AOF will immediately start
         * receiving writes. After the temporary file is renamed to the
         * configured file, the original AOF file descriptor will be closed.
         * Since this will be the last reference to that file, closing it
         * causes the underlying file to be unlinked, which may block the
         * server.
         *
         * To mitigate the blocking effect of the unlink operation (either
         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
1359
         * use a background thread to take care of this. First, we
1360 1361 1362 1363 1364 1365
         * make scenario 1 identical to scenario 2 by opening the target file
         * when it exists. The unlink operation after the rename(2) will then
         * be executed upon calling close(2) for its descriptor. Everything to
         * guarantee atomicity for this switch has already happened by then, so
         * we don't care what the outcome or duration of that close operation
         * is, as long as the file descriptor is released again. */
H
Henry Rawas 已提交
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403
#ifdef _WIN32
        oldfd = -1; /* We'll set this to the current AOF filedes later. */

        /* Close files before renaming */
        close(newfd);
        if (server.aof_fd != -1) close(server.aof_fd);
        /* now rename the existing file to allow new file to be renamed */
        snprintf(tmpfile_old,256,"temp-rewriteaof-old-%d.aof",
            (int)server.aof_child_pid);
        if (server.aof_fd != -1) {
            if (rename(server.aof_filename, tmpfile_old) == -1) {
                redisLog(REDIS_WARNING,
                    "Error trying to rename the existing AOF to old tempfile: %s", strerror(errno));
            }
        }
        if (rename(tmpfile,server.aof_filename) == -1) {
            redisLog(REDIS_WARNING,
                "Error trying to rename the temporary AOF: %s", strerror(errno));
            if (server.aof_fd != -1) {
                if (rename(tmpfile_old, server.aof_filename) == -1) {
                    redisLog(REDIS_WARNING,
                        "Error trying to rename the existing AOF from old tempfile: %s", strerror(errno));
                }
            }
            if (oldfd != -1) close(oldfd);
            goto cleanup;
        }
        /* now open the files again with new names */
        newfd = open(server.aof_filename, O_WRONLY|O_APPEND|_O_BINARY);
        if (newfd == -1) {
            /* Windows fix: More info */
            redisLog(REDIS_WARNING, "Not able to reopen the temporary AOF file after rename");
            goto cleanup;
        }
        if (server.aof_fd != -1) {
            server.aof_fd = open(tmpfile_old, O_WRONLY|O_APPEND|O_CREAT|_O_BINARY,0644);
        }
#else
A
antirez 已提交
1404
        if (server.aof_fd == -1) {
1405 1406
            /* AOF disabled */

1407 1408 1409
             /* Don't care if this fails: oldfd will be -1 and we handle that.
              * One notable case of -1 return is if the old file does
              * not exist. */
1410
             oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
1411 1412
        } else {
            /* AOF enabled */
1413
            oldfd = -1; /* We'll set this to the current AOF filedes later. */
1414 1415 1416 1417
        }

        /* Rename the temporary file. This will not unlink the target file if
         * it exists, because we reference it with "oldfd". */
1418
        if (rename(tmpfile,server.aof_filename) == -1) {
1419
            redisLog(REDIS_WARNING,
A
antirez 已提交
1420
                "Error trying to rename the temporary AOF file: %s", strerror(errno));
1421
            close(newfd);
1422
            if (oldfd != -1) close(oldfd);
1423 1424
            goto cleanup;
        }
H
Henry Rawas 已提交
1425
#endif
1426

A
antirez 已提交
1427
        if (server.aof_fd == -1) {
1428 1429
            /* AOF disabled, we don't need to set the AOF file descriptor
             * to this new file, so we can close it. */
1430 1431
            close(newfd);
        } else {
1432
            /* AOF enabled, replace the old fd with the new one. */
A
antirez 已提交
1433 1434
            oldfd = server.aof_fd;
            server.aof_fd = newfd;
1435
            if (server.aof_fsync == AOF_FSYNC_ALWAYS)
1436
                aof_fsync(newfd);
1437
            else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
1438
                aof_background_fsync(newfd);
A
antirez 已提交
1439
            server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
1440
            aofUpdateCurrentSize();
1441
            server.aof_rewrite_base_size = server.aof_current_size;
1442 1443 1444

            /* Clear regular AOF buffer since its contents was just written to
             * the new AOF from the background rewrite buffer. */
A
antirez 已提交
1445 1446
            sdsfree(server.aof_buf);
            server.aof_buf = sdsempty();
1447
        }
1448

1449 1450
        server.aof_lastbgrewrite_status = REDIS_OK;

A
antirez 已提交
1451
        redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully");
1452 1453 1454
        /* Change state from WAIT_REWRITE to ON if needed */
        if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
            server.aof_state = REDIS_AOF_ON;
1455 1456

        /* Asynchronously close the overwritten AOF. */
H
Henry Rawas 已提交
1457 1458 1459
#ifdef _WIN32
        if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(size_t)oldfd,NULL,NULL);
#else
1460
        if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
H
Henry Rawas 已提交
1461
#endif
1462 1463 1464

        redisLog(REDIS_VERBOSE,
            "Background AOF rewrite signal handler took %lldus", ustime()-now);
1465
    } else if (!bysignal && exitcode != 0) {
1466 1467
        server.aof_lastbgrewrite_status = REDIS_ERR;

1468 1469
        redisLog(REDIS_WARNING,
            "Background AOF rewrite terminated with error");
1470
    } else {
1471 1472
        server.aof_lastbgrewrite_status = REDIS_ERR;

1473
        redisLog(REDIS_WARNING,
1474
            "Background AOF rewrite terminated by signal %d", bysignal);
1475
    }
1476

1477
cleanup:
1478
    aofRewriteBufferReset();
A
antirez 已提交
1479
    aofRemoveTempFile(server.aof_child_pid);
H
Henry Rawas 已提交
1480 1481 1482 1483 1484
#ifdef _WIN32
    server.rdbbkgdfsave.state = BKSAVE_IDLE;
    /* turn off copy on write */
    cowBkgdSaveStop();
#endif
A
antirez 已提交
1485
    server.aof_child_pid = -1;
1486 1487
    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
    server.aof_rewrite_time_start = -1;
1488 1489
    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
    if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
1490
        server.aof_rewrite_scheduled = 1;
1491
}