rdb.c 39.0 KB
Newer Older
1 2
#include "redis.h"
#include "lzf.h"    /* LZF compression library */
3
#include "zipmap.h"
4

5
#include <math.h>
6 7 8 9 10
#include <sys/types.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <arpa/inet.h>
11
#include <sys/stat.h>
12

13
static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
14
    if (rdb && rioWrite(rdb,p,len) == 0)
15
        return -1;
16 17 18
    return len;
}

19 20
int rdbSaveType(rio *rdb, unsigned char type) {
    return rdbWriteRaw(rdb,&type,1);
21 22
}

23 24 25 26
int rdbLoadType(rio *rdb) {
    unsigned char type;
    if (rioRead(rdb,&type,1) == 0) return -1;
    return type;
27 28
}

29 30 31 32
time_t rdbLoadTime(rio *rdb) {
    int32_t t32;
    if (rioRead(rdb,&t32,4) == 0) return -1;
    return (time_t)t32;
33 34
}

35
int rdbSaveMillisecondTime(rio *rdb, long long t) {
36 37 38 39 40 41 42 43 44 45
    int64_t t64 = (int64_t) t;
    return rdbWriteRaw(rdb,&t64,8);
}

long long rdbLoadMillisecondTime(rio *rdb) {
    int64_t t64;
    if (rioRead(rdb,&t64,8) == 0) return -1;
    return (long long)t64;
}

46 47 48
/* Saves an encoded length. The first two bits in the first byte are used to
 * hold the encoding type. See the REDIS_RDB_* definitions for more information
 * on the types of encoding. */
49
int rdbSaveLen(rio *rdb, uint32_t len) {
50
    unsigned char buf[2];
51
    size_t nwritten;
52 53 54 55

    if (len < (1<<6)) {
        /* Save a 6 bit len */
        buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
56
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
57
        nwritten = 1;
58 59 60 61
    } else if (len < (1<<14)) {
        /* Save a 14 bit len */
        buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
        buf[1] = len&0xFF;
62
        if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
63
        nwritten = 2;
64 65 66
    } else {
        /* Save a 32 bit len */
        buf[0] = (REDIS_RDB_32BITLEN<<6);
67
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
68
        len = htonl(len);
69
        if (rdbWriteRaw(rdb,&len,4) == -4) return -1;
70
        nwritten = 1+4;
71
    }
72
    return nwritten;
73 74
}

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
/* Load an encoded length. The "isencoded" argument is set to 1 if the length
 * is not actually a length but an "encoding type". See the REDIS_RDB_ENC_*
 * definitions in rdb.h for more information. */
uint32_t rdbLoadLen(rio *rdb, int *isencoded) {
    unsigned char buf[2];
    uint32_t len;
    int type;

    if (isencoded) *isencoded = 0;
    if (rioRead(rdb,buf,1) == 0) return REDIS_RDB_LENERR;
    type = (buf[0]&0xC0)>>6;
    if (type == REDIS_RDB_ENCVAL) {
        /* Read a 6 bit encoding type. */
        if (isencoded) *isencoded = 1;
        return buf[0]&0x3F;
    } else if (type == REDIS_RDB_6BITLEN) {
        /* Read a 6 bit len. */
        return buf[0]&0x3F;
    } else if (type == REDIS_RDB_14BITLEN) {
        /* Read a 14 bit len. */
        if (rioRead(rdb,buf+1,1) == 0) return REDIS_RDB_LENERR;
        return ((buf[0]&0x3F)<<8)|buf[1];
    } else {
        /* Read a 32 bit len. */
        if (rioRead(rdb,&len,4) == 0) return REDIS_RDB_LENERR;
        return ntohl(len);
    }
}

/* Encodes the "value" argument as integer when it fits in the supported ranges
 * for encoded types. If the function successfully encodes the integer, the
 * representation is stored in the buffer pointer to by "enc" and the string
 * length is returned. Otherwise 0 is returned. */
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
int rdbEncodeInteger(long long value, unsigned char *enc) {
    if (value >= -(1<<7) && value <= (1<<7)-1) {
        enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
        enc[1] = value&0xFF;
        return 2;
    } else if (value >= -(1<<15) && value <= (1<<15)-1) {
        enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
        enc[1] = value&0xFF;
        enc[2] = (value>>8)&0xFF;
        return 3;
    } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
        enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
        enc[1] = value&0xFF;
        enc[2] = (value>>8)&0xFF;
        enc[3] = (value>>16)&0xFF;
        enc[4] = (value>>24)&0xFF;
        return 5;
    } else {
        return 0;
    }
}

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
/* Loads an integer-encoded object with the specified encoding type "enctype".
 * If the "encode" argument is set the function may return an integer-encoded
 * string object, otherwise it always returns a raw string object. */
