diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1b2b6a677a7fe54f65b1f958797326d3d693eb8d..75f8537327384b710ed92465fc3df6943d5bd3ed 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -181,6 +181,21 @@ typedef struct SSubmitMsg { char blocks[]; } SSubmitMsg; +typedef struct { + int32_t totalLen; + int32_t len; + SMemRow row; +} SSubmitBlkIter; + +typedef struct { + int32_t totalLen; + int32_t len; + void* pMsg; +} SSubmitMsgIter; + +int tsdbInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); +int tsdbGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); + typedef struct { int32_t index; // index of failed block in submit blocks int32_t vnode; // vnode index of failed block diff --git a/include/dnode/vnode/tsdb/tsdb.h b/include/dnode/vnode/tsdb/tsdb.h index e5522ddbd3d234515d7c9be6a444c725015347cb..7ca050929e44342fc1e9bbc1cabc4ab9aa758b73 100644 --- a/include/dnode/vnode/tsdb/tsdb.h +++ b/include/dnode/vnode/tsdb/tsdb.h @@ -26,10 +26,12 @@ extern "C" { typedef struct STsdb STsdb; typedef struct STsdbCfg { + int8_t precision; uint64_t lruCacheSize; - uint32_t keep0; + uint32_t keep; uint32_t keep1; uint32_t keep2; + int32_t daysPerFile; } STsdbCfg; // STsdb diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c1048f8482ae10cc1eb9cfc77327e3e143c6f683..50aa3267cda4a342c31ea86a74285bb6b50a2d23 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -27,6 +27,42 @@ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" +int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { + if (pMsg == NULL) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + return -1; + } + + pIter->totalLen = pMsg->length; + pIter->len = 0; + pIter->pMsg = pMsg; + if (pMsg->length <= sizeof(SSubmitMsg)) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + return -1; + } + + return 0; +} + +int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { + if (pIter->len == 0) { + pIter->len += sizeof(SSubmitMsg); + } else { + SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); + } + + if (pIter->len > pIter->totalLen) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + *pPBlock = NULL; + return -1; + } + + *pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + + return 0; +} + int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 8835e0ba65cbb4a2f8a8721391a606e6e47bdac1..06d90dd4959e3966f96ce9cf73d7b7f3d9594239 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -554,7 +554,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) { pCfg->ttl = 4; pCfg->keep = pCreate->daysToKeep0; pCfg->isWeak = true; - pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0; + pCfg->tsdbCfg.keep = pCreate->daysToKeep0; pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2; pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0; pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize; diff --git a/source/dnode/vnode/tsdb/inc/tsdbDef.h b/source/dnode/vnode/tsdb/inc/tsdbDef.h index ded28727e420bfaec50a5128fcf6815f508a9e74..cd3386c5032efc9abcad1df4e739db8f588cc0ce 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbDef.h +++ b/source/dnode/vnode/tsdb/inc/tsdbDef.h @@ -17,12 +17,15 @@ #define _TD_TSDB_DEF_H_ #include "mallocator.h" -#include "tmsg.h" -#include "tlist.h" +#include "tglobal.h" #include "thash.h" +#include "tlist.h" +#include "tmsg.h" #include "tskiplist.h" +#include "ttime.h" #include "tsdb.h" +#include "tsdbLog.h" #include "tsdbMemTable.h" #include "tsdbOptions.h" @@ -31,13 +34,16 @@ extern "C" { #endif struct STsdb { + int32_t vgId; char * path; - STsdbCfg options; + STsdbCfg config; STsdbMemTable * mem; STsdbMemTable * imem; SMemAllocatorFactory *pmaf; }; +#define REPO_ID(r) (r)->vgId + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/tsdb/inc/tsdbLog.h b/source/dnode/vnode/tsdb/inc/tsdbLog.h new file mode 100644 index 0000000000000000000000000000000000000000..bde9b338a2b1e6cfd0a3408911ee4d84778ecf84 --- /dev/null +++ b/source/dnode/vnode/tsdb/inc/tsdbLog.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_LOG_H_ +#define _TD_TSDB_LOG_H_ + +#include "tlog.h" + +extern int32_t tsdbDebugFlag; + +#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0) +#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0) +#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0) +#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0) +#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) + +#endif /* _TD_TSDB_LOG_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/tsdb/inc/tsdbMemTable.h b/source/dnode/vnode/tsdb/inc/tsdbMemTable.h index e7787af7cff24820b23ada5a4b7f2ea294fd3763..0bb9fb75f822eff85d45d709112d5aabc75205f6 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbMemTable.h +++ b/source/dnode/vnode/tsdb/inc/tsdbMemTable.h @@ -24,9 +24,9 @@ extern "C" { typedef struct STsdbMemTable STsdbMemTable; -STsdbMemTable *tsdbNewMemTable(SMemAllocatorFactory *pMAF); -void tsdbFreeMemTable(SMemAllocatorFactory *pMAF, STsdbMemTable *pMemTable); -int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg); +STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb); +void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable); +int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/vnode/tsdb/src/tsdbMain.c b/source/dnode/vnode/tsdb/src/tsdbMain.c index c8bcfc69062cdac8e140edd6143a5c9c39568ab7..4cb2eab644509a2c4c3438e7e4ace2d17323ef54 100644 --- a/source/dnode/vnode/tsdb/src/tsdbMain.c +++ b/source/dnode/vnode/tsdb/src/tsdbMain.c @@ -72,7 +72,7 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorF } pTsdb->path = strdup(path); - tsdbOptionsCopy(&(pTsdb->options), pTsdbCfg); + tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg); pTsdb->pmaf = pMAF; return pTsdb; diff --git a/source/dnode/vnode/tsdb/src/tsdbMemTable.c b/source/dnode/vnode/tsdb/src/tsdbMemTable.c index 2f5ca45c75001d7e58eb64bffa02aeec0e99e2e3..a342a7dd04cead6898c608544331c9ffeb153220 100644 --- a/source/dnode/vnode/tsdb/src/tsdbMemTable.c +++ b/source/dnode/vnode/tsdb/src/tsdbMemTable.c @@ -15,22 +15,9 @@ #include "tsdbDef.h" -#if 1 -typedef struct STbData { - TD_SLIST_NODE(STbData); - SSubmitMsg *pMsg; -} STbData; -#else -typedef struct STbData { - TD_SLIST_NODE(STbData); - uint64_t uid; // TODO: change here as tb_uid_t - TSKEY keyMin; - TSKEY keyMax; - uint64_t nRows; - SSkipList *pData; // Here need a container, may not use the SL - T_REF_DECLARE() -} STbData; -#endif +struct STbData { + tb_uid_t uid; +}; struct STsdbMemTable { T_REF_DECLARE() @@ -40,45 +27,169 @@ struct STsdbMemTable { uint64_t nRow; SMemAllocator *pMA; // Container +#if 1 + SSkipList *pData; // SSkiplist + SHashObj * pHashIdx; +#else TD_SLIST(STbData) list; +#endif }; -STsdbMemTable *tsdbNewMemTable(SMemAllocatorFactory *pMAF) { - STsdbMemTable *pMemTable; - SMemAllocator *pMA; - - pMA = (*pMAF->create)(pMAF); - ASSERT(pMA != NULL); +static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg); - pMemTable = (STsdbMemTable *)TD_MA_MALLOC(pMA, sizeof(*pMemTable)); +STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { + STsdbMemTable *pMemTable = (STsdbMemTable *)calloc(1, sizeof(*pMemTable)); if (pMemTable == NULL) { - (*pMAF->destroy)(pMAF, pMA); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } T_REF_INIT_VAL(pMemTable, 1); taosInitRWLatch(&(pMemTable->latch)); - pMemTable->keyMin = TSKEY_MAX; pMemTable->keyMax = TSKEY_MIN; + pMemTable->keyMin = TSKEY_MAX; pMemTable->nRow = 0; - pMemTable->pMA = pMA; - TD_SLIST_INIT(&(pMemTable->list)); + pMemTable->pMA = pTsdb->pmaf->create(pTsdb->pmaf); + if (pMemTable->pMA == NULL) { + free(pMemTable); + return NULL; + } + + // Initialize the container + pMemTable->pData = + tSkipListCreate(5, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), NULL /*TODO*/, SL_DISCARD_DUP_KEY, NULL /* TODO */); + if (pMemTable->pData == NULL) { + pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA); + free(pMemTable); + return NULL; + } + + pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (pMemTable->pHashIdx == NULL) { + pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA); + tSkipListDestroy(pMemTable->pData); + free(pMemTable); + return NULL; + } - // TODO return pMemTable; } -void tsdbFreeMemTable(SMemAllocatorFactory *pMAF, STsdbMemTable *pMemTable) { - SMemAllocator *pMA = pMemTable->pMA; +void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) { + if (pMemTable) { + taosHashCleanup(pMemTable->pHashIdx); + tSkipListDestroy(pMemTable->pData); + if (pMemTable->pMA) { + pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA); + } + free(pMemTable); + } +} - if (TD_MA_FREE_FUNC(pMA) != NULL) { - // TODO - ASSERT(0); +int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { + SSubmitBlk * pBlock = NULL; + SSubmitMsgIter msgIter = {0}; + int32_t affectedrows = 0, numOfRows = 0; + + if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) { + if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { + tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return -1; + } + + tsdbInitSubmitMsgIter(pMsg, &msgIter); + while (true) { + tsdbGetSubmitMsgNext(&msgIter, &pBlock); + if (pBlock == NULL) break; +#if 0 + if (tsdbInsertDataToTable(pTsdb, pBlock, &affectedrows) < 0) { + return -1; + } +#endif + numOfRows += pBlock->numOfRows; + } + + if (pRsp != NULL) { + pRsp->affectedRows = htonl(affectedrows); + pRsp->numOfRows = htonl(numOfRows); } - (*pMAF->destroy)(pMAF, pMA); + return 0; } +static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) { + ASSERT(pMsg != NULL); + // STsdbMeta * pMeta = pTsdb->tsdbMeta; + SSubmitMsgIter msgIter = {0}; + SSubmitBlk * pBlock = NULL; + SSubmitBlkIter blkIter = {0}; + SMemRow row = NULL; + TSKEY now = taosGetTimestamp(pTsdb->config.precision); + TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep; + TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.daysPerFile; + + terrno = TSDB_CODE_SUCCESS; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + + if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + while (true) { + if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + + pBlock->uid = htobe64(pBlock->uid); + pBlock->tid = htonl(pBlock->tid); + pBlock->sversion = htonl(pBlock->sversion); + pBlock->dataLen = htonl(pBlock->dataLen); + pBlock->schemaLen = htonl(pBlock->schemaLen); + pBlock->numOfRows = htons(pBlock->numOfRows); + +#if 0 + if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { + tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid, + pBlock->tid); + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return -1; + } + + STable *pTable = pMeta->tables[pBlock->tid]; + if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) { + tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid, + pBlock->tid); + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return -1; + } + + if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { + tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable)); + terrno = TSDB_CODE_TDB_INVALID_ACTION; + return -1; + } + + // Check schema version and update schema if needed + if (tsdbCheckTableSchema(pTsdb, pBlock, pTable) < 0) { + if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + continue; + } else { + return -1; + } + } + + tsdbInitSubmitBlkIter(pBlock, &blkIter); + while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { + if (tsdbCheckRowRange(pTsdb, pTable, row, minKey, maxKey, now) < 0) { + return -1; + } + } +#endif + } + + if (terrno != TSDB_CODE_SUCCESS) return -1; + return 0; +} + +#if 0 int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) { SMemAllocator *pMA = pMemTable->pMA; STbData * pTbData = (STbData *)TD_MA_MALLOC(pMA, sizeof(*pTbData)); @@ -91,29 +202,11 @@ int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) { return 0; } -/* ------------------------ STATIC METHODS ------------------------ */ - -#if 0 -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - #include "tdataformat.h" #include "tfunctional.h" +#include "tsdbRowMergeBuf.h" #include "tsdbint.h" #include "tskiplist.h" -#include "tsdbRowMergeBuf.h" #define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_MAX_INSERT_BATCH 512 @@ -149,37 +242,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, TSKEY now); -int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { - STsdbRepo * pRepo = repo; - SSubmitMsgIter msgIter = {0}; - SSubmitBlk * pBlock = NULL; - int32_t affectedrows = 0, numOfRows = 0; - - if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) { - if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { - tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); - } - return -1; - } - - tsdbInitSubmitMsgIter(pMsg, &msgIter); - while (true) { - tsdbGetSubmitMsgNext(&msgIter, &pBlock); - if (pBlock == NULL) break; - if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) { - return -1; - } - numOfRows += pBlock->numOfRows; - } - - if (pRsp != NULL) { - pRsp->affectedRows = htonl(affectedrows); - pRsp->numOfRows = htonl(numOfRows); - } - - if (tsdbCheckCommit(pRepo) < 0) return -1; - return 0; -} // ---------------- INTERNAL FUNCTIONS ---------------- int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { @@ -564,59 +626,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey } // ---------------- LOCAL FUNCTIONS ---------------- -static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { - STsdbMeta *pMeta = pRepo->tsdbMeta; - - SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable)); - if (pMemTable == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - pMemTable->keyFirst = INT64_MAX; - pMemTable->keyLast = 0; - pMemTable->numOfRows = 0; - - pMemTable->maxTables = pMeta->maxTables; - pMemTable->tData = (STableData **)calloc(pMemTable->maxTables, sizeof(STableData *)); - if (pMemTable->tData == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - pMemTable->actList = tdListNew(0); - if (pMemTable->actList == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - pMemTable->bufBlockList = tdListNew(sizeof(STsdbBufBlock*)); - if (pMemTable->bufBlockList == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - T_REF_INC(pMemTable); - - return pMemTable; - -_err: - tsdbFreeMemTable(pMemTable); - return NULL; -} - -static void tsdbFreeMemTable(SMemTable* pMemTable) { - if (pMemTable) { - ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0)); - ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0)); - tdListFree(pMemTable->extraBuffList); - tdListFree(pMemTable->bufBlockList); - tdListFree(pMemTable->actList); - tfree(pMemTable->tData); - free(pMemTable); - } -} static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData)); @@ -737,74 +747,6 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMem return 0; } -static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { - ASSERT(pMsg != NULL); - STsdbMeta * pMeta = pRepo->tsdbMeta; - SSubmitMsgIter msgIter = {0}; - SSubmitBlk * pBlock = NULL; - SSubmitBlkIter blkIter = {0}; - SMemRow row = NULL; - TSKEY now = taosGetTimestamp(pRepo->config.precision); - TSKEY minKey = now - tsTickPerDay[pRepo->config.precision] * pRepo->config.keep; - TSKEY maxKey = now + tsTickPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; - - terrno = TSDB_CODE_SUCCESS; - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - - if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; - while (true) { - if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; - if (pBlock == NULL) break; - - pBlock->uid = htobe64(pBlock->uid); - pBlock->tid = htonl(pBlock->tid); - pBlock->sversion = htonl(pBlock->sversion); - pBlock->dataLen = htonl(pBlock->dataLen); - pBlock->schemaLen = htonl(pBlock->schemaLen); - pBlock->numOfRows = htons(pBlock->numOfRows); - - if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { - tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - STable *pTable = pMeta->tables[pBlock->tid]; - if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) { - tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { - tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable)); - terrno = TSDB_CODE_TDB_INVALID_ACTION; - return -1; - } - - // Check schema version and update schema if needed - if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) { - if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - continue; - } else { - return -1; - } - } - - tsdbInitSubmitBlkIter(pBlock, &blkIter); - while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { - if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) { - return -1; - } - } - } - - if (terrno != TSDB_CODE_SUCCESS) return -1; - return 0; -} //row1 has higher priority static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, diff --git a/source/dnode/vnode/tsdb/src/tsdbWrite.c b/source/dnode/vnode/tsdb/src/tsdbWrite.c index f9441cbe44408ee4086ec86ef60e7b249723340c..599b58e0e2ef1a002e5f8e3c1a0ff33679eb1bd0 100644 --- a/source/dnode/vnode/tsdb/src/tsdbWrite.c +++ b/source/dnode/vnode/tsdb/src/tsdbWrite.c @@ -17,9 +17,9 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg) { // Check if mem is there. If not, create one. - pTsdb->mem = tsdbNewMemTable(pTsdb->pmaf); + pTsdb->mem = tsdbNewMemTable(pTsdb); if (pTsdb->mem == NULL) { return -1; } - return tsdbInsertDataToMemTable(pTsdb->mem, pMsg); + return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL); } \ No newline at end of file