aof.c 63.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

30
#include "server.h"
31
#include "bio.h"
A
antirez 已提交
32
#include "rio.h"
33 34 35 36

#include <signal.h>
#include <fcntl.h>
#include <sys/stat.h>
37 38 39 40
#include <sys/types.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
41
#include <sys/param.h>
42

43
void aofUpdateCurrentSize(void);
44
void aofClosePipes(void);
45

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
/* ----------------------------------------------------------------------------
 * AOF rewrite buffer implementation.
 *
 * The following code implement a simple buffer used in order to accumulate
 * changes while the background process is rewriting the AOF file.
 *
 * We only need to append, but can't just use realloc with a large block
 * because 'huge' reallocs are not always handled as one could expect
 * (via remapping of pages at OS level) but may involve copying data.
 *
 * For this reason we use a list of blocks, every block is
 * AOF_RW_BUF_BLOCK_SIZE bytes.
 * ------------------------------------------------------------------------- */

#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */

typedef struct aofrwblock {
    unsigned long used, free;
    char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;

/* This function free the old AOF rewrite buffer if needed, and initialize
 * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
 * so can be used for the first initialization as well. */
void aofRewriteBufferReset(void) {
    if (server.aof_rewrite_buf_blocks)
        listRelease(server.aof_rewrite_buf_blocks);

    server.aof_rewrite_buf_blocks = listCreate();
    listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
}

78
/* Return the current size of the AOF rewrite buffer. */
79
unsigned long aofRewriteBufferSize(void) {
80 81 82
    listNode *ln;
    listIter li;
    unsigned long size = 0;
83

84 85 86 87 88
    listRewind(server.aof_rewrite_buf_blocks,&li);
    while((ln = listNext(&li))) {
        aofrwblock *block = listNodeValue(ln);
        size += block->used;
    }
89 90 91
    return size;
}

92 93 94 95
/* Event handler used to send data to the child process doing the AOF
 * rewrite. We send pieces of our AOF differences buffer so that the final
 * write when the child finishes the rewrite will be small. */
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
96 97
    listNode *ln;
    aofrwblock *block;
98
    ssize_t nwritten;
A
antirez 已提交
99 100 101 102
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);
103

104 105 106 107 108 109 110 111
    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
112 113 114 115 116 117
        if (block->used > 0) {
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
118
            block->free += nwritten;
119
        }
120
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
121 122 123
    }
}

124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
        /* If we already got at least an allocated block, try appending
         * at least some piece into it. */
        if (block) {
            unsigned long thislen = (block->free < len) ? block->free : len;
            if (thislen) {  /* The current block is not already full. */
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }

        if (len) { /* First block to allocate, or need another block. */
            int numblocks;

            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            /* Log every time we cross more 10 or 100 blocks, respectively
             * as a notice or warning. */
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
A
antirez 已提交
155 156
                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                                         LL_NOTICE;
A
antirez 已提交
157
                serverLog(level,"Background AOF buffer size: %lu MB",
158 159 160 161
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }
162 163 164 165 166 167 168

    /* Install a file event to send data to the rewrite child if there is
     * not one already. */
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
169 170 171
}

/* Write the buffer (possibly composed of multiple blocks) into the specified
172
 * fd. If a short write or any other error happens -1 is returned,
173 174 175 176 177 178 179 180 181 182 183 184 185
 * otherwise the number of bytes written is returned. */
ssize_t aofRewriteBufferWrite(int fd) {
    listNode *ln;
    listIter li;
    ssize_t count = 0;

    listRewind(server.aof_rewrite_buf_blocks,&li);
    while((ln = listNext(&li))) {
        aofrwblock *block = listNodeValue(ln);
        ssize_t nwritten;

        if (block->used) {
            nwritten = write(fd,block->buf,block->used);
186
            if (nwritten != (ssize_t)block->used) {
187 188 189 190 191 192 193 194 195
                if (nwritten == 0) errno = EIO;
                return -1;
            }
            count += nwritten;
        }
    }
    return count;
}

196 197 198 199 200 201
/* ----------------------------------------------------------------------------
 * AOF file implementation
 * ------------------------------------------------------------------------- */

/* Starts a background task that performs fsync() against the specified
 * file descriptor (the one of the AOF file) in another thread. */
202
void aof_background_fsync(int fd) {
A
antirez 已提交
203
    bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
204 205
}

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
/* Kills an AOFRW child process if exists */
static void killAppendOnlyChild(void) {
    int statloc;
    /* No AOFRW child? return. */
    if (server.aof_child_pid == -1) return;
    /* Kill AOFRW child, wait for child exit. */
    serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
        (long) server.aof_child_pid);
    if (kill(server.aof_child_pid,SIGUSR1) != -1) {
        while(wait3(&statloc,0,NULL) != server.aof_child_pid);
    }
    /* Reset the buffer accumulating changes while the child saves. */
    aofRewriteBufferReset();
    aofRemoveTempFile(server.aof_child_pid);
    server.aof_child_pid = -1;
    server.aof_rewrite_time_start = -1;
    /* Close pipes used for IPC between the two processes. */
    aofClosePipes();
}

226 227 228
/* Called when the user switches from "appendonly yes" to "appendonly no"
 * at runtime using the CONFIG command. */
void stopAppendOnly(void) {
A
antirez 已提交
229
    serverAssert(server.aof_state != AOF_OFF);
230
    flushAppendOnlyFile(1);
A
antirez 已提交
231 232
    aof_fsync(server.aof_fd);
    close(server.aof_fd);
233

A
antirez 已提交
234 235
    server.aof_fd = -1;
    server.aof_selected_db = -1;
A
antirez 已提交
236
    server.aof_state = AOF_OFF;
237
    killAppendOnlyChild();
238 239 240 241 242
}

/* Called when the user switches from "appendonly no" to "appendonly yes"
 * at runtime using the CONFIG command. */
