diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index c01141f0d68181713455ab2f58017ec6e112f8b4..77e91acc14f852f9e55d72f13532ea68a26078c5 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -201,10 +201,10 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) pDataCol->len = 0; if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - pDataCol->spaceSize = (sizeof(VarDataLenT) + pDataCol->bytes) * maxPoints; pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); - pDataCol->pData = POINTER_SHIFT(*pBuf, TYPE_BYTES[pDataCol->type] * maxPoints); - *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + TYPE_BYTES[pDataCol->type] * maxPoints); + pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints); + pDataCol->spaceSize = pDataCol->bytes * maxPoints; + *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints); } else { pDataCol->spaceSize = pDataCol->bytes * maxPoints; pDataCol->dataOff = NULL; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 5379b371ef05150af0fc02c2a9693e3f6111bd5a..ac2af757426cded4dcdab58c7245808d53a5d06e 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -71,6 +71,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "operations TAOS_DEFINE_ERROR(TSDB_CODE_COM_MEMORY_CORRUPTED, 0, 0x0101, "memory corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_COM_OUT_OF_MEMORY, 0, 0x0102, "out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_COM_INVALID_CFG_MSG, 0, 0x0103, "invalid config message") +TAOS_DEFINE_ERROR(TSDB_CODE_COM_FILE_CORRUPTED, 0, 0x0104, "file is corrupted") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_SQL, 0, 0x0200, "invalid sql") diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 5926f2f2436d123822bb93e597ae66fa04dfb827..f52022d79dbd553e333ef26e8d59b86ea871db94 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -161,7 +161,7 @@ typedef struct { int64_t index; int numOfCacheBlocks; SList * memPool; -} STsdbCachePool; +} STsdbBufferPool; typedef struct { TSKEY keyFirst; @@ -173,7 +173,7 @@ typedef struct { typedef struct { int cacheBlockSize; int totalCacheBlocks; - STsdbCachePool pool; + STsdbBufferPool pool; STsdbCacheBlock *curBlock; SCacheMem * mem; SCacheMem * imem; diff --git a/src/tsdb/src/tsdbCache.c b/src/tsdb/src/tsdbCache.c index edc8472b34661fa616dcd352258249902e711a71..24476d89975bd420f53d7a40ac1cb6dd0c961bf0 100644 --- a/src/tsdb/src/tsdbCache.c +++ b/src/tsdb/src/tsdbCache.c @@ -35,7 +35,7 @@ STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) pCache->totalCacheBlocks = totalBlocks; pCache->pRepo = pRepo; - STsdbCachePool *pPool = &(pCache->pool); + STsdbBufferPool *pPool = &(pCache->pool); pPool->index = 0; pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *)); if (pPool->memPool == NULL) goto _err; @@ -106,7 +106,7 @@ static void tsdbFreeCacheMem(SCacheMem *mem) { } static int tsdbAllocBlockFromPool(STsdbCache *pCache) { - STsdbCachePool *pPool = &(pCache->pool); + STsdbBufferPool *pPool = &(pCache->pool); tsdbLockRepo(pCache->pRepo); if (listNEles(pPool->memPool) == 0) { @@ -170,7 +170,7 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { } static int tsdbAddCacheBlockToPool(STsdbCache *pCache) { - STsdbCachePool *pPool = &pCache->pool; + STsdbBufferPool *pPool = &pCache->pool; STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize); if (pBlock == NULL) return -1; @@ -184,7 +184,7 @@ static int tsdbAddCacheBlockToPool(STsdbCache *pCache) { } static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) { - STsdbCachePool *pPool = &pCache->pool; + STsdbBufferPool *pPool = &pCache->pool; STsdbCacheBlock *pBlock = NULL; ASSERT(pCache->totalCacheBlocks >= 0); diff --git a/src/tsdb/src/tsdbMetaFile.c b/src/tsdb/src/tsdbMetaFile.c index 19fcae94e367816d01aa3cb29dd2f05890c3769d..921db8674aff5dd4463c122e277adb76d09fe9b1 100644 --- a/src/tsdb/src/tsdbMetaFile.c +++ b/src/tsdb/src/tsdbMetaFile.c @@ -105,7 +105,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t c return -1; } - fsync(mfh->fd); + // fsync(mfh->fd); mfh->tombSize++; @@ -132,7 +132,7 @@ int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, uint64_t uid) { return -1; } - fsync(mfh->fd); + // fsync(mfh->fd); mfh->nDel++; @@ -167,7 +167,7 @@ int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t c return -1; } - fsync(mfh->fd); + // fsync(mfh->fd); return 0; } diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h index 724c94e21dde790cd0fef007d33f029e1e10f1d8..a57d0e95cfe65048a90356677a021e698ee2daaa 100644 --- a/src/util/inc/tkvstore.h +++ b/src/util/inc/tkvstore.h @@ -46,11 +46,12 @@ typedef struct { } SKVStore; int tdCreateKVStore(char *fname); -int tdDestroyKVStore(); +int tdDestroyKVStore(char *fname); SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); void tdCloseKVStore(SKVStore *pStore); int tdKVStoreStartCommit(SKVStore *pStore); -int tdUpdateRecordInKVStore(SKVStore *pStore, uint64_t uid, void *cont, int contLen); +int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen); +int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid); int tdKVStoreEndCommit(SKVStore *pStore); #ifdef __cplusplus diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 148d8235a6fb832183e30a346cd4b6898b60f1cb..88cd446349a351da7855d7fcdfcd2291c830381f 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -34,42 +34,67 @@ #define TD_KVSTORE_SNAP_SUFFIX ".snap" #define TD_KVSTORE_NEW_SUFFIX ".new" +typedef struct { + uint64_t uid; + int64_t offset; + int64_t size; +} SKVRecord; + static int tdInitKVStoreHeader(int fd, char *fname); static void * tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo); -// static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); +static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); static char * tdGetKVStoreSnapshotFname(char *fdata); static char * tdGetKVStoreNewFname(char *fdata); static void tdFreeKVStore(SKVStore *pStore); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); +static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); +static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord); +static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord); +static int tdRestoreKVStore(SKVStore *pStore); int tdCreateKVStore(char *fname) { - char *tname = strdup(fname); - if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY; - int fd = open(fname, O_RDWR | O_CREAT, 0755); if (fd < 0) { uError("failed to open file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - int code = tdInitKVStoreHeader(fd, fname); - if (code != TSDB_CODE_SUCCESS) return code; + if (tdInitKVStoreHeader(fd, fname) < 0) { + close(fd); + return -1; + } if (fsync(fd) < 0) { uError("failed to fsync file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + close(fd); + return -1; } if (close(fd) < 0) { uError("failed to close file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - return TSDB_CODE_SUCCESS; + return 0; +} + +int tdDestroyKVStore(char *fname) { + if (remove(fname) < 0) { + uError("failed to remove file %s since %s", fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; } SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { + SStoreInfo info = {0}; + SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH); if (pStore == NULL) return NULL; @@ -89,14 +114,44 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - // TODO: rewind the file + if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err; + + if (ftruncate(pStore->fd, info.size) < 0) { + uError("failed to truncate %s to " PRId64 " size since %s", pStore->fname, info.size, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; close(pStore->sfd); pStore->sfd = -1; remove(pStore->fsnap); } - // TODO: Recover from the file + if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; + + struct stat tfstat; + if (fstat(pStore->fd, &tfstat) < 0) { + uError("failed to fstat file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(info.size == tfstat.st_size); + + if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pStore->info.size += TD_KVSTORE_HEADER_SIZE; + + if (tdRestoreKVStore(pStore) < 0) goto _err; + + close(pStore->fd); + pStore->fd = -1; return pStore; @@ -113,7 +168,11 @@ _err: return NULL; } +void tdCloseKVStore(SKVStore *pStore) { tdFreeKVStore(pStore); } + int tdKVStoreStartCommit(SKVStore *pStore) { + ASSERT(pStore->fd < 0); + pStore->fd = open(pStore->fname, O_RDWR); if (pStore->fd < 0) { uError("failed to open file %s since %s", pStore->fname, strerror(errno)); @@ -147,6 +206,14 @@ int tdKVStoreStartCommit(SKVStore *pStore) { } pStore->sfd = -1; + if (lseek(pStore->fd, 0, SEEK_END) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(pStore->info.size == lseek(pStore->fd, 0, SEEK_CUR)); + return 0; _err: @@ -162,11 +229,50 @@ _err: return -1; } +int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen) { + SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid)); + if (pRecord != NULL) { + pStore->info.tombSize += (pRecord->size + sizeof(SKVRecord)); + } + + // TODO + return 0; +} + +int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { + SKVRecord rInfo = {0}; + char buf[128] = "\0"; + + SKVRecord *pRecord = taosHashGet(pStore->map, &uid, sizeof(uid)); + if (pRecord == NULL) { + uError("failed to drop KV store record with key " PRIu64 " since not find", uid); + return -1; + } + + rInfo.offset = -pRecord->offset; + rInfo.uid = pRecord->uid; + rInfo.size = pRecord->size; + + void *pBuf = tdEncodeKVRecord(buf, &rInfo); + + if (twrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) { + uError("failed to write %d bytes to file %s since %s", POINTER_DISTANCE(pBuf, buf), pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pStore->info.size += POINTER_DISTANCE(pBuf, buf); + pStore->info.nDels++; + pStore->info.nRecords--; + pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); + + return 0; +} + int tdKVStoreEndCommit(SKVStore *pStore) { ASSERT(pStore->fd > 0); - terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)); - if (terrno != TSDB_CODE_SUCCESS) return -1; + if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) return -1; if (fsync(pStore->fd) < 0) { uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); @@ -179,27 +285,56 @@ int tdKVStoreEndCommit(SKVStore *pStore) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + pStore->fd = -1; remove(pStore->fsnap); return 0; } +static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { + char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; + + if (lseek(fd, 0, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (tread(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { + uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) { + uError("file %s is broken", fname); + terrno = TSDB_CODE_COM_FILE_CORRUPTED; + return -1; + } + + tdDecodeStoreInfo(buf, pInfo); + + return 0; +} + static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; if (lseek(fd, 0, SEEK_SET) < 0) { uError("failed to lseek file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } tdEncodeStoreInfo(buf, pInfo); taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE); if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { - uError("failed to write file %s %d bytes since %s", fname, TD_KVSTORE_HEADER_SIZE, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - return TSDB_CODE_SUCCESS; + return 0; } static int tdInitKVStoreHeader(int fd, char *fname) { @@ -217,21 +352,24 @@ static void *tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo) { return buf; } -// static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { -// buf = taosDecodeVariantI64(buf, &(pInfo->size)); -// buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); -// buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); -// buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); +static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { + buf = taosDecodeVariantI64(buf, &(pInfo->size)); + buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); + buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); + buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); -// return buf; -// } + return buf; +} static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { SKVStore *pStore = (SKVStore *)malloc(sizeof(SKVStore)); if (pStore == NULL) goto _err; pStore->fname = strdup(fname); - if (pStore->map == NULL) goto _err; + if (pStore->map == NULL) { + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } pStore->fsnap = tdGetKVStoreSnapshotFname(fname); if (pStore->fsnap == NULL) goto _err; @@ -254,7 +392,6 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void return pStore; _err: - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; tdFreeKVStore(pStore); return NULL; } @@ -289,4 +426,116 @@ static char *tdGetKVStoreNewFname(char *fdata) { } sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX); return fname; +} + +static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { + buf = taosEncodeFixedU64(buf, pRecord->uid); + buf = taosEncodeFixedI64(buf, pRecord->offset); + buf = taosEncodeFixedI64(buf, pRecord->size); + + return buf; +} + +static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { + buf = taosDecodeFixedU64(buf, &(pRecord->uid)); + buf = taosDecodeFixedI64(buf, &(pRecord->offset)); + buf = taosDecodeFixedI64(buf, &(pRecord->size)); + + return buf; +} + +static int tdRestoreKVStore(SKVStore *pStore) { + char tbuf[128] = "\0"; + void * buf = NULL; + int maxBufSize = 0; + SKVRecord rInfo = {0}; + + ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR)); + + while (true) { + ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord)); + if (tsize == 0) break; + if (tsize < sizeof(SKVRecord)) { + uError("failed to read %d bytes from file %s since %s", sizeof(SKVRecord), pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + char *pBuf = tdDecodeKVRecord(tbuf, &rInfo); + ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord)); + ASSERT(rInfo.offset > 0 ? pStore->info.size == rInfo.offset : true); + + if (rInfo.offset < 0) { + taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid)); + pStore->info.size += sizeof(SKVRecord); + pStore->info.nRecords--; + pStore->info.nDels++; + pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) + sizeof(SKVRecord)); + } else { + // TODO: add statistics + ASSERT(rInfo.offset > 0 && rInfo.size > 0); + if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { + uError("failed to put record in KV store %s", pStore->fname); + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } + + maxBufSize = MAX(maxBufSize, rInfo.size); + + if (lseek(pStore->fd, rInfo.size, SEEK_CUR) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + } + + buf = malloc(maxBufSize); + if (buf == NULL) { + uError("failed to allocate %d bytes in KV store %s", maxBufSize, pStore->fname); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + SHashMutableIterator *pIter = taosHashCreateIter(pStore->map); + if (pIter == NULL) { + uError("failed to create hash iter while opening KV store %s", pStore->fname); + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } + + while (taosHashIterNext(pIter)) { + SKVRecord *pRecord = taosHashIterGet(pIter); + + if (lseek(pStore->fd, pRecord->offset, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (tread(pStore->fd, buf, pRecord->size) < pRecord->size) { + uError("failed to read %d bytes from file %s since %s", pRecord->size, pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, pRecord->size)) { + uError("file %s has checksum error, offset " PRId64 " size %d", pStore->fname, pRecord->offset, pRecord->size); + terrno = TSDB_CODE_COM_FILE_CORRUPTED; + goto _err; + } + + if (pStore->iFunc) (*pStore->iFunc)(pStore->appH, buf, pRecord->size); + } + + taosHashDestroyIter(pIter); + + if (pStore->aFunc) (*pStore->aFunc)(pStore->appH); + + tfree(buf); + return 0; + +_err: + tfree(buf); + return -1; } \ No newline at end of file