From 64554461f6f5dcbb93b1f304a25abbcd9ddec691 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 17 Jun 2022 12:22:45 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 96 +++++++++-- source/dnode/vnode/src/tsdb/tsdbRead.c | 199 ++++++++++++----------- 2 files changed, 178 insertions(+), 117 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index deaa171748..f9e3164826 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -398,26 +398,81 @@ _err: return code; } -static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) { - int32_t code = 0; - int32_t nRow = 0; - SBlock block = BLOCK_INIT_VAL; +static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) { + int32_t nRow = 0; + TSDBROW *pRow; + TSDBKEY key; + int32_t c = 0; + STbDataIter iter = *pIter; + + iter.pRow = NULL; + while (true) { + pRow = tsdbTbDataIterGet(pIter); - if (pBlock->last) { - // load last and merge until {pCommitter->maxKey, INT64_MAX} - } else { - // scan pIter, check how many rows in the block range - if (pBlock->nRow + nRow <= pCommitter->maxRow) { - if (pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) { - // add as a subblock + if (pRow == NULL) break; + key = tsdbRowKey(pRow); + + c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock); + if (c == 0) { + nRow++; + } else if (c > 0) { + break; + } else { + ASSERT(0); + } + } + + return nRow; +} + +static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock, + int8_t toDataOnly) { + int32_t code = 0; + int32_t iRow = 0; + int32_t nRow = 0; + int32_t c; + TSDBROW *pRow; + SBlock block = BLOCK_INIT_VAL; + TSDBKEY key1; + TSDBKEY key2; + + tsdbBlockDataClear(&pCommitter->bDataN); + + // load last and merge until {pCommitter->maxKey, INT64_MAX} + code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->bDataO, NULL, 0, NULL, NULL); + if (code) goto _err; + + iRow = 0; + nRow = pCommitter->bDataO.nRow; + pRow = tsdbTbDataIterGet(pIter); + + while (true) { + if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) { + if (pCommitter->bDataN.nRow > 0) { + goto _write_block_data; } else { - // load the block, merge until pBlock->maxKey + break; } - } else { - // load the block, merge until pBlock->maxKey } + + // TODO + + _write_block_data: + block.last = pCommitter->bDataN.nRow < pCommitter->minRow ? 1 : 0; + code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->bDataN, NULL, pBlockIdx, &block); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlock, &block, tPutBlock); + if (code) goto _err; } + block = BLOCK_INIT_VAL; + tsdbBlockDataClear(&pCommitter->bDataN); + + return code; + +_err: + tsdbError("vgId:%d merge commit impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -436,7 +491,7 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb if (code) goto _err; } else if (pBlock->last) { // merge - code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock); + code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0); if (code) goto _err; } else { // memory @@ -456,9 +511,14 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb code = tMapDataPutItem(&pCommitter->nBlock, pBlock, tPutBlock); if (code) goto _err; } else if (c == 0) { - // merge - code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock); - if (code) goto _err; + int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock); + + if (pBlock->nRow + nOverlap > pCommitter->maxRow || pBlock->nSubBlock == TSDB_MAX_SUBBLOCKS) { + code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 1); + if (code) goto _err; + } else { + // add as a subblock + } } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 77a2df0d86..a43e4a8673 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -113,7 +113,7 @@ struct STsdbReader { // SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time // SColumnDataAgg** pstatis;// the ptr array list to return to caller int32_t numOfBlocks; - SArray* pColumns; // column list, SColumnInfoData array list + SArray* pColumns; // SArray bool locateStart; int32_t outputCapacity; int32_t realNumOfRows; @@ -180,42 +180,43 @@ struct STsdbReader { // return pLocalIdList; // } -// static SArray* createCheckInfoFromTableGroup(STsdbReader* pTsdbReadHandle, STableListInfo* pTableList) { -// size_t tableSize = taosArrayGetSize(pTableList->pTableList); -// assert(tableSize >= 1); +static SArray* createCheckInfoFromTableGroup(STsdbReader* pTsdbReadHandle, STableListInfo* pTableList) { + // size_t tableSize = taosArrayGetSize(pTableList->pTableList); + // assert(tableSize >= 1); -// // allocate buffer in order to load data blocks from file -// SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo)); -// if (pTableCheckInfo == NULL) { -// return NULL; -// } + // // allocate buffer in order to load data blocks from file + // SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo)); + // if (pTableCheckInfo == NULL) { + // return NULL; + // } -// // todo apply the lastkey of table check to avoid to load header file -// for (int32_t j = 0; j < tableSize; ++j) { -// STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j); + // // todo apply the lastkey of table check to avoid to load header file + // for (int32_t j = 0; j < tableSize; ++j) { + // STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j); -// STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid}; -// info.suid = pTsdbReadHandle->suid; -// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { -// if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) { -// info.lastKey = pTsdbReadHandle->window.skey; -// } + // STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid}; + // info.suid = pTsdbReadHandle->suid; + // if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { + // if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) { + // info.lastKey = pTsdbReadHandle->window.skey; + // } -// assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey); -// } else { -// info.lastKey = pTsdbReadHandle->window.skey; -// } + // assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey); + // } else { + // info.lastKey = pTsdbReadHandle->window.skey; + // } -// taosArrayPush(pTableCheckInfo, &info); -// tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, -// info.lastKey, -// pTsdbReadHandle->idStr); -// } + // taosArrayPush(pTableCheckInfo, &info); + // tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, + // info.lastKey, + // pTsdbReadHandle->idStr); + // } -// // TODO group table according to the tag value. -// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar); -// return pTableCheckInfo; -// } + // // TODO group table according to the tag value. + // taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar); + // return pTableCheckInfo; + return NULL; +} // static void resetCheckInfo(STsdbReader* pTsdbReadHandle) { // size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); @@ -266,31 +267,31 @@ struct STsdbReader { // return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick // } -// static void setQueryTimewindow(STsdbReader* pTsdbReadHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) { -// pTsdbReadHandle->window = pCond->twindows[tWinIdx]; +static void setQueryTimewindow(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) { + // pReader->window = pCond->twindows[tWinIdx]; -// bool updateTs = false; -// int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb); -// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { -// if (startTs > pTsdbReadHandle->window.skey) { -// pTsdbReadHandle->window.skey = startTs; -// pCond->twindows[tWinIdx].skey = startTs; -// updateTs = true; -// } -// } else { -// if (startTs > pTsdbReadHandle->window.ekey) { -// pTsdbReadHandle->window.ekey = startTs; -// pCond->twindows[tWinIdx].ekey = startTs; -// updateTs = true; -// } -// } + // bool updateTs = false; + // int64_t startTs = getEarliestValidTimestamp(pReader->pTsdb); + // if (ASCENDING_TRAVERSE(pReader->order)) { + // if (startTs > pReader->window.skey) { + // pReader->window.skey = startTs; + // pCond->twindows[tWinIdx].skey = startTs; + // updateTs = true; + // } + // } else { + // if (startTs > pReader->window.ekey) { + // pReader->window.ekey = startTs; + // pCond->twindows[tWinIdx].ekey = startTs; + // updateTs = true; + // } + // } -// if (updateTs) { -// tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s", -// pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey, -// pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); -// } -// } + // if (updateTs) { + // tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s", + // pReader, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey, pReader->window.skey, + // pReader->window.ekey, pReader->idStr); + // } +} static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId, STsdbReader** ppReader) { @@ -326,50 +327,50 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, uint // // goto _end; // // } - // setQueryTimewindow(pReadHandle, pCond, 0); - - // if (pCond->numOfCols > 0) { - // int32_t rowLen = 0; - // for (int32_t i = 0; i < pCond->numOfCols; ++i) { - // rowLen += pCond->colList[i].bytes; - // } - - // // make sure the output SSDataBlock size be less than 2MB. - // int32_t TWOMB = 2 * 1024 * 1024; - // if (pReadHandle->outputCapacity * rowLen > TWOMB) { - // pReadHandle->outputCapacity = TWOMB / rowLen; - // } - - // // allocate buffer in order to load data blocks from file - // pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); - // if (pReadHandle->suppInfo.pstatis == NULL) { - // goto _end; - // } - - // // todo: use list instead of array? - // pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); - // if (pReadHandle->pColumns == NULL) { - // goto _end; - // } - - // for (int32_t i = 0; i < pCond->numOfCols; ++i) { - // SColumnInfoData colInfo = {{0}, 0}; - // colInfo.info = pCond->colList[i]; - - // int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity); - // if (code != TSDB_CODE_SUCCESS) { - // goto _end; - // } - - // taosArrayPush(pReadHandle->pColumns, &colInfo); - // } - - // pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); - - // size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn); - // pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t)); - // pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES); - // } + setQueryTimewindow(pReader, pCond, 0); + + if (pCond->numOfCols > 0) { + // int32_t rowLen = 0; + // for (int32_t i = 0; i < pCond->numOfCols; ++i) { + // rowLen += pCond->colList[i].bytes; + // } + + // // make sure the output SSDataBlock size be less than 2MB. + // int32_t TWOMB = 2 * 1024 * 1024; + // if (pReadHandle->outputCapacity * rowLen > TWOMB) { + // pReadHandle->outputCapacity = TWOMB / rowLen; + // } + + // // allocate buffer in order to load data blocks from file + // pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); + // if (pReadHandle->suppInfo.pstatis == NULL) { + // goto _end; + // } + + // // todo: use list instead of array? + // pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); + // if (pReadHandle->pColumns == NULL) { + // goto _end; + // } + + // for (int32_t i = 0; i < pCond->numOfCols; ++i) { + // SColumnInfoData colInfo = {{0}, 0}; + // colInfo.info = pCond->colList[i]; + + // int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity); + // if (code != TSDB_CODE_SUCCESS) { + // goto _end; + // } + + // taosArrayPush(pReadHandle->pColumns, &colInfo); + // } + + // pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); + + // size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn); + // pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t)); + // pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES); + } // pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows); // if (pReadHandle->pDataCols == NULL) { @@ -378,7 +379,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, uint // goto _end; // } - // tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo); + // tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo); // tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo); // return (STsdbReader*)pReadHandle; -- GitLab