int startAppendOnly(void) {
243
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
244
    int newfd;
245

246
    newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
A
antirez 已提交
247
    serverAssert(server.aof_state == AOF_OFF);
248
    if (newfd == -1) {
249 250 251 252 253 254 255 256
        char *cwdp = getcwd(cwd,MAXPATHLEN);

        serverLog(LL_WARNING,
            "Redis needs to enable the AOF but can't open the "
            "append only file %s (in server root dir %s): %s",
            server.aof_filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
257
        return C_ERR;
258
    }
259 260 261
    if (server.rdb_child_pid != -1) {
        server.aof_rewrite_scheduled = 1;
        serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible.");
262
    } else {
263 264 265
        /* If there is a pending AOF rewrite, we need to switch it off and
         * start a new one: the old one cannot be reused becuase it is not
         * accumulating the AOF buffer. */
266 267 268 269 270 271 272 273 274
        if (server.aof_child_pid != -1) {
            serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
            killAppendOnlyChild();
        }
        if (rewriteAppendOnlyFileBackground() == C_ERR) {
            close(newfd);
            serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
            return C_ERR;
        }
275
    }
276
    /* We correctly switched on AOF, now wait for the rewrite to be complete
277
     * in order to append data on disk. */
A
antirez 已提交
278
    server.aof_state = AOF_WAIT_REWRITE;
279 280
    server.aof_last_fsync = server.unixtime;
    server.aof_fd = newfd;
281
    return C_OK;
282 283
}

284 285 286 287 288 289 290 291
/* This is a wrapper to the write syscall in order to retry on short writes
 * or if the syscall gets interrupted. It could look strange that we retry
 * on short writes given that we are writing to a block device: normally if
 * the first call is short, there is a end-of-space condition, so the next
 * is likely to fail. However apparently in modern systems this is no longer
 * true, and in general it looks just more resilient to retry the write. If
 * there is an actual error condition we'll get it at the next try. */
ssize_t aofWrite(int fd, const char *buf, size_t len) {
Z
zhaozhao.zz 已提交
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
    ssize_t nwritten = 0, totwritten = 0;

    while(len) {
        nwritten = write(fd, buf, len);

        if (nwritten < 0) {
            if (errno == EINTR) {
                continue;
            }
            return totwritten ? totwritten : -1;
        }

        len -= nwritten;
        buf += nwritten;
        totwritten += nwritten;
    }

    return totwritten;
}

312 313 314 315 316 317
/* Write the append only file buffer on disk.
 *
 * Since we are required to write the AOF before replying to the client,
 * and the only way the client socket can get a write is entering when the
 * the event loop, we accumulate all the AOF writes in a memory
 * buffer and write it on disk using this function just before entering
318 319 320 321 322 323 324 325 326 327 328 329
 * the event loop again.
 *
 * About the 'force' argument:
 *
 * When the fsync policy is set to 'everysec' we may delay the flush if there
 * is still an fsync() going on in the background thread, since for instance
 * on Linux write(2) will be blocked by the background fsync anyway.
 * When this happens we remember that there is some aof buffer to be
 * flushed ASAP, and will try to do that in the serverCron() function.
 *
 * However if force is set to 1 we'll write regardless of the background
 * fsync. */
330
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
331
void flushAppendOnlyFile(int force) {
332
    ssize_t nwritten;
333
    int sync_in_progress = 0;
334
    mstime_t latency;
335

A
antirez 已提交
336
    if (sdslen(server.aof_buf) == 0) return;
337

338
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
A
antirez 已提交
339
        sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
340

341
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
342 343 344 345 346
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
J
Jan-Erik Rediger 已提交
347
                /* No previous write postponing, remember that we are
348 349 350 351
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
A
antirez 已提交
352
                /* We were already waiting for fsync to finish, but for less
353 354 355 356 357
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
358
            server.aof_delayed_fsync++;
A
antirez 已提交
359
            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
360 361
        }
    }
362 363 364 365 366
    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike */
367

368
    latencyStartMonitor(latency);
369
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
370
    latencyEndMonitor(latency);
371 372 373 374 375
    /* We want to capture different events for delayed writes:
     * when the delay happens with a pending fsync, or with a saving child
     * active, and when the above two conditions are missing.
     * We also use an additional event name to save all samples which is
     * useful for graphing / monitoring purposes. */
376
    if (sync_in_progress) {
377 378 379 380 381 382
        latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
    } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
        latencyAddSampleIfNeeded("aof-write-active-child",latency);
    } else {
        latencyAddSampleIfNeeded("aof-write-alone",latency);
    }
383 384
    latencyAddSampleIfNeeded("aof-write",latency);

385 386 387
    /* We performed the write so reset the postponed flush sentinel to zero. */
    server.aof_flush_postponed_start = 0;

Z
zhaozhao.zz 已提交
388
    if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
389 390 391 392 393 394 395 396 397
        static time_t last_write_error_log = 0;
        int can_log = 0;

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

J
Jan-Erik Rediger 已提交
398
        /* Log the AOF write error and record the error code. */
P
Pieter Noordhuis 已提交
399
        if (nwritten == -1) {
400
            if (can_log) {
A
antirez 已提交
401
                serverLog(LL_WARNING,"Error writing to the AOF file: %s",
402 403 404
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
P
Pieter Noordhuis 已提交
405
        } else {
406
            if (can_log) {
A
antirez 已提交
407
                serverLog(LL_WARNING,"Short write while writing to "
408 409 410 411 412
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }
413 414

            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
415
                if (can_log) {
A
antirez 已提交
416
                    serverLog(LL_WARNING, "Could not remove short write "
417 418 419 420 421
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
J
Jan-Erik Rediger 已提交
422
                /* If the ftruncate() succeeded we can set nwritten to
423 424
                 * -1 since there is no longer partial data into the AOF. */
                nwritten = -1;
425
            }
426 427 428 429 430 431 432 433
            server.aof_last_write_errno = ENOSPC;
        }

        /* Handle the AOF write error. */
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* We can't recover when the fsync policy is ALWAYS since the
             * reply for the client is already in the output buffers, and we
             * have the contract with the user that on acknowledged write data
J
Jan-Erik Rediger 已提交
434
             * is synced on disk. */
A
antirez 已提交
435
            serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
436 437 438 439 440
            exit(1);
        } else {
            /* Recover from failed write leaving data into the buffer. However
             * set an error to stop accepting writes as long as the error
             * condition is not cleared. */
441
            server.aof_last_write_status = C_ERR;
442 443 444 445 446 447 448 449 450 451 452 453

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return; /* We'll try again on the next call... */
        }
    } else {
        /* Successful write(2). If AOF was in error state, restore the
         * OK state and log the event. */
454
        if (server.aof_last_write_status == C_ERR) {
A
antirez 已提交
455
            serverLog(LL_WARNING,
456
                "AOF write error looks solved, Redis can write again.");
457
            server.aof_last_write_status = C_OK;
P
Pieter Noordhuis 已提交
458
        }
459
    }
460
    server.aof_current_size += nwritten;
461

462 463
    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
A
antirez 已提交
464 465
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
466
    } else {
A
antirez 已提交
467 468
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
469 470
    }

471 472
    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
473
    if (server.aof_no_fsync_on_rewrite &&
A
antirez 已提交
474
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
475
            return;
476 477

    /* Perform the fsync if needed. */
478
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
479 480
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
481
        latencyStartMonitor(latency);
A
antirez 已提交
482
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
483 484
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
A
antirez 已提交
485
        server.aof_last_fsync = server.unixtime;
486
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
A
antirez 已提交
487 488 489
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
490 491 492
    }
}

493 494 495 496 497 498 499 500 501 502 503
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;

    buf[0] = '*';
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

504
    for (j = 0; j < argc; j++) {
505 506 507 508 509 510 511 512
        o = getDecodedObject(argv[j]);
        buf[0] = '$';
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        dst = sdscatlen(dst,buf,len);
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        dst = sdscatlen(dst,"\r\n",2);
513 514
        decrRefCount(o);
    }
515
    return dst;
516 517
}

