提交 632a26d0 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 06926eda
......@@ -584,11 +584,12 @@ typedef struct SIntervalAggOperatorInfo {
typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo* intervalAggOperatorInfo;
bool hasGroupId;
// bool hasGroupId;
uint64_t groupId; // current groupId
int64_t curTs; // current ts
SSDataBlock* prefetchedBlock;
SNode* pCondition;
SResultRow* pResultRow;
} SMergeAlignedIntervalAggOperatorInfo;
typedef struct SStreamIntervalOperatorInfo {
......@@ -648,7 +649,6 @@ typedef struct SAggOperatorInfo {
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SNode* pFilterNode; // filter info, which is push down by optimizer
......@@ -690,7 +690,6 @@ typedef struct SFillOperatorInfo {
} SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
......@@ -737,7 +736,6 @@ typedef struct SWindowRowsSup {
} SWindowRowsSup;
typedef struct SSessionAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
......@@ -825,7 +823,6 @@ typedef struct SStateWindowOperatorInfo {
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
// bool reptScan;
const SNode* pCondition;
} SStateWindowOperatorInfo;
......@@ -846,24 +843,6 @@ typedef struct SStreamStateAggOperatorInfo {
bool ignoreExpiredData;
} SStreamStateAggOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle* pSortHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t resultRowFactor;
bool hasGroupVal;
SDiskbasedBuf* pTupleStore; // keep the final results
int32_t numOfResPerPage;
char** groupVal;
SArray* groupInfo;
} SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo;
uint32_t sortBufSize; // max buffer size for in-memory sort
......@@ -871,7 +850,6 @@ typedef struct SSortOperatorInfo {
SSortHandle* pSortHandle;
SArray* pColMatchInfo; // for index map from table scan output
int32_t bufPageSize;
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo;
......@@ -907,7 +885,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
__optr_decode_fn_t decode, __optr_explain_fn_t explain);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
......@@ -942,7 +919,6 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
SSDataBlock* pBlock, const char* idStr);
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
......@@ -1089,10 +1065,8 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEn
void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
const int32_t* rowCellOffset, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
......
......@@ -132,8 +132,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
return fpSet;
}
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo);
......@@ -1269,33 +1267,12 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu
}
}
// todo extract method with copytoSSDataBlock
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
const int32_t* rowCellOffset, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo) {
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowCellOffset);
if (pRow->numOfRows == 0) {
releaseBufPage(pBuf, page);
return 0;
}
while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
if (TAOS_FAILED(code)) {
releaseBufPage(pBuf, page);
qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
}
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) {
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) {
......@@ -1303,7 +1280,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
T_LONG_JMP(pTaskInfo->env, code);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
// do nothing
} else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
......@@ -1314,10 +1291,39 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
}
}
}
}
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
SqlFunctionCtx* pCtx = pSup->pCtx;
SExprInfo* pExprInfo = pSup->pExprInfo;
const int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
if (pRow->numOfRows == 0) {
releaseBufPage(pBuf, page);
return 0;
}
int32_t size = pBlock->info.capacity;
while (pBlock->info.rows + pRow->numOfRows > size) {
size = size * 1.25;
}
int32_t code = blockDataEnsureCapacity(pBlock, size);
if (TAOS_FAILED(code)) {
releaseBufPage(pBuf, page);
qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
return 0;
}
......@@ -1362,32 +1368,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
}
pGroupResInfo->index += 1;
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) {
#ifdef BUF_PAGE_DEBUG
qDebug("\npage_finalize %d", numOfExprs);
#endif
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
}
}
doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
......@@ -2307,21 +2288,6 @@ _error:
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey);
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param;
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->groupInfo);
if (pInfo->pSortHandle != NULL) {
tsortDestroySortHandle(pInfo->pSortHandle);
}
blockDataDestroy(pInfo->binfo.pRes);
cleanupAggSup(&pInfo->aggSup);
taosMemoryFreeClear(param);
}
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
size_t size = taosArrayGetSize(groupInfo);
if (size == 0) {
......@@ -2357,41 +2323,6 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3
return 0;
}
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
int32_t rowIndex) {
for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index
// pCtx[j].startRow = rowIndex;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
// pCtx[j].fpSet->addInput(&pCtx[j]);
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
// doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
// } else {
// assert(!TSDB_FUNC_IS_SCALAR(functionId));
// aAggs[functionId].mergeFunc(&pCtx[j]);
// }
}
}
static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
// if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
// continue;
// }
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
// doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
// } else {
// pCtx[j].fpSet.finalize(&pCtx[j]);
}
}
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
int32_t size = (int32_t)taosArrayGetSize(pColumnList);
......@@ -2406,210 +2337,6 @@ static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock
return true;
}
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
SSortedMergeOperatorInfo* pInfo = pOperator->info;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
if (!pInfo->hasGroupVal) {
ASSERT(i == 0);
doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
} else {
if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) {
doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
} else {
doFinalizeResultImpl(pCtx, numOfExpr);
int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
// TODO check for available buffer;
// next group info data
pInfo->binfo.pRes->info.rows += numOfRows;
for (int32_t j = 0; j < numOfExpr; ++j) {
if (pCtx[j].functionId < 0) {
continue;
}
pCtx[j].fpSet.process(&pCtx[j]);
}
doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
}
}
}
}
static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
SSortedMergeOperatorInfo* pInfo = pOperator->info;
SSortHandle* pHandle = pInfo->pSortHandle;
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
while (1) {
blockDataCleanup(pDataBlock);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
// build datablock for merge for one group
appendOneRowToDataBlock(pDataBlock, pTupleHandle);
if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) {
break;
}
}
if (pDataBlock->info.rows == 0) {
break;
}
setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
// pOperator->pRuntimeEnv, true);
doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
// flush to tuple store, and after all data have been handled, return to upstream node or sink node
}
doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
// TODO check for available buffer;
// next group info data
pInfo->binfo.pRes->info.rows += numOfRows;
return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
}
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
blockDataCleanup(pDataBlock);
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
if (p == NULL) {
return NULL;
}
blockDataEnsureCapacity(p, capacity);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(p, pTupleHandle);
if (p->info.rows >= capacity) {
break;
}
}
if (p->info.rows > 0) {
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
}
pDataBlock->info.rows = p->info.rows;
pDataBlock->info.capacity = p->info.rows;
}
blockDataDestroy(p);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortedMergeOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[i];
tsortAddSource(pInfo->pSortHandle, ps);
}
int32_t code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
pOperator->status = OP_RES_TO_RETURN;
return doMerge(pOperator);
}
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo,
SSortedMergeOperatorInfo* pInfo) {
if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
return 0;
}
int32_t len = 0;
SArray* plist = taosArrayInit(3, sizeof(SColumn));
pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t));
if (plist == NULL || pInfo->groupInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
for (int32_t i = 0; i < numOfGroupCol; ++i) {
SColumn* pCol = taosArrayGet(pGroupInfo, i);
for (int32_t j = 0; j < numOfCols; ++j) {
SExprInfo* pe = &pExprInfo[j];
if (pe->base.resSchema.slotId == pCol->colId) {
taosArrayPush(plist, pCol);
taosArrayPush(pInfo->groupInfo, &j);
len += pCol->bytes;
break;
}
}
}
ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist));
pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len));
if (pInfo->groupVal == NULL) {
taosArrayDestroy(plist);
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t offset = 0;
char* start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol));
for (int32_t i = 0; i < numOfGroupCol; ++i) {
pInfo->groupVal[i] = start + offset;
SColumn* pCol = taosArrayGet(plist, i);
offset += pCol->bytes;
}
taosArrayDestroy(plist);
return TSDB_CODE_SUCCESS;
}
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
// todo add more information about exchange operation
int32_t type = pOperator->operatorType;
......@@ -3342,13 +3069,6 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
cleanupBasicInfo(pInfo);
taosMemoryFreeClear(param);
}
static void freeItem(void* pItem) {
void** p = pItem;
if (*p != NULL) {
......
......@@ -46,19 +46,6 @@ static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, co
uint64_t groupId);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
///*
// * There are two cases to handle:
// *
// * 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including
// * pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey.
// * 2. Query range is set and query is in progress. There may be another result with the same query ranges to be
// * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
// * is a previous result generated or not.
// */
// static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) {
// // do nothing
//}
static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
......@@ -3004,9 +2991,9 @@ static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pIn
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId};
// add pull data request
savePullWindow(&pull, pInfo->pPullWins);
int32_t size = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, winKey, size);
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size);
int32_t size1 = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, winKey, size1);
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
}
}
}
......@@ -4884,66 +4871,64 @@ void destroyMergeAlignedIntervalOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId,
SSDataBlock* pResultBlock, TSKEY wstartTs) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
return pResult;
}
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
if (*pResult == NULL) {
*pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
if (*pResult == NULL) {
return terrno;
}
}
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
// set time window for current result ,todo extract method
(*pResult)->win = (*win);
(*pResult)->numOfRows = 0;
(*pResult)->closed = false;
(*pResult)->endInterp = false;
(*pResult)->startInterp = false;
memset((*pResult)->pEntryInfo, 0, pAggSup->resultRowSize - sizeof(SResultRow));
setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
}
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
SSDataBlock* pBlock, int32_t scanFlag, SSDataBlock* pResultBlock) {
SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
SInterval* pInterval = &iaInfo->interval;
int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
uint64_t tableGroupId = pBlock->info.groupId;
SResultRow* pResult = NULL;
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
// there is an result exists
if (miaInfo->curTs != INT64_MIN) {
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
if (ts != miaInfo->curTs) {
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
miaInfo->curTs = ts;
}
} else {
miaInfo->curTs = ts;
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
}
STimeWindow win = {0};
win.skey = miaInfo->curTs;
win.ekey =
taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
// TODO: remove the hash table (groupid + winkey => result row position)
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret);
}
int32_t currPos = startPos;
......@@ -4956,21 +4941,18 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
pBlock->info.rows, numOfOutput);
pBlock->info.rows, pSup->numOfExprs);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
miaInfo->curTs = tsCols[currPos];
currWin.skey = miaInfo->curTs;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
iaInfo->interval.precision) -
1;
currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret);
}
miaInfo->curTs = currWin.skey;
......@@ -4978,68 +4960,66 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
pBlock->info.rows, numOfOutput);
pBlock->info.rows, pSup->numOfExprs);
}
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
SIntervalAggOperatorInfo* pIaInfo = pMiaInfo->intervalAggOperatorInfo;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = iaInfo->binfo.pRes;
SSDataBlock* pRes = pIaInfo->binfo.pRes;
SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t scanFlag = MAIN_SCAN;
while (1) {
SSDataBlock* pBlock = NULL;
if (miaInfo->prefetchedBlock == NULL) {
if (pMiaInfo->prefetchedBlock == NULL) {
pBlock = downstream->fpSet.getNextFn(downstream);
} else {
pBlock = miaInfo->prefetchedBlock;
miaInfo->prefetchedBlock = NULL;
pBlock = pMiaInfo->prefetchedBlock;
pMiaInfo->prefetchedBlock = NULL;
miaInfo->groupId = pBlock->info.groupId;
pMiaInfo->groupId = pBlock->info.groupId;
}
// no data exists, all query processing is done
if (pBlock == NULL) {
// close last unfinalized time window
if (miaInfo->curTs != INT64_MIN) {
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
miaInfo->curTs = INT64_MIN;
// close last unclosed time window
if (pMiaInfo->curTs != INT64_MIN) {
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
pMiaInfo->curTs = INT64_MIN;
}
doSetOperatorCompleted(pOperator);
break;
}
if (!miaInfo->hasGroupId) {
miaInfo->hasGroupId = true;
miaInfo->groupId = pBlock->info.groupId;
} else if (miaInfo->groupId != pBlock->info.groupId) {
if (pMiaInfo->groupId != pBlock->info.groupId && pMiaInfo->groupId != 0) {
// if there are unclosed time window, close it firstly.
ASSERT(miaInfo->curTs != INT64_MIN);
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
miaInfo->prefetchedBlock = pBlock;
miaInfo->curTs = INT64_MIN;
ASSERT(pMiaInfo->curTs != INT64_MIN);
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
pMiaInfo->prefetchedBlock = pBlock;
pMiaInfo->curTs = INT64_MIN;
pMiaInfo->groupId = 0;
break;
}
getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pIaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
doFilter(miaInfo->pCondition, pRes, NULL);
doFilter(pMiaInfo->pCondition, pRes, NULL);
if (pRes->info.rows >= pOperator->resultInfo.capacity) {
break;
}
}
pRes->info.groupId = miaInfo->groupId;
miaInfo->hasGroupId = false;
pRes->info.groupId = pMiaInfo->groupId;
}
static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
......@@ -5191,8 +5171,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return TSDB_CODE_SUCCESS;
}
......@@ -5201,9 +5180,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
STimeWindow* newWin) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
......@@ -5216,9 +5193,10 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
if (prevGrpWin->groupId != tableGroupId) {
continue;
}
STimeWindow* prevWin = &prevGrpWin->window;
if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
tdListPopNode(miaInfo->groupIntervals, listNode);
}
}
......@@ -5378,7 +5356,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
if (listNode != NULL) {
SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
pRes->info.groupId = grpWin->groupId;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册