robj *rdbLoadIntegerObject(rio *rdb, int enctype, int encode) {
    unsigned char enc[4];
    long long val;

    if (enctype == REDIS_RDB_ENC_INT8) {
        if (rioRead(rdb,enc,1) == 0) return NULL;
        val = (signed char)enc[0];
    } else if (enctype == REDIS_RDB_ENC_INT16) {
        uint16_t v;
        if (rioRead(rdb,enc,2) == 0) return NULL;
        v = enc[0]|(enc[1]<<8);
        val = (int16_t)v;
    } else if (enctype == REDIS_RDB_ENC_INT32) {
        uint32_t v;
        if (rioRead(rdb,enc,4) == 0) return NULL;
        v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
        val = (int32_t)v;
    } else {
        val = 0; /* anti-warning */
        redisPanic("Unknown RDB integer encoding type");
    }
    if (encode)
        return createStringObjectFromLongLong(val);
    else
        return createObject(REDIS_STRING,sdsfromlonglong(val));
}

160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
/* String objects in the form "2391" "-100" without any space and with a
 * range of values that can fit in an 8, 16 or 32 bit signed value can be
 * encoded as integers to save space */
int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
    long long value;
    char *endptr, buf[32];

    /* Check if it's possible to encode this value as a number */
    value = strtoll(s, &endptr, 10);
    if (endptr[0] != '\0') return 0;
    ll2string(buf,32,value);

    /* If the number converted back into a string is not identical
     * then it's not possible to encode the string as integer */
    if (strlen(buf) != len || memcmp(buf,s,len)) return 0;

    return rdbEncodeInteger(value,enc);
}

179
int rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
180 181
    size_t comprlen, outlen;
    unsigned char byte;
182
    int n, nwritten = 0;
183 184 185 186 187 188 189 190 191 192 193 194 195
    void *out;

    /* We require at least four bytes compression for this to be worth it */
    if (len <= 4) return 0;
    outlen = len-4;
    if ((out = zmalloc(outlen+1)) == NULL) return 0;
    comprlen = lzf_compress(s, len, out, outlen);
    if (comprlen == 0) {
        zfree(out);
        return 0;
    }
    /* Data compressed! Let's save it on disk */
    byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
196
    if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;
197
    nwritten += n;
198

199
    if ((n = rdbSaveLen(rdb,comprlen)) == -1) goto writeerr;
200 201
    nwritten += n;

202
    if ((n = rdbSaveLen(rdb,len)) == -1) goto writeerr;
203 204
    nwritten += n;

205
    if ((n = rdbWriteRaw(rdb,out,comprlen)) == -1) goto writeerr;
206
    nwritten += n;
207

208
    zfree(out);
209
    return nwritten;
210 211 212 213 214 215

writeerr:
    zfree(out);
    return -1;
}

216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
robj *rdbLoadLzfStringObject(rio *rdb) {
    unsigned int len, clen;
    unsigned char *c = NULL;
    sds val = NULL;

    if ((clen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
    if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
    if ((c = zmalloc(clen)) == NULL) goto err;
    if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
    if (rioRead(rdb,c,clen) == 0) goto err;
    if (lzf_decompress(c,clen,val,len) == 0) goto err;
    zfree(c);
    return createObject(REDIS_STRING,val);
err:
    zfree(c);
    sdsfree(val);
    return NULL;
}

235
/* Save a string objet as [len][data] on disk. If the object is a string
236
 * representation of an integer value we try to save it in a special form */
237
int rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
238
    int enclen;
239
    int n, nwritten = 0;
240 241 242 243 244

    /* Try integer encoding */
    if (len <= 11) {
        unsigned char buf[5];
        if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
245
            if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
246
            return enclen;
247 248 249 250 251
        }
    }

    /* Try LZF compression - under 20 bytes it's unable to compress even
     * aaaaaaaaaaaaaaaaaa so skip it */
A
antirez 已提交
252
    if (server.rdb_compression && len > 20) {
253
        n = rdbSaveLzfStringObject(rdb,s,len);
254 255 256
        if (n == -1) return -1;
        if (n > 0) return n;
        /* Return value of 0 means data can't be compressed, save the old way */
257 258 259
    }

    /* Store verbatim */
260
    if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
261 262
    nwritten += n;
    if (len > 0) {
263
        if (rdbWriteRaw(rdb,s,len) == -1) return -1;
264 265 266
        nwritten += len;
    }
    return nwritten;
267 268 269
}

/* Save a long long value as either an encoded string or a string. */
270
int rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
271
    unsigned char buf[32];
272
    int n, nwritten = 0;
273 274
    int enclen = rdbEncodeInteger(value,buf);
    if (enclen > 0) {
275
        return rdbWriteRaw(rdb,buf,enclen);
276 277 278 279
    } else {
        /* Encode as string */
        enclen = ll2string((char*)buf,32,value);
        redisAssert(enclen < 32);
280
        if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;
281
        nwritten += n;
282
        if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;
283
        nwritten += n;
284
    }
285
    return nwritten;
286 287 288
}

/* Like rdbSaveStringObjectRaw() but handle encoded objects */
289
int rdbSaveStringObject(rio *rdb, robj *obj) {
290 291 292
    /* Avoid to decode the object, then encode it again, if the
     * object is alrady integer encoded. */
    if (obj->encoding == REDIS_ENCODING_INT) {
293
        return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
294
    } else {
295
        redisAssertWithInfo(NULL,obj,obj->encoding == REDIS_ENCODING_RAW);
296
        return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
297 298 299
    }
}

