diff --git a/src/rdb.c b/src/rdb.c index c79bfa8d4d23812636795b17b90a49de4aed2763..acc6ca87915214af2a0739f19d9928e4bfd6b659 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -785,6 +785,12 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { nwritten += n; } raxStop(&ri); + + /* Save the last entry ID. */ + if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1; + nwritten += n; + if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1; + nwritten += n; } else if (o->type == OBJ_MODULE) { /* Save a module-specific value. */ RedisModuleIO io; @@ -1431,6 +1437,40 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); break; } + } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) { + o = createStreamObject(); + stream *s = o->ptr; + uint64_t listpacks = rdbLoadLen(rdb,NULL); + + while(listpacks--) { + unsigned char *lp = + rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); + if (lp == NULL) return NULL; + unsigned char *first = lpFirst(lp); + if (first == NULL) { + /* Serialized listpacks should never be free, since on + * deletion we should remove the radix tree key if the + * resulting listpack is emtpy. */ + rdbExitReportCorruptRDB("Empty listpack inside stream"); + } + + /* Get the ID of the first entry: we'll use it as key to add the + * listpack into the radix tree. */ + int64_t e_len; + unsigned char buf[LP_INTBUF_SIZE]; + unsigned char *e = lpGet(first,&e_len,buf); + if (e_len != sizeof(streamID)) { + rdbExitReportCorruptRDB("Listpack first entry is not the " + "size of a stream ID"); + } + int retval = raxInsert(s->rax,e,sizeof(streamID),lp,NULL); + if (!retval) + rdbExitReportCorruptRDB("Listpack re-added with existing key"); + } + + /* Load the last entry ID. */ + s->last_id.ms = rdbLoadLen(rdb,NULL); + s->last_id.seq = rdbLoadLen(rdb,NULL); } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { uint64_t moduleid = rdbLoadLen(rdb,NULL); moduleType *mt = moduleTypeLookupModuleByID(moduleid);