未验证 提交 028cc528 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19116 from taosdata/feature/3_liaohj

refactor: do some internal refactor.
......@@ -185,7 +185,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void **pReader);
uint64_t suid, void **pReader, const char* idstr);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
......@@ -715,21 +715,21 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
// tsdbCache ==============================================================================================
typedef struct SCacheRowsReader {
SVnode *pVnode;
STSchema *pSchema;
uint64_t uid;
uint64_t suid;
char **transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
SVnode *pVnode;
STSchema *pSchema;
uint64_t uid;
uint64_t suid;
char **transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
SSttBlockLoadInfo *pLoadInfo;
STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader;
SDataFReader *pDataFReaderLast;
const char *idstr;
} SCacheRowsReader;
typedef struct {
......@@ -752,8 +752,6 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
......
......@@ -1406,30 +1406,6 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *
return code;
}
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) {
int32_t code = 0;
int16_t nCol = taosArrayGetSize(pLastArray);
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol);
SColVal *tColVal = &tTsVal->colVal;
taosArrayPush(pColArray, tColVal);
}
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
if (code) goto _err;
taosArrayDestroy(pColArray);
return code;
_err:
taosArrayDestroy(pColArray);
return code;
}
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
int32_t code = 0;
char key[32] = {0};
......
......@@ -20,9 +20,8 @@
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
void** pRes) {
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
......@@ -65,9 +64,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
}
pBlock->info.rows += allNullRow ? 0 : 1;
} else {
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
} else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
......@@ -94,11 +91,16 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
}
pBlock->info.rows += 1;
} else {
tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
return TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void** pReader) {
uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) {
......@@ -142,6 +144,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
return TSDB_CODE_OUT_OF_MEMORY;
}
p->idstr = taosMemoryStrDup(idstr);
*pReader = p;
return TSDB_CODE_SUCCESS;
}
......@@ -160,6 +164,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
destroyLastBlockLoadInfo(p->pLoadInfo);
taosMemoryFree((void*) p->idstr);
taosMemoryFree(pReader);
return NULL;
}
......@@ -308,7 +313,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
if (hasRes) {
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes);
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
}
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
......@@ -323,7 +328,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds, pRes);
saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
// TODO reset the pRes
taosArrayPush(pTableUidList, &pKeyInfo->uid);
......
......@@ -396,7 +396,6 @@ static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
ASSERT(pWindow != NULL);
return pWindow->skey > pWindow->ekey;
}
......@@ -1447,7 +1446,6 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
index += step;
}
ASSERT(0);
return -1;
}
......@@ -2421,6 +2419,46 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
}
}
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, bool* loadNeighbor) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t nextIndex = -1;
SBlockIndex nxtBIndex = {0};
*loadNeighbor = false;
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
if (!hasNeighbor) { // do nothing
return code;
}
if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) { // load next block
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
// 1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
// 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// 4. check the data values
initBlockDumpInfo(pReader, pBlockIter);
*loadNeighbor = true;
}
return code;
}
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -2479,38 +2517,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
int32_t nextIndex = -1;
SBlockIndex bIndex = {0};
pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex);
if (!hasNeighbor) { // do nothing
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
if (overlapWithNeighborBlock(pBlock, &bIndex, pReader->order)) { // load next block
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
// 1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
// 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
// 4. check the data values
initBlockDumpInfo(pReader, pBlockIter);
} else {
// continue check for the next file block if the last ts in the current block
// is overlapped with the next neighbor block
bool loadNeighbor = false;
code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
if ((!loadNeighbor) || (code != 0)) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
......@@ -2777,7 +2790,10 @@ static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
ASSERT(pStatus->pTableIter != NULL);
if (pStatus->pTableIter == NULL) {
return false;
}
return true;
}
......@@ -3117,10 +3133,10 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
}
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
ASSERT(pKey != NULL);
if (pDelList == NULL) {
return false;
}
size_t num = taosArrayGetSize(pDelList);
bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1;
......@@ -3318,35 +3334,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
*state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t nextIndex = -1;
SBlockIndex bIndex = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex);
if (!hasNeighbor) { // do nothing
return 0;
}
bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
if (overlap) { // load next block
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
// 1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
// 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block
int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// 4. check the data values
initBlockDumpInfo(pReader, pBlockIter);
bool loadNeighbor = true;
int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
......@@ -3354,7 +3345,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
}
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
......@@ -3709,13 +3700,11 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
}
} while (1);
ASSERT(pBlock->info.rows <= capacity);
return TSDB_CODE_SUCCESS;
}
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
ASSERT(pReader != NULL);
int32_t size = taosHashGetSize(pReader->status.pTableMap);
STableBlockScanInfo** p = NULL;
......@@ -4079,7 +4068,6 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
}
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
ASSERT(pReader != NULL);
*rows = pReader->pResBlock->info.rows;
*uid = pReader->pResBlock->info.id.uid;
*pWindow = pReader->pResBlock->info.window;
......
......@@ -354,7 +354,6 @@ typedef struct STableMergeScanInfo {
SLimitInfo limitInfo;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
......
......@@ -90,7 +90,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableList);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader);
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -216,7 +216,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader);
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
taosArrayClear(pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
......
......@@ -99,6 +99,8 @@ static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SC
int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo);
void setOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
......@@ -139,9 +141,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
return fpSet;
}
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
SFilePage* pData = NULL;
......@@ -245,7 +244,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
}
// a new buffer page for each table. Needs to opt this design
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
if (pWindowRes->pageId != -1) {
return 0;
}
......@@ -916,8 +915,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
int32_t ret =
addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) {
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册