提交 47cb043f 编写于 作者: 5 54liuyao

feat(stream):optimize update scan range

上级 164dea54
...@@ -52,11 +52,12 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int ...@@ -52,11 +52,12 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
#define START_TS_COLUMN_INDEX 0 #define START_TS_COLUMN_INDEX 0
#define END_TS_COLUMN_INDEX 1 #define END_TS_COLUMN_INDEX 1
#define UID_COLUMN_INDEX 2 #define UID_COLUMN_INDEX 2
#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX #define GROUPID_COLUMN_INDEX 3
#define DELETE_GROUPID_COLUMN_INDEX 2 #define CALCULATE_START_TS_COLUMN_INDEX 4
#define CALCULATE_END_TS_COLUMN_INDEX 5
enum { enum {
// when this task starts to execute, this status will set // when this task starts to execute, this status will set
...@@ -346,7 +347,6 @@ typedef enum EStreamScanMode { ...@@ -346,7 +347,6 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1, STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES, STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES, STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER, // todo(liuyao) delete it
STREAM_SCAN_FROM_DATAREADER_RETRIEVE, STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
STREAM_SCAN_FROM_DATAREADER_RANGE, STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode; } EStreamScanMode;
...@@ -366,7 +366,7 @@ typedef struct SStreamAggSupporter { ...@@ -366,7 +366,7 @@ typedef struct SStreamAggSupporter {
char* pKeyBuf; // window key buffer char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SArray* pScanWindow; SSDataBlock* pScanBlock;
} SStreamAggSupporter; } SStreamAggSupporter;
typedef struct SessionWindowSupporter { typedef struct SessionWindowSupporter {
...@@ -419,7 +419,7 @@ typedef struct SStreamScanInfo { ...@@ -419,7 +419,7 @@ typedef struct SStreamScanInfo {
int32_t deleteDataIndex; int32_t deleteDataIndex;
STimeWindow updateWin; STimeWindow updateWin;
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes;
// status for tmq // status for tmq
// SSchemaWrapper schema; // SSchemaWrapper schema;
STqOffset offset; STqOffset offset;
...@@ -712,7 +712,6 @@ typedef struct SStreamStateAggOperatorInfo { ...@@ -712,7 +712,6 @@ typedef struct SStreamStateAggOperatorInfo {
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
SHashObj* pSeDeleted; SHashObj* pSeDeleted;
void* pDelIterator; void* pDelIterator;
SArray* pScanWindow;
SArray* pChildren; // cache for children's result; SArray* pChildren; // cache for children's result;
bool ignoreExpiredData; bool ignoreExpiredData;
} SStreamStateAggOperatorInfo; } SStreamStateAggOperatorInfo;
...@@ -954,6 +953,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, ...@@ -954,6 +953,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
...@@ -970,7 +970,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -970,7 +970,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
SSDataBlock* createPullDataBlock(); SSDataBlock* createSpecialDataBlock(EStreamType type);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -5136,8 +5136,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF ...@@ -5136,8 +5136,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
} }
pSup->valueSize = size; pSup->valueSize = size;
pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow)); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
int32_t pageSize = 4096; int32_t pageSize = 4096;
while (pageSize < pSup->resultRowSize * 4) { while (pageSize < pSup->resultRowSize * 4) {
pageSize <<= 1u; pageSize <<= 1u;
......
...@@ -1373,8 +1373,10 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, ...@@ -1373,8 +1373,10 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t numOfOutput, static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t numOfOutput,
SSDataBlock* pBlock, SArray* pUpWins) { SSDataBlock* pBlock, SArray* pUpWins) {
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsCols = (TSKEY*)pTsCol->pData; TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData;
uint64_t* pGpDatas = NULL; uint64_t* pGpDatas = NULL;
if (pBlock->info.type == STREAM_RETRIEVE) { if (pBlock->info.type == STREAM_RETRIEVE) {
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
...@@ -1382,22 +1384,18 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* ...@@ -1382,22 +1384,18 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
} }
int32_t step = 0; int32_t step = 0;
int32_t startPos = 0; int32_t startPos = 0;
SResultRowInfo dumyInfo; for (int32_t i = 0; i < pBlock->info.rows; i++) {
dumyInfo.cur.pageId = -1; SResultRowInfo dumyInfo;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[0], pInterval, TSDB_ORDER_ASC); dumyInfo.cur.pageId = -1;
while (1) { STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
step = while (win.ekey <= endTsCols[i]) {
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId;
uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput);
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); if (pUpWins && res) {
if (pUpWins && res) { SWinRes winRes = {.ts = win.skey, .groupId = winGpId};
SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; taosArrayPush(pUpWins, &winRes);
taosArrayPush(pUpWins, &winRes); }
} getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
int32_t prevEndPos = step - 1 + startPos;
startPos = getNextQualifiedWindow(pInterval, &win, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
if (startPos < 0) {
break;
} }
} }
} }
...@@ -1501,7 +1499,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo ...@@ -1501,7 +1499,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo
} }
blockDataEnsureCapacity(pBlock, size - *index); blockDataEnsureCapacity(pBlock, size - *index);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, DELETE_GROUPID_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = *index; i < size; i++) { for (int32_t i = *index; i < size; i++) {
SWinRes* pWin = taosArrayGet(pWins, i); SWinRes* pWin = taosArrayGet(pWins, i);
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false);
...@@ -1793,10 +1791,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1793,10 +1791,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
pInfo->delIndex = 0; pInfo->delIndex = 0;
// pInfo->pDelRes = createPullDataBlock(); todo(liuyao) for delete pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "TimeIntervalAggOperator"; pOperator->name = "TimeIntervalAggOperator";
...@@ -2598,14 +2593,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2598,14 +2593,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
chId = getChildIndex(pSDataBlock); chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
} }
// if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) {
// qDebug("===stream===delete child id %d", chId);
// taosArrayRemove(chArray, index);
// if (taosArrayGetSize(chArray) == 0) {
// // pull data is over
// taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
// }
// }
if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) { if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
ignore = false; ignore = false;
} }
...@@ -2697,16 +2684,18 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB ...@@ -2697,16 +2684,18 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
} }
blockDataEnsureCapacity(pBlock, size - (*pIndex)); blockDataEnsureCapacity(pBlock, size - (*pIndex));
ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock)); ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock));
SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
for (; (*pIndex) < size; (*pIndex)++) { for (; (*pIndex) < size; (*pIndex)++) {
SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex)); SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex));
SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false); colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false); colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
pBlock->info.rows++; pBlock->info.rows++;
} }
if ((*pIndex) == size) { if ((*pIndex) == size) {
...@@ -2825,7 +2814,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2825,7 +2814,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
continue; continue;
} }
removeResults(pUpWins, pUpdated); removeResults(pUpWins, pUpdated);
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); copyDataBlock(pInfo->pUpdateRes, pBlock);
// copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
pInfo->returnUpdate = true; pInfo->returnUpdate = true;
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
break; break;
...@@ -2933,12 +2923,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2933,12 +2923,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
SSDataBlock* createPullDataBlock() { SSDataBlock* createSpecialDataBlock(EStreamType type) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false; pBlock->info.hasVarCol = false;
pBlock->info.groupId = 0; pBlock->info.groupId = 0;
pBlock->info.rows = 0; pBlock->info.rows = 0;
pBlock->info.type = STREAM_RETRIEVE; pBlock->info.type = type;
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t); pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t);
pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData));
...@@ -2952,6 +2942,14 @@ SSDataBlock* createPullDataBlock() { ...@@ -2952,6 +2942,14 @@ SSDataBlock* createPullDataBlock() {
infoData.info.type = TSDB_DATA_TYPE_UBIGINT; infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t); infoData.info.bytes = sizeof(uint64_t);
// uid
taosArrayPush(pBlock->pDataBlock, &infoData);
// group id
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate end ts
taosArrayPush(pBlock->pDataBlock, &infoData); taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock; return pBlock;
...@@ -3019,8 +3017,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3019,8 +3017,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error; goto _error;
} }
} }
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
pInfo->pUpdateRes->info.type = STREAM_CLEAR;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->returnUpdate = false; pInfo->returnUpdate = false;
...@@ -3042,11 +3039,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3042,11 +3039,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pullIndex = 0; pInfo->pullIndex = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createPullDataBlock(); pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
// pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
...@@ -3061,7 +3056,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3061,7 +3056,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL); aggEncodeResultRow, aggDecodeResultRow, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type);
}
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -3086,6 +3083,7 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { ...@@ -3086,6 +3083,7 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
} }
taosHashCleanup(pSup->pResultRows); taosHashCleanup(pSup->pResultRows);
destroyDiskbasedBuf(pSup->pResultBuf); destroyDiskbasedBuf(pSup->pResultBuf);
blockDataDestroy(pSup->pScanBlock);
} }
void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -3200,7 +3198,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3200,7 +3198,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL; pInfo->pDelIterator = NULL;
// pInfo->pDelRes = createPullDataBlock(); // pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
...@@ -3559,7 +3557,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc ...@@ -3559,7 +3557,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData; TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endDatas = (TSKEY*)pEndTsCol->pData; TSKEY* endDatas = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* gpDatas = (uint64_t*)pGroupCol->pData; uint64_t* gpDatas = (uint64_t*)pGroupCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
int32_t winIndex = 0; int32_t winIndex = 0;
...@@ -4255,7 +4253,6 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc ...@@ -4255,7 +4253,6 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual, step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual,
pSeDeleted); pSeDeleted);
ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData)); ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData));
taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex); deleteWindow(pAggSup->pCurWins, winIndex);
} }
...@@ -4280,8 +4277,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4280,8 +4277,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
} else { } else {
return; return;
} }
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
blockDataEnsureCapacity(pAggSup->pScanBlock, pSDataBlock->info.rows);
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) { for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) {
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) { if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
...@@ -4296,7 +4294,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4296,7 +4294,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i, winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i,
&allEqual, pInfo->pSeDeleted); &allEqual, pInfo->pSeDeleted);
if (!allEqual) { if (!allEqual) {
taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
&pSDataBlock->info.groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex); deleteWindow(pAggSup->pCurWins, winIndex);
continue; continue;
...@@ -4460,7 +4459,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4460,7 +4459,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL; pInfo->pDelIterator = NULL;
// pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete // pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
......
...@@ -476,16 +476,16 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -476,16 +476,16 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type; int32_t type = pDestCtx->input.pData[0]->info.type;
int32_t bytes = pDestCtx->input.pData[0]->info.bytes; int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) > *(TSKEY*)(pSBuf + bytes))) { if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts > pSBuf->ts)) {
memcpy(pDBuf, pSBuf, bytes); memcpy(pDBuf->buf, pSBuf->buf, bytes);
*(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes); pDBuf->ts = pSBuf->ts;
pDResInfo->numOfRes = 1; pDResInfo->numOfRes = 1;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2994,16 +2994,16 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2994,16 +2994,16 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
// todo rewrite: // todo rewrite:
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type; int32_t type = pDestCtx->input.pData[0]->info.type;
int32_t bytes = pDestCtx->input.pData[0]->info.bytes; int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) < *(TSKEY*)(pSBuf + bytes))) { if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts < pSBuf->ts)) {
memcpy(pDBuf, pSBuf, bytes); memcpy(pDBuf->buf, pSBuf->buf, bytes);
*(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes); pDBuf->ts = pSBuf->ts;
pDResInfo->numOfRes = 1; pDResInfo->numOfRes = 1;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -367,7 +367,7 @@ if $data32 != 8 then ...@@ -367,7 +367,7 @@ if $data32 != 8 then
endi endi
#$loop_all = 0 #$loop_all = 0
#looptest: #=looptest:
sql drop database IF EXISTS test2; sql drop database IF EXISTS test2;
sql drop stream IF EXISTS streams21; sql drop stream IF EXISTS streams21;
...@@ -511,6 +511,6 @@ endi ...@@ -511,6 +511,6 @@ endi
$loop_all = $loop_all + 1 $loop_all = $loop_all + 1
print ============loop_all=$loop_all print ============loop_all=$loop_all
#goto looptest #=goto looptest
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册