300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
robj *rdbGenericLoadStringObject(rio *rdb, int encode) {
    int isencoded;
    uint32_t len;
    sds val;

    len = rdbLoadLen(rdb,&isencoded);
    if (isencoded) {
        switch(len) {
        case REDIS_RDB_ENC_INT8:
        case REDIS_RDB_ENC_INT16:
        case REDIS_RDB_ENC_INT32:
            return rdbLoadIntegerObject(rdb,len,encode);
        case REDIS_RDB_ENC_LZF:
            return rdbLoadLzfStringObject(rdb);
        default:
            redisPanic("Unknown RDB encoding type");
        }
    }

    if (len == REDIS_RDB_LENERR) return NULL;
    val = sdsnewlen(NULL,len);
    if (len && rioRead(rdb,val,len) == 0) {
        sdsfree(val);
        return NULL;
    }
    return createObject(REDIS_STRING,val);
}

robj *rdbLoadStringObject(rio *rdb) {
    return rdbGenericLoadStringObject(rdb,0);
}

robj *rdbLoadEncodedStringObject(rio *rdb) {
    return rdbGenericLoadStringObject(rdb,1);
}

336 337 338 339 340 341 342 343
/* Save a double value. Doubles are saved as strings prefixed by an unsigned
 * 8 bit integer specifing the length of the representation.
 * This 8 bit integer has special values in order to specify the following
 * conditions:
 * 253: not a number
 * 254: + inf
 * 255: - inf
 */
344
int rdbSaveDoubleValue(rio *rdb, double val) {
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
    unsigned char buf[128];
    int len;

    if (isnan(val)) {
        buf[0] = 253;
        len = 1;
    } else if (!isfinite(val)) {
        len = 1;
        buf[0] = (val < 0) ? 255 : 254;
    } else {
#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL)
        /* Check if the float is in a safe range to be casted into a
         * long long. We are assuming that long long is 64 bit here.
         * Also we are assuming that there are no implementations around where
         * double has precision < 52 bit.
         *
         * Under this assumptions we test if a double is inside an interval
         * where casting to long long is safe. Then using two castings we
         * make sure the decimal part is zero. If all this is true we use
         * integer printing function that is much faster. */
        double min = -4503599627370495; /* (2^52)-1 */
        double max = 4503599627370496; /* -(2^52) */
        if (val > min && val < max && val == ((double)((long long)val)))
            ll2string((char*)buf+1,sizeof(buf),(long long)val);
        else
#endif
            snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
        buf[0] = strlen((char*)buf+1);
        len = buf[0]+1;
    }
375
    return rdbWriteRaw(rdb,buf,len);
376 377
}

378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
/* For information about double serialization check rdbSaveDoubleValue() */
int rdbLoadDoubleValue(rio *rdb, double *val) {
    char buf[128];
    unsigned char len;

    if (rioRead(rdb,&len,1) == 0) return -1;
    switch(len) {
    case 255: *val = R_NegInf; return 0;
    case 254: *val = R_PosInf; return 0;
    case 253: *val = R_Nan; return 0;
    default:
        if (rioRead(rdb,buf,len) == 0) return -1;
        buf[len] = '\0';
        sscanf(buf, "%lg", val);
        return 0;
    }
}

/* Save the object type of object "o". */
int rdbSaveObjectType(rio *rdb, robj *o) {
    switch (o->type) {
    case REDIS_STRING:
        return rdbSaveType(rdb,REDIS_RDB_TYPE_STRING);
    case REDIS_LIST:
        if (o->encoding == REDIS_ENCODING_ZIPLIST)
            return rdbSaveType(rdb,REDIS_RDB_TYPE_LIST_ZIPLIST);
        else if (o->encoding == REDIS_ENCODING_LINKEDLIST)
            return rdbSaveType(rdb,REDIS_RDB_TYPE_LIST);
        else
            redisPanic("Unknown list encoding");
    case REDIS_SET:
        if (o->encoding == REDIS_ENCODING_INTSET)
            return rdbSaveType(rdb,REDIS_RDB_TYPE_SET_INTSET);
        else if (o->encoding == REDIS_ENCODING_HT)
            return rdbSaveType(rdb,REDIS_RDB_TYPE_SET);
        else
            redisPanic("Unknown set encoding");
    case REDIS_ZSET:
        if (o->encoding == REDIS_ENCODING_ZIPLIST)
            return rdbSaveType(rdb,REDIS_RDB_TYPE_ZSET_ZIPLIST);
        else if (o->encoding == REDIS_ENCODING_SKIPLIST)
            return rdbSaveType(rdb,REDIS_RDB_TYPE_ZSET);
        else
            redisPanic("Unknown sorted set encoding");
    case REDIS_HASH:
423 424
        if (o->encoding == REDIS_ENCODING_ZIPLIST)
            return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH_ZIPLIST);
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
        else if (o->encoding == REDIS_ENCODING_HT)
            return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH);
        else
            redisPanic("Unknown hash encoding");
    default:
        redisPanic("Unknown object type");
    }
    return -1; /* avoid warning */
}

