diff --git a/.gitignore b/.gitignore index f70f5987b27b829ec74d8fd58e7fe29b6c0c6393..d5c7f763cf41939b0e577fc0ce72a2d8bf2436b6 100644 --- a/.gitignore +++ b/.gitignore @@ -110,3 +110,4 @@ contrib/* !contrib/CMakeLists.txt !contrib/test sql +debug*/ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d27e274eba8f15a3a70902b43676e6f6c8537d94..647da79e563a9d697c556b73c6aad2b701516fb2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1010,6 +1010,10 @@ typedef struct { SReplica replicas[TSDB_MAX_REPLICA]; int32_t numOfRetensions; SArray* pRetensions; // SRetention + + // for tsma + int8_t isTsma; + } SCreateVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3c5278011a650fc03b92ac32b0ac7edec9b1065f..38602667252e429eb9840c75d2c23b98139df184 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -214,6 +214,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_FILL, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW, + QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW, QUERY_NODE_PHYSICAL_PLAN_PARTITION, QUERY_NODE_PHYSICAL_PLAN_DISPATCH, diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5e1405e0c64ec932e9296f5a0d852657bf98349a..708e1b2df4ca8de64c6dbbe4df2aec96c4abe41f 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -196,12 +196,14 @@ static const SSysDbTableSchema vgroupsSchema[] = { {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "nfiles", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, }; static const SSysDbTableSchema smaSchema[] = { {.name = "sma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, }; static const SSysDbTableSchema transSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6066a5fbfb090f7ea5dac74e0d385f5abd0561c7..22ac4124340752415ef320effacc781568b20897 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2917,6 +2917,8 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1; if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1; } + + if (tEncodeI8(&encoder, pReq->isTsma) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2979,6 +2981,8 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * } } + if (tDecodeI8(&decoder, &pReq->isTsma) < 0) return -1; + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 10e6ad4e1e3d5f942d0850d59b9a2d80a3d4b62b..0af503bbcff4db55d9dcf660fbe174612d0e8073 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -183,7 +183,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - dDebug("vgId:%d, create vnode req is received", createReq.vgId); + dDebug("vgId:%d, create vnode req is received, tsma:%d", createReq.vgId, createReq.isTsma); SVnodeCfg vnodeCfg = {0}; vmGenerateVnodeCfg(&createReq, &vnodeCfg); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8ad1131bc3e933206f63283bb913746680014704..6318f2e3f2b790b5aff25e774c762a8bb5f4c4da 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -330,6 +330,7 @@ typedef struct { int64_t compStorage; int64_t pointsWritten; int8_t compact; + int8_t isTsma; int8_t replica; SVnodeGid vnodeGid[TSDB_MAX_REPLICA]; } SVgObj; @@ -588,7 +589,8 @@ typedef struct { int8_t status; int8_t createdBy; // STREAM_CREATED_BY__USER or SMA int32_t fixedSinkVgId; // 0 for shuffle - int64_t smaId; // 0 for unused + SVgObj fixedSinkVg; + int64_t smaId; // 0 for unused int8_t trigger; int32_t triggerParam; int64_t waterMark; diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 9bf7b6eb8937cee5078ddab38e04810e77734d05..c9099b6b050481b78030befbe93de59139df1b27 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -30,6 +30,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); +int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); SArray *mndBuildDnodesArray(SMnode *pMnode); int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6921235f8bfdb007811dc92ccff0514a68999824..95d3383ee10e378c4c5a66e9d16de4fda90db9ed 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1155,7 +1155,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (NULL == pDb || pVgroup->dbUid == pDb->uid) { + if ((NULL == pDb || pVgroup->dbUid == pDb->uid) && !pVgroup->isTsma) { SVgroupInfo vgInfo = {0}; vgInfo.vgId = pVgroup->vgId; vgInfo.hashBegin = pVgroup->hashBegin; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index d62ca3a2c901496ab0fe09505a4d81a8320809c3..204c6c16a3844850a99731c977e93eb87fd6032c 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -237,11 +237,14 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr taosArrayPush(tasks, &pTask); pTask->nodeId = pStream->fixedSinkVgId; +#if 0 SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); if (pVgroup == NULL) { return -1; } pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); +#endif + pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); // source pTask->sourceType = TASK_SOURCE__MERGE; pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; @@ -263,7 +266,8 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr // dispatch pTask->dispatchType = TASK_DISPATCH__NONE; - mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); + /*mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);*/ + mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pStream->fixedSinkVg.vgId); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index b38e901d49dee8e6c93dc629824e51bf44c71e0f..cb7d3e81f631c34d278f15c7427c141dbf30ead9 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -295,9 +295,9 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm } static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) { - SEncoder encoder = {0}; - int32_t contLen; - SName name = {0}; + SEncoder encoder = {0}; + int32_t contLen; + SName name = {0}; tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); SVDropTSmaReq req = {0}; @@ -354,6 +354,22 @@ static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj return 0; } +static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) { + SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); + if (pVgRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; + if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1; + return 0; +} + +static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) { + SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); + if (pVgRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1; + if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1; + return 0; +} + static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; @@ -393,6 +409,34 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } +static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { + SVnodeGid *pVgid = pVgroup->vnodeGid + 0; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + // todo add sma info here + + int32_t contLen = 0; + void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_DND_CREATE_VNODE; + action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) { SSmaObj smaObj = {0}; memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); @@ -448,9 +492,14 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.version = 1; streamObj.sql = pCreate->sql; streamObj.createdBy = STREAM_CREATED_BY__SMA; - streamObj.fixedSinkVgId = smaObj.dstVgId; streamObj.smaId = smaObj.uid; - /*streamObj.physicalPlan = "";*/ + + if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) { + mError("sma:%s, failed to create since %s", smaObj.name, terrstr()); + return -1; + } + smaObj.dstVgId = streamObj.fixedSinkVg.vgId; + streamObj.fixedSinkVgId = smaObj.dstVgId; int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, pReq); @@ -460,8 +509,11 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea mndTransSetDbInfo(pTrans, pDb); if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; + if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; + if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; + if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; @@ -480,7 +532,6 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) { if (pCreate->intervalUnit < 0) return -1; if (pCreate->slidingUnit < 0) return -1; if (pCreate->timezone < 0) return -1; - if (pCreate->dstVgId < 0) return -1; if (pCreate->interval < 0) return -1; if (pCreate->offset < 0) return -1; if (pCreate->sliding < 0) return -1; @@ -602,6 +653,24 @@ static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj * return 0; } +static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) { + SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); + if (pVgRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; + if (sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) { + SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); + if (pVgRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1; + if (sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED) != 0) return -1; + + return 0; +} + static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; @@ -643,23 +712,59 @@ static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * return 0; } +static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { + SVnodeGid *pVgid = pVgroup->vnodeGid + 0; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + int32_t contLen = 0; + void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_DND_DROP_VNODE; + action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_SMA, pReq); + SVgObj *pVgroup = NULL; + STrans *pTrans = NULL; + + pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); + if (pVgroup == NULL) goto _OVER; + + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_SMA, pReq); if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); mndTransSetDbInfo(pTrans, pDb); if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER; + if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; _OVER: mndTransDrop(pTrans); + mndReleaseVgroup(pMnode, pVgroup); return code; } @@ -846,6 +951,9 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)n1, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false); + numOfRows++; sdbRelease(pSdb, pSma); } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 444c4bb61998d41e2bbf57ae584c2dc9bc97b026..a7480f459aa5c03c5c5550b4d22767e0e5badc5a 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -829,7 +829,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { sendRsp = true; } } else { - if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) { + if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 6) { if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; sendRsp = true; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 62021c6a7edc467bd7cd62fba9ef9eddbef1193b..e05b38a7c0345293eb53caeab2eb680f6d113651 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -80,6 +80,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER) SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER) + SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER) SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER) for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; @@ -127,6 +128,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER) SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER) + SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER) SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER) for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; @@ -167,6 +169,7 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) { pOld->hashBegin = pNew->hashBegin; pOld->hashEnd = pNew->hashEnd; pOld->replica = pNew->replica; + pOld->isTsma = pNew->isTsma; memcpy(pOld->vnodeGid, pNew->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid)); return 0; } @@ -426,6 +429,25 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr return 0; } +int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) { + SArray *pArray = mndBuildDnodesArray(pMnode); + if (pArray == NULL) return -1; + + pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + pVgroup->isTsma = 1; + pVgroup->createdTime = taosGetTimestampMs(); + pVgroup->updateTime = pVgroup->createdTime; + pVgroup->version = 1; + memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN); + pVgroup->dbUid = pDb->uid; + pVgroup->replica = 1; + + if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) return -1; + + mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId); + return 0; +} + int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { int32_t code = -1; SArray *pArray = NULL; @@ -702,9 +724,12 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppendNULL(pColInfo, numOfRows); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppendNULL(pColInfo, numOfRows); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false); + numOfRows++; sdbRelease(pSdb, pVgroup); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ec204a8f60645cc05fe83c4486a8da0f627dac20..b36d3566bf0abaf3c12760cb6fdc7311195e75da 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -465,6 +465,7 @@ typedef struct SStreamFinalIntervalOperatorInfo { SAggSupporter aggSup; // aggregate supporter int32_t order; // current SSDataBlock scan order STimeWindowAggSupp twAggSup; + SArray* pChildren; } SStreamFinalIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -581,6 +582,7 @@ typedef struct SStreamSessionAggOperatorInfo { SSDataBlock* pDelRes; SHashObj* pStDeleted; void* pDelIterator; + SArray* pChildren; // cache for children's result; } SStreamSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { @@ -722,7 +724,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); + STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, @@ -797,7 +799,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, +int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey); SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4ed6a2de0d7c9c8d6f8ce7ba4353a3eb2aab0d6f..998d5eb02df325a93579c6e120925acd0890694c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -829,33 +829,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { - if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { - SSDataBlock* pDB = getDataFromCatch(pInfo); - if (pDB != NULL) { - return pDB; - } else { - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } - } - if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); pOperator->status = OP_EXEC_DONE; return NULL; } - int32_t current = pInfo->validBlockIndex++; - SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); - if (pBlock->info.type == STREAM_REPROCESS) { - pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; - } else { - int32_t code = catchDatablock(pBlock, &pInfo->childAggSup, pInfo->primaryTsIndex, 0); - if (code != TDB_CODE_SUCCESS) { - pTaskInfo->code = code; - longjmp(pTaskInfo->env, code); - } - } - return pBlock; + int32_t current = pInfo->validBlockIndex++; + return taosArrayGetP(pInfo->pBlockLists, current); } else { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { blockDataDestroy(pInfo->pUpdateRes); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9346dbf54a7037d5f072619fa07fef6b4dfde58c..6965771c730345dcf4802bb9b101a295771b3deb 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1067,7 +1067,8 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, } static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, - SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock) { + SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock, + SArray* pUpWins) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; int32_t step = 0; @@ -1079,6 +1080,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); doClearWindow(pSup, pBinfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput); + if (pUpWins) { + taosArrayPush(pUpWins, &win); + } } } @@ -1119,7 +1123,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_REPROCESS) { doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, - pOperator->numOfExprs, pBlock); + pOperator->numOfExprs, pBlock, NULL); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; } @@ -1154,6 +1158,15 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo *)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupAggSup(&pInfo->aggSup); + if (pInfo->pChildren) { + int32_t size = taosArrayGetSize(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); + destroyIntervalOperatorInfo(pChildOp->info, numOfOutput); + taosMemoryFreeClear(pChildOp->info); + taosMemoryFreeClear(pChildOp); + } + } } bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { @@ -1228,32 +1241,38 @@ _error: SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, - SExecTaskInfo* pTaskInfo) { + STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->order = TSDB_ORDER_ASC; pInfo->interval = *pInterval; pInfo->twAggSup = *pTwAggSupp; pInfo->primaryTsIndex = primaryTsSlotId; - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096); - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); + int32_t numOfChild = 8;// Todo(liuyao) get it from phy plan + pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo)); + for (int32_t i = 0; i < numOfChild; i++) { + SSDataBlock* chRes = createOneDataBlock(pResBlock, false); + SOperatorInfo* pChildOp = createIntervalOperatorInfo(NULL, pExprInfo, numOfCols, + chRes, pInterval, primaryTsSlotId, pTwAggSupp, NULL, pTaskInfo); + if (pChildOp && chRes) { + taosArrayPush(pInfo->pChildren, &pChildOp); + continue; + } + goto _error; + } pOperator->name = "StreamFinalIntervalOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; @@ -1703,6 +1722,51 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB return pUpdated; } +bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { + return pInfo->pChildren != NULL; +} + +void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, + int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { + for (int32_t k = 0; k < numOfOutput; ++k) { + if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) { + continue; + } + int32_t code = TSDB_CODE_SUCCESS; + if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { + code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); + if (code != TSDB_CODE_SUCCESS) { + qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); + pTaskInfo->code = code; + longjmp(pTaskInfo->env, code); + } + } + } +} + +static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArray *pWinArray, + int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { + int32_t size = taosArrayGetSize(pWinArray); + ASSERT(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + STimeWindow* pParentWin = taosArrayGet(pWinArray, i); + SResultRow* pCurResult = NULL; + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, + pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, + pTaskInfo); + int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); + for (int32_t j = 0; j < numOfChildren; j++) { + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); + SIntervalAggOperatorInfo* pChInfo = pChildOp->info; + SResultRow* pChResult = NULL; + setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult, + 0, pChInfo->binfo.pCtx, pChildOp->numOfExprs, pChInfo->binfo.rowCellInfoOffset, + &pChInfo->aggSup, pTaskInfo); + compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo); + } + } +} + static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; @@ -1726,10 +1790,26 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); if (pBlock->info.type == STREAM_REPROCESS) { + SArray *pUpWins = taosArrayInit(8, sizeof(STimeWindow)); doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, - pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock); + pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock, pUpWins); + if (isFinalInterval(pInfo)) { + int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + SIntervalAggOperatorInfo* pChildInfo = pChildOp->info; + doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, + pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL); + rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, + pOperator->numOfExprs, pOperator->pTaskInfo); + } + taosArrayDestroy(pUpWins); continue; } + if (isFinalInterval(pInfo)) { + int32_t chIndex = 1; //Todo(liuyao) get it from SSDataBlock + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); + doStreamIntervalAgg(pChildOp); + } pUpdated = doHashInterval(pOperator, pBlock, 0); } @@ -1752,6 +1832,16 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(&pInfo->binfo, numOfOutput); destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupGroupResInfo(&pInfo->groupResInfo); + if (pInfo->pChildren != NULL) { + int32_t size = taosArrayGetSize(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo *pChild = taosArrayGetP(pInfo->pChildren, i); + SStreamSessionAggOperatorInfo* pChInfo = pChild->info; + destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput); + taosMemoryFreeClear(pChild); + taosMemoryFreeClear(pChInfo); + } + } } int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, @@ -1780,6 +1870,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamSessionAggOperatorInfo* pI SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_OUT_OF_MEMORY; SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -1789,7 +1880,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, initResultSizeInfo(pOperator, 4096); - int32_t code = initStreamAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); + code = initStreamAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1820,6 +1911,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, pInfo->pDelIterator = NULL; pInfo->pDelRes = createOneDataBlock(pResBlock, false); blockDataEnsureCapacity(pInfo->pDelRes, 64); + pInfo->pChildren = NULL; pOperator->name = "StreamSessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW; @@ -2068,24 +2160,6 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) return size - startIndex - 1; } -void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, - int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { - for (int32_t k = 0; k < numOfOutput; ++k) { - if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) { - continue; - } - int32_t code = TSDB_CODE_SUCCESS; - if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { - code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); - if (code != TSDB_CODE_SUCCESS) { - qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); - pTaskInfo->code = code; - longjmp(pTaskInfo->env, code); - } - } - } -} - void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pResultRows, startIndex); @@ -2164,7 +2238,7 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator, } static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* pBinfo, - SSDataBlock* pBlock, int32_t tsIndex, int32_t numOfOutput, int64_t gap) { + SSDataBlock* pBlock, int32_t tsIndex, int32_t numOfOutput, int64_t gap, SArray* result) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; int32_t step = 0; @@ -2173,7 +2247,11 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex); step = updateSessionWindowInfo(pCurWin, tsCols, pBlock->info.rows, i, gap, NULL); + ASSERT(isInWindow(pCurWin, tsCols[i], gap)); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pBinfo, numOfOutput); + if (result) { + taosArrayPush(result, pCurWin); + } } } @@ -2215,6 +2293,42 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It } } +static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray *pWinArray, + int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { + int32_t size = taosArrayGetSize(pWinArray); + ASSERT(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + SResultWindowInfo* pParentWin = taosArrayGet(pWinArray, i); + SResultRow* pCurResult = NULL; + setWindowOutputBuf(pParentWin, &pCurResult, pInfo->binfo.pCtx, groupId, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo); + int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); + for (int32_t j = 0; j < numOfChildren; j++) { + SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); + SStreamSessionAggOperatorInfo* pChInfo = pChild->info; + SArray* pChWins = pChInfo->streamAggSup.pResultRows; + int32_t chWinSize = taosArrayGetSize(pChWins); + int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, + TSDB_ORDER_DESC, getSessionWindowEndkey); + for (int32_t k = index; k > 0 && k < chWinSize; k++) { + SResultWindowInfo* pcw = taosArrayGet(pChWins, k); + if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { + SResultRow* pChResult = NULL; + setWindowOutputBuf(pcw, &pChResult, pChInfo->binfo.pCtx, groupId, + numOfOutput, pChInfo->binfo.rowCellInfoOffset, &pChInfo->streamAggSup, pTaskInfo); + compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo); + continue; + } + break; + } + } + } +} + +bool isFinalSession(SStreamSessionAggOperatorInfo* pInfo) { + return pInfo->pChildren != NULL; +} + static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2247,10 +2361,25 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); if (pBlock->info.type == STREAM_REPROCESS) { + SArray *pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); doClearSessionWindows(&pInfo->streamAggSup, &pInfo->binfo, pBlock, 0, - pOperator->numOfExprs, pInfo->gap); + pOperator->numOfExprs, pInfo->gap, pWins); + if (isFinalSession(pInfo)) { + int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; + doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0, + pChildOp->numOfExprs, pChildInfo->gap, NULL); + rebuildTimeWindow(pInfo, pWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo); + } + taosArrayDestroy(pWins); continue; } + if (isFinalSession(pInfo)) { + int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock + SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + doStreamSessionWindowAggImpl(pOperator, pBlock, NULL, NULL); + } doStreamSessionWindowAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted); } @@ -2271,3 +2400,39 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { pInfo->streamAggSup.pResultBuf); return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } + +SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, + SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, + int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_OUT_OF_MEMORY; + SStreamSessionAggOperatorInfo* pInfo = NULL; + SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pExprInfo, + numOfCols, pResBlock, gap, tsSlotId, pTwAggSupp, pTaskInfo); + if (pOperator == NULL) { + goto _error; + } + pOperator->name = "StreamFinalSessionWindowAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW; + int32_t numOfChild = 1; //Todo(liuyao) get it from phy plan + pInfo = pOperator->info; + pInfo->pChildren = taosArrayInit(8, sizeof(void *)); + for (int32_t i = 0; i < numOfChild; i++) { + SOperatorInfo* pChild = createStreamSessionAggOperatorInfo(NULL, pExprInfo, + numOfCols, NULL, gap, tsSlotId, pTwAggSupp, pTaskInfo); + if (pChild == NULL) { + goto _error; + } + taosArrayPush(pInfo->pChildren, &pChild); + } + return pOperator; + +_error: + if (pInfo != NULL) { + destroyStreamSessionAggOperatorInfo(pInfo, numOfCols); + } + + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +}