提交 54005072 编写于 作者: 5 54liuyao

feat(stream): stream state operator

上级 7fa2d3d0
......@@ -219,6 +219,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT,
......
......@@ -338,6 +338,8 @@ typedef struct SStateWinodwPhysiNode {
SNode* pStateKey;
} SStateWinodwPhysiNode;
typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode;
typedef struct SSortPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
......
......@@ -366,16 +366,18 @@ typedef struct SCatchSupporter {
} SCatchSupporter;
typedef struct SStreamAggSupporter {
SArray* pResultRows; // SResultWindowInfo
SArray* pResultRows;
int32_t keySize;
char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SArray* pScanWindow;
} SStreamAggSupporter;
typedef struct SessionWindowSupporter {
SStreamAggSupporter* pStreamAggSup;
int64_t gap;
uint8_t parentType;
} SessionWindowSupporter;
typedef struct SStreamBlockScanInfo {
......@@ -406,6 +408,7 @@ typedef struct SStreamBlockScanInfo {
SArray* childIds;
SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex;
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
......@@ -593,6 +596,11 @@ typedef struct SResultWindowInfo {
bool isClosed;
} SResultWindowInfo;
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
SStateKeys stateKey;
} SStateWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
......@@ -606,7 +614,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SSDataBlock* pDelRes;
SHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
SArray* pChildren; // cache for children's result; final stream operator
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
......@@ -630,6 +638,22 @@ typedef struct SStateWindowOperatorInfo {
// bool reptScan;
} SStateWindowOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SColumn stateCol; // start row index
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes;
SHashObj* pSeDeleted;
void* pDelIterator;
SArray* pScanWindow;
SArray* pChildren; // cache for children's result;
} SStreamStateAggOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
......@@ -787,6 +811,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
#endif
......@@ -838,7 +866,8 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap,
int32_t* pIndex);
......
......@@ -4600,6 +4600,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
......@@ -5185,14 +5187,17 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey,
size_t size) {
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
pSup->pResultRows = taosArrayInit(1024, sizeof(SResultWindowInfo));
pSup->pResultRows = taosArrayInit(1024, size);
if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow));
int32_t pageSize = 4096;
while (pageSize < pSup->resultRowSize * 4) {
pageSize <<= 1u;
......@@ -5205,6 +5210,14 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
}
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
return initStreamAggSupporter(pSup, pKey, sizeof(SResultWindowInfo));
}
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
return initStreamAggSupporter(pSup, pKey, sizeof(SStateWindowInfo));
}
int64_t getSmaWaterMark(int64_t interval, double filesFactor) {
int64_t waterMark = 0;
ASSERT(FLT_GREATEREQUAL(filesFactor, 0.000000));
......
......@@ -706,16 +706,23 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists);
}
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { return pInfo->sessionSup.pStreamAggSup != NULL; }
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW;
}
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW;
}
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
if (pInfo->updateResIndex < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0);
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX,};
bool needRead = false;
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, pInfo->primaryTsIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win;
if (isSessionWindow(pInfo)) {
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
int64_t gap = pInfo->sessionSup.gap;
......@@ -731,15 +738,28 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey,
binarySearchForKey, NULL, TSDB_ORDER_ASC);
}
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->scanTimes = 0;
return true;
} else {
needRead = true;
} else if (isStateWindow(pInfo)) {
SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow;
int32_t size = taosArrayGetSize(pWins);
if (pInfo->scanWinIndex < size) {
win = *(STimeWindow *)taosArrayGet(pWins, pInfo->scanWinIndex);
pInfo->scanWinIndex++;
needRead = true;
} else {
pInfo->scanWinIndex = 0;
taosArrayClear(pWins);
}
}
if (!needRead) {
return false;
}
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->scanTimes = 0;
return true;
}
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
......@@ -754,36 +774,39 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
return pResult;
}
static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible) {
SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible,
SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) {
for (int32_t i = 0; i < pBlock->info.rows; i++) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[i])) {
taosArrayPush(pInfo->tsArray, ts + i);
}
}
if (!pUpdateBlock) {
taosArrayClear(pInfo->tsArray);
return;
}
int32_t size = taosArrayGetSize(pInfo->tsArray);
if (size > 0 && invertible) {
// TODO(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
// Todo(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pBlock, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->pRes, false);
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, 0);
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
colInfoDataEnsureCapacity(pCol, 0, size);
for (int32_t i = 0; i < size; i++) {
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i);
colDataAppend(pCol, i, (char*)pTs, false);
}
pDataBlock->info.rows = size;
pDataBlock->info.type = STREAM_REPROCESS;
blockDataUpdateTsWindow(pDataBlock, 0);
pUpdateBlock->info.rows = size;
pUpdateBlock->info.type = STREAM_REPROCESS;
blockDataUpdateTsWindow(pUpdateBlock, 0);
taosArrayClear(pInfo->tsArray);
return pDataBlock;
}
return NULL;
}
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
......@@ -815,14 +838,25 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
blockDataCleanup(pInfo->pRes);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
prepareDataScan(pInfo);
if (!isStateWindow(pInfo)) {
prepareDataScan(pInfo);
}
return pInfo->pUpdateRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo);
if (pSDB == NULL) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else {
return pSDB;
} else {
if (isStateWindow(pInfo) &&
taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
prepareDataScan(pInfo);
}
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo);
if (pSDB == NULL) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else {
getUpdateDataBlock(pInfo, true, pSDB, NULL);
return pSDB;
}
}
}
......@@ -906,7 +940,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if (rows == 0) {
pOperator->status = OP_EXEC_DONE;
} else if (pInfo->pUpdateInfo) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true);
SSDataBlock* upRes = createOneDataBlock(pInfo->pRes, false);
getUpdateDataBlock(pInfo, true, pInfo->pRes, upRes);
if (upRes) {
pInfo->pUpdateRes = upRes;
if (upRes->info.type == STREAM_REPROCESS) {
......@@ -950,6 +985,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
int16_t colId = id->colId;
taosArrayPush(pColIds, &colId);
if (id->colId == pTableScanNode->tsColId) {
pInfo->primaryTsIndex = id->targetSlotId;
}
}
// set the extract column id to streamHandle
......@@ -974,7 +1012,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval,
pTableScanNode->filesFactor);
}
pInfo->primaryTsIndex = 0; // pTableScanNode->tsColId;
if (pSTInfo->interval.interval > 0 && pDataReader) {
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
} else {
......
......@@ -610,8 +610,7 @@ int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
} else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
pDBuf->dsum += pSBuf->dsum;
}
SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
......@@ -1394,7 +1393,7 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
pDBuf->v = pSBuf->v;
}
}
SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
......@@ -1639,6 +1638,7 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
pDBuf->quadraticDSum += pSBuf->quadraticDSum;
}
pDBuf->count += pSBuf->count;
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
......
......@@ -244,6 +244,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiStreamSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return "PhysiStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW:
return "PhysiStreamStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return "PhysiPartition";
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
......@@ -2744,6 +2746,28 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkStateWindowCol = "StateWindowCol";
static const char* jkStateWindowExpr = "StateWindowExpr";
static int32_t stateWindowNodeToJson(const void* pObj, SJson* pJson) {
const SStateWindowNode* pNode = (const SStateWindowNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkStateWindowCol, nodeToJson, pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkStateWindowExpr, nodeToJson, pNode->pExpr);
}
return code;
}
static int32_t jsonToStateWindowNode(const SJson* pJson, void* pObj) {
SStateWindowNode* pNode = (SStateWindowNode*)pObj;
int32_t code = jsonToNodeObject(pJson, jkStateWindowCol, (SNode**)&pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkStateWindowExpr, (SNode**)&pNode->pExpr);
}
return code;
}
static const char* jkSessionWindowTsPrimaryKey = "TsPrimaryKey";
static const char* jkSessionWindowGap = "Gap";
......@@ -3521,8 +3545,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_ORDER_BY_EXPR:
return orderByExprNodeToJson(pObj, pJson);
case QUERY_NODE_LIMIT:
case QUERY_NODE_STATE_WINDOW:
break;
case QUERY_NODE_STATE_WINDOW:
return stateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_SESSION_WINDOW:
return sessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_INTERVAL_WINDOW:
......@@ -3626,6 +3651,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
return physiSessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW:
return physiStateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return physiPartitionNodeToJson(pObj, pJson);
......@@ -3662,6 +3688,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToTempTableNode(pJson, pObj);
case QUERY_NODE_ORDER_BY_EXPR:
return jsonToOrderByExprNode(pJson, pObj);
case QUERY_NODE_STATE_WINDOW:
return jsonToStateWindowNode(pJson, pObj);
case QUERY_NODE_SESSION_WINDOW:
return jsonToSessionWindowNode(pJson, pObj);
case QUERY_NODE_INTERVAL_WINDOW:
......@@ -3745,6 +3773,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
return jsonToPhysiSessionWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW:
return jsonToPhysiStateWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj);
......
......@@ -520,7 +520,8 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: {
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: {
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)pNode;
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
......
......@@ -274,6 +274,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return makeNode(type, sizeof(SStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW:
return makeNode(type, sizeof(SStreamStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
......
......@@ -108,7 +108,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
return false;
}
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType);
return true;
// return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType);
}
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
}
......@@ -217,8 +218,7 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) {
}
static void setScanWindowInfo(SScanLogicNode* pScan) {
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pScan->node.pParent) &&
WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pScan->node.pParent)->winType) {
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pScan->node.pParent)) {
pScan->interval = ((SWindowLogicNode*)pScan->node.pParent)->interval;
pScan->offset = ((SWindowLogicNode*)pScan->node.pParent)->offset;
pScan->sliding = ((SWindowLogicNode*)pScan->node.pParent)->sliding;
......
......@@ -980,7 +980,9 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW);
pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW:
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW));
if (NULL == pState) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -70,42 +70,42 @@ endi
# row 1
if $data11 != 3 then
print ======$data01
print ======$data11
return -1
endi
if $data12 != 10 then
print ======$data02
print ======$data12
return -1
endi
if $data13 != 10 then
print ======$data03
print ======$data13
return -1
endi
if $data14 != 1.100000000 then
print ======$data04
print ======$data14
return -1
endi
if $data15 != 0.000000000 then
print ======$data05
print ======$data15
return -1
endi
if $data16 != 10 then
print ======$data05
print ======$data15
return -1
endi
if $data17 != 1.100000000 then
print ======$data05
print ======$data17
return -1
endi
if $data18 != 5 then
print ======$data05
print ======$data18
return -1
endi
......@@ -145,17 +145,17 @@ if $data05 != 0.816496581 then
endi
if $data06 != 3 then
print ======$data05
print ======$data06
return -1
endi
if $data07 != 1.100000000 then
print ======$data05
print ======$data07
return -1
endi
if $data08 != 13 then
print ======$data05
print ======$data08
return -1
endi
......
......@@ -187,4 +187,14 @@ if $data34 != 19 then
return -1
endi
sql insert into t1 values(1648791000000,1,1,1,1.1,23);
sleep 300
sql select * from streamt order by s desc;
# row 0
if $data01 != 1 then
print ======$data01
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
sql create stream streams1 trigger at_once into streamt1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
sql select * from streamt1 order by c desc;
sleep 300
if $rows != 1 then
print ======$rows
return -1;
endi
sql insert into t1 values(1648791214000,1,2,3,1.0,3);
sql select * from streamt1 order by c desc;
sleep 300
if $rows != 1 then
print ======$rows
return -1;
endi
sql insert into t1 values(1648791213010,2,2,3,1.0,4);
sql insert into t1 values(1648791213000,1,2,3,1.0,5);
sql insert into t1 values(1648791214000,1,2,3,1.0,6);
$loop_count = 0
loop1:
sql select * from streamt1 where c >=4 order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print ======$rows
goto loop1
return -1
endi
# row 0
if $data01 != 1 then
print ======$data01
return -1
endi
if $data02 != 1 then
print ======$data02
return -1
endi
if $data03 != 1 then
print ======$data03
return -1
endi
if $data04 != 1 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
if $data06 != 5 then
print ======$data06
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 2 then
print ======$data13
return -1
endi
if $data14 != 2 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
if $data16 != 4 then
print ======$data16
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 1 then
print ======$data23
return -1
endi
if $data24 != 1 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
if $data26 != 6 then
print ======$data26
return -1
endi
sql insert into t1 values(1648791213011,1,2,3,1.0,7);
loop2:
$loop_count = 0
sql select * from streamt1 where c in (5,4,7) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 2
if $data21 != 2 then
print ======$data21
goto loop2
return -1
endi
if $data22 != 2 then
print ======$data22
goto loop2
return -1
endi
if $data23 != 2 then
print ======$data23
goto loop2
return -1
endi
if $data24 != 1 then
print ======$data24
goto loop2
return -1
endi
if $data25 != 3 then
print ======$data25
goto loop2
return -1
endi
if $data26 != 7 then
print ======$data26
goto loop2
return -1
endi
sql insert into t1 values(1648791213011,1,2,3,1.0,8);
loop21:
$loop_count = 0
sql select * from streamt1 where c in (5,4,8) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data26 != 8 then
print ======$data26
goto loop21
return -1
endi
sql insert into t1 values(1648791213020,1,2,3,1.0,9);
sql insert into t1 values(1648791213020,3,2,3,1.0,10);
sql insert into t1 values(1648791214000,1,2,3,1.0,11);
sql insert into t1 values(1648791213011,10,20,10,10.0,12);
loop3:
$loop_count = 0
sql select * from streamt1 where c in (5,4,10,11,12) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
goto loop3
return -1
endi
if $data22 != 1 then
print ======$data22
goto loop3
return -1
endi
if $data23 != 10 then
print ======$data23
goto loop3
return -1
endi
if $data24 != 10 then
print ======$data24
goto loop3
return -1
endi
if $data25 != 10 then
print ======$data25
goto loop3
return -1
endi
if $data26 != 12 then
print ======$data26
goto loop3
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
goto loop3
return -1
endi
if $data32 != 1 then
print ======$data32
goto loop3
return -1
endi
if $data33 != 3 then
print ======$data33
goto loop3
return -1
endi
if $data34 != 3 then
print ======$data34
goto loop3
return -1
endi
if $data35 != 3 then
print ======$data35
goto loop3
return -1
endi
if $data36 != 10 then
print ======$data36
goto loop3
return -1
endi
# row 4
if $data41 != 1 then
print ======$data41
goto loop3
return -1
endi
if $data42 != 1 then
print ======$data42
goto loop3
return -1
endi
if $data43 != 1 then
print ======$data43
goto loop3
return -1
endi
if $data44 != 1 then
print ======$data44
goto loop3
return -1
endi
if $data45 != 3 then
print ======$data45
goto loop3
return -1
endi
if $data46 != 11 then
print ======$data46
goto loop3
return -1
endi
sql insert into t1 values(1648791213030,3,12,12,12.0,13);
sql insert into t1 values(1648791214040,1,13,13,13.0,14);
sql insert into t1 values(1648791213030,3,14,14,14.0,15) (1648791214020,15,15,15,15.0,16);
loop4:
$loop_count = 0
sql select * from streamt1 where c in (14,15,16) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print ======$rows
goto loop4
return -1;
endi
# row 0
if $data01 != 2 then
print ======$data01
goto loop4
return -1
endi
if $data02 != 2 then
print ======$data02
goto loop4
return -1
endi
if $data03 != 6 then
print ======$data03
goto loop4
return -1
endi
if $data04 != 3 then
print ======$data04
goto loop4
return -1
endi
if $data05 != 3 then
print ======$data05
goto loop4
return -1
endi
if $data06 != 15 then
print ======$data06
goto loop4
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
goto loop4
return -1
endi
if $data12 != 1 then
print ======$data12
goto loop4
return -1
endi
if $data13 != 15 then
print ======$data13
goto loop4
return -1
endi
if $data14 != 15 then
print ======$data14
goto loop4
return -1
endi
if $data15 != 15 then
print ======$data15
goto loop4
return -1
endi
if $data16 != 16 then
print ======$data16
goto loop4
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
goto loop4
return -1
endi
if $data22 != 1 then
print ======$data22
goto loop4
return -1
endi
if $data23 != 1 then
print ======$data23
goto loop4
return -1
endi
if $data24 != 1 then
print ======$data24
goto loop4
return -1
endi
if $data25 != 13 then
print ======$data25
goto loop4
return -1
endi
if $data26 != 14 then
print ======$data26
goto loop4
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -102,4 +102,4 @@ if $data21 != 1 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册