diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 8ce40ac29320bc7793015d82acb65040865f16ad..86cd4f6a6b4f9f5d53212648a9ca8f9cdbf06e8f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -123,15 +123,13 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2); int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowClose(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter); + // SRowMerger -int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); +int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema); int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); -void tsdbRowMergerClear(SRowMerger *pMerger); int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow); - -int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema); -void tsdbRowMergerClear_rv(SRowMerger* pMerger); -void tsdbRowMergerCleanup_rv(SRowMerger* pMerger); +void tsdbRowMergerClear(SRowMerger *pMerger); +void tsdbRowMergerCleanup(SRowMerger *pMerger); // TABLEID int32_t tTABLEIDCmprFn(const void *p1, const void *p2); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e643e8f1a493c5409b5f2cb6bb92adb44a6ea703..79dc9543ba6645265bce1c15dae837e6af961960 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1464,7 +1464,6 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte pBlockIter->pTableMap = pReader->status.pTableMap; // access data blocks according to the offset of each block in asc/desc order. -// int32_t numOfTables = (int32_t)tSimpleHashGetSize(pReader->status.pTableMap); int32_t numOfTables = taosArrayGetSize(pTableList); int64_t st = taosGetTimestampUs(); @@ -1474,21 +1473,10 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } int32_t cnt = 0; -// void* ptr = NULL; -// int32_t iter = 0; -// while (1) { -// ptr = tSimpleHashIterate(pReader->status.pTableMap, ptr, &iter); -// if (ptr == NULL) { -// break; -// } - - for(int32_t i = 0; i < numOfTables; ++i) { + for (int32_t i = 0; i < numOfTables; ++i) { STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i); ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0); -// if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) { -// continue; -// } size_t num = taosArrayGetSize(pTableScanInfo->pBlockList); sup.numOfBlocksPerTable[sup.numOfTables] = num; @@ -1876,8 +1864,6 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc pScanInfo->lastKeyInStt = key; if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) { - // the qualifed ts may equal to k.ts, only a greater version one. - // here we need to fallback one step. return true; } } @@ -1939,7 +1925,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* return NULL; } - code = tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema); + code = tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema); if (code != 0) { terrno = code; return NULL; @@ -2021,7 +2007,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (pReader->order == TSDB_ORDER_ASC) { if (minKey == key) { init = true; // todo check if pReader->pSchema is null or not - int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2031,15 +2017,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tsdbRowMergerAdd(&pReader->status.merger, fRow1, NULL); + tsdbRowMergerAdd(pMerger, fRow1, NULL); } else { init = true; - int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &pReader->status.merger, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr); } if (minKey == k.ts) { @@ -2051,7 +2037,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(pMerger, pRow, pSchema); } else { init = true; - int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2069,7 +2055,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return terrno; } - int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2086,7 +2072,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(pMerger, fRow1, NULL); } else { init = true; - int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2099,7 +2085,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(pMerger, &fRow, NULL); } else { init = true; - int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2116,7 +2102,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear_rv(pMerger); + tsdbRowMergerClear(pMerger); return code; } @@ -2149,7 +2135,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, pBlockScanInfo->lastKey = tsLastBlock; return TSDB_CODE_SUCCESS; } else { - code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2166,14 +2152,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear_rv(pMerger); + tsdbRowMergerClear(pMerger); if (code != TSDB_CODE_SUCCESS) { return code; } } } else { // not merge block data - code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2193,7 +2179,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear_rv(pMerger); + tsdbRowMergerClear(pMerger); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2223,7 +2209,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key == ts) { SRow* pTSRow = NULL; - SRowMerger* pMerger = &pReader->status.merger; + SRowMerger* pMerger = pMerger; int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { @@ -2245,7 +2231,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear_rv(pMerger); + tsdbRowMergerClear(pMerger); return code; } else { return TSDB_CODE_SUCCESS; @@ -2454,7 +2440,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear_rv(pMerger); + tsdbRowMergerClear(pMerger); return code; } @@ -2627,7 +2613,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear_rv(&pReader->status.merger); + tsdbRowMergerClear(&pReader->status.merger); return code; } } @@ -3913,7 +3899,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, } pResRow->type = TSDBROW_ROW_FMT; - tsdbRowMergerClear_rv(&pReader->status.merger); + tsdbRowMergerClear(&pReader->status.merger); *freeTSRow = true; return TSDB_CODE_SUCCESS; @@ -3972,7 +3958,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p } int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow); - tsdbRowMergerClear_rv(pMerger); + tsdbRowMergerClear(pMerger); return code; } @@ -4097,12 +4083,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S int32_t code = TSDB_CODE_SUCCESS; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; -// ASSERT (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID);// { -// SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]); -// ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex]; ((int64_t*)pReader->status.pPrimaryTsCol->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex]; - i += 1; -// } + i += 1; SColVal cv = {0}; int32_t numOfInputCols = pBlockData->nColData; @@ -4355,7 +4337,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } if (pReader->pSchema != NULL) { - tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema); + tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema); } pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); @@ -4515,7 +4497,7 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFree(pReader->idStr); - tsdbRowMergerCleanup_rv(&pReader->status.merger); + tsdbRowMergerCleanup(&pReader->status.merger); taosMemoryFree(pReader->pSchema); tSimpleHashCleanup(pReader->pSchemaMap); @@ -4562,7 +4544,6 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { } pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - // pInfo->lastKey = ts; } } else { // resetDataBlockScanInfo excluding lastKey @@ -4585,7 +4566,6 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { } pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - // pInfo->lastKey = ts; } pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter; @@ -5134,8 +5114,6 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa const int32_t numOfBuckets = 20.0; - // find the start data block in file - // find the start data block in file tsdbAcquireReader(pReader); if (pReader->suspended) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 0e2fb4b6aa031868b3c2859c2efc0a81372eb552..d8b0fc7c4555daf2687fb347b60f0a522bd0f377 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -637,78 +637,6 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { } // SRowMerger ====================================================== - -int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) { - int32_t code = 0; - TSDBKEY key = TSDBROW_KEY(pRow); - SColVal *pColVal = &(SColVal){0}; - STColumn *pTColumn; - int32_t iCol, jCol = 0; - - if (NULL == pResTSchema) { - pResTSchema = pTSchema; - } - - pMerger->pTSchema = pResTSchema; - pMerger->version = key.version; - - pMerger->pArray = taosArrayInit(pResTSchema->numOfCols, sizeof(SColVal)); - if (pMerger->pArray == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - // ts - pTColumn = &pTSchema->columns[jCol++]; - - ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); - - *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts}); - if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - // other - for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) { - pTColumn = &pResTSchema->columns[iCol]; - if (pTSchema->columns[jCol].colId < pTColumn->colId) { - ++jCol; - --iCol; - continue; - } else if (pTSchema->columns[jCol].colId > pTColumn->colId) { - taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); - continue; - } - - tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); - if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - - pColVal->value.pData = NULL; - code = tRealloc(&pColVal->value.pData, pColVal->value.nData); - if (code) goto _exit; - - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } - - if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - } - - for (; iCol < pResTSchema->numOfCols; ++iCol) { - pTColumn = &pResTSchema->columns[iCol]; - taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); - } - -_exit: - return code; -} - int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; TSDBKEY key = TSDBROW_KEY(pRow); @@ -836,7 +764,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) } } -int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) { +int32_t tsdbRowMergerInit(SRowMerger* pMerger, STSchema *pSchema) { pMerger->pTSchema = pSchema; pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal)); if (pMerger->pArray == NULL) { @@ -846,7 +774,7 @@ int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) { } } -void tsdbRowMergerClear_rv(SRowMerger* pMerger) { +void tsdbRowMergerClear(SRowMerger* pMerger) { for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); if (IS_VAR_DATA_TYPE(pTColVal->type)) { @@ -857,7 +785,7 @@ void tsdbRowMergerClear_rv(SRowMerger* pMerger) { taosArrayClear(pMerger->pArray); } -void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) { +void tsdbRowMergerCleanup(SRowMerger* pMerger) { int32_t numOfCols = taosArrayGetSize(pMerger->pArray); for (int32_t iCol = 1; iCol < numOfCols; iCol++) { SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); @@ -869,82 +797,6 @@ void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) { taosArrayDestroy(pMerger->pArray); } -void tsdbRowMergerClear(SRowMerger *pMerger) { - for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { - SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); - if (IS_VAR_DATA_TYPE(pTColVal->type)) { - tFree(pTColVal->value.pData); - } - } - - taosArrayDestroy(pMerger->pArray); -} -/* -int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { - int32_t code = 0; - TSDBKEY key = TSDBROW_KEY(pRow); - SColVal *pColVal = &(SColVal){0}; - - ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts); - - for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { - tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal); - - if (key.version > pMerger->version) { - if (!COL_VAL_IS_NONE(pColVal)) { - if (IS_VAR_DATA_TYPE(pColVal->type)) { - SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); - if (!COL_VAL_IS_NULL(pColVal)) { - code = tRealloc(&pTColVal->value.pData, pColVal->value.nData); - if (code) goto _exit; - - pTColVal->value.nData = pColVal->value.nData; - if (pTColVal->value.nData) { - memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); - } - pTColVal->flag = 0; - } else { - tFree(pTColVal->value.pData); - pTColVal->value.pData = NULL; - taosArraySet(pMerger->pArray, iCol, pColVal); - } - } else { - taosArraySet(pMerger->pArray, iCol, pColVal); - } - } - } else if (key.version < pMerger->version) { - SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); - if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { - if (IS_VAR_DATA_TYPE(pColVal->type)) { - if (!COL_VAL_IS_NULL(pColVal)) { - code = tRealloc(&tColVal->value.pData, pColVal->value.nData); - if (code) goto _exit; - - tColVal->value.nData = pColVal->value.nData; - if (tColVal->value.nData) { - memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData); - } - tColVal->flag = 0; - } else { - tFree(tColVal->value.pData); - tColVal->value.pData = NULL; - taosArraySet(pMerger->pArray, iCol, pColVal); - } - } else { - taosArraySet(pMerger->pArray, iCol, pColVal); - } - } - } else { - ASSERT(0); - } - } - - pMerger->version = key.version; - -_exit: - return code; -} -*/ int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) { return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow); }