518 519 520 521 522 523 524 525 526
/* Create the sds representation of an PEXPIREAT command, using
 * 'seconds' as time to live and 'cmd' to understand what command
 * we are translating into a PEXPIREAT.
 *
 * This command is used in order to translate EXPIRE and PEXPIRE commands
 * into PEXPIREAT command so that we retain precision in the append only
 * file, and the time is always absolute and not relative. */
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
    long long when;
527 528
    robj *argv[3];

J
Jan-Erik Rediger 已提交
529
    /* Make sure we can use strtoll */
530
    seconds = getDecodedObject(seconds);
531 532 533 534 535 536 537 538 539 540 541 542 543
    when = strtoll(seconds->ptr,NULL,10);
    /* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
    if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
        cmd->proc == expireatCommand)
    {
        when *= 1000;
    }
    /* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == setexCommand || cmd->proc == psetexCommand)
    {
        when += mstime();
    }
544 545
    decrRefCount(seconds);

546
    argv[0] = createStringObject("PEXPIREAT",9);
547
    argv[1] = key;
548 549
    argv[2] = createStringObjectFromLongLong(when);
    buf = catAppendOnlyGenericCommand(buf, 3, argv);
550 551 552 553 554 555
    decrRefCount(argv[0]);
    decrRefCount(argv[2]);
    return buf;
}

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
556
    sds buf = sdsempty();
557 558
    robj *tmpargv[3];

G
guiquanz 已提交
559
    /* The DB this command was targeting is not the same as the last command
J
Jan-Erik Rediger 已提交
560
     * we appended. To issue a SELECT command is needed. */
A
antirez 已提交
561
    if (dictid != server.aof_selected_db) {
562 563 564 565 566
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
A
antirez 已提交
567
        server.aof_selected_db = dictid;
568 569
    }

570 571 572 573 574 575
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
576 577 578 579 580
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
581
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
582
    } else if (cmd->proc == setCommand && argc > 3) {
583 584 585 586 587
        int i;
        robj *exarg = NULL, *pxarg = NULL;
        /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
        buf = catAppendOnlyGenericCommand(buf,3,argv);
        for (i = 3; i < argc; i ++) {
588 589
            if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
            if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
590 591 592
        }
        serverAssert(!(exarg && pxarg));
        if (exarg)
593 594
            buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                               exarg);
595
        if (pxarg)
596 597
            buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                               pxarg);
598
    } else {
599 600 601
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
602 603 604 605 606
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
607
     * positive reply about the operation performed. */
A
antirez 已提交
608
    if (server.aof_state == AOF_ON)
A
antirez 已提交
609
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
610 611 612 613 614

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
A
antirez 已提交
615
    if (server.aof_child_pid != -1)
616
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
617 618 619 620

    sdsfree(buf);
}

621 622 623 624
/* ----------------------------------------------------------------------------
 * AOF loading
 * ------------------------------------------------------------------------- */

625 626
/* In Redis commands are always executed in the context of a client, so in
 * order to load the append only file we need to create a fake client. */
627 628
struct client *createFakeClient(void) {
    struct client *c = zmalloc(sizeof(*c));
629 630 631

    selectDb(c,0);
    c->fd = -1;
632
    c->name = NULL;
633
    c->querybuf = sdsempty();
634
    c->querybuf_peak = 0;
635 636
    c->argc = 0;
    c->argv = NULL;
637
    c->bufpos = 0;
638
    c->flags = 0;
A
antirez 已提交
639
    c->btype = BLOCKED_NONE;
640 641
    /* We set the fake client as a slave waiting for the synchronization
     * so that Redis will not try to send replies to this client. */
A
antirez 已提交
642
    c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
643
    c->reply = listCreate();
644
    c->reply_bytes = 0;
645
    c->obuf_soft_limit_reached_time = 0;
646
    c->watched_keys = listCreate();
647
    c->peerid = NULL;
648
    listSetFreeMethod(c->reply,decrRefCountVoid);
649 650 651 652 653
    listSetDupMethod(c->reply,dupClientReplyValue);
    initClientMultiState(c);
    return c;
}

654
void freeFakeClientArgv(struct client *c) {
655 656 657 658 659 660 661
    int j;

    for (j = 0; j < c->argc; j++)
        decrRefCount(c->argv[j]);
    zfree(c->argv);
}

662
void freeFakeClient(struct client *c) {
663 664
    sdsfree(c->querybuf);
    listRelease(c->reply);
665
    listRelease(c->watched_keys);
666 667 668 669
    freeClientMultiState(c);
    zfree(c);
}

670 671
/* Replay the append log file. On success C_OK is returned. On non fatal
 * error (the append only file is zero-length) C_ERR is returned. On
672 673
 * fatal error an error message is logged and the program exists. */
int loadAppendOnlyFile(char *filename) {
674
    struct client *fakeClient;
675 676
    FILE *fp = fopen(filename,"r");
    struct redis_stat sb;
677
    int old_aof_state = server.aof_state;
678
    long loops = 0;
679
    off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
680
    off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */
681

682 683 684 685 686 687 688 689 690
    if (fp == NULL) {
        serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
        exit(1);
    }

    /* Handle a zero-length AOF file as a special case. An emtpy AOF file
     * is a valid AOF because an empty server with AOF enabled will create
     * a zero length file at startup, that will remain like that if no write
     * operation is received. */
A
antirez 已提交
691
    if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
692
        server.aof_current_size = 0;
A
antirez 已提交
693
        fclose(fp);
694
        return C_ERR;
A
antirez 已提交
695
    }
696 697 698

    /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
     * to the same file we're about to read. */
A
antirez 已提交
699
    server.aof_state = AOF_OFF;
700 701

    fakeClient = createFakeClient();
702 703
    startLoading(fp);

704 705 706 707 708 709 710 711 712 713 714 715 716
    /* Check if this AOF file has an RDB preamble. In that case we need to
     * load the RDB file and later continue loading the AOF tail. */
    char sig[5]; /* "REDIS" */
    if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
        /* No RDB preamble, seek back at 0 offset. */
        if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
    } else {
        /* RDB preamble. Pass loading the RDB functions. */
        rio rdb;

        serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
        if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
        rioInitWithFile(&rdb,fp);
717
        if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
718 719 720 721 722 723 724 725
            serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
            goto readerr;
        } else {
            serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
        }
    }

    /* Read the actual AOF file, in REPL format, command by command. */