/* Load object type. Return -1 when the byte doesn't contain an object type. */
int rdbLoadObjectType(rio *rdb) {
    int type;
    if ((type = rdbLoadType(rdb)) == -1) return -1;
    if (!rdbIsObjectType(type)) return -1;
    return type;
441 442
}

A
antirez 已提交
443
/* Save a Redis object. Returns -1 on error, 0 on success. */
444
int rdbSaveObject(rio *rdb, robj *o) {
445 446
    int n, nwritten = 0;

447 448
    if (o->type == REDIS_STRING) {
        /* Save a string value */
449
        if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
450
        nwritten += n;
451 452 453
    } else if (o->type == REDIS_LIST) {
        /* Save a list value */
        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
454
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);
455

456
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
457
            nwritten += n;
458 459 460 461 462
        } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
            list *list = o->ptr;
            listIter li;
            listNode *ln;

463
            if ((n = rdbSaveLen(rdb,listLength(list))) == -1) return -1;
464 465
            nwritten += n;

466 467 468
            listRewind(list,&li);
            while((ln = listNext(&li))) {
                robj *eleobj = listNodeValue(ln);
469
                if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
470
                nwritten += n;
471 472 473 474 475 476
            }
        } else {
            redisPanic("Unknown list encoding");
        }
    } else if (o->type == REDIS_SET) {
        /* Save a set value */
477 478 479 480
        if (o->encoding == REDIS_ENCODING_HT) {
            dict *set = o->ptr;
            dictIterator *di = dictGetIterator(set);
            dictEntry *de;
481

482
            if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;
483 484
            nwritten += n;

485
            while((de = dictNext(di)) != NULL) {
486
                robj *eleobj = dictGetKey(de);
487
                if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
488
                nwritten += n;
489 490 491
            }
            dictReleaseIterator(di);
        } else if (o->encoding == REDIS_ENCODING_INTSET) {
492
            size_t l = intsetBlobLen((intset*)o->ptr);
493

494
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
495
            nwritten += n;
496 497
        } else {
            redisPanic("Unknown set encoding");
498 499
        }
    } else if (o->type == REDIS_ZSET) {
500 501 502
        /* Save a sorted set value */
        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);
503

504
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
505
            nwritten += n;
506
        } else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
507 508 509 510
            zset *zs = o->ptr;
            dictIterator *di = dictGetIterator(zs->dict);
            dictEntry *de;

511
            if ((n = rdbSaveLen(rdb,dictSize(zs->dict))) == -1) return -1;
512
            nwritten += n;
513 514

            while((de = dictNext(di)) != NULL) {
515 516
                robj *eleobj = dictGetKey(de);
                double *score = dictGetVal(de);
517

518
                if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
519
                nwritten += n;
520
                if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1;
521 522 523 524
                nwritten += n;
            }
            dictReleaseIterator(di);
        } else {
P
Typo  
Pieter Noordhuis 已提交
525
            redisPanic("Unknown sorted set encoding");
526 527 528
        }
    } else if (o->type == REDIS_HASH) {
        /* Save a hash value */
529 530
        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);
531

532
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
533
            nwritten += n;
534 535

        } else if (o->encoding == REDIS_ENCODING_HT) {
536 537 538
            dictIterator *di = dictGetIterator(o->ptr);
            dictEntry *de;

539
            if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) return -1;
540 541
            nwritten += n;

542
            while((de = dictNext(di)) != NULL) {
543 544
                robj *key = dictGetKey(de);
                robj *val = dictGetVal(de);
545

546
                if ((n = rdbSaveStringObject(rdb,key)) == -1) return -1;
547
                nwritten += n;
548
                if ((n = rdbSaveStringObject(rdb,val)) == -1) return -1;
549
                nwritten += n;
550 551
            }
            dictReleaseIterator(di);
552 553 554

        } else {
            redisPanic("Unknown hash encoding");
555
        }
556

557 558 559
    } else {
        redisPanic("Unknown object type");
    }
560
    return nwritten;
561 562 563 564 565 566
}

/* Return the length the object will have on disk if saved with
 * the rdbSaveObject() function. Currently we use a trick to get
 * this length with very little changes to the code. In the future
 * we could switch to a faster solution. */
567 568
off_t rdbSavedObjectLen(robj *o) {
    int len = rdbSaveObject(NULL,o);
569
    redisAssertWithInfo(NULL,o,len != -1);
570
    return len;
571 572
}

573 574 575 576
/* Save a key-value pair, with expire time, type, key, value.
 * On error -1 is returned.
 * On success if the key was actaully saved 1 is returned, otherwise 0
 * is returned (the key was already expired). */
577
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
578
                        long long expiretime, long long now)
