diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 148d8235a6fb832183e30a346cd4b6898b60f1cb..03bf4b1b0023de0bf5c2f23bf6dd9181a7e7345e 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -34,14 +34,21 @@ #define TD_KVSTORE_SNAP_SUFFIX ".snap" #define TD_KVSTORE_NEW_SUFFIX ".new" +typedef struct { + uint64_t uid; + int64_t offset; + int32_t size; +} SKVRecord; + static int tdInitKVStoreHeader(int fd, char *fname); static void * tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo); -// static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); +static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); static char * tdGetKVStoreSnapshotFname(char *fdata); static char * tdGetKVStoreNewFname(char *fdata); static void tdFreeKVStore(SKVStore *pStore); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); +static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); int tdCreateKVStore(char *fname) { char *tname = strdup(fname); @@ -70,6 +77,7 @@ int tdCreateKVStore(char *fname) { } SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { + SStoreInfo info = {0}; SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH); if (pStore == NULL) return NULL; @@ -90,6 +98,13 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH } // TODO: rewind the file + if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err; + + if (ftruncate(pStore->fd, info.size) < 0) { + uError("failed to truncate %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } close(pStore->sfd); pStore->sfd = -1; @@ -97,6 +112,18 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH } // TODO: Recover from the file + terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info); + if (terrno != TSDB_CODE_SUCCESS) goto _err; + + if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + while (true) { + + } return pStore; @@ -184,6 +211,25 @@ int tdKVStoreEndCommit(SKVStore *pStore) { return 0; } +static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { + char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; + + if (tread(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { + uError("failed to read file %s since %s", fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) { + uError("file %s is broken", fname); + return -1; + } + + tdDecodeStoreInfo(buf, pInfo); + + return 0; +} + static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; @@ -217,14 +263,14 @@ static void *tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo) { return buf; } -// static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { -// buf = taosDecodeVariantI64(buf, &(pInfo->size)); -// buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); -// buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); -// buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); +static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { + buf = taosDecodeVariantI64(buf, &(pInfo->size)); + buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); + buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); + buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); -// return buf; -// } + return buf; +} static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { SKVStore *pStore = (SKVStore *)malloc(sizeof(SKVStore));