726 727 728 729 730 731 732 733
    while(1) {
        int argc, j;
        unsigned long len;
        robj **argv;
        char buf[128];
        sds argsds;
        struct redisCommand *cmd;

734 735 736
        /* Serve the clients from time to time */
        if (!(loops++ % 1000)) {
            loadingProgress(ftello(fp));
737
            processEventsWhileBlocked();
738 739
        }

740 741 742 743 744 745 746
        if (fgets(buf,sizeof(buf),fp) == NULL) {
            if (feof(fp))
                break;
            else
                goto readerr;
        }
        if (buf[0] != '*') goto fmterr;
A
antirez 已提交
747
        if (buf[1] == '\0') goto readerr;
748
        argc = atoi(buf+1);
749 750
        if (argc < 1) goto fmterr;

751
        argv = zmalloc(sizeof(robj*)*argc);
752 753 754
        fakeClient->argc = argc;
        fakeClient->argv = argv;

755
        for (j = 0; j < argc; j++) {
756 757 758 759 760
            if (fgets(buf,sizeof(buf),fp) == NULL) {
                fakeClient->argc = j; /* Free up to j-1. */
                freeFakeClientArgv(fakeClient);
                goto readerr;
            }
761 762 763
            if (buf[0] != '$') goto fmterr;
            len = strtol(buf+1,NULL,10);
            argsds = sdsnewlen(NULL,len);
764 765 766 767 768 769
            if (len && fread(argsds,len,1,fp) == 0) {
                sdsfree(argsds);
                fakeClient->argc = j; /* Free up to j-1. */
                freeFakeClientArgv(fakeClient);
                goto readerr;
            }
770
            argv[j] = createObject(OBJ_STRING,argsds);
771 772 773 774 775
            if (fread(buf,2,1,fp) == 0) {
                fakeClient->argc = j+1; /* Free up to j. */
                freeFakeClientArgv(fakeClient);
                goto readerr; /* discard CRLF */
            }
776 777 778 779 780
        }

        /* Command lookup */
        cmd = lookupCommand(argv[0]->ptr);
        if (!cmd) {
A
antirez 已提交
781
            serverLog(LL_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);
782 783
            exit(1);
        }
784

785 786
        if (cmd == server.multiCommand) valid_before_multi = valid_up_to;

787
        /* Run the command in the context of a fake client */
788
        fakeClient->cmd = cmd;
789 790 791 792 793
        if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) {
            queueMultiCommand(fakeClient);
        } else {
            cmd->proc(fakeClient);
        }
794 795

        /* The fake client should not have a reply */
A
antirez 已提交
796
        serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
797
        /* The fake client should never get blocked */
A
antirez 已提交
798
        serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);
799

800 801
        /* Clean up. Command code may have changed argv/argc so we use the
         * argv/argc of the client instead of the local variables. */
802
        freeFakeClientArgv(fakeClient);
803
        fakeClient->cmd = NULL;
804
        if (server.aof_load_truncated) valid_up_to = ftello(fp);
805 806 807
    }

    /* This point can only be reached when EOF is reached without errors.
808 809 810
     * If the client is in the middle of a MULTI/EXEC, handle it as it was
     * a short read, even if technically the protocol is correct: we want
     * to remove the unprocessed tail and continue. */
811 812 813 814 815
    if (fakeClient->flags & CLIENT_MULTI) {
        serverLog(LL_WARNING,"!!! Warning: we lost EXEC in the middle of transaction, discard !!!");
        valid_up_to = valid_before_multi;
        goto uxeof;
    }
816

817
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
818 819
    fclose(fp);
    freeFakeClient(fakeClient);
820
    server.aof_state = old_aof_state;
821
    stopLoading();
822
    aofUpdateCurrentSize();
823
    server.aof_rewrite_base_size = server.aof_current_size;
824
    return C_OK;
825

826 827
readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
    if (!feof(fp)) {
O
Oran Agra 已提交
828
        if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
A
antirez 已提交
829
        serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
830
        exit(1);
831
    }
832 833

uxeof: /* Unexpected AOF end of file. */
A
antirez 已提交
834
    if (server.aof_load_truncated) {
A
antirez 已提交
835 836
        serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");
        serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",
837 838 839
            (unsigned long long) valid_up_to);
        if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
            if (valid_up_to == -1) {
A
antirez 已提交
840
                serverLog(LL_WARNING,"Last valid command offset is invalid");
841
            } else {
A
antirez 已提交
842
                serverLog(LL_WARNING,"Error truncating the AOF file: %s",
843 844 845
                    strerror(errno));
            }
        } else {
846 847 848
            /* Make sure the AOF file descriptor points to the end of the
             * file after the truncate call. */
            if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
A
antirez 已提交
849
                serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",
850 851
                    strerror(errno));
            } else {
A
antirez 已提交
852
                serverLog(LL_WARNING,
853 854 855
                    "AOF loaded anyway because aof-load-truncated is enabled");
                goto loaded_ok;
            }
856
        }
A
antirez 已提交
857
    }
O
Oran Agra 已提交
858
    if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
A
antirez 已提交
859
    serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
860
    exit(1);
861 862

fmterr: /* Format error. */
O
Oran Agra 已提交
863
    if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
A
antirez 已提交
864
    serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
865 866 867
    exit(1);
}

868 869 870 871
/* ----------------------------------------------------------------------------
 * AOF rewrite
 * ------------------------------------------------------------------------- */

P
Pieter Noordhuis 已提交
872
/* Delegate writing an object to writing a bulk string or bulk long long.
O
Oran Agra 已提交
873
 * This is not placed in rio.c since that adds the server.h dependency. */
P
Pieter Noordhuis 已提交
874 875 876
int rioWriteBulkObject(rio *r, robj *obj) {
    /* Avoid using getDecodedObject to help copy-on-write (we are often
     * in a child process when this function is called). */
877
    if (obj->encoding == OBJ_ENCODING_INT) {
P
Pieter Noordhuis 已提交
878
        return rioWriteBulkLongLong(r,(long)obj->ptr);
879
    } else if (sdsEncodedObject(obj)) {
P
Pieter Noordhuis 已提交
880 881
        return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
    } else {
A
antirez 已提交
882
        serverPanic("Unknown string encoding");
P
Pieter Noordhuis 已提交
883 884 885
    }
}

886 887 888 889 890
/* Emit the commands needed to rebuild a list object.
 * The function returns 0 on error, 1 on success. */