579 580 581 582 583
{
    /* Save the expire time */
    if (expiretime != -1) {
        /* If this key is already expired skip it */
        if (expiretime < now) return 0;
584 585
        if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
586
    }
587

588
    /* Save type, key, value */
589
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
590 591
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    if (rdbSaveObject(rdb,val) == -1) return -1;
592 593 594
    return 1;
}

595 596 597 598 599
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
int rdbSave(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    char tmpfile[256];
600
    char magic[10];
601
    int j;
602
    long long now = mstime();
603 604
    FILE *fp;
    rio rdb;
605 606 607 608

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
A
antirez 已提交
609 610
        redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
            strerror(errno));
611 612
        return REDIS_ERR;
    }
613

614
    rioInitWithFile(&rdb,fp);
615 616
    snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
    if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
617

618 619 620 621
    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
622
        di = dictGetSafeIterator(d);
623 624 625 626 627 628
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }

        /* Write the SELECT DB opcode */
629
        if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
630
        if (rdbSaveLen(&rdb,j) == -1) goto werr;
631 632 633

        /* Iterate this DB writing every entry */
        while((de = dictNext(di)) != NULL) {
634 635
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
636
            long long expire;
637 638
            
            initStaticStringObject(key,keystr);
639
            expire = getExpire(db,&key);
640
            if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
641 642 643 644
        }
        dictReleaseIterator(di);
    }
    /* EOF opcode */
645
    if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661

    /* Make sure data will not remain on the OS's output buffers */
    fflush(fp);
    fsync(fileno(fp));
    fclose(fp);

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"DB saved on disk");
    server.dirty = 0;
    server.lastsave = time(NULL);
662
    server.lastbgsave_status = REDIS_OK;
663 664 665 666 667 668 669 670 671 672 673 674
    return REDIS_OK;

werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
}

