提交 fa43fc45 编写于 作者: 5 54liuyao

refactor stream interval build result data

上级 fa3356d3
...@@ -570,7 +570,7 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -570,7 +570,7 @@ typedef struct SStreamIntervalOperatorInfo {
SWinKey delKey; SWinKey delKey;
uint64_t numOfDatapack; uint64_t numOfDatapack;
SArray* pUpdated; SArray* pUpdated;
SHashObj* pUpdatedMap; SSHashObj* pUpdatedMap;
} SStreamIntervalOperatorInfo; } SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo { typedef struct SDataGroupInfo {
......
...@@ -155,7 +155,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in ...@@ -155,7 +155,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) { void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
if (pGroupResInfo->pRows != NULL) { if (pGroupResInfo->pRows != NULL) {
taosArrayDestroyP(pGroupResInfo->pRows, taosMemoryFree); taosArrayDestroy(pGroupResInfo->pRows);
} }
pGroupResInfo->pRows = pArrayList; pGroupResInfo->pRows = pArrayList;
......
...@@ -2589,26 +2589,22 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat ...@@ -2589,26 +2589,22 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SWinKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
int32_t size = 0; int32_t size = 0;
void* pVal = NULL; void* pVal = NULL;
SWinKey key = { int32_t code = streamStateGet(pState, pKey, &pVal, &size);
.ts = *(TSKEY*)pPos->key,
.groupId = pPos->groupId,
};
int32_t code = streamStateGet(pState, &key, &pVal, &size);
ASSERT(code == 0); ASSERT(code == 0);
SResultRow* pRow = (SResultRow*)pVal; SResultRow* pRow = (SResultRow*)pVal;
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one // no results, continue to check the next one
if (pRow->numOfRows == 0) { if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
releaseOutputBuf(pState, &key, pRow); releaseOutputBuf(pState, pKey, pRow);
continue; continue;
} }
if (pBlock->info.id.groupId == 0) { if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pPos->groupId; pBlock->info.id.groupId = pKey->groupId;
void* tbname = NULL; void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
pBlock->info.parTbName[0] = 0; pBlock->info.parTbName[0] = 0;
...@@ -2618,15 +2614,15 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat ...@@ -2618,15 +2614,15 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
tdbFree(tbname); tdbFree(tbname);
} else { } else {
// current value belongs to different group, it can't be packed into one datablock // current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pPos->groupId) { if (pBlock->info.id.groupId != pKey->groupId) {
releaseOutputBuf(pState, &key, pRow); releaseOutputBuf(pState, pKey, pRow);
break; break;
} }
} }
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0); ASSERT(pBlock->info.rows > 0);
releaseOutputBuf(pState, &key, pRow); releaseOutputBuf(pState, pKey, pRow);
break; break;
} }
...@@ -2656,7 +2652,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat ...@@ -2656,7 +2652,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
} }
pBlock->info.rows += pRow->numOfRows; pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pState, &key, pRow); releaseOutputBuf(pState, pKey, pRow);
} }
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
......
...@@ -842,68 +842,61 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { ...@@ -842,68 +842,61 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
} }
static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) { static int32_t saveWinResult(int64_t ts, uint64_t groupId, SSHashObj* pUpdatedMap) {
SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (newPos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
*(int64_t*)newPos->key = ts;
SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey key = {.ts = ts, .groupId = groupId};
if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { tSimpleHashPut(pUpdatedMap, &key, sizeof(SWinKey), NULL, 0);
taosMemoryFree(newPos);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) { static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SSHashObj* pUpdatedMap) {
return saveWinResult(ts, -1, -1, groupId, pUpdatedMap); return saveWinResult(ts, groupId, pUpdatedMap);
} }
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) { static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) {
int32_t size = taosArrayGetSize(pWins); int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SWinKey* pW = taosArrayGet(pWins, i); SWinKey* pW = taosArrayGet(pWins, i);
void* tmp = taosHashGet(pUpdatedMap, pW, sizeof(SWinKey)); void* tmp = tSimpleHashGet(pUpdatedMap, pW, sizeof(SWinKey));
if (tmp) { if (tmp) {
void* value = *(void**)tmp; void* value = *(void**)tmp;
taosMemoryFree(value); taosMemoryFree(value);
taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey)); tSimpleHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
} }
} }
} }
int32_t compareWinRes(void* pKey, void* data, int32_t index) { int32_t compareWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SWinKey* pDataPos = taosArrayGet(res, index); SWinKey* pDataPos = taosArrayGet(res, index);
SResKeyPos* pRKey = (SResKeyPos*)pKey; SWinKey* pWKey = (SWinKey*)pKey;
if (pRKey->groupId > pDataPos->groupId) {
if (pWKey->groupId > pDataPos->groupId) {
return 1; return 1;
} else if (pRKey->groupId < pDataPos->groupId) { } else if (pWKey->groupId < pDataPos->groupId) {
return -1; return -1;
} }
if (*(int64_t*)pRKey->key > pDataPos->ts) { if (pWKey->ts > pDataPos->ts) {
return 1; return 1;
} else if (*(int64_t*)pRKey->key < pDataPos->ts) { } else if (pWKey->ts < pDataPos->ts) {
return -1; return -1;
} }
return 0; return 0;
} }
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) {
taosArraySort(pDelWins, winKeyCmprImpl); taosArraySort(pDelWins, winKeyCmprImpl);
taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL); taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
int32_t delSize = taosArrayGetSize(pDelWins); int32_t delSize = taosArrayGetSize(pDelWins);
if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) { if (tSimpleHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
return; return;
} }
void* pIte = NULL; void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { int32_t iter = 0;
SResKeyPos* pResKey = *(SResKeyPos**)pIte; while ((pIte = tSimpleHashIterate(pUpdatedMap, pIte, &iter)) != NULL) {
int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes); SWinKey* pResKey = tSimpleHashGetKey(pIte, NULL);
if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) { int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinKey);
if (index >= 0 && 0 == compareWinKey(pResKey, pDelWins, index)) {
taosArrayRemove(pDelWins, index); taosArrayRemove(pDelWins, index);
delSize = taosArrayGetSize(pDelWins); delSize = taosArrayGetSize(pDelWins);
} }
...@@ -1352,7 +1345,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) ...@@ -1352,7 +1345,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
} }
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
SHashObj* pUpdatedMap) { SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
...@@ -1388,28 +1381,21 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa ...@@ -1388,28 +1381,21 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
taosArrayPush(pUpWins, &winRes); taosArrayPush(pUpWins, &winRes);
} }
if (pUpdatedMap) { if (pUpdatedMap) {
void* tmp = taosHashGet(pUpdatedMap, &winRes, sizeof(SWinKey)); tSimpleHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
if (tmp) {
void* value = *(void**)tmp;
taosMemoryFree(value);
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
}
} }
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
} while (win.ekey <= endTsCols[i]); } while (win.ekey <= endTsCols[i]);
} }
} }
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) {
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0; int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen); SWinKey* pKey = tSimpleHashGetKey(pIte, NULL);
uint64_t groupId = *(uint64_t*)key; uint64_t groupId = pKey->groupId;
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); TSKEY ts = pKey->ts;
SResultRowPosition* pPos = (SResultRowPosition*)pIte; int32_t code = saveWinResult(ts, groupId, resWins);
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1417,36 +1403,16 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { ...@@ -1417,36 +1403,16 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SWinKey* pDataPos = taosArrayGet(res, index);
SWinKey* pWKey = (SWinKey*)pKey;
if (pWKey->groupId > pDataPos->groupId) {
return 1;
} else if (pWKey->groupId < pDataPos->groupId) {
return -1;
}
if (pWKey->ts > pDataPos->ts) {
return 1;
} else if (pWKey->ts < pDataPos->ts) {
return -1;
}
return 0;
}
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval, static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins, SHashObj* pPullDataMap, SSHashObj* closeWins, SArray* pDelWins,
SOperatorInfo* pOperator) { SOperatorInfo* pOperator) {
qDebug("===stream===close interval window"); qDebug("===stream===close interval window");
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0; int32_t iter = 0;
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t delSize = taosArrayGetSize(pDelWins); int32_t delSize = taosArrayGetSize(pDelWins);
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen); void* key = tSimpleHashGetKey(pIte, NULL);
SWinKey* pWinKey = (SWinKey*)key; SWinKey* pWinKey = (SWinKey*)key;
if (delSize > 0) { if (delSize > 0) {
int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey); int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
...@@ -2157,7 +2123,7 @@ bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) { ...@@ -2157,7 +2123,7 @@ bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) {
return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0); return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0);
} }
static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SHashObj* pUpdatedMap) { static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t size = taosArrayGetSize(pWinArray); int32_t size = taosArrayGetSize(pWinArray);
...@@ -2343,7 +2309,8 @@ static void clearFunctionContext(SExprSupp* pSup) { ...@@ -2343,7 +2309,8 @@ static void clearFunctionContext(SExprSupp* pSup) {
} }
} }
void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { void doBuildStreamIntervalResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// set output datablock version // set output datablock version
pBlock->info.version = pTaskInfo->version; pBlock->info.version = pTaskInfo->version;
...@@ -2370,7 +2337,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN ...@@ -2370,7 +2337,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
} }
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
SHashObj* pUpdatedMap) { SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
...@@ -2516,7 +2483,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2516,7 +2483,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
...@@ -2543,7 +2510,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2543,7 +2510,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
...@@ -2552,11 +2519,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2552,11 +2519,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
if (!pInfo->pUpdated) { if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey));
} }
if (!pInfo->pUpdatedMap) { if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
} }
while (1) { while (1) {
...@@ -2650,12 +2617,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2650,12 +2617,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
void* pIte = NULL; void* pIte = NULL;
while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) { int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
taosArrayPush(pInfo->pUpdated, pIte); taosArrayPush(pInfo->pUpdated, pIte);
} }
taosHashCleanup(pInfo->pUpdatedMap); tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
taosArraySort(pInfo->pUpdated, resultrowComparAsc); taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
...@@ -2675,7 +2643,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2675,7 +2643,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
...@@ -3239,10 +3207,9 @@ static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) ...@@ -3239,10 +3207,9 @@ static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2)
static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) { static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0; int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen); void* key = tSimpleHashGetKey(pIte, NULL);
taosArrayPush(pUpdated, key); taosArrayPush(pUpdated, key);
} }
taosArraySort(pUpdated, sessionKeyCompareAsc); taosArraySort(pUpdated, sessionKeyCompareAsc);
...@@ -3256,13 +3223,12 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo ...@@ -3256,13 +3223,12 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
return; return;
} }
blockDataEnsureCapacity(pBlock, size); blockDataEnsureCapacity(pBlock, size);
size_t keyLen = 0;
int32_t iter = 0; int32_t iter = 0;
while (((*Ite) = tSimpleHashIterate(pStDeleted, *Ite, &iter)) != NULL) { while (((*Ite) = tSimpleHashIterate(pStDeleted, *Ite, &iter)) != NULL) {
if (pBlock->info.rows + 1 > pBlock->info.capacity) { if (pBlock->info.rows + 1 > pBlock->info.capacity) {
break; break;
} }
SSessionKey* res = tSimpleHashGetKey(*Ite, &keyLen); SSessionKey* res = tSimpleHashGetKey(*Ite, NULL);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false); colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
...@@ -3351,7 +3317,6 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS ...@@ -3351,7 +3317,6 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) { int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) {
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0; int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
SResultWindowInfo* pWinInfo = pIte; SResultWindowInfo* pWinInfo = pIte;
...@@ -3362,7 +3327,7 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa ...@@ -3362,7 +3327,7 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa
return code; return code;
} }
} }
SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen); SSessionKey* pKey = tSimpleHashGetKey(pIte, NULL);
tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter); tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
} }
} }
...@@ -4761,7 +4726,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4761,7 +4726,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows > 0) { if (pInfo->binfo.pRes->info.rows > 0) {
printDataBlock(pInfo->binfo.pRes, "single interval"); printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
...@@ -4776,14 +4741,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4776,14 +4741,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (!pInfo->pUpdated) { if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey));
} }
if (!pInfo->pUpdatedMap) { if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
} }
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -4832,19 +4796,21 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4832,19 +4796,21 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pInfo->pUpdatedMap, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL,
pInfo->pDelWins, pOperator); pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
void* pIte = NULL; void* pIte = NULL;
while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) { int32_t iter = 0;
taosArrayPush(pInfo->pUpdated, pIte); while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
SWinKey* pKey = tSimpleHashGetKey(pIte, NULL);
taosArrayPush(pInfo->pUpdated, pKey);
} }
taosArraySort(pInfo->pUpdated, resultrowComparAsc); taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
taosHashCleanup(pInfo->pUpdatedMap); tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
...@@ -4853,7 +4819,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4853,7 +4819,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows > 0) { if (pInfo->binfo.pRes->info.rows > 0) {
printDataBlock(pInfo->binfo.pRes, "single interval"); printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册