提交 47ae534c 编写于 作者: L Liu Jicong

fix(sma): drop stream when drop sma

上级 814b3caa
...@@ -34,6 +34,11 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); ...@@ -34,6 +34,11 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
// for sma
// TODO refactor
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -857,6 +857,24 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p ...@@ -857,6 +857,24 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
if (pStream == NULL || pStream->smaId != pSma->uid) {
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
} else {
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
}
// drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
}
}
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
......
...@@ -494,7 +494,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { ...@@ -494,7 +494,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
return 0; return 0;
} }
static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t lv = taosArrayGetSize(pStream->tasks); int32_t lv = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < lv; i++) { for (int32_t i = 0; i < lv; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i); SArray *pTasks = taosArrayGetP(pStream->tasks, i);
......
...@@ -43,7 +43,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -43,7 +43,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) { if (pTag == NULL) {
taosArrayDestroy(schemaReqs); terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
return NULL; return NULL;
} }
......
...@@ -347,7 +347,6 @@ typedef struct SStreamBlockScanInfo { ...@@ -347,7 +347,6 @@ typedef struct SStreamBlockScanInfo {
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SArray* childIds; SArray* childIds;
SessionWindowSupporter sessionSup; SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; int32_t scanWinIndex;
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
......
...@@ -19,8 +19,7 @@ ...@@ -19,8 +19,7 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "vnode.h" #include "vnode.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -33,12 +32,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -33,12 +32,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, assignUid, id); return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else { } else {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid;
// TODO: if a block was set but not consumed, // TODO: if a block was set but not consumed,
// prevent setting a different type of block // prevent setting a different type of block
...@@ -76,7 +74,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { ...@@ -76,7 +74,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, 0, NULL); return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, NULL);
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
...@@ -94,8 +92,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -94,8 +92,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -507,20 +507,21 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -507,20 +507,21 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if(pInfo->currentGroupId == -1){ if (pInfo->currentGroupId == -1) {
pInfo->currentGroupId++; pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
setTaskStatus(pTaskInfo, TASK_COMPLETED); setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL; return NULL;
} }
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbCleanupReadHandle(pInfo->dataReader); tsdbCleanupReadHandle(pInfo->dataReader);
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId); tsdbReaderT* pReader =
tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
pInfo->dataReader = pReader; pInfo->dataReader = pReader;
} }
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doTableScanGroup(pOperator);
if(result){ if (result) {
return result; return result;
} }
...@@ -530,7 +531,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -530,7 +531,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbSetTableList(pInfo->dataReader, tableList); tsdbSetTableList(pInfo->dataReader, tableList);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
...@@ -538,7 +539,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -538,7 +539,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
result = doTableScanGroup(pOperator); result = doTableScanGroup(pOperator);
if(result){ if (result) {
return result; return result;
} }
...@@ -822,7 +823,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -822,7 +823,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0; pTableScanInfo->curTWinIdx = 0;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); // tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1; pTableScanInfo->currentGroupId = -1;
return true; return true;
...@@ -1030,14 +1031,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -1030,14 +1031,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = numOfRows; pInfo->pRes->info.capacity = numOfRows;
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = uid;
} else {
pInfo->pRes->info.groupId = groupId;
}
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
if (groupIdPre) { if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre; pInfo->pRes->info.groupId = *groupIdPre;
...@@ -1132,9 +1125,9 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { ...@@ -1132,9 +1125,9 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
return tableIdList; return tableIdList;
} }
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId) { uint64_t taskId) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -1934,11 +1927,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -1934,11 +1927,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error; goto _error;
} }
pInfo->pTableList = pTableListInfo; pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList; pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createResDataBlock(pDescNode);
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
pOperator->name = "TagScanOperator"; pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
...@@ -2216,8 +2209,8 @@ SArray* generateSortByTsInfo(int32_t order) { ...@@ -2216,8 +2209,8 @@ SArray* generateSortByTsInfo(int32_t order) {
return pList; return pList;
} }
static int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, SArray* tableList, SArray* arrayReader, uint64_t queryId, static int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, SArray* tableList,
uint64_t taskId) { SArray* arrayReader, uint64_t queryId, uint64_t taskId) {
for (int32_t i = 0; i < taosArrayGetSize(tableList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(tableList); ++i) {
SArray* tmp = taosArrayInit(1, sizeof(STableKeyInfo)); SArray* tmp = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(tmp, taosArrayGet(tableList, i)); taosArrayPush(tmp, taosArrayGet(tableList, i));
...@@ -2237,13 +2230,13 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2237,13 +2230,13 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId); SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId);
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableList, createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableList, pInfo->dataReaders, pInfo->queryId,
pInfo->dataReaders, pInfo->queryId, pInfo->taskId); pInfo->taskId);
// todo the total available buffer should be determined by total capacity of buffer of this task. // todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result // the additional one is reserved for merge result
int32_t tableLen = taosArrayGetSize(tableList); int32_t tableLen = taosArrayGetSize(tableList);
pInfo->sortBufSize = pInfo->bufPageSize * ((tableLen==0?1:tableLen) + 1); pInfo->sortBufSize = pInfo->bufPageSize * ((tableLen == 0 ? 1 : tableLen) + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str); pInfo->pSortInputBlock, pTaskInfo->id.str);
...@@ -2342,7 +2335,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2342,7 +2335,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) { if (pBlock != NULL) {
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t)); uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
if(groupId) pBlock->info.groupId = *groupId; if (groupId) pBlock->info.groupId = *groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock; return pBlock;
...@@ -2359,7 +2352,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2359,7 +2352,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) { if (pBlock != NULL) {
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t)); uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
if(groupId) pBlock->info.groupId = *groupId; if (groupId) pBlock->info.groupId = *groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock; return pBlock;
......
...@@ -103,6 +103,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { ...@@ -103,6 +103,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
wError("cannot open file %s, since %s", fnameStr, terrstr()); wError("cannot open file %s, since %s", fnameStr, terrstr());
return -1; return -1;
} }
pRead->pReadLogTFile = pLogTFile;
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
...@@ -112,7 +113,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { ...@@ -112,7 +113,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1; return -1;
} }
pRead->pReadLogTFile = pLogTFile;
pRead->pReadIdxTFile = pIdxTFile; pRead->pReadIdxTFile = pIdxTFile;
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册