int rdbSaveBackground(char *filename) {
    pid_t childpid;
675
    long long start;
676

A
antirez 已提交
677
    if (server.rdb_child_pid != -1) return REDIS_ERR;
A
antirez 已提交
678

679
    server.dirty_before_bgsave = server.dirty;
A
antirez 已提交
680

681
    start = ustime();
682
    if ((childpid = fork()) == 0) {
A
antirez 已提交
683 684
        int retval;

685
        /* Child */
686 687
        if (server.ipfd > 0) close(server.ipfd);
        if (server.sofd > 0) close(server.sofd);
688
        retval = rdbSave(filename);
A
antirez 已提交
689
        _exit((retval == REDIS_OK) ? 0 : 1);
690 691
    } else {
        /* Parent */
692
        server.stat_fork_time = ustime()-start;
693 694 695 696 697 698
        if (childpid == -1) {
            redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
A
antirez 已提交
699
        server.rdb_child_pid = childpid;
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
        updateDictResizePolicy();
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}

void rdbRemoveTempFile(pid_t childpid) {
    char tmpfile[256];

    snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
    unlink(tmpfile);
}

/* Load a Redis object of the specified type from the specified file.
 * On success a newly allocated object is returned, otherwise NULL. */
715
robj *rdbLoadObject(int rdbtype, rio *rdb) {
716 717
    robj *o, *ele, *dec;
    size_t len;
718
    unsigned int i;
719

720 721
    redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",rdbtype,rdb->tell(rdb));
    if (rdbtype == REDIS_RDB_TYPE_STRING) {
722
        /* Read string value */
723
        if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
724
        o = tryObjectEncoding(o);
725
    } else if (rdbtype == REDIS_RDB_TYPE_LIST) {
726
        /* Read list value */
727
        if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
728 729 730 731 732 733 734 735 736 737

        /* Use a real list when there are too many entries */
        if (len > server.list_max_ziplist_entries) {
            o = createListObject();
        } else {
            o = createZiplistObject();
        }

        /* Load every single element of the list */
        while(len--) {
738
            if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756

            /* If we are using a ziplist and the value is too big, convert
             * the object to a real list. */
            if (o->encoding == REDIS_ENCODING_ZIPLIST &&
                ele->encoding == REDIS_ENCODING_RAW &&
                sdslen(ele->ptr) > server.list_max_ziplist_value)
                    listTypeConvert(o,REDIS_ENCODING_LINKEDLIST);

            if (o->encoding == REDIS_ENCODING_ZIPLIST) {
                dec = getDecodedObject(ele);
                o->ptr = ziplistPush(o->ptr,dec->ptr,sdslen(dec->ptr),REDIS_TAIL);
                decrRefCount(dec);
                decrRefCount(ele);
            } else {
                ele = tryObjectEncoding(ele);
                listAddNodeTail(o->ptr,ele);
            }
        }
757
    } else if (rdbtype == REDIS_RDB_TYPE_SET) {
758
        /* Read list/set value */
759
        if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
760 761 762 763 764 765 766 767 768 769 770 771

        /* Use a regular set when there are too many entries. */
        if (len > server.set_max_intset_entries) {
            o = createSetObject();
            /* It's faster to expand the dict to the right size asap in order
             * to avoid rehashing */
            if (len > DICT_HT_INITIAL_SIZE)
                dictExpand(o->ptr,len);
        } else {
            o = createIntsetObject();
        }

772
        /* Load every single element of the list/set */
773 774
        for (i = 0; i < len; i++) {
            long long llval;
775
            if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
776
            ele = tryObjectEncoding(ele);
777 778 779

            if (o->encoding == REDIS_ENCODING_INTSET) {
                /* Fetch integer value from element */
A
antirez 已提交
780
                if (isObjectRepresentableAsLongLong(ele,&llval) == REDIS_OK) {
781 782 783 784 785 786 787 788 789 790 791
                    o->ptr = intsetAdd(o->ptr,llval,NULL);
                } else {
                    setTypeConvert(o,REDIS_ENCODING_HT);
                    dictExpand(o->ptr,len);
                }
            }

            /* This will also be called when the set was just converted
             * to regular hashtable encoded set */
            if (o->encoding == REDIS_ENCODING_HT) {
                dictAdd((dict*)o->ptr,ele,NULL);
792 793
            } else {
                decrRefCount(ele);
794
            }
795
        }
796
    } else if (rdbtype == REDIS_RDB_TYPE_ZSET) {
797 798
        /* Read list/set value */
        size_t zsetlen;
799
        size_t maxelelen = 0;
800 801
        zset *zs;

802
        if ((zsetlen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
803 804
        o = createZsetObject();
        zs = o->ptr;
805

806 807 808
        /* Load every single element of the list/set */
        while(zsetlen--) {
            robj *ele;
809 810
            double score;
            zskiplistNode *znode;
811

812
            if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
813
            ele = tryObjectEncoding(ele);
814
            if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
815 816 817 818 819 820

            /* Don't care about integer-encoded strings. */
            if (ele->encoding == REDIS_ENCODING_RAW &&
                sdslen(ele->ptr) > maxelelen)
                    maxelelen = sdslen(ele->ptr);

821 822
            znode = zslInsert(zs->zsl,score,ele);
            dictAdd(zs->dict,ele,&znode->score);
823 824
            incrRefCount(ele); /* added to skiplist */
        }
825 826 827 828 829

        /* Convert *after* loading, since sorted sets are not stored ordered. */
        if (zsetLength(o) <= server.zset_max_ziplist_entries &&
            maxelelen <= server.zset_max_ziplist_value)
                zsetConvert(o,REDIS_ENCODING_ZIPLIST);
830
    } else if (rdbtype == REDIS_RDB_TYPE_HASH) {
831 832 833 834 835
        size_t len;
        int ret;

        len = rdbLoadLen(rdb, NULL);
        if (len == REDIS_RDB_LENERR) return NULL;
836 837

        o = createHashObject();
838

839
        /* Too many entries? Use an hash table. */
840 841 842 843
        if (len > server.hash_max_ziplist_entries)
            hashTypeConvert(o, REDIS_ENCODING_HT);

        /* Load every field and value into the ziplist */
844
        while (o->encoding == REDIS_ENCODING_ZIPLIST && len > 0) {
845 846
            robj *field, *value;

847
            len--;
848 849 850 851 852 853 854 855
            /* Load raw strings */
            field = rdbLoadStringObject(rdb);
            if (field == NULL) return NULL;
            redisAssert(field->encoding == REDIS_ENCODING_RAW);
            value = rdbLoadStringObject(rdb);
            if (value == NULL) return NULL;
            redisAssert(field->encoding == REDIS_ENCODING_RAW);

856 857 858
            /* Add pair to ziplist */
            o->ptr = ziplistPush(o->ptr, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
            o->ptr = ziplistPush(o->ptr, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
859 860 861
            /* Convert to hash table if size threshold is exceeded */
            if (sdslen(field->ptr) > server.hash_max_ziplist_value ||
                sdslen(value->ptr) > server.hash_max_ziplist_value)
862
            {
A
antirez 已提交
863 864
                decrRefCount(field);
                decrRefCount(value);
865 866
                hashTypeConvert(o, REDIS_ENCODING_HT);
                break;
867
            }
A
antirez 已提交
868 869
            decrRefCount(field);
            decrRefCount(value);
870
        }
871 872

        /* Load remaining fields and values into the hash table */
873
        while (o->encoding == REDIS_ENCODING_HT && len > 0) {
874 875
            robj *field, *value;

876
            len--;
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893
            /* Load encoded strings */
            field = rdbLoadEncodedStringObject(rdb);
            if (field == NULL) return NULL;
            value = rdbLoadEncodedStringObject(rdb);
            if (value == NULL) return NULL;

            field = tryObjectEncoding(field);
            value = tryObjectEncoding(value);

            /* Add pair to hash table */
            ret = dictAdd((dict*)o->ptr, field, value);
            redisAssert(ret == REDIS_OK);
        }

        /* All pairs should be read by now */
        redisAssert(len == 0);

894 895 896
    } else if (rdbtype == REDIS_RDB_TYPE_HASH_ZIPMAP  ||
               rdbtype == REDIS_RDB_TYPE_LIST_ZIPLIST ||
               rdbtype == REDIS_RDB_TYPE_SET_INTSET   ||
897 898
               rdbtype == REDIS_RDB_TYPE_ZSET_ZIPLIST ||
               rdbtype == REDIS_RDB_TYPE_HASH_ZIPLIST)
899
    {
900
        robj *aux = rdbLoadStringObject(rdb);
901 902

        if (aux == NULL) return NULL;
903
        o = createObject(REDIS_STRING,NULL); /* string is just placeholder */
904 905 906
        o->ptr = zmalloc(sdslen(aux->ptr));
        memcpy(o->ptr,aux->ptr,sdslen(aux->ptr));
        decrRefCount(aux);
907 908 909 910 911 912 913

        /* Fix the object encoding, and make sure to convert the encoded
         * data type into the base type if accordingly to the current
         * configuration there are too many elements in the encoded data
         * type. Note that we only check the length and not max element
         * size as this is an O(N) scan. Eventually everything will get
         * converted. */
914 915
        switch(rdbtype) {
            case REDIS_RDB_TYPE_HASH_ZIPMAP:
916 917 918 919 920
                /* Convert to ziplist encoded hash. This must be deprecated
                 * when loading dumps created by Redis 2.4 gets deprecated. */
                {
                    unsigned char *zl = ziplistNew();
                    unsigned char *zi = zipmapRewind(o->ptr);
921 922 923
                    unsigned char *fstr, *vstr;
                    unsigned int flen, vlen;
                    unsigned int maxlen = 0;
924

925 926 927
                    while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {
                        if (flen > maxlen) maxlen = flen;
                        if (vlen > maxlen) maxlen = vlen;
928 929 930 931 932 933 934 935 936
                        zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL);
                        zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL);
                    }

                    zfree(o->ptr);
                    o->ptr = zl;
                    o->type = REDIS_HASH;
                    o->encoding = REDIS_ENCODING_ZIPLIST;

937 938 939
                    if (hashTypeLength(o) > server.hash_max_ziplist_entries ||
                        maxlen > server.hash_max_ziplist_value)
                    {
940
                        hashTypeConvert(o, REDIS_ENCODING_HT);
941
                    }
942
                }
943
                break;
944
            case REDIS_RDB_TYPE_LIST_ZIPLIST:
945 946 947 948 949
                o->type = REDIS_LIST;
                o->encoding = REDIS_ENCODING_ZIPLIST;
                if (ziplistLen(o->ptr) > server.list_max_ziplist_entries)
                    listTypeConvert(o,REDIS_ENCODING_LINKEDLIST);
                break;
950
            case REDIS_RDB_TYPE_SET_INTSET:
951 952 953 954 955
                o->type = REDIS_SET;
                o->encoding = REDIS_ENCODING_INTSET;
                if (intsetLen(o->ptr) > server.set_max_intset_entries)
                    setTypeConvert(o,REDIS_ENCODING_HT);
                break;
956
            case REDIS_RDB_TYPE_ZSET_ZIPLIST:
957 958
                o->type = REDIS_ZSET;
                o->encoding = REDIS_ENCODING_ZIPLIST;
959
                if (zsetLength(o) > server.zset_max_ziplist_entries)
960
                    zsetConvert(o,REDIS_ENCODING_SKIPLIST);
961
                break;
962 963 964 965 966 967
            case REDIS_RDB_TYPE_HASH_ZIPLIST:
                o->type = REDIS_HASH;
                o->encoding = REDIS_ENCODING_ZIPLIST;
                if (hashTypeLength(o) > server.hash_max_ziplist_entries)
                    hashTypeConvert(o, REDIS_ENCODING_HT);
                break;
968
            default:
969
                redisPanic("Unknown encoding");
970
                break;
971
        }
972 973 974 975 976 977
    } else {
        redisPanic("Unknown object type");
    }
    return o;
}

978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002
/* Mark that we are loading in the global state and setup the fields
 * needed to provide loading stats. */
void startLoading(FILE *fp) {
    struct stat sb;

    /* Load the DB */
    server.loading = 1;
    server.loading_start_time = time(NULL);
    if (fstat(fileno(fp), &sb) == -1) {
        server.loading_total_bytes = 1; /* just to avoid division by zero */
    } else {
        server.loading_total_bytes = sb.st_size;
    }
}

/* Refresh the loading progress info */
void loadingProgress(off_t pos) {
    server.loading_loaded_bytes = pos;
}

/* Loading finished */
void stopLoading(void) {
    server.loading = 0;
}

1003 1004
int rdbLoad(char *filename) {
    uint32_t dbid;
1005
    int type, rdbver;
1006 1007
    redisDb *db = server.db+0;
    char buf[1024];
1008
    long long expiretime, now = mstime();
1009
    long loops = 0;
1010 1011
    FILE *fp;
    rio rdb;
1012 1013

    fp = fopen(filename,"r");
1014 1015 1016 1017
    if (!fp) {
        errno = ENOENT;
        return REDIS_ERR;
    }
1018
    rioInitWithFile(&rdb,fp);
P
Pieter Noordhuis 已提交
1019
    if (rioRead(&rdb,buf,9) == 0) goto eoferr;
1020 1021 1022 1023
    buf[9] = '\0';
    if (memcmp(buf,"REDIS",5) != 0) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
1024
        errno = EINVAL;
1025 1026 1027
        return REDIS_ERR;
    }
    rdbver = atoi(buf+5);
1028
    if (rdbver < 1 || rdbver > 4) {
1029 1030
        fclose(fp);
        redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
1031
        errno = EINVAL;
1032 1033
        return REDIS_ERR;
    }
1034 1035

    startLoading(fp);
1036 1037 1038
    while(1) {
        robj *key, *val;
        expiretime = -1;
1039 1040 1041

        /* Serve the clients from time to time */
        if (!(loops++ % 1000)) {
1042
            loadingProgress(rdb.tell(&rdb));
1043 1044 1045
            aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
        }

1046
        /* Read type. */
1047
        if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
1048
        if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
1049
            if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
1050
            /* We read the time so we need to read the object type again. */
1051
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
A
antirez 已提交
1052
            /* the EXPIRETIME opcode specifies time in seconds, so convert
1053 1054 1055 1056 1057 1058 1059 1060
             * into milliesconds. */
            expiretime *= 1000;
        } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
            /* Milliseconds precision expire times introduced with RDB
             * version 3. */
            if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
1061
        }
1062 1063 1064 1065

        if (type == REDIS_RDB_OPCODE_EOF)
            break;

1066
        /* Handle SELECT DB opcode as a special case */
1067
        if (type == REDIS_RDB_OPCODE_SELECTDB) {
1068
            if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
1069 1070 1071 1072 1073 1074 1075 1076 1077
                goto eoferr;
            if (dbid >= (unsigned)server.dbnum) {
                redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                exit(1);
            }
            db = server.db+dbid;
            continue;
        }
        /* Read key */
1078
        if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
1079
        /* Read value */
1080
        if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
1081 1082 1083 1084 1085 1086
        /* Check if the key already expired. This function is used when loading
         * an RDB file from disk, either at startup, or when an RDB was
         * received from the master. In the latter case, the master is
         * responsible for key expiry. If we would expire keys here, the
         * snapshot taken by the master may not be reflected on the slave. */
        if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
1087 1088 1089 1090 1091
            decrRefCount(key);
            decrRefCount(val);
            continue;
        }
        /* Add the new object in the hash table */
1092 1093
        dbAdd(db,key,val);

1094 1095 1096 1097 1098 1099
        /* Set the expire time if needed */
        if (expiretime != -1) setExpire(db,key,expiretime);

        decrRefCount(key);
    }
    fclose(fp);
1100
    stopLoading();
1101 1102 1103 1104 1105 1106 1107 1108 1109
    return REDIS_OK;

eoferr: /* unexpected end of file is handled here with a fatal exit */
    redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
    exit(1);
    return REDIS_ERR; /* Just to avoid warning */
}

