diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h index a57d0e95cfe65048a90356677a021e698ee2daaa..346e567c41e0a209b7dde96de55faa74a704f9b3 100644 --- a/src/util/inc/tkvstore.h +++ b/src/util/inc/tkvstore.h @@ -25,7 +25,7 @@ typedef int (*iterFunc)(void *, void *cont, int contLen); typedef void (*afterFunc)(void *); typedef struct { - int64_t size; + int64_t size; // including 512 bytes of header size int64_t tombSize; int64_t nRecords; int64_t nDels; diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 88cd446349a351da7855d7fcdfcd2291c830381f..abd9d039a9f18f8ccb525f35a97f5b88aff873b0 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -58,7 +58,7 @@ int tdCreateKVStore(char *fname) { if (fd < 0) { uError("failed to open file %s since %s", fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + goto _err; } if (tdInitKVStoreHeader(fd, fname) < 0) { @@ -80,6 +80,11 @@ int tdCreateKVStore(char *fname) { } return 0; + +_err: + if (fd > 0) close(fd); + remove(fname); + return -1; } int tdDestroyKVStore(char *fname) { @@ -105,7 +110,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - if (access(pStore->fsnap, F_OK) == 0) { + if (access(pStore->fsnap, F_OK) == 0) { // .snap file exists uTrace("file %s exists, try to recover the KV store", pStore->fsnap); pStore->sfd = open(pStore->fsnap, O_RDONLY); if (pStore->sfd < 0) { @@ -114,16 +119,22 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err; + if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) { + if (terrno != TSDB_CODE_COM_FILE_CORRUPTED) goto _err; + } else { + if (ftruncate(pStore->fd, info.size) < 0) { + uError("failed to truncate %s to " PRId64 " size since %s", pStore->fname, info.size, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - if (ftruncate(pStore->fd, info.size) < 0) { - uError("failed to truncate %s to " PRId64 " size since %s", pStore->fname, info.size, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; + if (fsync(pStore->fd) < 0) { + uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); + goto _err; + } } - if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; - close(pStore->sfd); pStore->sfd = -1; remove(pStore->fsnap); @@ -131,22 +142,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; - struct stat tfstat; - if (fstat(pStore->fd, &tfstat) < 0) { - uError("failed to fstat file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - ASSERT(info.size == tfstat.st_size); - - if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) { - uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - pStore->info.size += TD_KVSTORE_HEADER_SIZE; + pStore->info.size = TD_KVSTORE_HEADER_SIZE; if (tdRestoreKVStore(pStore) < 0) goto _err; @@ -326,7 +322,9 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { return -1; } - tdEncodeStoreInfo(buf, pInfo); + void *pBuf = tdEncodeStoreInfo(buf, pInfo); + ASSERT(POINTER_DISTANCE(pBuf, buf) + sizeof(TSCKSUM) <= TD_KVSTORE_HEADER_SIZE); + taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE); if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); @@ -445,34 +443,36 @@ static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { } static int tdRestoreKVStore(SKVStore *pStore) { - char tbuf[128] = "\0"; - void * buf = NULL; - int maxBufSize = 0; - SKVRecord rInfo = {0}; + char tbuf[128] = "\0"; + void * buf = NULL; + int maxBufSize = 0; + SKVRecord rInfo = {0}; + SHashMutableIterator *pIter = NULL; ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR)); + ASSERT(pStore->info.size == TD_KVSTORE_HEADER_SIZE); while (true) { ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord)); if (tsize == 0) break; if (tsize < sizeof(SKVRecord)) { - uError("failed to read %d bytes from file %s since %s", sizeof(SKVRecord), pStore->fname, strerror(errno)); + uError("failed to read %d bytes from file %s at offset %" PRId64 "since %s", sizeof(SKVRecord), pStore->fname, + pStore->info.size, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } char *pBuf = tdDecodeKVRecord(tbuf, &rInfo); ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord)); - ASSERT(rInfo.offset > 0 ? pStore->info.size == rInfo.offset : true); + ASSERT(pStore->info.size == rInfo.offset); if (rInfo.offset < 0) { taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid)); pStore->info.size += sizeof(SKVRecord); pStore->info.nRecords--; pStore->info.nDels++; - pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) + sizeof(SKVRecord)); + pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); } else { - // TODO: add statistics ASSERT(rInfo.offset > 0 && rInfo.size > 0); if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { uError("failed to put record in KV store %s", pStore->fname); @@ -487,6 +487,9 @@ static int tdRestoreKVStore(SKVStore *pStore) { terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + + pStore->info.size += (sizeof(SKVRecord) + rInfo.size); + pStore->info.nRecords++; } } @@ -497,7 +500,7 @@ static int tdRestoreKVStore(SKVStore *pStore) { goto _err; } - SHashMutableIterator *pIter = taosHashCreateIter(pStore->map); + pIter = taosHashCreateIter(pStore->map); if (pIter == NULL) { uError("failed to create hash iter while opening KV store %s", pStore->fname); terrno = TSDB_CODE_COM_OUT_OF_MEMORY; @@ -508,13 +511,14 @@ static int tdRestoreKVStore(SKVStore *pStore) { SKVRecord *pRecord = taosHashIterGet(pIter); if (lseek(pStore->fd, pRecord->offset, SEEK_SET) < 0) { - uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + uError("failed to lseek file %s since %s, offset %" PRId64, pStore->fname, strerror(errno), pRecord->offset); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } if (tread(pStore->fd, buf, pRecord->size) < pRecord->size) { - uError("failed to read %d bytes from file %s since %s", pRecord->size, pStore->fname, strerror(errno)); + uError("failed to read %" PRId64 " bytes from file %s since %s, offset %" PRId64, pRecord->size, pStore->fname, + strerror(errno), pRecord->offset); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } @@ -528,14 +532,14 @@ static int tdRestoreKVStore(SKVStore *pStore) { if (pStore->iFunc) (*pStore->iFunc)(pStore->appH, buf, pRecord->size); } - taosHashDestroyIter(pIter); - if (pStore->aFunc) (*pStore->aFunc)(pStore->appH); + taosHashDestroyIter(pIter); tfree(buf); return 0; _err: + taosHashDestroyIter(pIter); tfree(buf); return -1; } \ No newline at end of file