diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index c9071f4cb0c14e689583b25c1c7e2939ec4a6fb6..b5ab4412a9b4f41821b9b0fa00e303f467c4e316 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -115,6 +115,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_ID, 0, 255, "invalid query i TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_STREAM_ID, 0, 256, "invalid stream id") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONNECTION, 0, 257, "invalid connection") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_ERROR, 0, 258, "sdb error") +TAOS_DEFINE_ERROR(TSDB_CODE_TIMESTAMP_OUT_OF_RANGE, 0, 259, "timestamp is out of range") // acct TAOS_DEFINE_ERROR(TSDB_CODE_ACCT_ALREADY_EXIST, 0, 300, "accounts already exist") @@ -172,6 +173,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 462, "invalid value") // others TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 500, "invalid file format") +// TSDB +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONFIG, 0, 550, "invalid TSDB configuration") + #ifdef TAOS_ERROR_C }; diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 39808ab02fd23373a14aae8027604cf4f37b026d..e056a10bbab38e423bc2e978c0a344987bebc836 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -160,6 +160,7 @@ typedef struct { typedef struct { int64_t index; + int numOfCacheBlocks; SList * memPool; } STsdbCachePool; @@ -227,13 +228,13 @@ typedef struct { int maxFGroups; int numOfFGroups; - SFileGroup fGroup[]; + SFileGroup *fGroup; } STsdbFileH; #define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId #define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId -STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); +STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg); void tsdbCloseFileH(STsdbFileH *pFileH); int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose); @@ -261,11 +262,12 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter); typedef struct { int32_t len; int32_t offset; + int32_t padding; // For padding purpose int32_t hasLast : 1; int32_t numOfBlocks : 31; - int32_t checksum; + int64_t uid; TSKEY maxKey; -} SCompIdx; /* sizeof(SCompIdx) = 24 */ +} SCompIdx; /* sizeof(SCompIdx) = 28 */ /** * if numOfSubBlocks == 0, then the SCompBlock is a sub-block @@ -485,6 +487,11 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper); int tsdbWriteCompInfo(SRWHelper *pHelper); int tsdbWriteCompIdx(SRWHelper *pHelper); +// --------- Other functions need to further organize +void tsdbFitRetention(STsdbRepo *pRepo); +int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); +void tsdbAdjustCacheBlocks(STsdbCache *pCache); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/src/tsdbCache.c b/src/tsdb/src/tsdbCache.c index 9351bc602b5f34746381cbc8c712429b02a6561a..84f8a81eeaaca56b605d769545b5f1fd5e30c960 100644 --- a/src/tsdb/src/tsdbCache.c +++ b/src/tsdb/src/tsdbCache.c @@ -20,6 +20,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache); static void tsdbFreeBlockList(SList *list); static void tsdbFreeCacheMem(SCacheMem *mem); +static int tsdbAddCacheBlockToPool(STsdbCache *pCache); STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) { STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); @@ -40,13 +41,7 @@ STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) if (pPool->memPool == NULL) goto _err; for (int i = 0; i < totalBlocks; i++) { - STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize); - if (pBlock == NULL) { - goto _err; - } - pBlock->offset = 0; - pBlock->remain = cacheBlockSize; - tdListAppend(pPool->memPool, (void *)(&pBlock)); + if (tsdbAddCacheBlockToPool(pCache) < 0) goto _err; } pCache->mem = NULL; @@ -142,4 +137,70 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { tsdbUnLockRepo(pCache->pRepo); return 0; +} + +int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { + STsdbCache *pCache = pRepo->tsdbCache; + int oldNumOfBlocks = pCache->totalCacheBlocks; + + tsdbLockRepo((TsdbRepoT *)pRepo); + + ASSERT(pCache->totalCacheBlocks != totalBlocks); + + if (pCache->totalCacheBlocks < totalBlocks) { + ASSERT(pCache->totalCacheBlocks == pCache->pool.numOfCacheBlocks); + int blocksToAdd = pCache->totalCacheBlocks - totalBlocks; + pCache->totalCacheBlocks = totalBlocks; + for (int i = 0; i < blocksToAdd; i++) { + if (tsdbAddCacheBlockToPool(pCache) < 0) { + tsdbUnLockRepo((TsdbRepoT *)pRepo); + tsdbError("tsdbId %d: failed to add cache block to cache pool", pRepo->config.tsdbId); + return -1; + } + } + } else { + pCache->totalCacheBlocks = totalBlocks; + tsdbAdjustCacheBlocks(pCache); + } + + tsdbUnLockRepo((TsdbRepoT *)pRepo); + tsdbTrace("tsdbId %d: tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, totalBlocks); + return 0; +} + +static int tsdbAddCacheBlockToPool(STsdbCache *pCache) { + STsdbCachePool *pPool = &pCache->pool; + + STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize); + if (pBlock == NULL) return -1; + + pBlock->offset = 0; + pBlock->remain = pCache->cacheBlockSize; + tdListAppend(pPool->memPool, (void *)(&pBlock)); + pPool->numOfCacheBlocks++; + + return 0; +} + +static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) { + STsdbCachePool *pPool = &pCache->pool; + STsdbCacheBlock *pBlock = NULL; + + ASSERT(pCache->totalCacheBlocks >= 0); + + SListNode *node = tdListPopHead(pPool->memPool); + if (node == NULL) return -1; + + tdListNodeGetData(pPool->memPool, node, &pBlock); + free(pBlock); + listNodeFree(node); + pPool->numOfCacheBlocks--; + + return 0; +} + +void tsdbAdjustCacheBlocks(STsdbCache *pCache) { + while (pCache->totalCacheBlocks < pCache->pool.numOfCacheBlocks) { + if (tsdbRemoveCacheBlockFromPool(pCache) < 0) break; + } } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index ab76f69bedf1d289be57f7b2539ff1b9fbcac466..0c1b9e314e4168b3bd3f9181a061dee007760a7f 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -27,6 +27,7 @@ #include "tchecksum.h" #include "tsdbMain.h" #include "tutil.h" +#include "ttime.h" const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD @@ -40,13 +41,19 @@ static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); -STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { - STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); +STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) { + STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH)); if (pFileH == NULL) { // TODO: deal with ERROR here return NULL; } - pFileH->maxFGroups = maxFiles; + pFileH->maxFGroups = pCfg->keep / pCfg->daysPerFile + 3; + + pFileH->fGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup)); + if (pFileH->fGroup == NULL) { + free(pFileH); + return NULL; + } DIR *dir = opendir(dataDir); if (dir == NULL) { @@ -69,7 +76,12 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { return pFileH; } -void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); } +void tsdbCloseFileH(STsdbFileH *pFileH) { + if (pFileH) { + tfree(pFileH->fGroup); + free(pFileH); + } +} static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) { tsdbGetFileName(dataDir, fid, suffix, pFile->fname); @@ -161,6 +173,18 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct } } +void tsdbFitRetention(STsdbRepo *pRepo) { + STsdbFileH *pFileH = pRepo->tsdbFileH; + SFileGroup *pGroup = pFileH->fGroup; + + int mfid = + tsdbGetKeyFileId(taosGetTimestamp(pRepo->config.precision), pRepo->config.daysPerFile, pRepo->config.precision); + + while (pGroup[0].fileId < mfid) { + tsdbRemoveFileGroup(pFileH, pGroup[0].fileId); + } +} + void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { if (pIter->numOfFGroups == 0) { assert(pIter->pFileGroup == NULL); @@ -252,43 +276,6 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf return 0; } -// int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { -// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); -// if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; - -// if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1; -// // TODO: need to check the correctness -// return 0; -// } - -// int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { -// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); - -// if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1; - -// if (read(pFile->fd, buf, pIdx->len) < 0) return -1; - -// // TODO: need to check the correctness - -// return 0; -// } - -// int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) { -// // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); - -// if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1; -// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; -// if (read(pFile->fd, buf, size) < 0) return -1; - -// return 0; -// } - -// int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) { -// if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1; -// if (read(pFile->fd, buf, pCol->len) < 0) return -1; -// return 0; -// } - static int compFGroupKey(const void *key, const void *fgroup) { int fid = *(int *)key; SFileGroup *pFGroup = (SFileGroup *)fgroup; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index b1ef3d2d9ce38e0c0e0347ac9d1c6b9a61905cb3..3f41a3c5fe71f8e9986321808ccde0a5b3f2bed1 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -6,6 +6,7 @@ #include "tsdbMain.h" #include "tscompression.h" #include "tchecksum.h" +#include "ttime.h" int tsdbDebugFlag = 135; @@ -27,7 +28,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); // static int tsdbOpenMetaFile(char *tsdbDir); -static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock); +static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static void * tsdbCommitData(void *arg); @@ -35,8 +36,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **i SDataCols *pDataCols); static TSKEY tsdbNextIterKey(SSkipListIterator *pIter); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); -// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, -// int64_t uid); +static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression); +static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); +static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -214,7 +216,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { } tsdbGetDataDirName(pRepo, dataDir); - pRepo->tsdbFileH = tsdbInitFileH(dataDir, pRepo->config.maxTables); + pRepo->tsdbFileH = tsdbInitFileH(dataDir, &(pRepo->config)); if (pRepo->tsdbFileH == NULL) { tsdbFreeCache(pRepo->tsdbCache); tsdbFreeMeta(pRepo->tsdbMeta); @@ -297,10 +299,23 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) { */ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) { STsdbRepo *pRepo = (STsdbRepo *)repo; + STsdbCfg * pRCfg = &pRepo->config; - pRepo->config = *pCfg; - // TODO - return 0; + if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_INVALID_CONFIG; + + ASSERT(pRCfg->tsdbId == pCfg->tsdbId); + ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); + ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile); + ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock); + ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock); + ASSERT(pRCfg->precision == pCfg->precision); + + if (pRCfg->compression != pCfg->compression) tsdbAlterCompression(pRepo, pCfg->compression); + if (pRCfg->keep != pCfg->keep) tsdbAlterKeep(pRepo, pCfg->keep); + if (pRCfg->totalBlocks != pCfg->totalBlocks) tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks); + if (pRCfg->maxTables != pCfg->maxTables) tsdbAlterMaxTables(pRepo, pCfg->maxTables); + + return TSDB_CODE_SUCCESS; } int32_t tsdbTriggerCommit(TsdbRepoT *repo) { @@ -394,13 +409,16 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) { // TODO: need to return the number of data inserted int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg) { SSubmitMsgIter msgIter; + STsdbRepo *pRepo = (STsdbRepo *)repo; tsdbInitSubmitMsgIter(pMsg, &msgIter); SSubmitBlk *pBlock = NULL; int32_t code = TSDB_CODE_SUCCESS; - + + TSKEY now = taosGetTimestamp(pRepo->config.precision); + while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { - if ((code = tsdbInsertDataToTable(repo, pBlock)) != TSDB_CODE_SUCCESS) { + if ((code = tsdbInsertDataToTable(repo, pBlock, now)) != TSDB_CODE_SUCCESS) { return code; } } @@ -787,21 +805,31 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable return 0; } -static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock) { +static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now) { STsdbRepo *pRepo = (STsdbRepo *)repo; STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId); if (pTable == NULL) { - uError("failed to get table for insert, uid:%" PRIu64 ", tid:%d", tableId.uid, tableId.tid); + tsdbError("failed to get table for insert, uid:%" PRIu64 ", tid:%d", tableId.uid, tableId.tid); return TSDB_CODE_INVALID_TABLE_ID; } - SSubmitBlkIter blkIter; - SDataRow row; + SSubmitBlkIter blkIter = {0}; + SDataRow row = NULL; + + TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep; + TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; tsdbInitSubmitBlkIter(pBlock, &blkIter); while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { + if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) { + tsdbError( + "tsdbId: %d, table tid: %d, talbe uid: %ld timestamp is out of range. now: %ld maxKey: %ld, minKey: %ld", + pRepo->config.tsdbId, pTable->tableId.tid, pTable->tableId.uid, now, minKey, maxKey); + return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; + } + if (tdInsertRowToTable(pRepo, row, pTable) < 0) { return -1; } @@ -903,6 +931,9 @@ static void *tsdbCommitData(void *arg) { } } + // Do retention actions + tsdbFitRetention(pRepo); + _exit: tdFreeDataCols(pDataCols); tsdbDestroyTableIters(iters, pCfg->maxTables); @@ -910,6 +941,7 @@ _exit: tsdbLockRepo(arg); tdListMove(pCache->imem->list, pCache->pool.memPool); + tsdbAdjustCacheBlocks(pCache); tdListFree(pCache->imem->list); free(pCache->imem); pCache->imem = NULL; @@ -1028,4 +1060,27 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; } return 0; +} + +static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) { + pRepo->config.compression = compression; +} + +static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { + STsdbCfg *pCfg = &pRepo->config; + + int maxFiles = keep / pCfg->maxTables + 3; + if (pRepo->config.keep > keep) { + pRepo->tsdbFileH->maxFGroups = maxFiles; + } else { + pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup)); + if (pRepo->tsdbFileH->fGroup == NULL) { + // TODO: deal with the error + } + pRepo->tsdbFileH->maxFGroups = maxFiles; + } +} + +static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { + // TODO } \ No newline at end of file diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 653379e03b1154cf95fab271286996f68fce170d..e32a64629678a4d16a54885ccb9184a09a50a15a 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -414,6 +414,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + pIdx->uid = pHelper->tableInfo.uid; if (pIdx->offset < 0) return -1; ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx)); diff --git a/src/util/inc/ttime.h b/src/util/inc/ttime.h index 34c241cbc0f22afc511660cee475c82d08466599..61df65f345afc3085890cdcb8396ddba6a2fe857 100644 --- a/src/util/inc/ttime.h +++ b/src/util/inc/ttime.h @@ -22,22 +22,37 @@ extern "C" { #include #include +#include "tutil.h" //@return timestamp in second int32_t taosGetTimestampSec(); //@return timestamp in millisecond -int64_t taosGetTimestampMs(); +static FORCE_INLINE int64_t taosGetTimestampMs() { + struct timeval systemTime; + gettimeofday(&systemTime, NULL); + return (int64_t)systemTime.tv_sec * 1000L + (uint64_t)systemTime.tv_usec / 1000; +} //@return timestamp in microsecond -int64_t taosGetTimestampUs(); +static FORCE_INLINE int64_t taosGetTimestampUs() { + struct timeval systemTime; + gettimeofday(&systemTime, NULL); + return (int64_t)systemTime.tv_sec * 1000000L + (uint64_t)systemTime.tv_usec; +} /* * @return timestamp decided by global conf variable, tsTimePrecision * if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond. * precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond. */ -int64_t taosGetTimestamp(int32_t precision); +static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { + if (precision == TSDB_TIME_PRECISION_MICRO) { + return taosGetTimestampUs(); + } else { + return taosGetTimestampMs(); + } +} int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts); diff --git a/src/util/src/ttime.c b/src/util/src/ttime.c index 015cb19606ff11b5ddb857a1fc11299f224afd16..d34bf0e6ce922a09eb9941bb8cac9c98141a68cc 100644 --- a/src/util/src/ttime.c +++ b/src/util/src/ttime.c @@ -121,30 +121,6 @@ static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); int32_t taosGetTimestampSec() { return (int32_t)time(NULL); } -int64_t taosGetTimestampMs() { - struct timeval systemTime; - gettimeofday(&systemTime, NULL); - return (int64_t)systemTime.tv_sec * 1000L + (uint64_t)systemTime.tv_usec / 1000; -} - -int64_t taosGetTimestampUs() { - struct timeval systemTime; - gettimeofday(&systemTime, NULL); - return (int64_t)systemTime.tv_sec * 1000000L + (uint64_t)systemTime.tv_usec; -} - -/* - * If tsTimePrecision == 1, taosGetTimestamp will return timestamp in microsecond. - * Otherwise, it will return timestamp in millisecond. - */ -int64_t taosGetTimestamp(int32_t precision) { - if (precision == TSDB_TIME_PRECISION_MICRO) { - return taosGetTimestampUs(); - } else { - return taosGetTimestampMs(); - } -} - int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec) { /* parse datatime string in with tz */ if (strnchr(timestr, 'T', len, false) != NULL) { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 2f49280cb6af32fc308a59a7820bcde8993f444f..3541fc15b649c64dbb411c1b03b0763e3d329f67 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -111,11 +111,13 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe int32_t code = 0; dTrace("pVnode:%p vgId:%d, table:%s, start to create", pVnode, pVnode->vgId, pTable->tableId); - int16_t numOfColumns = htons(pTable->numOfColumns); - int16_t numOfTags = htons(pTable->numOfTags); - int32_t sid = htonl(pTable->sid); - uint64_t uid = htobe64(pTable->uid); - SSchema *pSchema = (SSchema *) pTable->data; + int16_t numOfColumns = htons(pTable->numOfColumns); + int16_t numOfTags = htons(pTable->numOfTags); + int32_t sid = htonl(pTable->sid); + uint64_t uid = htobe64(pTable->uid); + SSchema * pSchema = (SSchema *)pTable->data; + STSchema *pDestTagSchema = NULL; + SDataRow dataRow = NULL; int32_t totalCols = numOfColumns + numOfTags; @@ -130,7 +132,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe tsdbTableSetName(&tCfg, pTable->tableId, false); if (numOfTags != 0) { - STSchema *pDestTagSchema = tdNewSchema(numOfTags); + pDestTagSchema = tdNewSchema(numOfTags); for (int i = numOfColumns; i < totalCols; i++) { tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } @@ -140,7 +142,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe char *pTagData = pTable->data + totalCols * sizeof(SSchema); int accumBytes = 0; - SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); + dataRow = tdNewDataRowFromSchema(pDestTagSchema); for (int i = 0; i < numOfTags; i++) { STColumn *pTCol = schemaColAt(pDestTagSchema, i); @@ -151,6 +153,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe } code = tsdbCreateTable(pVnode->tsdb, &tCfg); + tdFreeDataRow(dataRow); + tfree(pDestTagSchema); tfree(pDestSchema); dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);