diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 5e9089cec9fa3d80921a22df7d7492062c1e39e4..69385c3a46b7f129bc1704385a0b723f4671a2d0 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -34,11 +34,6 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -// for sma -// TODO refactor -int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index d880fb28d432874a6e0bf15f733cb9cf90eab0bb..ef24cd0ba45c80eac2ec38176feccd23ec1e9e3e 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -857,24 +857,6 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); mndTransSetDbName(pTrans, pDb->name, NULL); - SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name); - if (pStream == NULL || pStream->smaId != pSma->uid) { - sdbRelease(pMnode->pSdb, pStream); - goto _OVER; - } else { - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { - mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); - sdbRelease(pMnode->pSdb, pStream); - goto _OVER; - } - - // drop stream - if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { - sdbRelease(pMnode->pSdb, pStream); - goto _OVER; - } - } - if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 444a8864efaecbee19012f7c93c20f4bf67a351e..b78756d8b887a1e3939f2f1f3351a2693ec606d4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -490,7 +490,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { +static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t lv = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < lv; i++) { SArray *pTasks = taosArrayGetP(pStream->tasks, i); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 0bb9918488c7d0da7a0e9a3ab981e1739a326188..9abc2f639b8739c79189c5c7de2a3f48e1fb82f0 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -43,7 +43,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo taosArrayPush(tagArray, &tagVal); tTagNew(tagArray, 1, false, &pTag); if (pTag == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(schemaReqs); taosArrayDestroy(tagArray); return NULL; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index eb2c6f51025b8ecb958de65fd8dbe774f376a27b..36f81e86ffeebbec4a5d94a0b139359a578e862c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -348,6 +348,7 @@ typedef struct SStreamBlockScanInfo { SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SArray* childIds; SessionWindowSupporter sessionSup; + bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. int32_t scanWinIndex; // for state operator int32_t pullDataResIndex; SSDataBlock* pPullDataRes; // pull data SSDataBlock diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3fd491885fcf18ac5d35d8d9fe3d6ecbafdd0145..6de364e63a3c2bbe6e5044a53a7f916b1d2d8da1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -19,7 +19,8 @@ #include "tdatablock.h" #include "vnode.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, + char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -32,11 +33,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu return TSDB_CODE_QRY_APP_ERROR; } pOperator->status = OP_NOT_OPENED; - return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); + return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, assignUid, id); } else { pOperator->status = OP_NOT_OPENED; SStreamBlockScanInfo* pInfo = pOperator->info; + pInfo->assignBlockUid = assignUid; // TODO: if a block was set but not consumed, // prevent setting a different type of block @@ -74,7 +76,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { return TSDB_CODE_QRY_APP_ERROR; } SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, NULL); + return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, 0, NULL); } int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { @@ -92,7 +94,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); + int32_t code = + doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index aadf7b3b7e0354f0fb3fa64cf5f3cefc1dcb366a..15803a4b7342b2bb03443b3430b40034059ca8f0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1051,6 +1051,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.capacity = numOfRows; + // for generating rollup SMA result, each time is an independent time serie. + // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this + if (pInfo->assignBlockUid) { + pInfo->pRes->info.groupId = uid; + } else { + pInfo->pRes->info.groupId = groupId; + } + uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); if (groupIdPre) { pInfo->pRes->info.groupId = *groupIdPre; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 20fa5f1f2b20c0f5fc467bc53a79aaea68447a74..682afbb785784f0e158f95750fc376130a6fd696 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -103,7 +103,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { wError("cannot open file %s, since %s", fnameStr, terrstr()); return -1; } - pRead->pReadLogTFile = pLogTFile; walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); @@ -113,6 +112,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { return -1; } + pRead->pReadLogTFile = pLogTFile; pRead->pReadIdxTFile = pIdxTFile; return 0; }