int rewriteListObject(rio *r, robj *key, robj *o) {
    long long count = 0, items = listTypeLength(o);

891
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
892 893 894 895 896
        quicklist *list = o->ptr;
        quicklistIter *li = quicklistGetIterator(list, AL_START_HEAD);
        quicklistEntry entry;

        while (quicklistNext(li,&entry)) {
897
            if (count == 0) {
A
antirez 已提交
898 899
                int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
                    AOF_REWRITE_ITEMS_PER_CMD : items;
900 901 902 903 904
                if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
                if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }

M
Matt Stancliff 已提交
905 906 907 908
            if (entry.value) {
                if (rioWriteBulkString(r,(char*)entry.value,entry.sz) == 0) return 0;
            } else {
                if (rioWriteBulkLongLong(r,entry.longval) == 0) return 0;
909
            }
A
antirez 已提交
910
            if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
911
            items--;
912
        }
M
Matt Stancliff 已提交
913
        quicklistReleaseIterator(li);
914
    } else {
A
antirez 已提交
915
        serverPanic("Unknown list encoding");
916 917 918 919
    }
    return 1;
}

920 921 922 923 924
/* Emit the commands needed to rebuild a set object.
 * The function returns 0 on error, 1 on success. */
int rewriteSetObject(rio *r, robj *key, robj *o) {
    long long count = 0, items = setTypeSize(o);

925
    if (o->encoding == OBJ_ENCODING_INTSET) {
926 927 928 929 930
        int ii = 0;
        int64_t llval;

        while(intsetGet(o->ptr,ii++,&llval)) {
            if (count == 0) {
A
antirez 已提交
931 932
                int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
                    AOF_REWRITE_ITEMS_PER_CMD : items;
933 934 935 936 937 938

                if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
                if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
            if (rioWriteBulkLongLong(r,llval) == 0) return 0;
A
antirez 已提交
939
            if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
940 941
            items--;
        }
942
    } else if (o->encoding == OBJ_ENCODING_HT) {
943 944 945 946
        dictIterator *di = dictGetIterator(o->ptr);
        dictEntry *de;

        while((de = dictNext(di)) != NULL) {
947
            sds ele = dictGetKey(de);
948
            if (count == 0) {
A
antirez 已提交
949 950
                int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
                    AOF_REWRITE_ITEMS_PER_CMD : items;
951 952 953 954 955

                if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
                if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
956
            if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
A
antirez 已提交
957
            if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
958 959 960 961
            items--;
        }
        dictReleaseIterator(di);
    } else {
A
antirez 已提交
962
        serverPanic("Unknown set encoding");
963 964 965 966
    }
    return 1;
}

967 968 969 970 971
/* Emit the commands needed to rebuild a sorted set object.
 * The function returns 0 on error, 1 on success. */
int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
    long long count = 0, items = zsetLength(o);

972
    if (o->encoding == OBJ_ENCODING_ZIPLIST) {
973 974 975 976 977 978 979 980
        unsigned char *zl = o->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vll;
        double score;

        eptr = ziplistIndex(zl,0);
A
antirez 已提交
981
        serverAssert(eptr != NULL);
982
        sptr = ziplistNext(zl,eptr);
A
antirez 已提交
983
        serverAssert(sptr != NULL);
984 985

        while (eptr != NULL) {
A
antirez 已提交
986
            serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
987 988 989
            score = zzlGetScore(sptr);

            if (count == 0) {
A
antirez 已提交
990 991
                int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
                    AOF_REWRITE_ITEMS_PER_CMD : items;
992 993 994 995 996 997 998 999 1000 1001 1002 1003

                if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
                if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
            if (rioWriteBulkDouble(r,score) == 0) return 0;
            if (vstr != NULL) {
                if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
            } else {
                if (rioWriteBulkLongLong(r,vll) == 0) return 0;
            }
            zzlNext(zl,&eptr,&sptr);
A
antirez 已提交
1004
            if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1005 1006
            items--;
        }
1007
    } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
1008 1009 1010 1011 1012
        zset *zs = o->ptr;
        dictIterator *di = dictGetIterator(zs->dict);
        dictEntry *de;

        while((de = dictNext(di)) != NULL) {
1013
            sds ele = dictGetKey(de);
1014 1015 1016
            double *score = dictGetVal(de);

            if (count == 0) {
A
antirez 已提交
1017 1018
                int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
                    AOF_REWRITE_ITEMS_PER_CMD : items;
1019 1020 1021 1022 1023 1024

                if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
                if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }
            if (rioWriteBulkDouble(r,*score) == 0) return 0;
1025
            if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
A
antirez 已提交
1026
            if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1027 1028 1029 1030
            items--;
        }
        dictReleaseIterator(di);
    } else {
A
antirez 已提交
1031
        serverPanic("Unknown sorted zset encoding");
1032 1033 1034 1035
    }
    return 1;
}

1036
/* Write either the key or the value of the currently selected item of a hash.
1037 1038
 * The 'hi' argument passes a valid Redis hash iterator.
 * The 'what' filed specifies if to write a key or a value and can be
1039
 * either OBJ_HASH_KEY or OBJ_HASH_VALUE.
1040 1041
 *
 * The function returns 0 on error, non-zero on success. */
1042
static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
1043
    if (hi->encoding == OBJ_ENCODING_ZIPLIST) {
1044 1045 1046 1047 1048
        unsigned char *vstr = NULL;
        unsigned int vlen = UINT_MAX;
        long long vll = LLONG_MAX;

        hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll);
1049
        if (vstr)
1050
            return rioWriteBulkString(r, (char*)vstr, vlen);
1051
        else
1052
            return rioWriteBulkLongLong(r, vll);
1053
    } else if (hi->encoding == OBJ_ENCODING_HT) {
1054 1055
        sds value = hashTypeCurrentFromHashTable(hi, what);
        return rioWriteBulkString(r, value, sdslen(value));
1056 1057
    }

A
antirez 已提交
1058
    serverPanic("Unknown hash encoding");
1059 1060 1061
    return 0;
}

A
antirez 已提交
1062 1063 1064
/* Emit the commands needed to rebuild a hash object.
 * The function returns 0 on error, 1 on success. */
int rewriteHashObject(rio *r, robj *key, robj *o) {
1065
    hashTypeIterator *hi;
A
antirez 已提交
1066 1067
    long long count = 0, items = hashTypeLength(o);

1068
    hi = hashTypeInitIterator(o);
1069
    while (hashTypeNext(hi) != C_ERR) {
1070
        if (count == 0) {
A
antirez 已提交
1071 1072
            int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
                AOF_REWRITE_ITEMS_PER_CMD : items;
A
antirez 已提交
1073

1074 1075 1076
            if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
            if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
            if (rioWriteBulkObject(r,key) == 0) return 0;
A
antirez 已提交
1077 1078
        }

1079 1080
        if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) == 0) return 0;
        if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE) == 0) return 0;
A
antirez 已提交
1081
        if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1082 1083
        items--;
    }
A
antirez 已提交
1084

1085
    hashTypeReleaseIterator(hi);
A
antirez 已提交
1086 1087 1088 1089

    return 1;
}

1090
/* Call the module type callback in order to rewrite a data type
A
antirez 已提交
1091
 * that is exported by a module and is not handled by Redis itself.
1092 1093 1094 1095 1096 1097 1098
 * The function returns 0 on error, 1 on success. */
int rewriteModuleObject(rio *r, robj *key, robj *o) {
    RedisModuleIO io;
    moduleValue *mv = o->ptr;
    moduleType *mt = mv->type;
    moduleInitIOContext(io,mt,r);
    mt->aof_rewrite(&io,key,mv->value);
1099 1100 1101 1102
    if (io.ctx) {
        moduleFreeContext(io.ctx);
        zfree(io.ctx);
    }
1103 1104 1105
    return io.error ? 0 : 1;
}

1106 1107 1108
/* This function is called by the child rewriting the AOF file to read
 * the difference accumulated from the parent into a buffer, that is
 * concatenated at the end of the rewrite. */
1109
ssize_t aofReadDiffFromParent(void) {
J
Jan-Erik Rediger 已提交
1110
    char buf[65536]; /* Default pipe buffer size on most Linux systems. */
1111
    ssize_t nread, total = 0;
1112 1113 1114 1115

    while ((nread =
            read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
1116
        total += nread;
1117
    }
1118
    return total;
1119 1120
}

A
antirez 已提交
1121
int rewriteAppendOnlyFileRio(rio *aof) {
1122 1123
    dictIterator *di = NULL;
    dictEntry *de;
1124
    size_t processed = 0;
A
antirez 已提交
1125 1126
    long long now = mstime();
    int j;
1127 1128 1129 1130 1131 1132

    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
1133
        di = dictGetSafeIterator(d);
1134 1135

        /* SELECT the new DB */
A
antirez 已提交
1136 1137
        if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
1138 1139 1140

        /* Iterate this DB writing every entry */
        while((de = dictNext(di)) != NULL) {
A
antirez 已提交
1141
            sds keystr;
1142
            robj key, *o;
1143
            long long expiretime;
1144

1145 1146
            keystr = dictGetKey(de);
            o = dictGetVal(de);
1147
            initStaticStringObject(key,keystr);
1148

1149 1150
            expiretime = getExpire(db,&key);

1151 1152 1153
            /* If this key is already expired skip it */
            if (expiretime != -1 && expiretime < now) continue;

1154
            /* Save the key and associated value */
1155
            if (o->type == OBJ_STRING) {
1156 1157
                /* Emit a SET command */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
A
antirez 已提交
1158
                if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
1159
                /* Key and value */
A
antirez 已提交
1160 1161
                if (rioWriteBulkObject(aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(aof,o) == 0) goto werr;
1162
            } else if (o->type == OBJ_LIST) {
A
antirez 已提交
1163
                if (rewriteListObject(aof,&key,o) == 0) goto werr;
1164
            } else if (o->type == OBJ_SET) {
A
antirez 已提交
1165
                if (rewriteSetObject(aof,&key,o) == 0) goto werr;
1166
            } else if (o->type == OBJ_ZSET) {
A
antirez 已提交
1167
                if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
1168
            } else if (o->type == OBJ_HASH) {
A
antirez 已提交
1169
                if (rewriteHashObject(aof,&key,o) == 0) goto werr;
1170
            } else if (o->type == OBJ_MODULE) {
A
antirez 已提交
1171
                if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
1172
            } else {
A
antirez 已提交
1173
                serverPanic("Unknown object type");
1174 1175 1176
            }
            /* Save the expire time */
            if (expiretime != -1) {
1177
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
A
antirez 已提交
1178 1179 1180
                if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
1181
            }
1182
            /* Read some diff from the parent process from time to time. */
A
antirez 已提交
1183 1184
            if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
                processed = aof->processed_bytes;
1185 1186
                aofReadDiffFromParent();
            }
1187 1188
        }
        dictReleaseIterator(di);
1189
        di = NULL;
1190
    }
A
antirez 已提交
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225
    return C_OK;

werr:
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

/* Write a sequence of commands able to fully rebuild the dataset into
 * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
 *
 * In order to minimize the number of commands needed in the rewritten
 * log Redis uses variadic commands when possible, such as RPUSH, SADD
 * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
 * are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
    rio aof;
    FILE *fp;
    char tmpfile[256];
    char byte;

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return C_ERR;
    }

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);

    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);

A
antirez 已提交
1226
    if (server.aof_use_rdb_preamble) {
A
antirez 已提交
1227
        int error;
1228
        if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
A
antirez 已提交
1229 1230 1231 1232
            errno = error;
            goto werr;
        }
    } else {
A
antirez 已提交
1233
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
A
antirez 已提交
1234
    }
1235

1236 1237 1238 1239 1240 1241 1242
    /* Do an initial slow fsync here while the parent is still sending
     * data, in order to make the next final fsync faster. */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;

    /* Read again a few times to get more data from the parent.
     * We can't read forever (the server may receive data from clients
J
Jan-Erik Rediger 已提交
1243
     * faster than it is able to send data to the child), so we try to read
1244 1245 1246
     * some more data in a loop as soon as there is a good chance more data
     * will come. If it looks like we are wasting time, we abort (this
     * happens after 20 ms without new data). */
1247
    int nodata = 0;
1248 1249 1250 1251
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
1252
            nodata++;
1253
            continue;
1254
        }
1255 1256 1257
        nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                       timeouts. */
        aofReadDiffFromParent();
1258 1259
    }

1260
    /* Ask the master to stop sending diffs. */
1261 1262 1263 1264 1265 1266 1267
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    /* We read the ACK from the server using a 10 seconds timeout. Normally
     * it should reply ASAP, but just in case we lose its reply, we are sure
     * the child will eventually get terminated. */
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
1268
        byte != '!') goto werr;
A
antirez 已提交
1269
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
1270 1271 1272 1273 1274

    /* Read the final diff if any. */
    aofReadDiffFromParent();

    /* Write the received diff to the file. */
A
antirez 已提交
1275
    serverLog(LL_NOTICE,
1276 1277
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));
1278 1279 1280
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

1281
    /* Make sure data will not remain on the OS's output buffers */
1282
    if (fflush(fp) == EOF) goto werr;
1283
    if (fsync(fileno(fp)) == -1) goto werr;
1284
    if (fclose(fp) == EOF) goto werr;
1285 1286 1287 1288

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
A
antirez 已提交
1289
        serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
1290
        unlink(tmpfile);
1291
        return C_ERR;
1292
    }
A
antirez 已提交
1293
    serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
1294
    return C_OK;
1295 1296

werr:
A
antirez 已提交
1297
    serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
1298 1299
    fclose(fp);
    unlink(tmpfile);
1300
    return C_ERR;
1301 1302
}

1303 1304 1305 1306 1307 1308 1309 1310 1311
/* ----------------------------------------------------------------------------
 * AOF rewrite pipes for IPC
 * -------------------------------------------------------------------------- */

/* This event handler is called when the AOF rewriting child sends us a
 * single '!' char to signal we should stop sending buffer diffs. The
 * parent sends a '!' as well to acknowledge. */
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
    char byte;
A
antirez 已提交
1312 1313 1314
    UNUSED(el);
    UNUSED(privdata);
    UNUSED(mask);
1315 1316

    if (read(fd,&byte,1) == 1 && byte == '!') {
A
antirez 已提交
1317
        serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
1318
        server.aof_stop_sending_diff = 1;
1319 1320 1321 1322 1323
        if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
            /* If we can't send the ack, inform the user, but don't try again
             * since in the other side the children will use a timeout if the
             * kernel can't buffer our write, or, the children was
             * terminated. */
A
antirez 已提交
1324
            serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
1325 1326
                strerror(errno));
        }
1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
    }
    /* Remove the handler since this can be called only one time during a
     * rewrite. */
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}

/* Create the pipes used for parent - child process IPC during rewrite.
 * We have a data pipe used to send AOF incremental diffs to the child,
 * and two other pipes used by the children to signal it finished with
 * the rewrite so no more data should be written, and another for the
 * parent to acknowledge it understood this new condition. */
int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
Z
zhaozhao.zz 已提交
1344
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0;
1357
    return C_OK;
1358 1359

error:
A
antirez 已提交
1360
    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
1361 1362
        strerror(errno));
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
1363
    return C_ERR;
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377
}

void aofClosePipes(void) {
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
    aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
    close(server.aof_pipe_write_data_to_child);
    close(server.aof_pipe_read_data_from_parent);
    close(server.aof_pipe_write_ack_to_parent);
    close(server.aof_pipe_read_ack_from_child);
    close(server.aof_pipe_write_ack_to_child);
    close(server.aof_pipe_read_ack_from_parent);
}

/* ----------------------------------------------------------------------------
J
Jan-Erik Rediger 已提交
1378
 * AOF background rewrite
1379 1380
 * ------------------------------------------------------------------------- */

1381 1382 1383 1384 1385
/* This is how rewriting of the append only file in background works:
 *
 * 1) The user calls BGREWRITEAOF
 * 2) Redis calls this function, that forks():
 *    2a) the child rewrite the append only file in a temp file.
A
antirez 已提交
1386
 *    2b) the parent accumulates differences in server.aof_rewrite_buf.
1387 1388
 * 3) When the child finished '2a' exists.
 * 4) The parent will trap the exit code, if it's OK, will append the
A
antirez 已提交
1389
 *    data accumulated into server.aof_rewrite_buf into the temp file, and
1390 1391 1392 1393 1394
 *    finally will rename(2) the temp file in the actual file name.
 *    The the new file is reopened as the new append only file. Profit!
 */
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
1395
    long long start;
1396

1397
    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
1398
    if (aofCreatePipes() != C_OK) return C_ERR;
1399
    openChildInfoPipe();
1400
    start = ustime();
1401 1402 1403
    if ((childpid = fork()) == 0) {
        char tmpfile[256];

1404
        /* Child */
1405
        closeListeningSockets(0);
1406
        redisSetProcTitle("redis-aof-rewrite");
1407
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
1408
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
1409
            size_t private_dirty = zmalloc_get_private_dirty(-1);
1410 1411

            if (private_dirty) {
A
antirez 已提交
1412
                serverLog(LL_NOTICE,
1413
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
1414 1415
                    private_dirty/(1024*1024));
            }
1416 1417 1418

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_AOF);
1419
            exitFromChild(0);
1420
        } else {
1421
            exitFromChild(1);
1422 1423 1424
        }
    } else {
        /* Parent */
1425
        server.stat_fork_time = ustime()-start;
1426
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
1427
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
1428
        if (childpid == -1) {
1429
            closeChildInfoPipe();
A
antirez 已提交
1430
            serverLog(LL_WARNING,
1431 1432
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
1433
            aofClosePipes();
1434
            return C_ERR;
1435
        }
A
antirez 已提交
1436
        serverLog(LL_NOTICE,
1437
            "Background append only file rewriting started by pid %d",childpid);
1438
        server.aof_rewrite_scheduled = 0;
1439
        server.aof_rewrite_time_start = time(NULL);
A
antirez 已提交
1440
        server.aof_child_pid = childpid;
1441 1442 1443
        updateDictResizePolicy();
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
A
antirez 已提交
1444
         * accumulated by the parent into server.aof_rewrite_buf will start
1445
         * with a SELECT statement and it will be safe to merge. */
A
antirez 已提交
1446
        server.aof_selected_db = -1;
1447
        replicationScriptCacheFlush();
1448
        return C_OK;
1449
    }
1450
    return C_OK; /* unreached */
1451 1452
}

1453
void bgrewriteaofCommand(client *c) {
A
antirez 已提交
1454
    if (server.aof_child_pid != -1) {
1455
        addReplyError(c,"Background append only file rewriting already in progress");
A
antirez 已提交
1456
    } else if (server.rdb_child_pid != -1) {
1457
        server.aof_rewrite_scheduled = 1;
1458
        addReplyStatus(c,"Background append only file rewriting scheduled");
1459
    } else if (rewriteAppendOnlyFileBackground() == C_OK) {
1460
        addReplyStatus(c,"Background append only file rewriting started");
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472
    } else {
        addReply(c,shared.err);
    }
}

void aofRemoveTempFile(pid_t childpid) {
    char tmpfile[256];

    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
    unlink(tmpfile);
}

A
antirez 已提交
1473
/* Update the server.aof_current_size field explicitly using stat(2)
1474 1475
 * to check the size of the file. This is useful after a rewrite or after
 * a restart, normally the size is updated just adding the write length
A
antirez 已提交
1476
 * to the current length, that is much faster. */
