未验证 提交 e0ac1027 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16376 from taosdata/feature/stream

refactor(stream): refine stream backend interface
...@@ -96,7 +96,7 @@ int32_t create_stream() { ...@@ -96,7 +96,7 @@ int32_t create_stream() {
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, pRes = taos_query(pConn,
"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)"); "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -44,6 +44,30 @@ enum { ...@@ -44,6 +44,30 @@ enum {
) )
// clang-format on // clang-format on
typedef struct {
TSKEY ts;
uint64_t groupId;
} SWinKey;
static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2;
if (pWin1->groupId > pWin2->groupId) {
return 1;
} else if (pWin1->groupId < pWin2->groupId) {
return -1;
}
if (pWin1->ts > pWin2->ts) {
return 1;
} else if (pWin1->ts < pWin2->ts) {
return -1;
}
return 0;
}
enum { enum {
TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__POLL_RSP,
......
...@@ -551,16 +551,17 @@ typedef struct { ...@@ -551,16 +551,17 @@ typedef struct {
} SStreamStateCur; } SStreamStateCur;
#if 1 #if 1
int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen); int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen); int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen); int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
void streamFreeVal(void* val);
SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen); SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen); SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key);
void streamStateFreeCur(SStreamStateCur* pCur); void streamStateFreeCur(SStreamStateCur* pCur);
int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen); int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
......
...@@ -652,27 +652,33 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ...@@ -652,27 +652,33 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
// expand executor // expand executor
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
if (pTask->pState == NULL) {
return -1;
}
SReadHandle handle = { SReadHandle handle = {
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.initTqReader = 1, .initTqReader = 1,
.pStateBackend = pTask->pState,
}; };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor); ASSERT(pTask->exec.executor);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) { } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
if (pTask->pState == NULL) {
return -1;
}
SReadHandle mgHandle = { SReadHandle mgHandle = {
.vnode = NULL, .vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
.pStateBackend = pTask->pState,
}; };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
ASSERT(pTask->exec.executor); ASSERT(pTask->exec.executor);
} }
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
if (pTask->pState == NULL) {
return -1;
}
// sink // sink
/*pTask->ahandle = pTq->pVnode;*/ /*pTask->ahandle = pTq->pVnode;*/
if (pTask->outputType == TASK_OUTPUT__SMA) { if (pTask->outputType == TASK_OUTPUT__SMA) {
......
...@@ -1281,6 +1281,42 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1281,6 +1281,42 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
#if 0
SStreamState* pState = pTaskInfo->streamInfo.pState;
if (pState) {
printf(">>>>>>>> stream write backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char tmp[100] = "abcdefg1";
if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
ASSERT(0);
}
key.ts = 2;
char tmp2[100] = "abcdefg2";
if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
ASSERT(0);
}
key.groupId = 5;
key.ts = 1;
char tmp3[100] = "abcdefg3";
if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
ASSERT(0);
}
char* val2 = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val2, sz);
streamFreeVal(val2);
}
#endif
qDebug("stream scan called"); qDebug("stream scan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) { while (1) {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "executorimpl.h" #include "executorimpl.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tfill.h" #include "tfill.h"
...@@ -27,11 +28,6 @@ typedef enum SResultTsInterpType { ...@@ -27,11 +28,6 @@ typedef enum SResultTsInterpType {
#define IS_FINAL_OP(op) ((op)->isFinal) #define IS_FINAL_OP(op) ((op)->isFinal)
typedef struct SWinRes {
TSKEY ts;
uint64_t groupId;
} SWinRes;
typedef struct SPullWindowInfo { typedef struct SPullWindowInfo {
STimeWindow window; STimeWindow window;
uint64_t groupId; uint64_t groupId;
...@@ -641,7 +637,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -641,7 +637,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows,
numOfExprs);
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pr); closeResultRow(pr);
...@@ -812,7 +809,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { ...@@ -812,7 +809,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
int32_t compareResKey(void* pKey, void* data, int32_t index) { int32_t compareResKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index); SResKeyPos* pos = taosArrayGetP(res, index);
SWinRes* pData = (SWinRes*)pKey; SWinKey* pData = (SWinKey*)pKey;
if (pData->ts == *(int64_t*)pos->key) { if (pData->ts == *(int64_t*)pos->key) {
if (pData->groupId > pos->groupId) { if (pData->groupId > pos->groupId) {
return 1; return 1;
...@@ -828,7 +825,7 @@ int32_t compareResKey(void* pKey, void* data, int32_t index) { ...@@ -828,7 +825,7 @@ int32_t compareResKey(void* pKey, void* data, int32_t index) {
static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SArray* pUpdated) { static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SArray* pUpdated) {
int32_t size = taosArrayGetSize(pUpdated); int32_t size = taosArrayGetSize(pUpdated);
SWinRes data = {.ts = ts, .groupId = groupId}; SWinKey data = {.ts = ts, .groupId = groupId};
int32_t index = binarySearchCom(pUpdated, size, &data, TSDB_ORDER_DESC, compareResKey); int32_t index = binarySearchCom(pUpdated, size, &data, TSDB_ORDER_DESC, compareResKey);
if (index == -1) { if (index == -1) {
index = 0; index = 0;
...@@ -861,8 +858,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ ...@@ -861,8 +858,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
newPos->groupId = groupId; newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
*(int64_t*)newPos->key = ts; *(int64_t*)newPos->key = ts;
SWinRes key = {.ts = ts, .groupId = groupId}; SWinKey key = {.ts = ts, .groupId = groupId};
if (taosHashPut(pUpdatedMap, &key, sizeof(SWinRes), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
taosMemoryFree(newPos); taosMemoryFree(newPos);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -879,20 +876,20 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda ...@@ -879,20 +876,20 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) { static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
int32_t size = taosArrayGetSize(pWins); int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SWinRes* pW = taosArrayGet(pWins, i); SWinKey* pW = taosArrayGet(pWins, i);
taosHashRemove(pUpdatedMap, pW, sizeof(SWinRes)); taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
} }
} }
int64_t getWinReskey(void* data, int32_t index) { int64_t getWinReskey(void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SWinRes* pos = taosArrayGet(res, index); SWinKey* pos = taosArrayGet(res, index);
return pos->ts; return pos->ts;
} }
int32_t compareWinRes(void* pKey, void* data, int32_t index) { int32_t compareWinRes(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SWinRes* pos = taosArrayGetP(res, index); SWinKey* pos = taosArrayGetP(res, index);
SResKeyPos* pData = (SResKeyPos*)pKey; SResKeyPos* pData = (SResKeyPos*)pKey;
if (*(int64_t*)pData->key == pos->ts) { if (*(int64_t*)pData->key == pos->ts) {
if (pData->groupId > pos->groupId) { if (pData->groupId > pos->groupId) {
...@@ -985,8 +982,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -985,8 +982,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
pBlock->info.rows, numOfOutput); numOfOutput);
} }
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
...@@ -1025,8 +1022,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1025,8 +1022,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
pBlock->info.rows, numOfOutput); numOfOutput);
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
} }
...@@ -1214,8 +1211,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1214,8 +1211,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
} }
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
...@@ -1418,7 +1415,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, ...@@ -1418,7 +1415,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC);
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
if (pUpWins) { if (pUpWins) {
SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]}; SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]};
taosArrayPush(pUpWins, &winRes); taosArrayPush(pUpWins, &winRes);
} }
} }
...@@ -1445,7 +1442,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* ...@@ -1445,7 +1442,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId;
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput);
if (pUpWins && res) { if (pUpWins && res) {
SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
taosArrayPush(pUpWins, &winRes); taosArrayPush(pUpWins, &winRes);
} }
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
...@@ -1484,11 +1481,11 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1484,11 +1481,11 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
STimeWindow win; STimeWindow win;
win.skey = ts; win.skey = ts;
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
SWinRes winRe = { SWinKey winRe = {
.ts = win.skey, .ts = win.skey,
.groupId = groupId, .groupId = groupId,
}; };
void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes)); void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinKey));
if (isCloseWindow(&win, pSup)) { if (isCloseWindow(&win, pSup)) {
if (chIds && pPullDataMap) { if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**)chIds; SArray* chAy = *(SArray**)chIds;
...@@ -1555,7 +1552,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo ...@@ -1555,7 +1552,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = *index; i < size; i++) { for (int32_t i = *index; i < size; i++) {
SWinRes* pWin = taosArrayGet(pWins, i); SWinKey* pWin = taosArrayGet(pWins, i);
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false);
colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&pWin->groupId, false); colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&pWin->groupId, false);
pBlock->info.rows++; pBlock->info.rows++;
...@@ -1595,6 +1592,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1595,6 +1592,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
SStreamState* pState = pTaskInfo->streamInfo.pState;
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -1639,6 +1639,35 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1639,6 +1639,35 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
} }
#if 0
if (pState) {
printf(">>>>>>>> stream read backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char* val = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val, sz);
streamFreeVal(val);
SStreamStateCur* pCur = streamStateGetCur(pState, &key);
ASSERT(pCur);
while (streamStateCurNext(pState, pCur) == 0) {
SWinKey key1;
const void* val1;
if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) {
break;
}
printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz);
}
streamStateFreeCur(pCur);
}
#endif
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
...@@ -1857,7 +1886,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1857,7 +1886,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
} }
} }
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
...@@ -1958,8 +1987,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1958,8 +1987,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
} }
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
...@@ -2811,7 +2840,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr ...@@ -2811,7 +2840,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
return; return;
} }
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SWinRes* pWinRes = taosArrayGet(pWinArray, i); SWinKey* pWinRes = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
STimeWindow ParentWin = {.skey = pWinRes->ts, .ekey = pWinRes->ts + 1}; STimeWindow ParentWin = {.skey = pWinRes->ts, .ekey = pWinRes->ts + 1};
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &ParentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx, setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &ParentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx,
...@@ -2854,12 +2883,12 @@ int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* ...@@ -2854,12 +2883,12 @@ int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY*
return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC); return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
} }
void addPullWindow(SHashObj* pMap, SWinRes* pWinRes, int32_t size) { void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
SArray* childIds = taosArrayInit(8, sizeof(int32_t)); SArray* childIds = taosArrayInit(8, sizeof(int32_t));
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
taosArrayPush(childIds, &i); taosArrayPush(childIds, &i);
} }
taosHashPut(pMap, pWinRes, sizeof(SWinRes), &childIds, sizeof(void*)); taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*));
} }
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
...@@ -2906,11 +2935,11 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2906,11 +2935,11 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
} }
if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) {
bool ignore = true; bool ignore = true;
SWinRes winRes = { SWinKey winRes = {
.ts = nextWin.skey, .ts = nextWin.skey,
.groupId = tableGroupId, .groupId = tableGroupId,
}; };
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes)); void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) { if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId}; SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
// add pull data request // add pull data request
...@@ -3038,8 +3067,8 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { ...@@ -3038,8 +3067,8 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock); int32_t chId = getChildIndex(pBlock);
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
SWinRes winRes = {.ts = tsData[i], .groupId = groupIdData[i]}; SWinKey winRes = {.ts = tsData[i], .groupId = groupIdData[i]};
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinRes)); void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
if (chIds) { if (chIds) {
SArray* chArray = *(SArray**)chIds; SArray* chArray = *(SArray**)chIds;
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
...@@ -3048,7 +3077,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { ...@@ -3048,7 +3077,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
taosArrayRemove(chArray, index); taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) { if (taosArrayGetSize(chArray) == 0) {
// pull data is over // pull data is over
taosHashRemove(pMap, &winRes, sizeof(SWinRes)); taosHashRemove(pMap, &winRes, sizeof(SWinKey));
} }
} }
} }
...@@ -3132,7 +3161,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3132,7 +3161,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) { } else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
...@@ -3170,7 +3199,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3170,7 +3199,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
removeResults(pUpWins, pUpdatedMap); removeResults(pUpWins, pUpdatedMap);
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
...@@ -3385,7 +3414,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3385,7 +3414,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
...@@ -3720,8 +3749,8 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS ...@@ -3720,8 +3749,8 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS
} }
if (pWinInfo->win.skey > pStartTs[i]) { if (pWinInfo->win.skey > pStartTs[i]) {
if (pStDeleted && pWinInfo->isOutput) { if (pStDeleted && pWinInfo->isOutput) {
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId}; SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId};
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey));
pWinInfo->isOutput = false; pWinInfo->isOutput = false;
} }
pWinInfo->win.skey = pStartTs[i]; pWinInfo->win.skey = pStartTs[i];
...@@ -3839,8 +3868,8 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, ...@@ -3839,8 +3868,8 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo); compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo);
taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition)); taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition));
if (pWinInfo->isOutput) { if (pWinInfo->isOutput) {
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId}; SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId};
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey));
pWinInfo->isOutput = false; pWinInfo->isOutput = false;
} }
taosArrayRemove(pInfo->streamAggSup.pCurWins, i); taosArrayRemove(pInfo->streamAggSup.pCurWins, i);
...@@ -3902,8 +3931,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -3902,8 +3931,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
pCurWin->isClosed = false; pCurWin->isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId}; SWinKey value = {.ts = pCurWin->win.skey, .groupId = groupId};
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes)); code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -3955,8 +3984,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, ...@@ -3955,8 +3984,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
int32_t step = 0; int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = SResultWindowInfo* pCurWin = getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex);
getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex);
if (!pCurWin || pCurWin->pos.pageId == -1) { if (!pCurWin || pCurWin->pos.pageId == -1) {
// window has been closed. // window has been closed.
step = 1; step = 1;
...@@ -3981,9 +4009,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) { ...@@ -3981,9 +4009,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) {
if (pos == NULL) { if (pos == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
pos->groupId = ((SWinRes*)pData)->groupId; pos->groupId = ((SWinKey*)pData)->groupId;
pos->pos = *(SResultRowPosition*)key; pos->pos = *(SResultRowPosition*)key;
*(int64_t*)pos->key = ((SWinRes*)pData)->ts; *(int64_t*)pos->key = ((SWinKey*)pData)->ts;
taosArrayPush(pUpdated, &pos); taosArrayPush(pUpdated, &pos);
} }
taosArraySort(pUpdated, resultrowComparAsc); taosArraySort(pUpdated, resultrowComparAsc);
...@@ -3999,7 +4027,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It ...@@ -3999,7 +4027,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
blockDataEnsureCapacity(pBlock, size); blockDataEnsureCapacity(pBlock, size);
size_t keyLen = 0; size_t keyLen = 0;
while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) {
SWinRes* res = *Ite; SWinKey* res = *Ite;
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
...@@ -4130,8 +4158,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) { ...@@ -4130,8 +4158,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) {
int32_t size = taosArrayGetSize(pResWins); int32_t size = taosArrayGetSize(pResWins);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i); SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i);
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; SWinKey res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId};
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey));
} }
} }
...@@ -4169,14 +4197,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4169,14 +4197,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_CLEAR) { if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, pOperator->exprSupp.numOfExprs, 0, doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX,
pWins); pOperator->exprSupp.numOfExprs, 0, pWins);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs, doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX,
0, NULL); pChildOp->exprSupp.numOfExprs, 0, NULL);
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator); rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
} }
taosArrayDestroy(pWins); taosArrayDestroy(pWins);
...@@ -4578,7 +4606,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_ ...@@ -4578,7 +4606,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_
} }
int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, uint64_t groupId, int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, uint64_t groupId,
SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, SHashObj* pSeDeleted) { SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual,
SHashObj* pSeDeleted) {
*allEqual = true; *allEqual = true;
SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex); SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex);
for (int32_t i = start; i < rows; ++i) { for (int32_t i = start; i < rows; ++i) {
...@@ -4599,9 +4628,8 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u ...@@ -4599,9 +4628,8 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
} }
if (pWinInfo->winInfo.win.skey > pTs[i]) { if (pWinInfo->winInfo.win.skey > pTs[i]) {
if (pSeDeleted && pWinInfo->winInfo.isOutput) { if (pSeDeleted && pWinInfo->winInfo.isOutput) {
SWinRes res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId}; SWinKey res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId};
taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res, taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey));
sizeof(SWinRes));
pWinInfo->winInfo.isOutput = false; pWinInfo->winInfo.isOutput = false;
} }
pWinInfo->winInfo.win.skey = pTs[i]; pWinInfo->winInfo.win.skey = pTs[i];
...@@ -4614,14 +4642,14 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u ...@@ -4614,14 +4642,14 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
return rows - start; return rows - start;
} }
static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SHashObj* pSeUpdated,
SHashObj* pSeUpdated, SHashObj* pSeDeleted) { SHashObj* pSeDeleted) {
SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pGroupColInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGroupColInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
TSKEY* tsCol = (TSKEY*)pTsColInfo->pData; TSKEY* tsCol = (TSKEY*)pTsColInfo->pData;
bool allEqual = false; bool allEqual = false;
int32_t step = 1; int32_t step = 1;
uint64_t* gpCol = (uint64_t*) pGroupColInfo->pData; uint64_t* gpCol = (uint64_t*)pGroupColInfo->pData;
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0; int32_t winIndex = 0;
SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], gpCol[i], &winIndex); SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], gpCol[i], &winIndex);
...@@ -4665,13 +4693,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4665,13 +4693,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
char* pKeyData = colDataGetData(pKeyColInfo, i); char* pKeyData = colDataGetData(pKeyColInfo, i);
int32_t winIndex = 0; int32_t winIndex = 0;
bool allEqual = true; bool allEqual = true;
SStateWindowInfo* pCurWin = SStateWindowInfo* pCurWin = getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex);
getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex); winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo, pSDataBlock->info.rows,
winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo, i, &allEqual, pStDeleted);
pSDataBlock->info.rows, i, &allEqual, pStDeleted);
if (!allEqual) { if (!allEqual) {
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, GROUPID_COLUMN_INDEX,
GROUPID_COLUMN_INDEX, &groupId); &groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue; continue;
...@@ -4682,8 +4709,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4682,8 +4709,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
} }
pCurWin->winInfo.isClosed = false; pCurWin->winInfo.isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId}; SWinKey value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId};
code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes)); code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
......
...@@ -358,7 +358,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -358,7 +358,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
FAIL_SHUFFLE_DISPATCH: FAIL_SHUFFLE_DISPATCH:
if (pReqs) { if (pReqs) {
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
taosArrayDestroy(pReqs[i].data); taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroy(pReqs[i].dataLen); taosArrayDestroy(pReqs[i].dataLen);
} }
taosMemoryFree(pReqs); taosMemoryFree(pReqs);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "executor.h" #include "executor.h"
#include "streamInc.h" #include "streamInc.h"
#include "tcommon.h"
#include "ttimer.h" #include "ttimer.h"
SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
...@@ -23,14 +24,18 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { ...@@ -23,14 +24,18 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
char statePath[200]; char statePath[300];
sprintf(statePath, "%s/%d", path, pTask->taskId); sprintf(statePath, "%s/%d", path, pTask->taskId);
if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) { if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {
goto _err; goto _err;
} }
// open state storage backend // open state storage backend
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) { if (tdbTbOpen("state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pStateDb) < 0) {
goto _err;
}
if (streamStateBegin(pState) < 0) {
goto _err; goto _err;
} }
...@@ -60,6 +65,7 @@ int32_t streamStateBegin(SStreamState* pState) { ...@@ -60,6 +65,7 @@ int32_t streamStateBegin(SStreamState* pState) {
} }
if (tdbBegin(pState->db, &pState->txn) < 0) { if (tdbBegin(pState->db, &pState->txn) < 0) {
tdbTxnClose(&pState->txn);
return -1; return -1;
} }
return 0; return 0;
...@@ -95,33 +101,39 @@ int32_t streamStateAbort(SStreamState* pState) { ...@@ -95,33 +101,39 @@ int32_t streamStateAbort(SStreamState* pState) {
return 0; return 0;
} }
int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) { int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn); return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn);
} }
int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) { int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen); return tdbTbGet(pState->pStateDb, key, sizeof(SWinKey), pVal, pVLen);
} }
int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) { int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn); return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn);
} }
SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) { SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
int32_t c; int32_t c;
tdbTbcMoveTo(pCur->pCur, key, kLen, &c); tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
if (c != 0) { if (c != 0) {
taosMemoryFree(pCur); taosMemoryFree(pCur);
return NULL; return NULL;
} }
return 0; return pCur;
} }
int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) { int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen); const SWinKey* pKTmp = NULL;
int32_t kLen;
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
return -1;
}
*pKey = *pKTmp;
return 0;
} }
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
...@@ -134,14 +146,14 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { ...@@ -134,14 +146,14 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
return tdbTbcMoveToLast(pCur->pCur); return tdbTbcMoveToLast(pCur->pCur);
} }
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) { SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
int32_t c; int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
taosMemoryFree(pCur); taosMemoryFree(pCur);
return NULL; return NULL;
} }
...@@ -155,14 +167,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, i ...@@ -155,14 +167,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, i
return pCur; return pCur;
} }
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) { SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
int32_t c; int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
taosMemoryFree(pCur); taosMemoryFree(pCur);
return NULL; return NULL;
} }
...@@ -185,3 +197,9 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { ...@@ -185,3 +197,9 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
// //
return tdbTbcMoveToPrev(pCur->pCur); return tdbTbcMoveToPrev(pCur->pCur);
} }
void streamStateFreeCur(SStreamStateCur* pCur) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur);
}
void streamFreeVal(void* val) { tdbFree(val); }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册