提交 44123ec8 编写于 作者: 5 54liuyao

feat(stream):optimzie stream interval

上级 a6a1cfdd
......@@ -1017,6 +1017,7 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
......
......@@ -1277,8 +1277,12 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
}
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
bool init = false;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
if (init) {
continue;
}
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
......@@ -1295,6 +1299,8 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
} else {
pResInfo->initialized = true;
}
} else {
init = true;
}
}
}
......
......@@ -1174,10 +1174,15 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
bool isClosed = false;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
}
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup) &&
if ((update || (isSignleIntervalWindow(pInfo) && isClosed &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) {
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
}
......
......@@ -851,23 +851,34 @@ static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t g
return TSDB_CODE_SUCCESS;
}
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) {
SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (newPos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
*(int64_t*)newPos->key = ts;
SWinRes key = {.ts = ts, .groupId = groupId};
if (taosHashPut(pUpdatedMap, &key, sizeof(SWinRes), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
taosMemoryFree(newPos);
}
return TSDB_CODE_SUCCESS;
}
static void removeResult(SArray* pUpdated, SWinRes* pKey) {
int32_t size = taosArrayGetSize(pUpdated);
int32_t index = binarySearchCom(pUpdated, size, pKey, TSDB_ORDER_DESC, compareResKey);
if (index >= 0 && 0 == compareResKey(pKey, pUpdated, index)) {
taosArrayRemove(pUpdated, index);
}
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);;
}
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
}
static void removeResults(SArray* pWins, SArray* pUpdated) {
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
SWinRes* pW = taosArrayGet(pWins, i);
removeResult(pUpdated, pW);
taosHashRemove(pUpdatedMap, pW, sizeof(SWinRes));
}
}
......@@ -894,11 +905,14 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) {
return -1;
}
static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) {
int32_t upSize = taosArrayGetSize(pUpdated);
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) {
return;
}
int32_t delSize = taosArrayGetSize(pDelWins);
for (int32_t i = 0; i < upSize; i++) {
SResKeyPos* pResKey = taosArrayGetP(pUpdated, i);
void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
SResKeyPos* pResKey = (SResKeyPos*)pIte;
int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes);
if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) {
taosArrayRemove(pDelWins, index);
......@@ -914,7 +928,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) {
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); }
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SArray* pUpdated) {
int32_t scanFlag, SHashObj* pUpdatedMap) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
......@@ -940,7 +954,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated);
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
}
}
......@@ -997,7 +1011,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated);
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
}
......@@ -1437,7 +1451,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
}
}
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
......@@ -1446,7 +1460,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1455,7 +1469,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages,
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages,
SDiskbasedBuf* pDiscBuf) {
qDebug("===stream===close interval window");
void* pIte = NULL;
......@@ -1487,7 +1501,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
}
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1577,11 +1591,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
}
// qInfo("===stream===%ld", pBlock->info.version);
printDataBlock(pBlock, "single interval recv");
if (pBlock->info.type == STREAM_CLEAR) {
......@@ -1594,7 +1611,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
continue;
}
......@@ -1617,17 +1634,24 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
}
pOperator->status = OP_RES_TO_RETURN;
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated,
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
taosArrayPush(pUpdated, pIte);
}
taosHashCleanup(pUpdatedMap);
taosArraySort(pUpdated, resultrowComparAsc);
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
removeDeleteResults(pUpdated, pInfo->pDelWins);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows > 0) {
return pInfo->pDelRes;
......@@ -2831,7 +2855,7 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
}
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
SArray* pUpdated) {
SHashObj* pUpdatedMap) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
......@@ -2913,8 +2937,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
NULL, TSDB_ORDER_ASC);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
......@@ -3020,6 +3044,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
TSKEY maxTs = INT64_MIN;
SExprSupp* pSup = &pOperator->exprSupp;
......@@ -3077,7 +3103,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
clearSpecialDataBlock(pInfo->pUpdateRes);
removeDeleteResults(pUpdated, pInfo->pDelWins);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
pOperator->status = OP_RES_TO_RETURN;
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
break;
......@@ -3104,7 +3130,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
taosArrayDestroy(pUpWins);
continue;
}
removeResults(pUpWins, pUpdated);
removeResults(pUpWins, pUpdatedMap);
copyDataBlock(pInfo->pUpdateRes, pBlock);
// copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
pInfo->returnUpdate = true;
......@@ -3122,15 +3148,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdated);
continue;
}
removeResults(pInfo->pDelWins, pUpdated);
removeResults(pInfo->pDelWins, pUpdatedMap);
break;
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
removeResults(pUpWins, pUpdated);
removeResults(pUpWins, pUpdatedMap);
taosArrayDestroy(pUpWins);
if (taosArrayGetSize(pUpdated) > 0) {
break;
......@@ -3146,7 +3172,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
if (IS_FINAL_OP(pInfo)) {
int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren);
......@@ -3171,12 +3197,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
if (IS_FINAL_OP(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
} else {
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
}
void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
taosArrayPush(pUpdated, pIte);
}
taosHashCleanup(pUpdatedMap);
taosArraySort(pUpdated, resultrowComparAsc);
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
......
......@@ -124,7 +124,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
}
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
pInfo->pCloseWinSBF = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
pInfo->maxVersion = 0;
pInfo->scanGroupId = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册