提交 10e90ce9 编写于 作者: 5 54liuyao

feat(stream): auto pull data

上级 8e2b93f0
...@@ -42,12 +42,13 @@ enum { ...@@ -42,12 +42,13 @@ enum {
typedef enum EStreamType { typedef enum EStreamType {
STREAM_NORMAL = 1, STREAM_NORMAL = 1,
STREAM_INVERT, STREAM_INVERT,
STREAM_REPROCESS, STREAM_CLEAR,
STREAM_INVALID, STREAM_INVALID,
STREAM_GET_ALL, STREAM_GET_ALL,
STREAM_DELETE, STREAM_DELETE,
STREAM_RETRIEVE, STREAM_RETRIEVE,
STREAM_PUSH_DATA, STREAM_PUSH_DATA,
STREAM_PUSH_EMPTY,
} EStreamType; } EStreamType;
typedef struct { typedef struct {
......
...@@ -224,6 +224,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); ...@@ -224,6 +224,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createDataBlock(); SSDataBlock* createDataBlock();
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
...@@ -236,6 +237,8 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* ...@@ -236,6 +237,8 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t*
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData); const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowData(const SArray* dataBlocks, const char* flag); void blockDebugShowData(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);
......
...@@ -319,7 +319,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem ...@@ -319,7 +319,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
return -1; return -1;
} }
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) { } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
......
...@@ -32,12 +32,15 @@ typedef struct SUpdateInfo { ...@@ -32,12 +32,15 @@ typedef struct SUpdateInfo {
int64_t interval; int64_t interval;
int64_t watermark; int64_t watermark;
TSKEY minTS; TSKEY minTS;
SScalableBf* pCloseWinSBF;
} SUpdateInfo; } SUpdateInfo;
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1164,7 +1164,7 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) ...@@ -1164,7 +1164,7 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0; int32_t code = 0;
ASSERT(numOfRows > 0); //ASSERT(numOfRows > 0);
if (numOfRows == 0) { if (numOfRows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1230,6 +1230,32 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { ...@@ -1230,6 +1230,32 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
return 0; return 0;
} }
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
ASSERT(src != NULL && dst != NULL);
blockDataCleanup(dst);
int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
size_t numOfCols = taosArrayGetSize(src->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
if (pSrc->pData == NULL) {
continue;
}
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
}
dst->info.rows = src->info.rows;
dst->info.window = src->info.window;
return TSDB_CODE_SUCCESS;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return NULL; return NULL;
...@@ -1627,6 +1653,56 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) { ...@@ -1627,6 +1653,56 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
} }
} }
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
int32_t size = 2048;
*pDataBuf = taosMemoryCalloc(size, 1);
char* dumpBuf = *pDataBuf;
char pBuf[128] = {0};
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (colDataIsNull(pColInfoData, rows, j, NULL)) {
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
continue;
}
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
break;
case TSDB_DATA_TYPE_INT:
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
break;
case TSDB_DATA_TYPE_UINT:
len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
break;
case TSDB_DATA_TYPE_BIGINT:
len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var);
break;
case TSDB_DATA_TYPE_UBIGINT:
len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var);
break;
case TSDB_DATA_TYPE_FLOAT:
len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
break;
}
}
len += snprintf(dumpBuf + len, size - len, "\n");
}
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
return dumpBuf;
}
/** /**
* @brief TODO: Assume that the final generated result it less than 3M * @brief TODO: Assume that the final generated result it less than 3M
* *
......
...@@ -257,7 +257,7 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -257,7 +257,7 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
case TDMT_STREAM_TASK_RECOVER_RSP: case TDMT_STREAM_TASK_RECOVER_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg); return sndProcessTaskRecoverRsp(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE_RSP: case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg); return sndProcessTaskRetrieveRsp(pSnode, pMsg);
default: default:
ASSERT(0); ASSERT(0);
} }
......
...@@ -109,11 +109,15 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ...@@ -109,11 +109,15 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
} }
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
if (pHandle->pMsg == NULL) return false;
while (1) { while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false; return false;
} }
if (pHandle->pBlock == NULL) return false; if (pHandle->pBlock == NULL) {
pHandle->pMsg = NULL;
return false;
}
if (pHandle->tbIdHash == NULL) { if (pHandle->tbIdHash == NULL) {
return true; return true;
......
...@@ -293,6 +293,7 @@ typedef enum EStreamScanMode { ...@@ -293,6 +293,7 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_RES, STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES, STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER, STREAM_SCAN_FROM_DATAREADER,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
} EStreamScanMode; } EStreamScanMode;
typedef struct SCatchSupporter { typedef struct SCatchSupporter {
...@@ -348,7 +349,9 @@ typedef struct SStreamBlockScanInfo { ...@@ -348,7 +349,9 @@ typedef struct SStreamBlockScanInfo {
SArray* childIds; SArray* childIds;
SessionWindowSupporter sessionSup; SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
...@@ -427,8 +430,13 @@ typedef struct SStreamFinalIntervalOperatorInfo { ...@@ -427,8 +430,13 @@ typedef struct SStreamFinalIntervalOperatorInfo {
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SArray* pChildren; SArray* pChildren;
SSDataBlock* pUpdateRes; SSDataBlock* pUpdateRes;
bool returnUpdate;
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal; bool isFinal;
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
} SStreamFinalIntervalOperatorInfo; } SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
...@@ -851,6 +859,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -851,6 +859,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
SSDataBlock* createPullDataBlock();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -776,15 +776,14 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) { ...@@ -776,15 +776,14 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
} }
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
STimeWindow win = { STimeWindow win = {
.skey = INT64_MIN, .skey = INT64_MIN,
.ekey = INT64_MAX, .ekey = INT64_MAX,
}; };
bool needRead = false; bool needRead = false;
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) { if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, tsColIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
...@@ -793,14 +792,14 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -793,14 +792,14 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
int64_t gap = pInfo->sessionSup.gap; int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex); getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win; win = pCurWin->win;
pInfo->updateResIndex += (*pRowIndex) +=
updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, pInfo->updateResIndex, gap, NULL); updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
} else { } else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval, win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval,
pInfo->interval.precision, NULL); pInfo->interval.precision, NULL);
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey, (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey,
binarySearchForKey, NULL, TSDB_ORDER_ASC); binarySearchForKey, NULL, TSDB_ORDER_ASC);
} }
needRead = true; needRead = true;
...@@ -823,6 +822,9 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -823,6 +822,9 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0; pTableScanInfo->curTWinIdx = 0;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); // tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1; pTableScanInfo->currentGroupId = -1;
return true; return true;
...@@ -862,12 +864,12 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_ ...@@ -862,12 +864,12 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_
*/ */
} }
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) { while (1) {
SSDataBlock* pResult = NULL; SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pSnapshotReadOp);
if (pResult == NULL) { if (pResult == NULL) {
if (prepareDataScan(pInfo)) { if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
// scan next window data // scan next window data
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pSnapshotReadOp);
} }
...@@ -916,7 +918,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa ...@@ -916,7 +918,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
pUpdateBlock->info.rows = i; pUpdateBlock->info.rows = i;
pInfo->tsArrayIndex += i; pInfo->tsArrayIndex += i;
pUpdateBlock->info.groupId = pInfo->groupId; pUpdateBlock->info.groupId = pInfo->groupId;
pUpdateBlock->info.type = STREAM_REPROCESS; pUpdateBlock->info.type = STREAM_CLEAR;
blockDataUpdateTsWindow(pUpdateBlock, 0); blockDataUpdateTsWindow(pUpdateBlock, 0);
} }
// all rows have same group id // all rows have same group id
...@@ -970,6 +972,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -970,6 +972,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
if (pBlock->info.type == STREAM_RETRIEVE) {
pInfo->blockType = STREAM_DATA_TYPE_SUBMIT_BLOCK;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0;
prepareDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
}
return pBlock; return pBlock;
} else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { } else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
...@@ -979,28 +989,39 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -979,28 +989,39 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
if (!isStateWindow(pInfo)) { if (!isStateWindow(pInfo)) {
prepareDataScan(pInfo); prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
} }
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
if (pSDB != NULL) {
getUpdateDataBlock(pInfo, true, pSDB, NULL);
pSDB->info.type = STREAM_PUSH_DATA;
return pSDB;
}
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else { } else {
if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) { if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows; pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
prepareDataScan(pInfo); prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
} }
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) { if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo); SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB == NULL) { if (pSDB == NULL) {
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes); setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) { if (pInfo->pUpdateRes->info.rows > 0) {
if (!isStateWindow(pInfo)) { if (!isStateWindow(pInfo)) {
prepareDataScan(pInfo); // Todo(liuyao) mybe can delete this.
bool test = prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
ASSERT(test == false);
} }
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} else { } else {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} }
} else { } else {
pSDB->info.type = STREAM_NORMAL;
getUpdateDataBlock(pInfo, true, pSDB, NULL); getUpdateDataBlock(pInfo, true, pSDB, NULL);
return pSDB; return pSDB;
} }
...@@ -1070,10 +1091,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -1070,10 +1091,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
taosArrayDestroy(block.pDataBlock); taosArrayDestroy(block.pDataBlock);
if (pInfo->pRes->pDataBlock == NULL) { if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log // TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes); addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
...@@ -1091,12 +1114,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -1091,12 +1114,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pBlockInfo->rows; pOperator->resultInfo.totalRows += pBlockInfo->rows;
if (pBlockInfo->rows == 0) { if (pBlockInfo->rows == 0) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} else if (pInfo->pUpdateInfo) { } else if (pInfo->pUpdateInfo) {
pInfo->tsArrayIndex = 0; pInfo->tsArrayIndex = 0;
getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes); getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) { if (pInfo->pUpdateRes->info.rows > 0) {
if (pInfo->pUpdateRes->info.type == STREAM_REPROCESS) { if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
pInfo->updateResIndex = 0; pInfo->updateResIndex = 0;
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) { } else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) {
...@@ -1209,6 +1233,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, ...@@ -1209,6 +1233,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
pInfo->groupId = 0; pInfo->groupId = 0;
pInfo->pPullDataRes = createPullDataBlock();
pOperator->name = "StreamBlockScanOperator"; pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "executorimpl.h" #include "executorimpl.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tfill.h" #include "tfill.h"
#include "ttime.h" #include "ttime.h"
...@@ -26,6 +27,16 @@ typedef enum SResultTsInterpType { ...@@ -26,6 +27,16 @@ typedef enum SResultTsInterpType {
#define IS_FINAL_OP(op) ((op)->isFinal) #define IS_FINAL_OP(op) ((op)->isFinal)
typedef struct SWinRes {
TSKEY ts;
uint64_t groupId;
} SWinRes;
typedef struct SPullWindowInfo {
STimeWindow window;
uint64_t groupId;
} SPullWindowInfo;
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
...@@ -684,11 +695,13 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -684,11 +695,13 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
} }
void printDataBlock(SSDataBlock* pBlock, const char* flag) { void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (pBlock == NULL) return; if (pBlock == NULL){
SArray* blocks = taosArrayInit(1, sizeof(SSDataBlock)); qDebug("======printDataBlock Block is Null");
taosArrayPush(blocks, pBlock); return;
blockDebugShowData(blocks, flag); }
taosArrayDestroy(blocks); char *pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
taosMemoryFree(pBuf);
} }
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index); typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
...@@ -1217,30 +1230,40 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS ...@@ -1217,30 +1230,40 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS
} }
} }
void doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId, bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
int32_t numOfOutput) { int32_t numOfOutput) {
SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId); SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (!p1) { if (!p1) {
// window has been closed // window has been closed
return; return false;
} }
doClearWindowImpl(p1, pAggSup->pResultBuf, pSup, numOfOutput); doClearWindowImpl(p1, pAggSup->pResultBuf, pSup, numOfOutput);
return true;
} }
static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t tsIndex, static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t tsIndex,
int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) { int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; TSKEY* tsCols = (TSKEY*)pTsCol->pData;
uint64_t* pGpDatas = NULL;
if (pBlock->info.type == STREAM_RETRIEVE) {
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, 2);
pGpDatas = (uint64_t*)pGpCol->pData;
}
int32_t step = 0; int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, NULL);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput); uint64_t groupId = pBlock->info.groupId;
if (pUpWins) { if (pGpDatas) {
groupId = pGpDatas[i];
}
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), groupId, numOfOutput);
if (pUpWins && res) {
taosArrayPush(pUpWins, &win); taosArrayPush(pUpWins, &win);
} }
} }
...@@ -1268,8 +1291,8 @@ bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { ...@@ -1268,8 +1291,8 @@ bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) {
return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark; return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark;
} }
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SArray* closeWins) { SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins) {
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
...@@ -1280,10 +1303,20 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1280,10 +1303,20 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL);
SWinRes winRe = {.ts = win.skey, .groupId = groupId,};
void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes));
if (isCloseWindow(&win, pSup)) { if (isCloseWindow(&win, pSup)) {
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; if (chIds && pPullDataMap) {
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); SArray* chAy = *(SArray**) chIds;
taosHashRemove(pHashMap, keyBuf, keyLen); int32_t size = taosArrayGetSize(chAy);
qInfo("======window %ld wait child size:%d", win.skey ,size);
for (int32_t i = 0; i < size; i++) {
qInfo("======window %ld wait chid id:%d", win.skey ,*(int32_t*)taosArrayGet(chAy, i));
}
continue;
} else if (pPullDataMap) {
qInfo("======close window %ld", win.skey);
}
SResultRowPosition* pPos = (SResultRowPosition*)pIte; SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins); int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
...@@ -1291,11 +1324,25 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1291,11 +1324,25 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
return code; return code;
} }
} }
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) {
int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL);
}
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info; SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -1324,7 +1371,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1324,7 +1371,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_CLEAR) {
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock,
NULL); NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
...@@ -1345,7 +1392,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1345,7 +1392,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
} }
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated);
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
...@@ -1373,6 +1420,11 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1373,6 +1420,11 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param; SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
//it should be empty.
taosHashCleanup(pInfo->pPullDataMap);
taosArrayDestroy(pInfo->pPullWins);
blockDataDestroy(pInfo->pPullDataRes);
if (pInfo->pChildren) { if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
...@@ -2164,6 +2216,24 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { ...@@ -2164,6 +2216,24 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
return p1 == NULL; return p1 == NULL;
} }
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols,
int32_t startPos, TSKEY eKey, STimeWindow* pNextWin) {
int32_t forwardRows = getNumOfRowsInTimeWindow(pBlockInfo, tsCols, startPos,
eKey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
int32_t prevEndPos = forwardRows - 1 + startPos;
return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
}
void addPullWindow(SHashObj* pMap, SWinRes* pWinRes, int32_t size) {
SArray* childIds = taosArrayInit(8, sizeof(int32_t));
for (int32_t i = 0; i < size; i++) {
taosArrayPush(childIds, &i);
}
taosHashPut(pMap, pWinRes, sizeof(SWinRes), &childIds, sizeof(void*));
}
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
SArray* pUpdated) { SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
...@@ -2177,35 +2247,59 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2177,35 +2247,59 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t forwardRows = 0; int32_t forwardRows = 0;
if (pSDataBlock->pDataBlock != NULL) { ASSERT(pSDataBlock->pDataBlock != NULL);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
} else {
return;
}
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, NULL); pInfo->interval.precision, NULL);
while (1) { while (1) {
if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && pInfo->pChildren) {
isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) { bool ignore = true;
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SWinRes winRes = {.ts = nextWin.skey, .groupId = tableGroupId,};
taosArrayPush(pUpWins, &nextWin); void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pSup->numOfExprs, if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) {
pOperatorInfo->pTaskInfo); SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
taosArrayDestroy(pUpWins); // add pull data request
taosArrayPush(pInfo->pPullWins, &pull);
addPullWindow(pInfo->pPullDataMap, &winRes, taosArrayGetSize(pInfo->pChildren));
} else {
int32_t index = -1;
SArray* chArray = NULL;
if (chIds) {
chArray = *(void**) chIds;
int32_t chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
}
if (index != -1 && pSDataBlock->info.type == STREAM_PUSH_DATA) {
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
}
}
if ( index == -1 || pSDataBlock->info.type == STREAM_PUSH_DATA) {
ignore = false;
}
}
if (ignore) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
} }
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx, int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC); TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
...@@ -2230,14 +2324,17 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) ...@@ -2230,14 +2324,17 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo)
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
} }
static void clearUpdateDataBlock(SSDataBlock* pBlock) { static void clearSpecialDataBlock(SSDataBlock* pBlock) {
if (pBlock->info.rows <= 0) {
return;
}
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
} }
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) {
// ASSERT(pDest->info.capacity >= pSource->info.rows); // ASSERT(pDest->info.capacity >= pSource->info.rows);
blockDataEnsureCapacity(pDest, pSource->info.rows); blockDataEnsureCapacity(pDest, pSource->info.rows);
clearUpdateDataBlock(pDest); clearSpecialDataBlock(pDest);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0); SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0);
SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex); SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex);
...@@ -2254,7 +2351,63 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol ...@@ -2254,7 +2351,63 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
blockDataUpdateTsWindow(pDest, 0); blockDataUpdateTsWindow(pDest, 0);
} }
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } static bool needBreak(SStreamFinalIntervalOperatorInfo* pInfo) {
int32_t size = taosArrayGetSize(pInfo->pPullWins);
if (pInfo->pullIndex < size) {
return true;
}
return false;
}
static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) {
clearSpecialDataBlock(pBlock);
int32_t size = taosArrayGetSize(array);
if (size - (*pIndex) == 0) {
return;
}
blockDataEnsureCapacity(pBlock, size - (*pIndex) );
ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock));
for (; (*pIndex) < size; (*pIndex)++) {
SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex) );
SColumnInfoData* pStartTs = (SColumnInfoData*) taosArrayGet(pBlock->pDataBlock, 0);
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
SColumnInfoData* pEndTs = (SColumnInfoData*) taosArrayGet(pBlock->pDataBlock, 1);
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
SColumnInfoData* pGroupId = (SColumnInfoData*) taosArrayGet(pBlock->pDataBlock, 2);
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
pBlock->info.rows++;
}
if ((*pIndex) == size) {
*pIndex = 0;
taosArrayClear(array);
}
blockDataUpdateTsWindow(pBlock, 0);
}
void processPushEmpty(SSDataBlock* pBlock, SHashObj* pMap) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, 2);
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SWinRes winRes = {.ts = tsData[i], .groupId = groupIdData[i]};
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinRes));
if (chIds) {
SArray* chArray = *(SArray**) chIds;
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
if (index != -1) {
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
taosHashRemove(pMap, &winRes, sizeof(SWinRes));
}
}
}
}
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
...@@ -2270,28 +2423,50 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2270,28 +2423,50 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) { if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
if (IS_FINAL_OP(pInfo) || pInfo->pUpdateRes->info.rows == 0) { if (!IS_FINAL_OP(pInfo)) {
if (!IS_FINAL_OP(pInfo)) { // semi interval operator clear disk buffer
// semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo);
clearStreamIntervalOperator(pInfo);
}
return NULL;
} }
return NULL;
}
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes;
} else {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes;
}
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
// process the rest of the data // process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
return pInfo->binfo.pRes; doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
} }
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
clearUpdateDataBlock(pInfo->pUpdateRes); clearSpecialDataBlock(pInfo->pUpdateRes);
pOperator->status = OP_RES_TO_RETURN;
qInfo("Stream Final Interval return data");
break; break;
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PUSH_DATA || pBlock->info.type == STREAM_INVALID) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs, doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs,
pBlock, pUpWins); pBlock, pUpWins);
...@@ -2310,11 +2485,25 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2310,11 +2485,25 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
removeResults(pUpWins, pUpdated); removeResults(pUpWins, pUpdated);
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
pInfo->returnUpdate = true;
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
break; break;
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs,
pBlock, pUpWins);
removeResults(pUpWins, pUpdated);
taosArrayDestroy(pUpWins);
if (taosArrayGetSize(pUpdated) > 0) {
break;
}
continue;
} else if (pBlock->info.type == STREAM_PUSH_EMPTY && IS_FINAL_OP(pInfo)) {
processPushEmpty(pBlock, pInfo->pPullDataMap);
continue;
} }
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
...@@ -2334,31 +2523,70 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2334,31 +2523,70 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, pBlock->info.window.ekey);
if (needBreak(pInfo)) {
break;
}
} }
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
&pInfo->interval, pInfo->pPullDataMap, pUpdated);
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
} }
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
pOperator->status = OP_RES_TO_RETURN; if (pInfo->binfo.pRes->info.rows != 0) {
if (pInfo->binfo.pRes->info.rows == 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
pOperator->status = OP_EXEC_DONE; return pInfo->binfo.pRes;
if (pInfo->pUpdateRes->info.rows == 0) { }
return NULL;
} if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
// process the rest of the data // process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
return pInfo->binfo.pRes;
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
// ASSERT(false);
return NULL;
}
SSDataBlock* createPullDataBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = STREAM_RETRIEVE;
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t);
pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY);
// window start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// window end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t);
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
} }
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
...@@ -2412,23 +2640,30 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2412,23 +2640,30 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error; goto _error;
} }
} }
// semi interval operator does not catch result
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS; pInfo->pUpdateRes->info.type = STREAM_CLEAR;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->returnUpdate = false;
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
pInfo->isFinal = true; pInfo->isFinal = true;
pOperator->name = "StreamFinalIntervalOperator"; pOperator->name = "StreamFinalIntervalOperator";
} else { } else {
// semi interval operator does not catch result
pInfo->isFinal = false; pInfo->isFinal = false;
pOperator->name = "StreamSemiIntervalOperator"; pOperator->name = "StreamSemiIntervalOperator";
} }
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
} }
pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
pInfo->pullIndex = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createPullDataBlock();
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true; pOperator->blocking = true;
...@@ -2811,11 +3046,6 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, ...@@ -2811,11 +3046,6 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
} }
} }
typedef struct SWinRes {
TSKEY ts;
uint64_t groupId;
} SWinRes;
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated,
SHashObj* pStDeleted, bool hasEndTs) { SHashObj* pStDeleted, bool hasEndTs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -3027,14 +3257,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3027,14 +3257,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) { } else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
/*printDataBlock(pInfo->pDelRes, "session del");*/ printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
/*printDataBlock(pBInfo->pRes, "session insert");*/ printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
...@@ -3048,7 +3278,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3048,7 +3278,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
break; break;
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs, doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs,
pInfo->gap, pWins); pInfo->gap, pWins);
...@@ -3102,11 +3332,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3102,11 +3332,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
/*printDataBlock(pInfo->pDelRes, "session del");*/ printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
/*printDataBlock(pBInfo->pRes, "session insert");*/ printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
...@@ -3169,11 +3399,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -3169,11 +3399,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
clearUpdateDataBlock(pInfo->pUpdateRes); clearSpecialDataBlock(pInfo->pUpdateRes);
break; break;
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, pInfo->gap, pWins); doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, pInfo->gap, pWins);
removeSessionResults(pStUpdated, pWins); removeSessionResults(pStUpdated, pWins);
...@@ -3236,7 +3466,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -3236,7 +3466,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
} else { } else {
pInfo->isFinal = false; pInfo->isFinal = false;
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS; pInfo->pUpdateRes->info.type = STREAM_CLEAR;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->name = "StreamSessionSemiAggOperator"; pOperator->name = "StreamSessionSemiAggOperator";
pOperator->fpSet = pOperator->fpSet =
...@@ -3554,7 +3784,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3554,7 +3784,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
break; break;
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_CLEAR) {
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId, doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
pSeUpdated, pInfo->pSeDeleted); pSeUpdated, pInfo->pSeDeleted);
continue; continue;
......
...@@ -115,6 +115,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -115,6 +115,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
.srcNodeId = pTask->nodeId, .srcNodeId = pTask->nodeId,
.srcTaskId = pTask->taskId, .srcTaskId = pTask->taskId,
.pRetrieve = pRetrieve, .pRetrieve = pRetrieve,
.retrieveLen = dataStrLen,
}; };
int32_t sz = taosArrayGetSize(pTask->childEpInfo); int32_t sz = taosArrayGetSize(pTask->childEpInfo);
...@@ -146,7 +147,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -146,7 +147,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
.code = 0, .code = 0,
.msgType = TDMT_STREAM_RETRIEVE, .msgType = TDMT_STREAM_RETRIEVE,
.pCont = buf, .pCont = buf,
.contLen = len, .contLen = sizeof(SMsgHead) + len,
}; };
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
......
...@@ -45,11 +45,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -45,11 +45,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(false); ASSERT(false);
} }
if (output == NULL) { if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE && !hasData) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0}; //SSDataBlock block = {0};
block.info.type = STREAM_PUSH_DATA; //block.info.type = STREAM_PUSH_EMPTY;
block.info.childId = pTask->selfChildId; //block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block); SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
SSDataBlock* pBlock = createOneDataBlock(taosArrayGet(pRetrieveBlock->blocks, 0), true);
pBlock->info.type = STREAM_PUSH_EMPTY;
pBlock->info.childId = pTask->selfChildId;
taosArrayPush(pRes, pBlock);
} }
break; break;
} }
......
...@@ -119,6 +119,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma ...@@ -119,6 +119,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
taosArrayPush(pInfo->pTsBuckets, &dumy); taosArrayPush(pInfo->pTsBuckets, &dumy);
} }
pInfo->numBuckets = DEFAULT_BUCKET_SIZE; pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
pInfo->pCloseWinSBF = NULL;
return pInfo; return pInfo;
} }
...@@ -154,6 +155,9 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { ...@@ -154,6 +155,9 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) { if (ts < maxTs - pInfo->watermark) {
// this window has been closed. // this window has been closed.
if (pInfo->pCloseWinSBF) {
return tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY));
}
return true; return true;
} }
...@@ -193,3 +197,19 @@ void updateInfoDestroy(SUpdateInfo *pInfo) { ...@@ -193,3 +197,19 @@ void updateInfoDestroy(SUpdateInfo *pInfo) {
taosArrayDestroy(pInfo->pTsSBFs); taosArrayDestroy(pInfo->pTsSBFs);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo) {
if (pInfo->pCloseWinSBF) {
return;
}
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
pInfo->pCloseWinSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
}
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {
if (!pInfo || !pInfo->pCloseWinSBF) {
return;
}
tScalableBfDestroy(pInfo->pCloseWinSBF);
pInfo->pCloseWinSBF = NULL;
}
...@@ -80,17 +80,17 @@ endi ...@@ -80,17 +80,17 @@ endi
if $data03 != 4 then if $data03 != 4 then
print ======$data03 print ======$data03
return -1 goto loop1
endi endi
if $data04 != 52 then if $data04 != 52 then
print ======$data04 print ======$data04
return -1 goto loop1
endi endi
if $data05 != 13 then if $data05 != 13 then
print ======$data05 print ======$data05
return -1 goto loop1
endi endi
# row 1 # row 1
...@@ -179,7 +179,7 @@ sql use test1; ...@@ -179,7 +179,7 @@ sql use test1;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1); sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2); sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ; sql create stream stream_t2 trigger at_once watermark 20s into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3); sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3); sql insert into ts1 values(1648791222001,2,2,3);
......
...@@ -74,7 +74,7 @@ sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,t ...@@ -74,7 +74,7 @@ sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,t
sql create table ts1 using st tags(1,1,1); sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2); sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ; sql create stream stream_t2 trigger at_once watermark 20s into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3,1); sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2); sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3); sql insert into ts2 values(1648791211000,1,2,3,3);
...@@ -83,16 +83,16 @@ sql insert into ts2 values(1648791222001,2,2,3,4); ...@@ -83,16 +83,16 @@ sql insert into ts2 values(1648791222001,2,2,3,4);
sql insert into ts2 values(1648791222002,2,2,3,5); sql insert into ts2 values(1648791222002,2,2,3,5);
sql insert into ts2 values(1648791222002,2,2,3,6); sql insert into ts2 values(1648791222002,2,2,3,6);
sql insert into ts1 values(1648791211000,1,2,3,1); sql insert into ts1 values(1648791211000,1,2,3,7);
sql insert into ts1 values(1648791222001,2,2,3,2); sql insert into ts1 values(1648791222001,2,2,3,8);
sql insert into ts2 values(1648791211000,1,2,3,3); sql insert into ts2 values(1648791211000,1,2,3,9);
sql insert into ts2 values(1648791222001,2,2,3,4); sql insert into ts2 values(1648791222001,2,2,3,10);
$loop_count = 0 $loop_count = 0
loop2: loop2:
sleep 300 sleep 300
sql select * from streamtST; sql select * from streamtST order by c7 asc;
$loop_count = $loop_count + 1 $loop_count = $loop_count + 1
if $loop_count == 10 then if $loop_count == 10 then
...@@ -104,8 +104,18 @@ print =====data01=$data01 ...@@ -104,8 +104,18 @@ print =====data01=$data01
goto loop2 goto loop2
endi endi
if $data02 != 1 then if $data11 != 1 then
print =====data02=$data02 print =====data11=$data11
goto loop2
endi
if $data21 != 1 then
print =====data21=$data21
goto loop2
endi
if $data31 != 2 then
print =====data31=$data31
goto loop2 goto loop2
endi endi
...@@ -114,8 +124,18 @@ print =====data03=$data03 ...@@ -114,8 +124,18 @@ print =====data03=$data03
goto loop2 goto loop2
endi endi
if $data04 != 2 then if $data13 != 2 then
print =====data04=$data04 print =====data13=$data13
goto loop2
endi
if $data23 != 1 then
print =====data23=$data23
goto loop2
endi
if $data33 != 4 then
print =====data33=$data33
goto loop2 goto loop2
endi endi
......
...@@ -79,17 +79,17 @@ endi ...@@ -79,17 +79,17 @@ endi
if $data03 != 4 then if $data03 != 4 then
print ======$data03 print ======$data03
return -1 goto loop1
endi endi
if $data04 != 52 then if $data04 != 52 then
print ======$data04 print ======$data04
return -1 goto loop1
endi endi
if $data05 != 13 then if $data05 != 13 then
print ======$data05 print ======$data05
return -1 goto loop1
endi endi
# row 1 # row 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册