diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 05c3b303773bd2542f9c35e535fb6309f5eb58ce..40e566789389a24e0a5572db5917ce4b4f1b6b45 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -74,14 +74,16 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) { STable *tsdbDecodeTable(void *cont, int contLen) { STable *pTable = (STable *)calloc(1, sizeof(STable)); if (pTable == NULL) return NULL; - pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS); - if (pTable->schema == NULL) { - free(pTable); - return NULL; - } void *ptr = cont; T_READ_MEMBER(ptr, int8_t, pTable->type); + if (pTable->type != TSDB_CHILD_TABLE) { + pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS); + if (pTable->schema == NULL) { + free(pTable); + return NULL; + } + } int len = *(int *)ptr; ptr = (char *)ptr + sizeof(int); pTable->name = calloc(1, len + VARSTR_HEADER_SIZE + 1); @@ -620,7 +622,10 @@ static int tsdbFreeTable(STable *pTable) { if (pTable->type == TSDB_CHILD_TABLE) { kvRowFree(pTable->tagVal); } else { - for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]); + if (pTable->schema) { + for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]); + free(pTable->schema); + } } if (pTable->type == TSDB_STREAM_TABLE) { diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index eab70b591349cacfc382c206e393a4846283a559..add484b5f4136b2776e47e0baa4547fd606f1705 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -764,8 +764,8 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) { - ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && - rowsToWrite <= pHelper->config.maxRowsPerFileBlock); + ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pHelper->config.maxRowsPerFileBlock); + ASSERT(isLast ? rowsToWrite < pHelper->config.minRowsPerFileBlock : true); SCompData *pCompData = (SCompData *)(pHelper->pBuffer); int64_t offset = 0; @@ -905,14 +905,15 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows); if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) && - (blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) { + (blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) && + (pHelper->files.nLastF.fd) < 0) { if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } else { // Load if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; - ASSERT(pHelper->pDataCols[0]->numOfRows == blockAtIdx(pHelper, blkIdx)->numOfRows); + ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows); // Merge if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; // Write @@ -936,21 +937,21 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa // Key must overlap with the block ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast); - TSKEY keyLimit = - (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1; + TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1; // rows1: number of rows must merge in this block int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast); - // rows2: max nuber of rows the block can have more + // rows2: max number of rows the block can have more int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows; // rows3: number of rows between this block and the next block int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit); ASSERT(rows3 >= rows1); - if ((rows2 >= rows1) && - (( blockAtIdx(pHelper, blkIdx)->last) || - ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) { + if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) && + ((!blockAtIdx(pHelper, blkIdx)->last) || + ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) && + (pHelper->files.nLastF.fd < 0)))) { rowsWritten = rows1; bool isLast = false; SFile *pFile = NULL; @@ -964,7 +965,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; - } else { // Load-Merge-Write + } else { // Load-Merge-Write // Load if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false; @@ -1106,16 +1107,16 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; if (pTCompBlock->numOfSubBlocks > 1) { - ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len); + ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset); break; } } - if (ptr == NULL) ptr = (void *)((char *)(pHelper->pCompInfo) + pIdx->len - sizeof(TSCKSUM)); + if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len-sizeof(TSCKSUM)); size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); if (tsize > 0) { - memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize); + memmove(POINTER_SHIFT(ptr, sizeof(SCompBlock) * 2), ptr, tsize); for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2); diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h new file mode 100644 index 0000000000000000000000000000000000000000..724c94e21dde790cd0fef007d33f029e1e10f1d8 --- /dev/null +++ b/src/util/inc/tkvstore.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_KVSTORE_H_ +#define _TD_KVSTORE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef int (*iterFunc)(void *, void *cont, int contLen); +typedef void (*afterFunc)(void *); + +typedef struct { + int64_t size; + int64_t tombSize; + int64_t nRecords; + int64_t nDels; +} SStoreInfo; + +typedef struct { + char * fname; + int fd; + char * fsnap; + int sfd; + char * fnew; + int nfd; + SHashObj * map; + iterFunc iFunc; + afterFunc aFunc; + void * appH; + SStoreInfo info; +} SKVStore; + +int tdCreateKVStore(char *fname); +int tdDestroyKVStore(); +SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); +void tdCloseKVStore(SKVStore *pStore); +int tdKVStoreStartCommit(SKVStore *pStore); +int tdUpdateRecordInKVStore(SKVStore *pStore, uint64_t uid, void *cont, int contLen); +int tdKVStoreEndCommit(SKVStore *pStore); + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c new file mode 100644 index 0000000000000000000000000000000000000000..148d8235a6fb832183e30a346cd4b6898b60f1cb --- /dev/null +++ b/src/util/src/tkvstore.c @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include +#include +#include +#include +#include +#include + +#include "hash.h" +#include "os.h" +#include "taoserror.h" +#include "tchecksum.h" +#include "tcoding.h" +#include "tkvstore.h" +#include "tulog.h" + +#define TD_KVSTORE_HEADER_SIZE 512 +#define TD_KVSTORE_MAJOR_VERSION 1 +#define TD_KVSTORE_MAINOR_VERSION 0 +#define TD_KVSTORE_SNAP_SUFFIX ".snap" +#define TD_KVSTORE_NEW_SUFFIX ".new" + +static int tdInitKVStoreHeader(int fd, char *fname); +static void * tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo); +// static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); +static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); +static char * tdGetKVStoreSnapshotFname(char *fdata); +static char * tdGetKVStoreNewFname(char *fdata); +static void tdFreeKVStore(SKVStore *pStore); +static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); + +int tdCreateKVStore(char *fname) { + char *tname = strdup(fname); + if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY; + + int fd = open(fname, O_RDWR | O_CREAT, 0755); + if (fd < 0) { + uError("failed to open file %s since %s", fname, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } + + int code = tdInitKVStoreHeader(fd, fname); + if (code != TSDB_CODE_SUCCESS) return code; + + if (fsync(fd) < 0) { + uError("failed to fsync file %s since %s", fname, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } + + if (close(fd) < 0) { + uError("failed to close file %s since %s", fname, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } + + return TSDB_CODE_SUCCESS; +} + +SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { + SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH); + if (pStore == NULL) return NULL; + + pStore->fd = open(pStore->fname, O_RDWR); + if (pStore->fd < 0) { + uError("failed to open file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (access(pStore->fsnap, F_OK) == 0) { + uTrace("file %s exists, try to recover the KV store", pStore->fsnap); + pStore->sfd = open(pStore->fsnap, O_RDONLY); + if (pStore->sfd < 0) { + uError("failed to open file %s since %s", pStore->fsnap, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // TODO: rewind the file + + close(pStore->sfd); + pStore->sfd = -1; + remove(pStore->fsnap); + } + + // TODO: Recover from the file + + return pStore; + +_err: + if (pStore->fd > 0) { + close(pStore->fd); + pStore->fd = -1; + } + if (pStore->sfd > 0) { + close(pStore->sfd); + pStore->sfd = -1; + } + tdFreeKVStore(pStore); + return NULL; +} + +int tdKVStoreStartCommit(SKVStore *pStore) { + pStore->fd = open(pStore->fname, O_RDWR); + if (pStore->fd < 0) { + uError("failed to open file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pStore->sfd = open(pStore->fsnap, O_WRONLY | O_CREAT, 0755); + if (pStore->sfd < 0) { + uError("failed to open file %s since %s", pStore->fsnap, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (tsendfile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { + uError("failed to send file %d bytes since %s", TD_KVSTORE_HEADER_SIZE, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (fsync(pStore->sfd) < 0) { + uError("failed to fsync file %s since %s", pStore->fsnap, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (close(pStore->sfd) < 0) { + uError("failed to close file %s since %s", pStore->fsnap, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + pStore->sfd = -1; + + return 0; + +_err: + if (pStore->sfd > 0) { + close(pStore->sfd); + pStore->sfd = -1; + remove(pStore->fsnap); + } + if (pStore->fd > 0) { + close(pStore->fd); + pStore->fd = -1; + } + return -1; +} + +int tdKVStoreEndCommit(SKVStore *pStore) { + ASSERT(pStore->fd > 0); + + terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)); + if (terrno != TSDB_CODE_SUCCESS) return -1; + + if (fsync(pStore->fd) < 0) { + uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (close(pStore->fd) < 0) { + uError("failed to close file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + remove(pStore->fsnap); + return 0; +} + +static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { + char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; + + if (lseek(fd, 0, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", fname, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } + + tdEncodeStoreInfo(buf, pInfo); + taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE); + if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { + uError("failed to write file %s %d bytes since %s", fname, TD_KVSTORE_HEADER_SIZE, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } + + return TSDB_CODE_SUCCESS; +} + +static int tdInitKVStoreHeader(int fd, char *fname) { + SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0}; + + return tdUpdateKVStoreHeader(fd, fname, &info); +} + +static void *tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo) { + buf = taosEncodeVariantI64(buf, pInfo->size); + buf = taosEncodeVariantI64(buf, pInfo->tombSize); + buf = taosEncodeVariantI64(buf, pInfo->nRecords); + buf = taosEncodeVariantI64(buf, pInfo->nDels); + + return buf; +} + +// static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { +// buf = taosDecodeVariantI64(buf, &(pInfo->size)); +// buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); +// buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); +// buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); + +// return buf; +// } + +static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { + SKVStore *pStore = (SKVStore *)malloc(sizeof(SKVStore)); + if (pStore == NULL) goto _err; + + pStore->fname = strdup(fname); + if (pStore->map == NULL) goto _err; + + pStore->fsnap = tdGetKVStoreSnapshotFname(fname); + if (pStore->fsnap == NULL) goto _err; + + pStore->fnew = tdGetKVStoreNewFname(fname); + if (pStore->fnew == NULL) goto _err; + + pStore->fd = -1; + pStore->sfd = -1; + pStore->nfd = -1; + pStore->iFunc = iFunc; + pStore->aFunc = aFunc; + pStore->appH = appH; + pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + if (pStore->map == NULL) { + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } + + return pStore; + +_err: + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + tdFreeKVStore(pStore); + return NULL; +} + +static void tdFreeKVStore(SKVStore *pStore) { + if (pStore) { + tfree(pStore->fname); + tfree(pStore->fsnap); + tfree(pStore->fnew); + taosHashCleanup(pStore->map); + free(pStore); + } +} + +static char *tdGetKVStoreSnapshotFname(char *fdata) { + size_t size = strlen(fdata) + strlen(TD_KVSTORE_SNAP_SUFFIX) + 1; + char * fname = malloc(size); + if (fname == NULL) { + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + return NULL; + } + sprintf(fname, "%s%s", fdata, TD_KVSTORE_SNAP_SUFFIX); + return fname; +} + +static char *tdGetKVStoreNewFname(char *fdata) { + size_t size = strlen(fdata) + strlen(TD_KVSTORE_NEW_SUFFIX) + 1; + char * fname = malloc(size); + if (fname == NULL) { + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + return NULL; + } + sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX); + return fname; +} \ No newline at end of file