diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 6ae52f1fd7f86f5e1c060712bb5650ff258e7707..bfdc71bceb405da36c9c072e9082194e6fff6c30 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -155,15 +155,13 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs); int32_t tBlockDataCreate(SBlockData *pBlockData); void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear); int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid); -int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); void tBlockDataClear(SBlockData *pBlockData); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); -int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); -int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); +int32_t tBlockDataAddColData(SBlockData *pBlockData, SColData **ppColData); int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[], int32_t aBufN[]); int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]); @@ -473,7 +471,7 @@ struct SBlockData { int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0) int64_t *aVersion; // versions of each row TSKEY *aTSKEY; // timestamp of each row - SArray *aIdx; // SArray + int32_t nColData; SArray *aColData; // SArray }; @@ -716,14 +714,14 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { - SVnode *pVnode; - STSchema *pSchema; - uint64_t uid; - uint64_t suid; - char **transferBuf; // todo remove it soon - int32_t numOfCols; - int32_t type; - int32_t tableIndex; // currently returned result tables + SVnode *pVnode; + STSchema *pSchema; + uint64_t uid; + uint64_t suid; + char **transferBuf; // todo remove it soon + int32_t numOfCols; + int32_t type; + int32_t tableIndex; // currently returned result tables STableKeyInfo *pTableList; // table id list int32_t numOfTables; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bfde5b30765b06b61ddba0c05f07a0143c8a53db..631cbd415ffea512f1447a69f4e362d0b023c7b9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -135,22 +135,22 @@ typedef struct SUidOrderCheckInfo { } SUidOrderCheckInfo; typedef struct SReaderStatus { - bool loadFromFile; // check file stage - bool composedDataBlock; // the returned data block is a composed block or not - SHashObj* pTableMap; // SHash + bool loadFromFile; // check file stage + bool composedDataBlock; // the returned data block is a composed block or not + SHashObj* pTableMap; // SHash STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks. - SUidOrderCheckInfo uidCheckInfo; // check all table in uid order - SFileBlockDumpInfo fBlockDumpInfo; - SDFileSet* pCurrentFileset; // current opened file set - SBlockData fileBlockData; - SFilesetIter fileIter; - SDataBlockIter blockIter; + SUidOrderCheckInfo uidCheckInfo; // check all table in uid order + SFileBlockDumpInfo fBlockDumpInfo; + SDFileSet* pCurrentFileset; // current opened file set + SBlockData fileBlockData; + SFilesetIter fileIter; + SDataBlockIter blockIter; } SReaderStatus; typedef struct SBlockInfoBuf { - int32_t currentIndex; - SArray* pData; - int32_t numPerBucket; + int32_t currentIndex; + SArray* pData; + int32_t numPerBucket; } SBlockInfoBuf; struct STsdbReader { @@ -185,11 +185,13 @@ static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STabl SRowMerger* pMerger, SVersionRange* pVerRange); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); -static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, STableBlockScanInfo* pInfo); +static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, + STableBlockScanInfo* pInfo); static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); -static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange); +static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, + SVersionRange* pVerRange); static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow); @@ -238,13 +240,13 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { } static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { - int32_t num = numOfTables / pBuf->numPerBucket; + int32_t num = numOfTables / pBuf->numPerBucket; int32_t remainder = numOfTables % pBuf->numPerBucket; if (pBuf->pData == NULL) { pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES); } - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo)); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -266,7 +268,7 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) { size_t num = taosArrayGetSize(pBuf->pData); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { char** p = taosArrayGet(pBuf->pData, i); taosMemoryFree(*p); } @@ -276,7 +278,7 @@ static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) { static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) { int32_t bucketIndex = index / pBuf->numPerBucket; - char** pBucket = taosArrayGet(pBuf->pData, bucketIndex); + char** pBucket = taosArrayGet(pBuf->pData, bucketIndex); return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo); } @@ -319,8 +321,8 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); #endif - tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pScanInfo->lastKey, - pTsdbReader->idStr); + tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, + pScanInfo->lastKey, pTsdbReader->idStr); } pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0; @@ -334,7 +336,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) { STableBlockScanInfo** p = NULL; while ((p = taosHashIterate(pTableMap, p)) != NULL) { - STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p; + STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; pInfo->iterInit = false; pInfo->iiter.hasVal = false; @@ -571,7 +573,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->verRange = getQueryVerRange(pVnode, pCond, level); pReader->type = pCond->type; pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); - pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket + pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket ASSERT(pCond->numOfCols > 0); limitOutputBufferSize(pCond, &pReader->capacity); @@ -708,7 +710,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset}; - bIndex.window = (STimeWindow) {.skey = block.minKey.ts, .ekey = block.maxKey.ts}; + bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts}; void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex); if (p == NULL) { @@ -969,7 +971,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn } int32_t colIndex = 0; - int32_t num = taosArrayGetSize(pBlockData->aIdx); + int32_t num = pBlockData->nColData; while (i < numOfOutputCols && colIndex < num) { rowIndex = 0; pColData = taosArrayGet(pResBlock->pDataBlock, i); @@ -1036,7 +1038,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", + ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); @@ -1063,7 +1065,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, code:%s %s", + ", rows:%d, code:%s %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, tstrerror(code), pReader->idStr); return code; @@ -1072,7 +1074,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", + ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); @@ -1300,8 +1302,8 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl int32_t step = asc ? 1 : -1; *nextIndex = pBlockInfo->tbBlockIdx + step; - *pBlockIndex = *(SBlockIndex*) taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); -// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk); + *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); + // tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk); return true; } @@ -1365,7 +1367,8 @@ static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pV (pBlock->minVer <= pVerRange->maxVer); } -static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t startIndex) { +static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, + int32_t startIndex) { size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline); for (int32_t i = startIndex; i < num; i += 1) { @@ -1514,7 +1517,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64 - " - %" PRId64 " %s", + " - %" PRId64 " %s", pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr); @@ -2313,7 +2316,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { goto _end; } - pBlockScanInfo = *(STableBlockScanInfo**) p; + pBlockScanInfo = *(STableBlockScanInfo**)p; SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); @@ -2324,7 +2327,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { copyBlockDataToSDataBlock(pReader, pBlockScanInfo); // record the last key value - pBlockScanInfo->lastKey = asc? pBlock->maxKey.ts:pBlock->minKey.ts; + pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts; goto _end; } } @@ -2387,7 +2390,7 @@ _end: if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%d, elapsed time:%.2f ms %s", + " rows:%d, elapsed time:%.2f ms %s", pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -2553,7 +2556,7 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea void* p = taosHashIterate(pStatus->pTableMap, NULL); while (p != NULL) { - STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) p; + STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p; pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid; p = taosHashIterate(pStatus->pTableMap, p); } @@ -2627,7 +2630,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { while (1) { // load the last data block of current table - STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter; + STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasVal) { bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); @@ -2665,7 +2668,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; if (pBlockInfo != NULL) { - pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pScanInfo = + *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); } else { pScanInfo = *pReader->status.pTableIter; } @@ -2717,7 +2721,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order); // update the last key for the corresponding table - pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order)? pInfo->window.ekey:pInfo->window.skey; + pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey; } } @@ -2897,8 +2901,8 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret int8_t precision = pVnode->config.tsdbCfg.precision; int64_t now = taosGetTimestamp(precision); int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI) ? 1L - : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L - : 1000000L); + : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L + : 1000000L); for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) { SRetention* pRetention = retentions + level; @@ -3414,7 +3418,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR return TSDB_CODE_SUCCESS; } -int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, STableBlockScanInfo* pScanInfo) { +int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, + STableBlockScanInfo* pScanInfo) { int32_t numOfRows = pBlock->info.rows; int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int64_t uid = pScanInfo->uid; @@ -3474,7 +3479,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S } SColVal cv = {0}; - int32_t numOfInputCols = pBlockData->aIdx->size; + int32_t numOfInputCols = pBlockData->nColData; int32_t numOfOutputCols = pResBlock->pDataBlock->size; while (i < numOfOutputCols && j < numOfInputCols) { @@ -3555,8 +3560,8 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n taosHashClear(pReader->status.pTableMap); - STableKeyInfo* pList = (STableKeyInfo*) pTableList; - for(int32_t i = 0; i < num; ++i) { + STableKeyInfo* pList = (STableKeyInfo*)pTableList; + for (int32_t i = 0; i < num; ++i) { STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i); pInfo->uid = pList[i].uid; taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); @@ -3714,7 +3719,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; - _err: +_err: tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr); return code; } @@ -3882,7 +3887,8 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { - STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); + STableBlockScanInfo* pBlockScanInfo = + *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); if (pBlockScanInfo == NULL) { // no data block for the table of given uid return false; } @@ -3929,7 +3935,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); -// int64_t stime = taosGetTimestampUs(); + // int64_t stime = taosGetTimestampUs(); SBlockLoadSuppInfo* pSup = &pReader->suppInfo; @@ -3960,7 +3966,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock); int32_t i = 0, j = 0; - size_t size = taosArrayGetSize(pSup->pColAgg); + size_t size = taosArrayGetSize(pSup->pColAgg); while (j < numOfCols && i < size) { SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); @@ -3995,7 +4001,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { } SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); - STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + STableBlockScanInfo* pBlockScanInfo = + *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); if (pBlockScanInfo == NULL) { terrno = TSDB_CODE_INVALID_PARA; tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, @@ -4069,7 +4076,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { } tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64 - " in query %s", + " in query %s", pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr); @@ -4260,7 +4267,7 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr } tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); - _exit: +_exit: return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 27beb22165cbfb619f6cddd5d7eccfc55d80a43e..d7e5fba5cd3e8fe7f3235223d1ab0bf4a947a04e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -514,7 +514,7 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, pSmaInfo->size = 0; // encode - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue; @@ -1112,7 +1112,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey); // read and decode columns - if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit; + if (pBlockData->nColData == 0) goto _exit; if (hdr.szBlkCol > 0) { int64_t offset = pBlkInfo->offset + pBlkInfo->szKey; @@ -1128,7 +1128,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo SBlockCol *pBlockCol = &blockCol; int32_t n = 0; - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); while (pBlockCol && pBlockCol->cid < pColData->cid) { @@ -1212,49 +1212,6 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData ASSERT(pDataBlk->nSubBlock == 1); -#if 0 - if (pDataBlk->nSubBlock > 1) { - SBlockData bData1; - SBlockData bData2; - - // create - code = tBlockDataCreate(&bData1); - if (code) goto _err; - code = tBlockDataCreate(&bData2); - if (code) goto _err; - - // init - tBlockDataInitEx(&bData1, pBlockData); - tBlockDataInitEx(&bData2, pBlockData); - - for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { - code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], &bData1); - if (code) { - tBlockDataDestroy(&bData1, 1); - tBlockDataDestroy(&bData2, 1); - goto _err; - } - - code = tBlockDataCopy(pBlockData, &bData2); - if (code) { - tBlockDataDestroy(&bData1, 1); - tBlockDataDestroy(&bData2, 1); - goto _err; - } - - code = tBlockDataMerge(&bData1, &bData2, pBlockData); - if (code) { - tBlockDataDestroy(&bData1, 1); - tBlockDataDestroy(&bData2, 1); - goto _err; - } - } - - tBlockDataDestroy(&bData1, 1); - tBlockDataDestroy(&bData2, 1); - } -#endif - return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 4e02a28cdf49affbc722ae69a60cbf067ff98139..e34e305570e8660c02d2c4ecb6c179868d11c9d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -613,7 +613,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { return &pIter->colVal; } } else { - if (pIter->i < taosArrayGetSize(pIter->pRow->pBlockData->aIdx)) { + if (pIter->i < pIter->pRow->pBlockData->nColData) { SColData *pColData = tBlockDataGetColDataByIdx(pIter->pRow->pBlockData, pIter->i); tColDataGetValue(pColData, pIter->pRow->iRow, &pIter->colVal); @@ -917,14 +917,9 @@ int32_t tBlockDataCreate(SBlockData *pBlockData) { pBlockData->aUid = NULL; pBlockData->aVersion = NULL; pBlockData->aTSKEY = NULL; - pBlockData->aIdx = taosArrayInit(0, sizeof(int32_t)); - if (pBlockData->aIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } + pBlockData->nColData = 0; pBlockData->aColData = taosArrayInit(0, sizeof(SColData)); if (pBlockData->aColData == NULL) { - taosArrayDestroy(pBlockData->aIdx); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -937,12 +932,10 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) { tFree((uint8_t *)pBlockData->aUid); tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aTSKEY); - taosArrayDestroy(pBlockData->aIdx); taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataDestroy : NULL); pBlockData->aUid = NULL; pBlockData->aVersion = NULL; pBlockData->aTSKEY = NULL; - pBlockData->aIdx = NULL; pBlockData->aColData = NULL; } @@ -955,7 +948,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, pBlockData->uid = pId->uid; pBlockData->nRow = 0; - taosArrayClear(pBlockData->aIdx); + pBlockData->nColData = 0; if (aCid) { int32_t iColumn = 1; STColumn *pTColumn = &pTSchema->columns[iColumn]; @@ -969,7 +962,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, break; } else if (pTColumn->colId == aCid[iCid]) { SColData *pColData; - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); + code = tBlockDataAddColData(pBlockData, &pColData); if (code) goto _exit; tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); @@ -982,7 +975,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, STColumn *pTColumn = &pTSchema->columns[iColumn]; SColData *pColData; - code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); + code = tBlockDataAddColData(pBlockData, &pColData); if (code) goto _exit; tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); @@ -993,64 +986,36 @@ _exit: return code; } -int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom) { - int32_t code = 0; - - ASSERT(pBlockDataFrom->suid || pBlockDataFrom->uid); - - pBlockData->suid = pBlockDataFrom->suid; - pBlockData->uid = pBlockDataFrom->uid; - pBlockData->nRow = 0; - - taosArrayClear(pBlockData->aIdx); - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockDataFrom->aIdx); iColData++) { - SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColData); - - SColData *pColData; - code = tBlockDataAddColData(pBlockData, iColData, &pColData); - if (code) goto _exit; - - tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn); - } - -_exit: - return code; -} - void tBlockDataReset(SBlockData *pBlockData) { pBlockData->suid = 0; pBlockData->uid = 0; pBlockData->nRow = 0; - taosArrayClear(pBlockData->aIdx); + pBlockData->nColData = 0; } void tBlockDataClear(SBlockData *pBlockData) { ASSERT(pBlockData->suid || pBlockData->uid); pBlockData->nRow = 0; - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); tColDataClear(pColData); } } -int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData) { +int32_t tBlockDataAddColData(SBlockData *pBlockData, SColData **ppColData) { int32_t code = 0; SColData *pColData = NULL; - int32_t idx = taosArrayGetSize(pBlockData->aIdx); - if (idx >= taosArrayGetSize(pBlockData->aColData)) { + if (pBlockData->nColData >= taosArrayGetSize(pBlockData->aColData)) { if (taosArrayPush(pBlockData->aColData, &((SColData){0})) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } } - pColData = (SColData *)taosArrayGet(pBlockData->aColData, idx); + pColData = (SColData *)taosArrayGet(pBlockData->aColData, pBlockData->nColData); - if (taosArrayInsert(pBlockData->aIdx, iColData, &idx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + pBlockData->nColData++; *ppColData = pColData; return code; @@ -1087,7 +1052,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS tRowIterInit(&rIter, pRow, pTSchema); pColVal = tRowIterNext(&rIter); - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); while (pColVal && pColVal->cid < pColData->cid) { @@ -1115,19 +1080,19 @@ int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFr int32_t code = 0; int32_t iColData = 0; - for (int32_t iColDataFrom = 0; iColDataFrom < taosArrayGetSize(pBlockDataFrom->aIdx); iColDataFrom++) { + for (int32_t iColDataFrom = 0; iColDataFrom < pBlockDataFrom->nColData; iColDataFrom++) { SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColDataFrom); while (true) { SColData *pColData; - if (iColData < taosArrayGetSize(pBlockData->aIdx)) { + if (iColData < pBlockData->nColData) { pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); } else { pColData = NULL; } if (pColData == NULL || pColData->cid > pColDataFrom->cid) { - code = tBlockDataAddColData(pBlockData, iColData, &pColData); + code = tBlockDataAddColData(pBlockData, &pColData); if (code) goto _exit; tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn); @@ -1226,55 +1191,15 @@ _exit: return code; } -int32_t tBlockDataCopy(SBlockData *pSrc, SBlockData *pDest) { - int32_t code = 0; - - tBlockDataClear(pDest); - - ASSERT(pDest->suid == pSrc->suid); - ASSERT(pDest->uid == pSrc->uid); - ASSERT(taosArrayGetSize(pSrc->aIdx) == taosArrayGetSize(pDest->aIdx)); - - pDest->nRow = pSrc->nRow; - - if (pSrc->uid == 0) { - code = tRealloc((uint8_t **)&pDest->aUid, sizeof(int64_t) * pDest->nRow); - if (code) goto _exit; - memcpy(pDest->aUid, pSrc->aUid, sizeof(int64_t) * pDest->nRow); - } - - code = tRealloc((uint8_t **)&pDest->aVersion, sizeof(int64_t) * pDest->nRow); - if (code) goto _exit; - memcpy(pDest->aVersion, pSrc->aVersion, sizeof(int64_t) * pDest->nRow); - - code = tRealloc((uint8_t **)&pDest->aTSKEY, sizeof(TSKEY) * pDest->nRow); - if (code) goto _exit; - memcpy(pDest->aTSKEY, pSrc->aTSKEY, sizeof(TSKEY) * pDest->nRow); - - for (int32_t iColData = 0; iColData < taosArrayGetSize(pSrc->aIdx); iColData++) { - SColData *pColSrc = tBlockDataGetColDataByIdx(pSrc, iColData); - SColData *pColDest = tBlockDataGetColDataByIdx(pDest, iColData); - - ASSERT(pColSrc->cid == pColDest->cid); - ASSERT(pColSrc->type == pColDest->type); - - code = tColDataCopy(pColSrc, pColDest); - if (code) goto _exit; - } - -_exit: - return code; -} - SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx) { - ASSERT(idx >= 0 && idx < taosArrayGetSize(pBlockData->aIdx)); - return (SColData *)taosArrayGet(pBlockData->aColData, *(int32_t *)taosArrayGet(pBlockData->aIdx, idx)); + ASSERT(idx >= 0 && idx < pBlockData->nColData); + return (SColData *)taosArrayGet(pBlockData->aColData, idx); } void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) { ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID); int32_t lidx = 0; - int32_t ridx = taosArrayGetSize(pBlockData->aIdx) - 1; + int32_t ridx = pBlockData->nColData - 1; while (lidx <= ridx) { int32_t midx = (lidx + ridx) / 2; @@ -1308,7 +1233,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, // encode ================= // columns AND SBlockCol aBufN[0] = 0; - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); ASSERT(pColData->flag); @@ -1431,7 +1356,7 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin ASSERT(nt <= hdr.szBlkCol); SColData *pColData; - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); + code = tBlockDataAddColData(pBlockData, &pColData); if (code) goto _exit; tColDataInit(pColData, blockCol.cid, blockCol.type, blockCol.smaOn); diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index cf0d4d25e85535671aea1d3e2e90303e2e420c7d..3f3b794f46cb5c3b82b7d5476f400b71aa066c8e 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -79,7 +79,7 @@ void syncNodeRemove(int64_t rid) { taosRemoveRef(gNodeRefId, rid); } SSyncNode *syncNodeAcquire(int64_t rid) { SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid); if (pNode == NULL) { - sTrace("failed to acquire node from refId:%" PRId64, rid); + sError("failed to acquire node from refId:%" PRId64, rid); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3d68f294e712afdf2b98d3f5736a235ff37e6fef..d07b4d8b27f0a7c06e5a2749c50f57c75f6ed098 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -221,8 +221,12 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return -1; - + if (pSyncNode == NULL) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + sError("sync begin snapshot error"); + return -1; + } + int32_t code = 0; if (syncNodeIsMnode(pSyncNode)) { @@ -330,7 +334,10 @@ _DEL_WAL: int32_t syncEndSnapshot(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return -1; + if (pSyncNode == NULL) { + sError("sync end snapshot error"); + return -1; + } int32_t code = 0; if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) { @@ -352,7 +359,10 @@ int32_t syncEndSnapshot(int64_t rid) { int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return -1; + if (pSyncNode == NULL) { + sError("sync step down error"); + return -1; + } syncNodeStepDown(pSyncNode, newTerm); syncNodeRelease(pSyncNode); @@ -361,7 +371,10 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { bool syncIsReadyForRead(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return -1; + if (pSyncNode == NULL) { + sError("sync ready for read error"); + return false; + } if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) { syncNodeRelease(pSyncNode); @@ -651,7 +664,10 @@ static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandl int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return -1; + if (pSyncNode == NULL) { + sError("sync propose error"); + return -1; + } int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); syncNodeRelease(pSyncNode); @@ -2528,9 +2544,19 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd // append entry code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { - // del resp mgr, call FpCommitCb - ASSERT(0); - return -1; + if (ths->replicaNum == 1) { + if (h) { + taosLRUCacheRelease(ths->pLogStore->pCache, h, false); + } else { + syncEntryDestory(pEntry); + } + return -1; + + } else { + // del resp mgr, call FpCommitCb + ASSERT(0); + return -1; + } } // if mulit replica, start replicate right now diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 9544106937f9f9c8f8cea70ccb26e6724d2776ee..d3d69b288b1baa9070fb6605602ca37598e4d226 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -236,6 +236,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SWalReader* pWalHandle = pData->pWalHandle; if (pWalHandle == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId); + return -1; }