提交 d210ef1c 编写于 作者: 5 54liuyao

feat(stream): stream max delay

上级 2dd1c31f
......@@ -50,6 +50,7 @@ typedef enum EStreamType {
STREAM_INVERT,
STREAM_REPROCESS,
STREAM_INVALID,
STREAM_GET_ALL,
} EStreamType;
typedef struct {
......
......@@ -1496,6 +1496,7 @@ typedef struct {
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
......
......@@ -167,17 +167,17 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
}
}
}
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
return 0;
}
......
......@@ -750,14 +750,15 @@ int64_t getReskey(void* data, int32_t index) {
return *(int64_t*)pos->key;
}
static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId,
SArray* pUpdated) {
int32_t size = taosArrayGetSize(pUpdated);
int32_t index = binarySearch(pUpdated, size, result->win.skey, TSDB_ORDER_DESC, getReskey);
int32_t index = binarySearch(pUpdated, size, ts, TSDB_ORDER_DESC, getReskey);
if (index == -1) {
index = 0;
} else {
TSKEY resTs = getReskey(pUpdated, index);
if (resTs < result->win.skey) {
if (resTs < ts) {
index++;
} else {
return TSDB_CODE_SUCCESS;
......@@ -769,14 +770,18 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated
return TSDB_CODE_OUT_OF_MEMORY;
}
newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = result->pageId, .offset = result->offset};
*(int64_t*)newPos->key = result->win.skey;
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
*(int64_t*)newPos->key = ts;
if (taosArrayInsert(pUpdated, index, &newPos) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
}
static void removeResult(SArray* pUpdated, TSKEY key) {
int32_t size = taosArrayGetSize(pUpdated);
int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey);
......@@ -819,7 +824,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(pResult, tableGroupId, pUpdated);
saveResultRow(pResult, tableGroupId, pUpdated);
}
}
......@@ -869,7 +874,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(pResult, tableGroupId, pUpdated);
saveResultRow(pResult, tableGroupId, pUpdated);
}
}
......@@ -1243,6 +1248,23 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, SInterva
}
}
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SArray* closeWins) {
void* pIte = NULL;
......@@ -1259,16 +1281,12 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen);
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (pos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pos->groupId = groupId;
pos->pos = *(SResultRowPosition*)pIte;
*(int64_t*)pos->key = ts;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && !taosArrayPush(closeWins, &pos)) {
taosMemoryFree(pos);
return TSDB_CODE_OUT_OF_MEMORY;
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
......@@ -1296,8 +1314,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
SArray* pClosed = taosArrayInit(4, POINTER_BYTES);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
......@@ -1316,19 +1332,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue;
} else if (pBlock->info.type == STREAM_GET_ALL &&
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue;
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
}
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
taosArrayAddAll(pUpdated, pClosed);
taosArrayDestroy(pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
......@@ -1898,7 +1913,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResult(pResult, tableGroupId, pUpdated);
saveResultRow(pResult, tableGroupId, pUpdated);
}
// window start(end) key interpolation
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
......@@ -1993,7 +2008,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
SArray* pClosed = taosArrayInit(4, POINTER_BYTES);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -2042,7 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
taosArrayDestroy(pUpWins);
break;
} else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo) &&
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue;
}
if (isFinalInterval(pInfo)) {
int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren);
......@@ -2064,13 +2083,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
if (isFinalInterval(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
}
taosArrayDestroy(pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册