/* A background saving child (BGSAVE) terminated its work. Handle this. */
1110
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
1111 1112 1113
    if (!bysignal && exitcode == 0) {
        redisLog(REDIS_NOTICE,
            "Background saving terminated with success");
1114
        server.dirty = server.dirty - server.dirty_before_bgsave;
1115
        server.lastsave = time(NULL);
1116
        server.lastbgsave_status = REDIS_OK;
1117 1118
    } else if (!bysignal && exitcode != 0) {
        redisLog(REDIS_WARNING, "Background saving error");
1119
        server.lastbgsave_status = REDIS_ERR;
1120 1121
    } else {
        redisLog(REDIS_WARNING,
1122
            "Background saving terminated by signal %d", bysignal);
A
antirez 已提交
1123
        rdbRemoveTempFile(server.rdb_child_pid);
1124
        server.lastbgsave_status = REDIS_ERR;
1125
    }
A
antirez 已提交
1126
    server.rdb_child_pid = -1;
1127 1128 1129 1130
    /* Possibly there are slaves waiting for a BGSAVE in order to be served
     * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}
1131 1132

void saveCommand(redisClient *c) {
A
antirez 已提交
1133
    if (server.rdb_child_pid != -1) {
1134 1135 1136
        addReplyError(c,"Background save already in progress");
        return;
    }
A
antirez 已提交
1137
    if (rdbSave(server.rdb_filename) == REDIS_OK) {
1138 1139 1140 1141 1142 1143 1144
        addReply(c,shared.ok);
    } else {
        addReply(c,shared.err);
    }
}

void bgsaveCommand(redisClient *c) {
A
antirez 已提交
1145
    if (server.rdb_child_pid != -1) {
1146
        addReplyError(c,"Background save already in progress");
A
antirez 已提交
1147
    } else if (server.aof_child_pid != -1) {
1148
        addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
A
antirez 已提交
1149
    } else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK) {
1150 1151 1152 1153 1154
        addReplyStatus(c,"Background saving started");
    } else {
        addReply(c,shared.err);
    }
}