1477 1478
void aofUpdateCurrentSize(void) {
    struct redis_stat sb;
1479
    mstime_t latency;
1480

1481
    latencyStartMonitor(latency);
A
antirez 已提交
1482
    if (redis_fstat(server.aof_fd,&sb) == -1) {
A
antirez 已提交
1483
        serverLog(LL_WARNING,"Unable to obtain the AOF file length. stat: %s",
1484 1485
            strerror(errno));
    } else {
1486
        server.aof_current_size = sb.st_size;
1487
    }
1488 1489
    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("aof-fstat",latency);
1490 1491
}

1492 1493
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
 * Handle this. */
1494
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
1495
    if (!bysignal && exitcode == 0) {
1496
        int newfd, oldfd;
1497
        char tmpfile[256];
1498
        long long now = ustime();
1499
        mstime_t latency;
1500

A
antirez 已提交
1501
        serverLog(LL_NOTICE,
1502 1503
            "Background AOF rewrite terminated with success");

1504 1505
        /* Flush the differences accumulated by the parent to the
         * rewritten AOF. */
1506
        latencyStartMonitor(latency);
1507
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
A
antirez 已提交
1508
            (int)server.aof_child_pid);
1509 1510
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
A
antirez 已提交
1511
            serverLog(LL_WARNING,
1512
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
1513 1514
            goto cleanup;
        }
1515

1516
        if (aofRewriteBufferWrite(newfd) == -1) {
A
antirez 已提交
1517
            serverLog(LL_WARNING,
1518
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
1519
            close(newfd);
1520 1521
            goto cleanup;
        }
1522 1523
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
1524

A
antirez 已提交
1525
        serverLog(LL_NOTICE,
J
Jan-Erik Rediger 已提交
1526
            "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
1527 1528 1529

        /* The only remaining thing to do is to rename the temporary file to
         * the configured file and switch the file descriptor used to do AOF
1530 1531 1532 1533
         * writes. We don't want close(2) or rename(2) calls to block the
         * server on old file deletion.
         *
         * There are two possible scenarios:
1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547
         *
         * 1) AOF is DISABLED and this was a one time rewrite. The temporary
         * file will be renamed to the configured file. When this file already
         * exists, it will be unlinked, which may block the server.
         *
         * 2) AOF is ENABLED and the rewritten AOF will immediately start
         * receiving writes. After the temporary file is renamed to the
         * configured file, the original AOF file descriptor will be closed.
         * Since this will be the last reference to that file, closing it
         * causes the underlying file to be unlinked, which may block the
         * server.
         *
         * To mitigate the blocking effect of the unlink operation (either
         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
1548
         * use a background thread to take care of this. First, we
1549 1550 1551 1552 1553 1554
         * make scenario 1 identical to scenario 2 by opening the target file
         * when it exists. The unlink operation after the rename(2) will then
         * be executed upon calling close(2) for its descriptor. Everything to
         * guarantee atomicity for this switch has already happened by then, so
         * we don't care what the outcome or duration of that close operation
         * is, as long as the file descriptor is released again. */
A
antirez 已提交
1555
        if (server.aof_fd == -1) {
1556 1557
            /* AOF disabled */

Z
zhaozhao.zz 已提交
1558 1559 1560 1561
            /* Don't care if this fails: oldfd will be -1 and we handle that.
             * One notable case of -1 return is if the old file does
             * not exist. */
            oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
1562 1563
        } else {
            /* AOF enabled */
1564
            oldfd = -1; /* We'll set this to the current AOF filedes later. */
1565 1566 1567 1568
        }

        /* Rename the temporary file. This will not unlink the target file if
         * it exists, because we reference it with "oldfd". */
1569
        latencyStartMonitor(latency);
1570
        if (rename(tmpfile,server.aof_filename) == -1) {
A
antirez 已提交
1571
            serverLog(LL_WARNING,
1572 1573 1574 1575
                "Error trying to rename the temporary AOF file %s into %s: %s",
                tmpfile,
                server.aof_filename,
                strerror(errno));
1576
            close(newfd);
1577
            if (oldfd != -1) close(oldfd);
1578 1579
            goto cleanup;
        }
1580 1581
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rename",latency);
1582

A
antirez 已提交
1583
        if (server.aof_fd == -1) {
1584 1585
            /* AOF disabled, we don't need to set the AOF file descriptor
             * to this new file, so we can close it. */
1586 1587
            close(newfd);
        } else {
1588
            /* AOF enabled, replace the old fd with the new one. */
A
antirez 已提交
1589 1590
            oldfd = server.aof_fd;
            server.aof_fd = newfd;
1591
            if (server.aof_fsync == AOF_FSYNC_ALWAYS)
1592
                aof_fsync(newfd);
1593
            else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
1594
                aof_background_fsync(newfd);
A
antirez 已提交
1595
            server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
1596
            aofUpdateCurrentSize();
1597
            server.aof_rewrite_base_size = server.aof_current_size;
1598 1599 1600

            /* Clear regular AOF buffer since its contents was just written to
             * the new AOF from the background rewrite buffer. */
A
antirez 已提交
1601 1602
            sdsfree(server.aof_buf);
            server.aof_buf = sdsempty();
1603
        }
1604

1605
        server.aof_lastbgrewrite_status = C_OK;
1606

A
antirez 已提交
1607
        serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
1608
        /* Change state from WAIT_REWRITE to ON if needed */
A
antirez 已提交
1609 1610
        if (server.aof_state == AOF_WAIT_REWRITE)
            server.aof_state = AOF_ON;
1611 1612

        /* Asynchronously close the overwritten AOF. */
A
antirez 已提交
1613
        if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
1614

A
antirez 已提交
1615
        serverLog(LL_VERBOSE,
1616
            "Background AOF rewrite signal handler took %lldus", ustime()-now);
1617
    } else if (!bysignal && exitcode != 0) {
1618 1619 1620 1621
        /* SIGUSR1 is whitelisted, so we have a way to kill a child without
         * tirggering an error conditon. */
        if (bysignal != SIGUSR1)
            server.aof_lastbgrewrite_status = C_ERR;
A
antirez 已提交
1622
        serverLog(LL_WARNING,
1623
            "Background AOF rewrite terminated with error");
1624
    } else {
1625
        server.aof_lastbgrewrite_status = C_ERR;
1626

A
antirez 已提交
1627
        serverLog(LL_WARNING,
1628
            "Background AOF rewrite terminated by signal %d", bysignal);
1629
    }
1630

1631
cleanup:
1632
    aofClosePipes();
1633
    aofRewriteBufferReset();
A
antirez 已提交
1634 1635
    aofRemoveTempFile(server.aof_child_pid);
    server.aof_child_pid = -1;
1636 1637
    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
    server.aof_rewrite_time_start = -1;
1638
    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
A
antirez 已提交
1639
    if (server.aof_state == AOF_WAIT_REWRITE)
1640
        server.aof_rewrite_scheduled = 1;
1641
}