提交 aa20cfed 编写于 作者: 5 54liuyao

feat(stream): watermark and trigger

上级 c1ed5d58
......@@ -56,6 +56,9 @@ typedef struct SScanLogicNode {
int8_t intervalUnit;
int8_t slidingUnit;
SNode* pTagCond;
int8_t triggerType;
int64_t watermark;
int16_t tsColId;
} SScanLogicNode;
typedef struct SJoinLogicNode {
......@@ -216,6 +219,9 @@ typedef struct STableScanPhysiNode {
int64_t sliding;
int8_t intervalUnit;
int8_t slidingUnit;
int8_t triggerType;
int64_t watermark;
int16_t tsColId;
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;
......
......@@ -399,7 +399,6 @@ typedef struct SStreamBlockScanInfo {
EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SCatchSupporter childAggSup;
SArray* childIds;
SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
......@@ -441,6 +440,7 @@ typedef struct SAggSupporter {
typedef struct STimeWindowSupp {
int8_t calTrigger;
int64_t waterMark;
TSKEY maxTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
......@@ -572,6 +572,7 @@ typedef struct SResultWindowInfo {
SResultRowPosition pos;
STimeWindow win;
bool isOutput;
bool isClosed;
} SResultWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
......@@ -742,7 +743,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup, int16_t tsColId);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
......@@ -799,8 +802,6 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
const char* pDir);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap,
......
......@@ -233,7 +233,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows);
taosArrayDestroyP(pGroupResInfo->pRows, taosMemoryFree);
}
pGroupResInfo->pRows = pArrayList;
......
......@@ -4492,7 +4492,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp twSup = {.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
......@@ -4508,7 +4509,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle,
tableIdList, pTableScanNode, pTaskInfo, &twSup, pTableScanNode->tsColId);
taosArrayDestroy(tableIdList);
return pOperator;
......@@ -4608,7 +4610,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType};
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
......@@ -5169,20 +5172,6 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir) {
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);
if (pCatchSup->pKeyBuf == NULL || pCatchSup->pWindowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t pageSize = rowSize * 32;
int32_t bufSize = pageSize * 4096;
return createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir);
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
......
......@@ -755,91 +755,6 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti
return NULL;
}
void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId, TSKEY ts) {
int64_t* pKey = (int64_t*)pSup->pKeyBuf;
pKey[0] = groupId;
pKey[1] = childId;
pKey[2] = ts;
}
static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t pageId, int32_t tsIndex,
int64_t childId) {
SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex);
TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
for (int32_t i = 0; i < pDataBlock->info.rows; i++) {
setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]);
SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize);
if (p1 == NULL) {
SWindowPosition pos = {.pageId = pageId, .rowId = i};
int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos, sizeof(SWindowPosition));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
p1->pageId = pageId;
p1->rowId = i;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t tsIndex, int64_t childId) {
int32_t start = 0;
int32_t stop = 0;
int32_t pageSize = getBufPageSize(pSup->pDataBuf);
while (start < pDataBlock->info.rows) {
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize);
SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
if (pDB == NULL) {
return terrno;
}
int32_t pageId = -1;
void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId);
if (pPage == NULL) {
blockDataDestroy(pDB);
return terrno;
}
int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t);
assert(size <= pageSize);
blockDataToBuf(pPage, pDB);
setBufPageDirty(pPage, true);
releaseBufPage(pSup->pDataBuf, pPage);
blockDataDestroy(pDB);
start = stop + 1;
int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pBlock = pInfo->pUpdateRes;
if (pInfo->updateResIndex < pBlock->info.rows) {
blockDataCleanup(pInfo->pRes);
SCatchSupporter* pCSup = &pInfo->childAggSup;
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
int32_t size = taosArrayGetSize(pInfo->childIds);
for (int32_t i = 0; i < size; i++) {
int64_t id = *(int64_t*)taosArrayGet(pInfo->childIds, i);
setSupKeyBuf(pCSup, pBlock->info.groupId, id, tsCols[pInfo->updateResIndex]);
SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable, pCSup->pKeyBuf, pCSup->keySize);
void* buf = getBufPage(pCSup->pDataBuf, pos->pageId);
SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
blockDataFromBuf(pDB, buf);
SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1);
blockDataMerge(pInfo->pRes, pSub);
blockDataDestroy(pDB);
blockDataDestroy(pSub);
}
pInfo->updateResIndex++;
return pInfo->pRes;
}
return NULL;
}
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -977,7 +892,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup, int16_t tsColId) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -1022,9 +939,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
goto _error;
}
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->primaryTsIndex = tsColId;
if (pSTInfo->interval.interval > 0) {
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
} else {
pInfo->pUpdateInfo = NULL;
}
......@@ -1045,9 +962,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
// TODO(liuyao) get row size from phy plan
initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", TD_TMP_DIR_PATH);
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false;
......@@ -1056,8 +970,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pOperator->numOfExprs = pInfo->pRes->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL,
NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator;
......
......@@ -622,18 +622,103 @@ static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowInd
}
}
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
uint64_t tableGroupId) {
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
int32_t binarySearch(void* keyList, int num, TSKEY key, int order,
__get_value_fn_t getValuefn) {
int firstPos = 0, lastPos = num - 1, midPos = -1;
int numOfRows = 0;
if (num <= 0) return -1;
if (order == TSDB_ORDER_DESC) {
// find the first position which is smaller or equal than the key
while (1) {
if (key >= getValuefn(keyList, lastPos)) return lastPos;
if (key == getValuefn(keyList, firstPos)) return firstPos;
if (key < getValuefn(keyList, firstPos)) return firstPos - 1;
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < getValuefn(keyList, midPos)) {
lastPos = midPos - 1;
} else if (key > getValuefn(keyList, midPos)) {
firstPos = midPos + 1;
} else {
break;
}
}
} else {
// find the first position which is bigger or equal than the key
while (1) {
if (key <= getValuefn(keyList, firstPos)) return firstPos;
if (key == getValuefn(keyList, lastPos)) return lastPos;
if (key > getValuefn(keyList, lastPos)) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
else
return lastPos;
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < getValuefn(keyList, midPos)) {
lastPos = midPos - 1;
} else if (key > getValuefn(keyList, midPos)) {
firstPos = midPos + 1;
} else {
break;
}
}
}
return midPos;
}
int64_t getReskey(void* data, int32_t index) {
SArray* res = (SArray*) data;
SResKeyPos* pos = taosArrayGetP(res, index);
return *(int64_t*)pos->key;
}
static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
int32_t size = taosArrayGetSize(pUpdated);
int32_t index = binarySearch(pUpdated, size, result->win.skey, TSDB_ORDER_DESC, getReskey);
if (index == -1) {
index = 0;
} else {
TSKEY resTs = getReskey(pUpdated, index);
if (resTs < result->win.skey) {
index++;
} else {
return TSDB_CODE_SUCCESS;
}
}
SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (newPos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = result->pageId, .offset = result->offset};
*(int64_t*)newPos->key = result->win.skey;
if (taosArrayInsert(pUpdated, index, &newPos) == NULL ){
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
uint64_t tableGroupId, SArray* pUpdated) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
SArray* pUpdated = NULL;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pUpdated = taosArrayInit(4, POINTER_BYTES);
}
int32_t step = 1;
bool ascScan = (pInfo->order == TSDB_ORDER_ASC);
......@@ -663,13 +748,10 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos);
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM &&
(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == 0) ) {
saveResult(pResult, tableGroupId, pUpdated);
}
int32_t forwardStep = 0;
......@@ -742,13 +824,10 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos);
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM &&
(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == 0) ) {
saveResult(pResult, tableGroupId, pUpdated);
}
ekey = ascScan? nextWin.ekey:nextWin.skey;
......@@ -769,7 +848,6 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
saveDataBlockLastRow(pInfo->pRow, pBlock->pDataBlock, rowIndex, pBlock->info.numOfCols);
}
return pUpdated;
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
}
......@@ -799,7 +877,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId, NULL);
#if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){
......@@ -1067,7 +1145,7 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
}
static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock,
SInterval* pInterval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock,
SArray* pUpWins) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
......@@ -1075,8 +1153,8 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pIntrerval,
pIntrerval->precision, NULL);
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval,
pInterval->precision, NULL);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
doClearWindow(pSup, pBinfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
......@@ -1086,6 +1164,39 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
}
}
static int32_t closeIntervalWindow(SHashObj *pHashMap, STimeWindowAggSupp *pSup,
SInterval* pInterval, SArray* closeWins) {
void *pIte = NULL;
size_t keyLen = 0;
while((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*) key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(uint64_t*) ((char*)key + sizeof(uint64_t));
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval,
pInterval->precision, NULL);
if (win.ekey < pSup->maxTs - pSup->waterMark) {
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen);
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (pos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pos->groupId = groupId;
pos->pos = *(SResultRowPosition*) pIte;
*(int64_t*)pos->key = ts;
if (!taosArrayPush(closeWins, &pos)) {
taosMemoryFree(pos);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -1106,7 +1217,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = NULL;
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
SArray* pClosed = taosArrayInit(4, POINTER_BYTES);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
......@@ -1128,10 +1241,19 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
continue;
}
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId, pUpdated);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
}
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
&pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed,
pInfo->binfo.rowCellInfoOffset);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
taosArrayDestroy(pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated,
pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
......@@ -1935,63 +2057,6 @@ _error:
return NULL;
}
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
int32_t binarySearch(void* keyList, int num, TSKEY key, int order,
__get_value_fn_t getValuefn) {
int firstPos = 0, lastPos = num - 1, midPos = -1;
int numOfRows = 0;
if (num <= 0) return -1;
if (order == TSDB_ORDER_DESC) {
// find the first position which is smaller than the key
while (1) {
if (key >= getValuefn(keyList, lastPos)) return lastPos;
if (key == getValuefn(keyList, firstPos)) return firstPos;
if (key < getValuefn(keyList, firstPos)) return firstPos - 1;
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < getValuefn(keyList, midPos)) {
lastPos = midPos - 1;
} else if (key > getValuefn(keyList, midPos)) {
firstPos = midPos + 1;
} else {
break;
}
}
} else {
// find the first position which is bigger than the key
while (1) {
if (key <= getValuefn(keyList, firstPos)) return firstPos;
if (key == getValuefn(keyList, lastPos)) return lastPos;
if (key > getValuefn(keyList, lastPos)) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
else
return lastPos;
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < getValuefn(keyList, midPos)) {
lastPos = midPos - 1;
} else if (key > getValuefn(keyList, midPos)) {
firstPos = midPos + 1;
} else {
break;
}
}
}
return midPos;
}
int64_t getSessionWindowEndkey(void* data, int32_t index) {
SArray* pWinInfos = (SArray*) data;
SResultWindowInfo* pWin = taosArrayGet(pWinInfos, index);
......@@ -2223,12 +2288,14 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator,
if (winNum > 0) {
compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pTaskInfo, pStUpdated, pStDeleted);
}
pCurWin->isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pCurWin->isOutput = true;
}
i += winRows;
}
}
......@@ -2325,6 +2392,37 @@ bool isFinalSession(SStreamSessionAggOperatorInfo* pInfo) {
return pInfo->pChildren != NULL;
}
int32_t closeSessionWindow(SArray *pWins, STimeWindowAggSupp *pTwSup, SArray *pClosed,
int8_t calTrigger) {
// Todo(liuyao) save window to tdb
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
SResultWindowInfo *pSeWin = taosArrayGet(pWins, i);
if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) {
if (!pSeWin->isClosed) {
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (pos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pos->groupId = 0;
pos->pos = pSeWin->pos;
*(int64_t*)pos->key = pSeWin->win.ekey;
if (!taosArrayPush(pClosed, &pos)) {
taosMemoryFree(pos);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSeWin->isClosed = true;
if (calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) {
pSeWin->isOutput = true;
}
}
continue;
}
break;
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -2377,13 +2475,21 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
doStreamSessionWindowAggImpl(pOperator, pBlock, NULL, NULL);
}
doStreamSessionWindowAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
}
// restore the value
pOperator->status = OP_RES_TO_RETURN;
SArray* pClosed = taosArrayInit(16, POINTER_BYTES);
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed,
pInfo->twAggSup.calTrigger);
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId);
taosHashCleanup(pStUpdated);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
......
......@@ -1130,6 +1130,9 @@ static const char* jkTableScanPhysiPlanOffset = "Offset";
static const char* jkTableScanPhysiPlanSliding = "Sliding";
static const char* jkTableScanPhysiPlanIntervalUnit = "intervalUnit";
static const char* jkTableScanPhysiPlanSlidingUnit = "slidingUnit";
static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
static const char* jkTableScanPhysiPlanWatermark = "watermark";
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
......@@ -1171,6 +1174,15 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanWatermark, pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId);
}
return code;
}
......@@ -1221,6 +1233,15 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanWatermark, pNode->watermark, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId, code);
}
return code;
}
......
......@@ -223,6 +223,9 @@ static void setScanWindowInfo(SScanLogicNode* pScan) {
pScan->sliding = ((SWindowLogicNode*)pScan->node.pParent)->sliding;
pScan->intervalUnit = ((SWindowLogicNode*)pScan->node.pParent)->intervalUnit;
pScan->slidingUnit = ((SWindowLogicNode*)pScan->node.pParent)->slidingUnit;
pScan->triggerType = ((SWindowLogicNode*)pScan->node.pParent)->triggerType;
pScan->watermark = ((SWindowLogicNode*)pScan->node.pParent)->watermark;
pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pScan->node.pParent)->pTspk)->colId;
}
}
......
......@@ -503,6 +503,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->sliding = pScanLogicNode->sliding;
pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
pTableScan->triggerType = pScanLogicNode->triggerType;
pTableScan->watermark = pScanLogicNode->watermark;
pTableScan->tsColId = pScanLogicNode->tsColId;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
}
......
......@@ -72,12 +72,14 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
return val;
}
static int64_t adjustWatermark(int64_t interval, int64_t watermark) {
if (watermark <= 0 || watermark > MAX_NUM_SCALABLE_BF * interval) {
watermark = MAX_NUM_SCALABLE_BF * interval;
} else if (watermark < MIN_NUM_SCALABLE_BF * interval) {
watermark = MIN_NUM_SCALABLE_BF * interval;
}
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
if (watermark <= 0) {
watermark = TMIN(originInt/adjInterval, 1) * adjInterval;
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
watermark = MIN_NUM_SCALABLE_BF * adjInterval;
}*/ // Todo(liuyao) save window info to tdb
return watermark;
}
......@@ -94,7 +96,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
pInfo->pTsSBFs = NULL;
pInfo->minTS = -1;
pInfo->interval = adjustInterval(interval, precision);
pInfo->watermark = adjustWatermark(pInfo->interval, watermark);
pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark);
uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
......@@ -149,13 +151,18 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
int32_t res = TSDB_CODE_FAILED;
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) {
// this window has been closed.
return true;
}
SScalableBf *pSBf = getSBf(pInfo, ts);
// pSBf may be a null pointer
if (pSBf) {
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
}
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (maxTs < ts) {
taosArraySet(pInfo->pTsBuckets, index, &ts);
return false;
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger window_close into streamt as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sql insert into t1 values(1648791213001,1,2,3,1.0);
sleep 300
sql select * from streamt;
if $rows != 0 then
print ======$rows
return -1
endi
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791223002,2,2,3,1.1);
sql insert into t1 values(1648791223003,2,2,3,1.1);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sleep 300
sql select * from streamt;
if $rows != 1 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
sql insert into t1 values(1648791233001,2,2,3,1.1);
sleep 300
sql select * from streamt;
if $rows != 2 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
if $data11 != 3 then
print ======$data11
return -1
endi
sql insert into t1 values(1648791223004,2,2,3,1.1);
sql insert into t1 values(1648791223004,2,2,3,1.1);
sql insert into t1 values(1648791223005,2,2,3,1.1);
sleep 300
sql select * from streamt;
if $rows != 2 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
if $data11 != 5 then
print ======$data11
return -1
endi
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791213002,4,2,3,3.1)
sql insert into t1 values(1648791213002,4,2,3,4.1);
sleep 300
sql select * from streamt;
if $rows != 2 then
print ======$rows
return -1
endi
if $data01 != 2 then
print ======$data01
return -1
endi
if $data11 != 5 then
print ======$data11
return -1
endi
sql create table t2(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger window_close watermark 20s into streamt2 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 interval(10s);
sql insert into t2 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791239999,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 0 then
print ======$rows
return -1
endi
sql insert into t2 values(1648791240000,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 1 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791240000,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 1 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791280000,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 4 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
if $data11 != 1 then
print ======$data11
return -1
endi
if $data21 != 1 then
print ======$data21
return -1
endi
if $data31 != 3 then
print ======$data31
return -1
endi
sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791280000,1,2,3,1.0) (1648791280001,1,2,3,1.0) (1648791280002,1,2,3,1.0) (1648791310000,1,2,3,1.0) (1648791280001,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 5 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
if $data11 != 1 then
print ======$data11
return -1
endi
if $data21 != 1 then
print ======$data21
return -1
endi
if $data31 != 3 then
print ======$data31
return -1
endi
if $data41 != 3 then
print ======$data31
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t2(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger window_close into streamt2 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 session(ts, 10s);
sql insert into t2 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791222999,1,2,3,1.0);
sql insert into t2 values(1648791223000,1,2,3,1.0);
sql insert into t2 values(1648791223001,1,2,3,1.0);
sql insert into t2 values(1648791233001,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 0 then
print ======$rows
return -1
endi
sql insert into t2 values(1648791243002,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 1 then
print ======$rows
return -1
endi
if $data01 != 5 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791223001,1,2,3,1.0) (1648791223002,1,2,3,1.0) (1648791222999,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 1 then
print ======$rows
return -1
endi
if $data01 != 6 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791233002,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 1 then
print ======$rows
return -1
endi
if $data01 != 6 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791253003,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 1 then
print ======$rows
return -1
endi
if $data01 != 8 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791243003,1,2,3,1.0) (1648791243002,1,2,3,1.0) (1648791270004,1,2,3,1.0) (1648791280005,1,2,3,1.0) (1648791290006,1,2,3,1.0);
sleep 500
sql select * from streamt2;
if $rows != 3 then
print ======$rows
return -1
endi
if $data01 != 10 then
print ======$data01
return -1
endi
if $data11 != 1 then
print ======$data11
return -1
endi
if $data21 != 1 then
print ======$data21
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Subproject commit 4d83d8c62973506f760bcaa3a33f4665ed9046d0
Subproject commit 2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册