未验证 提交 bdd4134f 编写于 作者: L Liu Jicong 提交者: GitHub

Revert "fix(sma): drop stream when drop sma"

上级 e571567e
...@@ -34,11 +34,6 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); ...@@ -34,11 +34,6 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
// for sma
// TODO refactor
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -857,24 +857,6 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p ...@@ -857,24 +857,6 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
if (pStream == NULL || pStream->smaId != pSma->uid) {
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
} else {
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
}
// drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
}
}
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
......
...@@ -490,7 +490,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { ...@@ -490,7 +490,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
return 0; return 0;
} }
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t lv = taosArrayGetSize(pStream->tasks); int32_t lv = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < lv; i++) { for (int32_t i = 0; i < lv; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i); SArray *pTasks = taosArrayGetP(pStream->tasks, i);
......
...@@ -43,7 +43,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -43,7 +43,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) { if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; taosArrayDestroy(schemaReqs);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
return NULL; return NULL;
} }
......
...@@ -348,6 +348,7 @@ typedef struct SStreamBlockScanInfo { ...@@ -348,6 +348,7 @@ typedef struct SStreamBlockScanInfo {
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SArray* childIds; SArray* childIds;
SessionWindowSupporter sessionSup; SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex; int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock SSDataBlock* pPullDataRes; // pull data SSDataBlock
......
...@@ -19,7 +19,8 @@ ...@@ -19,7 +19,8 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "vnode.h" #include "vnode.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid,
char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -32,11 +33,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -32,11 +33,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, assignUid, id);
} else { } else {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid;
// TODO: if a block was set but not consumed, // TODO: if a block was set but not consumed,
// prevent setting a different type of block // prevent setting a different type of block
...@@ -74,7 +76,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { ...@@ -74,7 +76,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, NULL); return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, 0, NULL);
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
...@@ -92,7 +94,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -92,7 +94,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); int32_t code =
doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -1051,6 +1051,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -1051,6 +1051,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = numOfRows; pInfo->pRes->info.capacity = numOfRows;
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = uid;
} else {
pInfo->pRes->info.groupId = groupId;
}
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
if (groupIdPre) { if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre; pInfo->pRes->info.groupId = *groupIdPre;
......
...@@ -103,7 +103,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { ...@@ -103,7 +103,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
wError("cannot open file %s, since %s", fnameStr, terrstr()); wError("cannot open file %s, since %s", fnameStr, terrstr());
return -1; return -1;
} }
pRead->pReadLogTFile = pLogTFile;
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
...@@ -113,6 +112,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { ...@@ -113,6 +112,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1; return -1;
} }
pRead->pReadLogTFile = pLogTFile;
pRead->pReadIdxTFile = pIdxTFile; pRead->pReadIdxTFile = pIdxTFile;
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册