未验证 提交 76596b0c 编写于 作者: L liuyao 提交者: GitHub

Merge pull request #14243 from taosdata/feature/TD-16728

feat(stream): auto pull data
......@@ -42,12 +42,13 @@ enum {
typedef enum EStreamType {
STREAM_NORMAL = 1,
STREAM_INVERT,
STREAM_REPROCESS,
STREAM_CLEAR,
STREAM_INVALID,
STREAM_GET_ALL,
STREAM_DELETE,
STREAM_RETRIEVE,
STREAM_PUSH_DATA,
STREAM_PUSH_EMPTY,
} EStreamType;
typedef struct {
......
......@@ -224,6 +224,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createDataBlock();
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
......@@ -236,6 +237,8 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t*
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowData(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid);
......
......@@ -319,7 +319,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
return -1;
}
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
......
......@@ -32,12 +32,15 @@ typedef struct SUpdateInfo {
int64_t interval;
int64_t watermark;
TSKEY minTS;
SScalableBf* pCloseWinSBF;
} SUpdateInfo;
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
#ifdef __cplusplus
}
......
......@@ -1164,7 +1164,7 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0;
ASSERT(numOfRows > 0);
//ASSERT(numOfRows > 0);
if (numOfRows == 0) {
return TSDB_CODE_SUCCESS;
......@@ -1230,6 +1230,32 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
return 0;
}
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
ASSERT(src != NULL && dst != NULL);
blockDataCleanup(dst);
int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
size_t numOfCols = taosArrayGetSize(src->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
if (pSrc->pData == NULL) {
continue;
}
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
}
dst->info.rows = src->info.rows;
dst->info.window = src->info.window;
return TSDB_CODE_SUCCESS;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if (pDataBlock == NULL) {
return NULL;
......@@ -1627,6 +1653,56 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
}
}
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
int32_t size = 2048;
*pDataBuf = taosMemoryCalloc(size, 1);
char* dumpBuf = *pDataBuf;
char pBuf[128] = {0};
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (colDataIsNull(pColInfoData, rows, j, NULL)) {
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
continue;
}
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
break;
case TSDB_DATA_TYPE_INT:
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
break;
case TSDB_DATA_TYPE_UINT:
len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
break;
case TSDB_DATA_TYPE_BIGINT:
len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var);
break;
case TSDB_DATA_TYPE_UBIGINT:
len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var);
break;
case TSDB_DATA_TYPE_FLOAT:
len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
break;
}
}
len += snprintf(dumpBuf + len, size - len, "\n");
}
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
return dumpBuf;
}
/**
* @brief TODO: Assume that the final generated result it less than 3M
*
......
......@@ -257,7 +257,7 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
case TDMT_STREAM_TASK_RECOVER_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg);
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
default:
ASSERT(0);
}
......
......@@ -109,11 +109,15 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
}
bool tqNextDataBlock(STqReadHandle* pHandle) {
if (pHandle->pMsg == NULL) return false;
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
if (pHandle->pBlock == NULL) {
pHandle->pMsg = NULL;
return false;
}
if (pHandle->tbIdHash == NULL) {
return true;
......
......@@ -293,6 +293,7 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
} EStreamScanMode;
typedef struct SCatchSupporter {
......@@ -348,7 +349,9 @@ typedef struct SStreamBlockScanInfo {
SArray* childIds;
SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex;
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
......@@ -427,8 +430,13 @@ typedef struct SStreamFinalIntervalOperatorInfo {
STimeWindowAggSupp twAggSup;
SArray* pChildren;
SSDataBlock* pUpdateRes;
bool returnUpdate;
SPhysiNode* pPhyNode; // create new child
bool isFinal;
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
......@@ -851,6 +859,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
SSDataBlock* createPullDataBlock();
#ifdef __cplusplus
}
......
......@@ -776,15 +776,14 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
}
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = {
.skey = INT64_MIN,
.ekey = INT64_MAX,
};
bool needRead = false;
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, pInfo->primaryTsIndex);
if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, tsColIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
......@@ -793,14 +792,14 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0;
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win;
pInfo->updateResIndex +=
updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
(*pRowIndex) +=
updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
} else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval,
pInfo->interval.precision, NULL);
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey,
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey,
binarySearchForKey, NULL, TSDB_ORDER_ASC);
}
needRead = true;
......@@ -823,6 +822,9 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
return true;
......@@ -862,12 +864,12 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_
*/
}
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pSnapshotReadOp);
if (pResult == NULL) {
if (prepareDataScan(pInfo)) {
if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
// scan next window data
pResult = doTableScan(pInfo->pSnapshotReadOp);
}
......@@ -916,7 +918,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
pUpdateBlock->info.rows = i;
pInfo->tsArrayIndex += i;
pUpdateBlock->info.groupId = pInfo->groupId;
pUpdateBlock->info.type = STREAM_REPROCESS;
pUpdateBlock->info.type = STREAM_CLEAR;
blockDataUpdateTsWindow(pUpdateBlock, 0);
}
// all rows have same group id
......@@ -970,6 +972,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
blockDataUpdateTsWindow(pBlock, 0);
if (pBlock->info.type == STREAM_RETRIEVE) {
pInfo->blockType = STREAM_DATA_TYPE_SUBMIT_BLOCK;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0;
prepareDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
}
return pBlock;
} else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
......@@ -979,28 +989,39 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
if (!isStateWindow(pInfo)) {
prepareDataScan(pInfo);
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
}
return pInfo->pUpdateRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
if (pSDB != NULL) {
getUpdateDataBlock(pInfo, true, pSDB, NULL);
pSDB->info.type = STREAM_PUSH_DATA;
return pSDB;
}
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else {
if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
prepareDataScan(pInfo);
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
}
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo);
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB == NULL) {
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) {
if (!isStateWindow(pInfo)) {
prepareDataScan(pInfo);
// Todo(liuyao) mybe can delete this.
bool test = prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
ASSERT(test == false);
}
return pInfo->pUpdateRes;
} else {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
}
} else {
pSDB->info.type = STREAM_NORMAL;
getUpdateDataBlock(pInfo, true, pSDB, NULL);
return pSDB;
}
......@@ -1070,10 +1091,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
taosArrayDestroy(block.pDataBlock);
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno;
return NULL;
}
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
......@@ -1091,12 +1114,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pBlockInfo->rows;
if (pBlockInfo->rows == 0) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
} else if (pInfo->pUpdateInfo) {
pInfo->tsArrayIndex = 0;
getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) {
if (pInfo->pUpdateRes->info.type == STREAM_REPROCESS) {
if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
pInfo->updateResIndex = 0;
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) {
......@@ -1209,6 +1233,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
pInfo->groupId = 0;
pInfo->pPullDataRes = createPullDataBlock();
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
......@@ -15,6 +15,7 @@
#include "executorimpl.h"
#include "function.h"
#include "functionMgt.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"
......@@ -26,6 +27,16 @@ typedef enum SResultTsInterpType {
#define IS_FINAL_OP(op) ((op)->isFinal)
typedef struct SWinRes {
TSKEY ts;
uint64_t groupId;
} SWinRes;
typedef struct SPullWindowInfo {
STimeWindow window;
uint64_t groupId;
} SPullWindowInfo;
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
......@@ -684,11 +695,13 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
}
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (pBlock == NULL) return;
SArray* blocks = taosArrayInit(1, sizeof(SSDataBlock));
taosArrayPush(blocks, pBlock);
blockDebugShowData(blocks, flag);
taosArrayDestroy(blocks);
if (pBlock == NULL){
qDebug("======printDataBlock Block is Null");
return;
}
char *pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
taosMemoryFree(pBuf);
}
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
......@@ -1217,30 +1230,40 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS
}
}
void doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
int32_t numOfOutput) {
SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (!p1) {
// window has been closed
return;
return false;
}
doClearWindowImpl(p1, pAggSup->pResultBuf, pSup, numOfOutput);
return true;
}
static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t tsIndex,
int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY* tsCols = (TSKEY*)pTsCol->pData;
uint64_t* pGpDatas = NULL;
if (pBlock->info.type == STREAM_RETRIEVE) {
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, 2);
pGpDatas = (uint64_t*)pGpCol->pData;
}
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, NULL);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
if (pUpWins) {
uint64_t groupId = pBlock->info.groupId;
if (pGpDatas) {
groupId = pGpDatas[i];
}
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), groupId, numOfOutput);
if (pUpWins && res) {
taosArrayPush(pUpWins, &win);
}
}
......@@ -1268,8 +1291,8 @@ bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) {
return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark;
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SArray* closeWins) {
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins) {
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
......@@ -1280,10 +1303,20 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL);
SWinRes winRe = {.ts = win.skey, .groupId = groupId,};
void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes));
if (isCloseWindow(&win, pSup)) {
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen);
if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**) chIds;
int32_t size = taosArrayGetSize(chAy);
qInfo("======window %ld wait child size:%d", win.skey ,size);
for (int32_t i = 0; i < size; i++) {
qInfo("======window %ld wait chid id:%d", win.skey ,*(int32_t*)taosArrayGet(chAy, i));
}
continue;
} else if (pPullDataMap) {
qInfo("======close window %ld", win.skey);
}
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
......@@ -1291,11 +1324,25 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
return code;
}
}
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen);
}
}
return TSDB_CODE_SUCCESS;
}
static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) {
int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL);
}
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -1324,7 +1371,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
break;
}
if (pBlock->info.type == STREAM_REPROCESS) {
if (pBlock->info.type == STREAM_CLEAR) {
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock,
NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
......@@ -1345,7 +1392,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
}
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated);
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
......@@ -1373,6 +1420,11 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
//it should be empty.
taosHashCleanup(pInfo->pPullDataMap);
taosArrayDestroy(pInfo->pPullWins);
blockDataDestroy(pInfo->pPullDataRes);
if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
......@@ -2164,6 +2216,24 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
return p1 == NULL;
}
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols,
int32_t startPos, TSKEY eKey, STimeWindow* pNextWin) {
int32_t forwardRows = getNumOfRowsInTimeWindow(pBlockInfo, tsCols, startPos,
eKey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
int32_t prevEndPos = forwardRows - 1 + startPos;
return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
}
void addPullWindow(SHashObj* pMap, SWinRes* pWinRes, int32_t size) {
SArray* childIds = taosArrayInit(8, sizeof(int32_t));
for (int32_t i = 0; i < size; i++) {
taosArrayPush(childIds, &i);
}
taosHashPut(pMap, pWinRes, sizeof(SWinRes), &childIds, sizeof(void*));
}
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
......@@ -2177,35 +2247,59 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
} else {
return;
}
ASSERT(pSDataBlock->pDataBlock != NULL);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, NULL);
while (1) {
if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) &&
isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
taosArrayPush(pUpWins, &nextWin);
rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pSup->numOfExprs,
pOperatorInfo->pTaskInfo);
taosArrayDestroy(pUpWins);
if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && pInfo->pChildren) {
bool ignore = true;
SWinRes winRes = {.ts = nextWin.skey, .groupId = tableGroupId,};
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
// add pull data request
taosArrayPush(pInfo->pPullWins, &pull);
addPullWindow(pInfo->pPullDataMap, &winRes, taosArrayGetSize(pInfo->pChildren));
} else {
int32_t index = -1;
SArray* chArray = NULL;
if (chIds) {
chArray = *(void**) chIds;
int32_t chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
}
if (index != -1 && pSDataBlock->info.type == STREAM_PUSH_DATA) {
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
}
}
if ( index == -1 || pSDataBlock->info.type == STREAM_PUSH_DATA) {
ignore = false;
}
}
if (ignore) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
}
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
......@@ -2230,14 +2324,17 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo)
initResultRowInfo(&pInfo->binfo.resultRowInfo);
}
static void clearUpdateDataBlock(SSDataBlock* pBlock) {
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
if (pBlock->info.rows <= 0) {
return;
}
blockDataCleanup(pBlock);
}
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) {
// ASSERT(pDest->info.capacity >= pSource->info.rows);
blockDataEnsureCapacity(pDest, pSource->info.rows);
clearUpdateDataBlock(pDest);
clearSpecialDataBlock(pDest);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0);
SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex);
......@@ -2254,7 +2351,63 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
blockDataUpdateTsWindow(pDest, 0);
}
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static bool needBreak(SStreamFinalIntervalOperatorInfo* pInfo) {
int32_t size = taosArrayGetSize(pInfo->pPullWins);
if (pInfo->pullIndex < size) {
return true;
}
return false;
}
static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) {
clearSpecialDataBlock(pBlock);
int32_t size = taosArrayGetSize(array);
if (size - (*pIndex) == 0) {
return;
}
blockDataEnsureCapacity(pBlock, size - (*pIndex) );
ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock));
for (; (*pIndex) < size; (*pIndex)++) {
SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex) );
SColumnInfoData* pStartTs = (SColumnInfoData*) taosArrayGet(pBlock->pDataBlock, 0);
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
SColumnInfoData* pEndTs = (SColumnInfoData*) taosArrayGet(pBlock->pDataBlock, 1);
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
SColumnInfoData* pGroupId = (SColumnInfoData*) taosArrayGet(pBlock->pDataBlock, 2);
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
pBlock->info.rows++;
}
if ((*pIndex) == size) {
*pIndex = 0;
taosArrayClear(array);
}
blockDataUpdateTsWindow(pBlock, 0);
}
void processPushEmpty(SSDataBlock* pBlock, SHashObj* pMap) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, 2);
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SWinRes winRes = {.ts = tsData[i], .groupId = groupIdData[i]};
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinRes));
if (chIds) {
SArray* chArray = *(SArray**) chIds;
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
if (index != -1) {
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
taosHashRemove(pMap, &winRes, sizeof(SWinRes));
}
}
}
}
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
......@@ -2270,28 +2423,50 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (IS_FINAL_OP(pInfo) || pInfo->pUpdateRes->info.rows == 0) {
if (!IS_FINAL_OP(pInfo)) {
// semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo);
}
return NULL;
if (!IS_FINAL_OP(pInfo)) {
// semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo);
}
return NULL;
}
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes;
} else {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes;
}
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
// process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes;
}
return pInfo->binfo.pRes;
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
}
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
clearUpdateDataBlock(pInfo->pUpdateRes);
clearSpecialDataBlock(pInfo->pUpdateRes);
pOperator->status = OP_RES_TO_RETURN;
qInfo("Stream Final Interval return data");
break;
}
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
if (pBlock->info.type == STREAM_REPROCESS) {
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PUSH_DATA || pBlock->info.type == STREAM_INVALID) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs,
pBlock, pUpWins);
......@@ -2310,11 +2485,25 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
removeResults(pUpWins, pUpdated);
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
pInfo->returnUpdate = true;
taosArrayDestroy(pUpWins);
break;
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs,
pBlock, pUpWins);
removeResults(pUpWins, pUpdated);
taosArrayDestroy(pUpWins);
if (taosArrayGetSize(pUpdated) > 0) {
break;
}
continue;
} else if (pBlock->info.type == STREAM_PUSH_EMPTY && IS_FINAL_OP(pInfo)) {
processPushEmpty(pBlock, pInfo->pPullDataMap);
continue;
}
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
......@@ -2334,31 +2523,70 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, pBlock->info.window.ekey);
if (needBreak(pInfo)) {
break;
}
}
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
if (IS_FINAL_OP(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
&pInfo->interval, pInfo->pPullDataMap, pUpdated);
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
}
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
pOperator->status = OP_RES_TO_RETURN;
if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) {
return NULL;
}
if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes;
}
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
// process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes;
}
return pInfo->binfo.pRes;
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
// ASSERT(false);
return NULL;
}
SSDataBlock* createPullDataBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = STREAM_RETRIEVE;
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t);
pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY);
// window start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// window end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t);
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
......@@ -2412,23 +2640,30 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error;
}
}
// semi interval operator does not catch result
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
pInfo->pUpdateRes->info.type = STREAM_CLEAR;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->returnUpdate = false;
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
pInfo->isFinal = true;
pOperator->name = "StreamFinalIntervalOperator";
} else {
// semi interval operator does not catch result
pInfo->isFinal = false;
pOperator->name = "StreamSemiIntervalOperator";
}
if (!IS_FINAL_OP(pInfo)) {
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
}
pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
pInfo->pullIndex = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createPullDataBlock();
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
......@@ -2811,11 +3046,6 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
}
}
typedef struct SWinRes {
TSKEY ts;
uint64_t groupId;
} SWinRes;
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated,
SHashObj* pStDeleted, bool hasEndTs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -3027,14 +3257,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
/*printDataBlock(pInfo->pDelRes, "session del");*/
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
/*printDataBlock(pBInfo->pRes, "session insert");*/
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
......@@ -3048,7 +3278,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
break;
}
if (pBlock->info.type == STREAM_REPROCESS) {
if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs,
pInfo->gap, pWins);
......@@ -3102,11 +3332,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
/*printDataBlock(pInfo->pDelRes, "session del");*/
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
/*printDataBlock(pBInfo->pRes, "session insert");*/
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
......@@ -3169,11 +3399,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
clearUpdateDataBlock(pInfo->pUpdateRes);
clearSpecialDataBlock(pInfo->pUpdateRes);
break;
}
if (pBlock->info.type == STREAM_REPROCESS) {
if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, pInfo->gap, pWins);
removeSessionResults(pStUpdated, pWins);
......@@ -3236,7 +3466,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
} else {
pInfo->isFinal = false;
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
pInfo->pUpdateRes->info.type = STREAM_CLEAR;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->name = "StreamSessionSemiAggOperator";
pOperator->fpSet =
......@@ -3554,7 +3784,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
break;
}
if (pBlock->info.type == STREAM_REPROCESS) {
if (pBlock->info.type == STREAM_CLEAR) {
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
pSeUpdated, pInfo->pSeDeleted);
continue;
......
......@@ -115,6 +115,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
.srcNodeId = pTask->nodeId,
.srcTaskId = pTask->taskId,
.pRetrieve = pRetrieve,
.retrieveLen = dataStrLen,
};
int32_t sz = taosArrayGetSize(pTask->childEpInfo);
......@@ -146,7 +147,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
.code = 0,
.msgType = TDMT_STREAM_RETRIEVE,
.pCont = buf,
.contLen = len,
.contLen = sizeof(SMsgHead) + len,
};
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
......
......@@ -45,11 +45,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(false);
}
if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE && !hasData) {
SSDataBlock block = {0};
block.info.type = STREAM_PUSH_DATA;
block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
//SSDataBlock block = {0};
//block.info.type = STREAM_PUSH_EMPTY;
//block.info.childId = pTask->selfChildId;
SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
SSDataBlock* pBlock = createOneDataBlock(taosArrayGet(pRetrieveBlock->blocks, 0), true);
pBlock->info.type = STREAM_PUSH_EMPTY;
pBlock->info.childId = pTask->selfChildId;
taosArrayPush(pRes, pBlock);
}
break;
}
......
......@@ -119,6 +119,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
taosArrayPush(pInfo->pTsBuckets, &dumy);
}
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
pInfo->pCloseWinSBF = NULL;
return pInfo;
}
......@@ -154,6 +155,9 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) {
// this window has been closed.
if (pInfo->pCloseWinSBF) {
return tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY));
}
return true;
}
......@@ -193,3 +197,19 @@ void updateInfoDestroy(SUpdateInfo *pInfo) {
taosArrayDestroy(pInfo->pTsSBFs);
taosMemoryFree(pInfo);
}
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo) {
if (pInfo->pCloseWinSBF) {
return;
}
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
pInfo->pCloseWinSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
}
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {
if (!pInfo || !pInfo->pCloseWinSBF) {
return;
}
tScalableBfDestroy(pInfo->pCloseWinSBF);
pInfo->pCloseWinSBF = NULL;
}
......@@ -80,17 +80,17 @@ endi
if $data03 != 4 then
print ======$data03
return -1
goto loop1
endi
if $data04 != 52 then
print ======$data04
return -1
goto loop1
endi
if $data05 != 13 then
print ======$data05
return -1
goto loop1
endi
# row 1
......@@ -179,7 +179,7 @@ sql use test1;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql create stream stream_t2 trigger at_once watermark 20s into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3);
......
......@@ -74,7 +74,7 @@ sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,t
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ;
sql create stream stream_t2 trigger at_once watermark 20s into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
......@@ -83,16 +83,16 @@ sql insert into ts2 values(1648791222001,2,2,3,4);
sql insert into ts2 values(1648791222002,2,2,3,5);
sql insert into ts2 values(1648791222002,2,2,3,6);
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
sql insert into ts1 values(1648791211000,1,2,3,7);
sql insert into ts1 values(1648791222001,2,2,3,8);
sql insert into ts2 values(1648791211000,1,2,3,9);
sql insert into ts2 values(1648791222001,2,2,3,10);
$loop_count = 0
loop2:
sleep 300
sql select * from streamtST;
sql select * from streamtST order by c7 asc;
$loop_count = $loop_count + 1
if $loop_count == 10 then
......@@ -104,8 +104,18 @@ print =====data01=$data01
goto loop2
endi
if $data02 != 1 then
print =====data02=$data02
if $data11 != 1 then
print =====data11=$data11
goto loop2
endi
if $data21 != 1 then
print =====data21=$data21
goto loop2
endi
if $data31 != 2 then
print =====data31=$data31
goto loop2
endi
......@@ -114,8 +124,18 @@ print =====data03=$data03
goto loop2
endi
if $data04 != 2 then
print =====data04=$data04
if $data13 != 2 then
print =====data13=$data13
goto loop2
endi
if $data23 != 1 then
print =====data23=$data23
goto loop2
endi
if $data33 != 4 then
print =====data33=$data33
goto loop2
endi
......
......@@ -79,17 +79,17 @@ endi
if $data03 != 4 then
print ======$data03
return -1
goto loop1
endi
if $data04 != 52 then
print ======$data04
return -1
goto loop1
endi
if $data05 != 13 then
print ======$data05
return -1
goto loop1
endi
# row 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册