From 0f8016995c8f3865906b371e518c8a40bb00a5d1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 28 Nov 2022 13:41:06 +0800 Subject: [PATCH] more code --- source/common/src/tdataformat.c | 19 ++++-- source/common/src/tmsg.c | 17 +++--- source/dnode/vnode/src/inc/tsdb.h | 12 ++-- source/dnode/vnode/src/inc/vnodeInt.h | 3 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 42 ++++++------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 57 +++++++++-------- source/dnode/vnode/src/tsdb/tsdbRead.c | 58 +++++++++--------- source/dnode/vnode/src/tsdb/tsdbUtil.c | 31 +++++----- source/dnode/vnode/src/vnd/vnodeSvr.c | 71 ++++++++++++---------- 9 files changed, 163 insertions(+), 147 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index c9b818884f..2fea276835 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -103,6 +103,7 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) { ASSERT(((SColVal *)aColVal->pData)[0].type == TSDB_DATA_TYPE_TIMESTAMP); // scan --------------- + SRow *pRow = NULL; uint8_t flag = 0; int32_t iColVal = 1; const int32_t nColVal = taosArrayGetSize(aColVal); @@ -196,9 +197,11 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) { } // alloc -------------- - code = tRealloc((uint8_t **)ppRow, nRow); - if (code) return code; - SRow *pRow = *ppRow; + pRow = taosMemoryMalloc(nRow); + if (NULL == pRow) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } // build -------------- pColVal = (SColVal *)taosArrayGet(aColVal, 0); @@ -349,6 +352,12 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) { } _exit: + if (code) { + *ppRow = NULL; + tRowDestroy(pRow); + } else { + *ppRow = pRow; + } return code; } @@ -490,7 +499,9 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { } } -void tRowDestroy(SRow *pRow) { tFree((uint8_t *)pRow); } +void tRowDestroy(SRow *pRow) { + if (pRow) taosMemoryFree(pRow); +} static int32_t tRowPCmprFn(const void *p1, const void *p2) { if ((*(SRow **)p1)->ts < (*(SRow **)p2)->ts) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b6c12af04f..233d62b182 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6665,7 +6665,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm if (tEncodeI64v(pCoder, taosArrayGetSize(pSubmitTbData->aRowP)) < 0) return -1; for (int32_t i = 0; i < taosArrayGetSize(pSubmitTbData->aRowP); i++) { SRow *pRow = taosArrayGetP(pSubmitTbData->aRowP, i); - memcpy(pCoder->data + pCoder->pos, pRow, pRow->len); + if (pCoder->data) memcpy(pCoder->data + pCoder->pos, pRow, pRow->len); pCoder->pos += pRow->len; } } @@ -6713,10 +6713,8 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa for (int32_t i = 0; i < nRows; i++) { SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); - if (tDecodeBinary(pCoder, (uint8_t **)ppRow, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } + *ppRow = (SRow *)(pCoder->data + pCoder->pos); + pCoder->pos += (*ppRow)->len; } } @@ -6822,13 +6820,18 @@ _exit: if (code) { *ppReq = NULL; if (pReq) { - // todo: do other clear + if (pReq->aCreateTbReq) { + taosArrayDestroy(pReq->aCreateTbReq); + } + if (pReq->aSubmitTbData) { + taosArrayDestroy(pReq->aSubmitTbData); + } taosMemoryFree(pReq); } } else { *ppReq = pReq; } - return 0; + return code; } void tDestroySSubmitTbData(SSubmitTbData *pTbData) { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7fb962e3a7..c58cf8f630 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -121,7 +121,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); void tRowMergerClear(SRowMerger *pMerger); int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow); -int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow); +int32_t tRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow); // TABLEID int32_t tTABLEIDCmprFn(const void *p1, const void *p2); // TSDBKEY @@ -336,7 +336,7 @@ typedef struct SMemSkipListNode SMemSkipListNode; struct SMemSkipListNode { int8_t level; int64_t version; - STSRow *pTSRow; + SRow *pTSRow; SMemSkipListNode *forwards[0]; }; typedef struct SMemSkipList { @@ -380,7 +380,7 @@ struct TSDBROW { union { struct { int64_t version; - STSRow *pTSRow; + SRow *pTSRow; }; struct { SBlockData *pBlockData; @@ -739,8 +739,8 @@ typedef struct { int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); -int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb); -int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup); +int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, SRow *row, STsdb *pTsdb); +int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, SRow *row, bool dup); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); @@ -752,7 +752,7 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); size_t tsdbCacheGetCapacity(SVnode *pVnode); -int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); +// int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f229b3b127..49a1f5fdad 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -160,8 +160,7 @@ int32_t tsdbRollbackCommit(STsdb* pTsdb); int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); -int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, - SSubmitBlkRsp* pRsp); +int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, SSubmitBlkRsp* pRsp); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index d71eb33951..fda7618dd1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -190,7 +190,7 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { return code; } -int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup) { +int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, SRow *row, bool dup) { int32_t code = 0; STSRow *cacheRow = NULL; char key[32] = {0}; @@ -222,7 +222,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST SColVal *tColVal = &tTsVal1->colVal; SColVal colVal = {0}; - tTSRowGetVal(row, pTSchema, iCol, &colVal); + tRowGet(row, pTSchema, iCol, &colVal); if (!COL_VAL_IS_NONE(&colVal)) { if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) { invalidate = true; @@ -316,7 +316,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST return code; } -int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb) { +int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, SRow *row, STsdb *pTsdb) { int32_t code = 0; STSRow *cacheRow = NULL; char key[32] = {0}; @@ -348,7 +348,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb SColVal *tColVal = &tTsVal1->colVal; SColVal colVal = {0}; - tTSRowGetVal(row, pTSchema, iCol, &colVal); + tRowGet(row, pTSchema, iCol, &colVal); if (!COL_VAL_IS_VALUE(&colVal)) { if (keyTs == tTsVal1->ts && COL_VAL_IS_VALUE(tColVal)) { invalidate = true; @@ -1456,29 +1456,29 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader * return code; } -int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) { - int32_t code = 0; - int16_t nCol = taosArrayGetSize(pLastArray); - SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); +// int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) { +// int32_t code = 0; +// int16_t nCol = taosArrayGetSize(pLastArray); +// SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol); - SColVal *tColVal = &tTsVal->colVal; - taosArrayPush(pColArray, tColVal); - } +// for (int16_t iCol = 0; iCol < nCol; ++iCol) { +// SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol); +// SColVal *tColVal = &tTsVal->colVal; +// taosArrayPush(pColArray, tColVal); +// } - code = tdSTSRowNew(pColArray, pTSchema, ppRow); - if (code) goto _err; +// code = tdSTSRowNew(pColArray, pTSchema, ppRow); +// if (code) goto _err; - taosArrayDestroy(pColArray); +// taosArrayDestroy(pColArray); - return code; +// return code; -_err: - taosArrayDestroy(pColArray); +// _err: +// taosArrayDestroy(pColArray); - return code; -} +// return code; +// } int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) { int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 0a7f59e429..b494413f79 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -29,7 +29,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version, - SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp); + SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { int32_t code = 0; @@ -95,13 +95,12 @@ STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t return pTbData; } -int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, - SSubmitBlkRsp *pRsp) { +int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp) { int32_t code = 0; SMemTable *pMemTable = pTsdb->mem; STbData *pTbData = NULL; - tb_uid_t suid = pMsgIter->suid; - tb_uid_t uid = pMsgIter->uid; + tb_uid_t suid = pSubmitTbData->suid; + tb_uid_t uid = pSubmitTbData->uid; SMetaInfo info; code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL); @@ -116,14 +115,14 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI if (info.suid) { metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info, NULL); } - if (pMsgIter->sversion != info.skmVer) { + if (pSubmitTbData->sver != info.skmVer) { tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, TD_VID(pTsdb->pVnode), - pMsgIter->sversion, info.skmVer, suid, uid); + pSubmitTbData->sver, info.skmVer, suid, uid); code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; goto _err; } - pRsp->sver = info.skmVer; + if (pRsp) pRsp->sver = info.skmVer; // create/get STbData to op code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData); @@ -132,7 +131,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI } // do insert impl - code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pMsgIter, pBlock, pRsp); + code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pSubmitTbData, pRsp); if (code) { goto _err; } @@ -468,8 +467,8 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { return level; } -static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version, - STSRow *pRow, int8_t forward) { +static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version, SRow *pRow, + int8_t forward) { int32_t code = 0; int8_t level; SMemSkipListNode *pNode; @@ -538,23 +537,21 @@ _exit: } static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version, - SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) { - int32_t code = 0; - SSubmitBlkIter blkIter = {0}; + SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp) { + int32_t code = 0; + // SSubmitBlkIter blkIter = {0}; TSDBKEY key = {.version = version}; SMemSkipListNode *pos[SL_MAX_LEVEL]; TSDBROW row = tsdbRowFromTSRow(version, NULL); - int32_t nRow = 0; - STSRow *pLastRow = NULL; - - tInitSubmitBlkIter(pMsgIter, pBlock, &blkIter); + int32_t nRow = taosArrayGetSize(pSubmitTbData->aRowP); + int32_t iRow = 0; + SRow *pLastRow = NULL; // backward put first data - row.pTSRow = tGetSubmitBlkNext(&blkIter); - if (row.pTSRow == NULL) return code; + row.pTSRow = taosArrayGetP(pSubmitTbData->aRowP, iRow); key.ts = row.pTSRow->ts; - nRow++; + iRow++; tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 0); if (code) { @@ -566,17 +563,19 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i pLastRow = row.pTSRow; // forward put rest data - row.pTSRow = tGetSubmitBlkNext(&blkIter); - if (row.pTSRow) { + if (iRow < nRow) { for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) { pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel); } - do { + + while (iRow < nRow) { + row.pTSRow = taosArrayGetP(pSubmitTbData->aRowP, iRow); key.ts = row.pTSRow->ts; - nRow++; + if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) { tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); } + code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 1); if (code) { goto _err; @@ -584,8 +583,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i pLastRow = row.pTSRow; - row.pTSRow = tGetSubmitBlkNext(&blkIter); - } while (row.pTSRow); + iRow++; + } } if (key.ts >= pTbData->maxKey) { @@ -607,8 +606,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey); pMemTable->nRow += nRow; - pRsp->numOfRows = nRow; - pRsp->affectedRows = nRow; + if (pRsp) pRsp->numOfRows = nRow; + if (pRsp) pRsp->affectedRows = nRow; return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 96cfa1752d..b97f82f3d6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -186,7 +186,7 @@ static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STabl SRowMerger* pMerger, SVersionRange* pVerRange); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); -static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, +static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pInfo); static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); @@ -194,10 +194,10 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange); -static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, - STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow); +static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, + STsdbReader* pReader, bool* freeTSRow); static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, - STsdbReader* pReader, STSRow** pTSRow); + STsdbReader* pReader, SRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); @@ -242,7 +242,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SSDataBlock* pB static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) { int32_t i = 0, j = 0; - while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) { + while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) { STColumn* pTCol = &pSchema->columns[i]; if (pTCol->colId == pSupInfo->colIds[j]) { if (!IS_BSMA_ON(pTCol)) { @@ -305,7 +305,8 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) { } // NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model -static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) { +static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, + int32_t numOfTables) { // allocate buffer in order to load data blocks from file // todo use simple hash instead, optimize the memory consumption SHashObj* pTableMap = @@ -754,7 +755,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el, pReader->idStr); - pReader->cost.numOfBlocks += total; pReader->cost.headFileLoadTime += el; @@ -951,7 +951,7 @@ static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* p // a faster version of copy procedure. static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData, - int32_t dumpedRows, bool asc) { + int32_t dumpedRows, bool asc) { uint8_t* p = NULL; if (asc) { p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; @@ -960,7 +960,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo p = pData->pData + tDataTypes[pData->type].bytes * startIndex; } - int32_t step = asc? 1:-1; + int32_t step = asc ? 1 : -1; // make sure it is aligned to 8bit ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); @@ -970,12 +970,11 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo // 2. reverse the array list in case of descending order scan data block if (!asc) { - switch(pColData->info.type) { + switch (pColData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: - { + case TSDB_DATA_TYPE_UBIGINT: { int32_t mid = dumpedRows >> 1u; int64_t* pts = (int64_t*)pColData->pData; for (int32_t j = 0; j < mid; ++j) { @@ -989,7 +988,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_UTINYINT: { - int32_t mid = dumpedRows >> 1u; + int32_t mid = dumpedRows >> 1u; int8_t* pts = (int8_t*)pColData->pData; for (int32_t j = 0; j < mid; ++j) { int8_t t = pts[j]; @@ -1730,7 +1729,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; - STSRow* pTSRow = NULL; + SRow* pTSRow = NULL; SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -1876,7 +1875,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); - STSRow* pTSRow = NULL; + SRow* pTSRow = NULL; SRowMerger merge = {0}; TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr); @@ -1953,7 +1952,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key == ts) { - STSRow* pTSRow = NULL; + SRow* pTSRow = NULL; SRowMerger merge = {0}; int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); @@ -1993,7 +1992,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; - STSRow* pTSRow = NULL; + SRow* pTSRow = NULL; int32_t code = TSDB_CODE_SUCCESS; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SArray* pDelList = pBlockScanInfo->delSkyline; @@ -2354,7 +2353,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } else { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - STSRow* pTSRow = NULL; + SRow* pTSRow = NULL; SRowMerger merge = {0}; int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); @@ -2436,7 +2435,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); // it is a clean block, load it directly - if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && + if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && pBlock->nRow <= pReader->capacity) { if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) { copyBlockDataToSDataBlock(pReader, pBlockScanInfo); @@ -3354,7 +3353,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc return TSDB_CODE_SUCCESS; } -int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, +int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; TSDBROW current = *pRow; @@ -3423,7 +3422,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, } int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, - STSRow** pTSRow) { + SRow** pTSRow) { SRowMerger merge = {0}; TSDBKEY k = TSDBROW_KEY(pRow); @@ -3476,7 +3475,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } -int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey, +int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow, int64_t endKey, bool* freeTSRow) { TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); @@ -3533,8 +3532,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR return TSDB_CODE_SUCCESS; } -int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, - STableBlockScanInfo* pScanInfo) { +int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) { int32_t numOfRows = pBlock->info.rows; int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int64_t uid = pScanInfo->uid; @@ -3556,7 +3554,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* col_id_t colId = pColInfoData->info.colId; if (colId == pSchema->columns[j].colId) { - tTSRowGetVal(pTSRow, pSchema, j, &colVal); + tRowGet(pTSRow, pSchema, j, &colVal); doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo); i += 1; j += 1; @@ -3633,8 +3631,8 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e SSDataBlock* pBlock = pReader->pResBlock; do { - STSRow* pTSRow = NULL; - bool freeTSRow = false; + SRow* pTSRow = NULL; + bool freeTSRow = false; tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow); if (pTSRow == NULL) { break; @@ -3789,7 +3787,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); } - STsdbReader* p = (pReader->innerReader[0] != NULL)? pReader->innerReader[0]:pReader; + STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader; pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables); if (pReader->status.pTableMap == NULL) { tsdbReaderClose(p); @@ -4123,7 +4121,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS if (pSup->colIds[j] == PRIMARYKEY_TIMESTAMP_COL_ID) { taosArrayPush(pNewAggList, &pSup->tsColAgg); } else { - // all date in this block are null + // all date in this block are null SColumnDataAgg nullColAgg = {.colId = pSup->colIds[j], .numOfNull = pBlock->nRow}; taosArrayPush(pNewAggList, &nullColAgg); } @@ -4135,7 +4133,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS taosArrayAddAll(pSup->pColAgg, pNewAggList); size_t num = taosArrayGetSize(pSup->pColAgg); - for(int32_t k = 0; k < num; ++k) { + for (int32_t k = 0; k < num; ++k) { pSup->plist[k] = taosArrayGet(pSup->pColAgg, k); } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index db86a9429d..1f02a9a3ef 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -573,7 +573,7 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * ASSERT(iCol > 0); if (pRow->type == 0) { - tTSRowGetVal(pRow->pTSRow, pTSchema, iCol, pColVal); + tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal); } else if (pRow->type == 1) { SColData *pColData; @@ -621,7 +621,7 @@ void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { if (pIter->pRow->type == 0) { if (pIter->i < pIter->pTSchema->numOfCols) { - tTSRowGetVal(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal); + tRowGet(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal); pIter->i++; return &pIter->colVal; @@ -807,12 +807,8 @@ _exit: return code; } -int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) { - int32_t code = 0; - - code = tdSTSRowNew(pMerger->pArray, pMerger->pTSchema, ppRow); - - return code; +int32_t tRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) { + return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow); } // delete skyline ====================================================== @@ -1247,15 +1243,16 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS SColVal cv = {0}; if (pRow->type == 0) { - if (TD_IS_TP_ROW(pRow->pTSRow)) { - code = tBlockDataAppendTPRow(pBlockData, pRow->pTSRow, pTSchema); - if (code) goto _err; - } else if (TD_IS_KV_ROW(pRow->pTSRow)) { - code = tBlockDataAppendKVRow(pBlockData, pRow->pTSRow, pTSchema); - if (code) goto _err; - } else { - ASSERT(0); - } + ASSERT(0); + // if (TD_IS_TP_ROW(pRow->pTSRow)) { + // code = tBlockDataAppendTPRow(pBlockData, pRow->pTSRow, pTSchema); + // if (code) goto _err; + // } else if (TD_IS_KV_ROW(pRow->pTSRow)) { + // code = tBlockDataAppendKVRow(pBlockData, pRow->pTSRow, pTSchema); + // if (code) goto _err; + // } else { + // ASSERT(0); + // } } else { code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow); if (code) goto _err; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c75d1ffded..69f041d322 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -77,52 +77,37 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { tDecoderClear(&dc); } break; case TDMT_VND_SUBMIT: { - SSubmitMsgIter msgIter = {0}; - SSubmitReq *pSubmitReq = (SSubmitReq *)pMsg->pCont; - SSubmitBlk *pBlock = NULL; - int64_t ctime = taosGetTimestampMs(); - tb_uid_t uid; - - if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) { - code = terrno; - goto _err; - } + tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + tStartDecode(&dc); - for (;;) { - tGetSubmitMsgNext(&msgIter, &pBlock); - if (pBlock == NULL) break; + int32_t flag; + tDecodeI32v(&dc, &flag); - if (msgIter.schemaLen > 0) { + if (flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { + int64_t ctime = taosGetTimestampMs(); + int64_t nReq; + int64_t uid; + + tDecodeI64v(&dc, &nReq); + for (int64_t iReq; iReq < nReq; iReq++) { char *name = NULL; - tDecoderInit(&dc, pBlock->data, msgIter.schemaLen); - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } - - if (tDecodeI32v(&dc, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } - if (tDecodeCStr(&dc, &name) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } + tStartDecode(&dc); + tDecodeI32v(&dc, NULL); + tDecodeCStr(&dc, &name); uid = metaGetTableEntryUidByName(pVnode->pMeta, name); if (uid == 0) { uid = tGenIdPI64(); } + *(int64_t *)(dc.data + dc.pos) = uid; *(int64_t *)(dc.data + dc.pos + 8) = ctime; - pBlock->uid = htobe64(uid); - tEndDecode(&dc); - tDecoderClear(&dc); } } + tEndDecode(&dc); } break; case TDMT_VND_DELETE: { int32_t size; @@ -855,6 +840,28 @@ static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const } static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +#if 1 + SDecoder dc = {0}; + SSubmitReq2 *pSubmitReq = NULL; + + tDecoderInit(&dc, (char *)pReq + sizeof(SMsgHead), len - sizeof(SMsgHead)); + tDecodeSSubmitReq2(&dc, &pSubmitReq); + + if (pSubmitReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { + // todo + ASSERT(0); + } + + for (int32_t iSubmitTbData = 0; iSubmitTbData < taosArrayGetSize(pSubmitReq->aSubmitTbData); iSubmitTbData++) { + SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, iSubmitTbData); + + int32_t code = tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, NULL /*todo*/); + if (code) { + // todo + } + } + +#else SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; @@ -1007,6 +1014,8 @@ _exit: vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); return 0; +#endif + return 0; } static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { -- GitLab