提交 9ea0ad57 编写于 作者: L Liu Jicong

refactor(stream): destroy stream task

上级 a717042a
......@@ -343,7 +343,7 @@ tmq_list_t* tmq_list_new() {
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
SArray* container = &list->container;
char* topic = strDupUnquo(src);
char* topic = strdup(src);
if (taosArrayPush(container, &topic) == NULL) return -1;
return 0;
}
......
......@@ -235,7 +235,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
}
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL;
SNode *pAst = NULL;
SQueryPlan *pPlan = NULL;
mDebug("stream:%s to create", pCreate->name);
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
......@@ -293,7 +294,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
goto FAIL;
}
SQueryPlan *pPlan = NULL;
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
......@@ -317,6 +317,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
FAIL:
if (pAst != NULL) nodesDestroyNode(pAst);
if (pPlan != NULL) qDestroyQueryPlan(pPlan);
return 0;
}
......@@ -541,7 +542,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// build stream obj from request
SStreamObj streamObj = {0};
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
ASSERT(0);
/*ASSERT(0);*/
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
......@@ -689,7 +690,14 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
return -1;
} else {
// TODO drop all task on snode
#if 0
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1;
}
#endif
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
......
......@@ -82,6 +82,13 @@ void tqClose(STQ* pTq) {
if (pTq) {
tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->handles);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
tFreeSStreamTask(pTask);
}
taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr);
taosMemoryFree(pTq->path);
......@@ -608,7 +615,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
streamSetupTrigger(pTask);
tqInfo("deploy stream task id %d child id %d on vgId:%d", pTask->taskId, pTask->selfChildId, TD_VID(pTq->pVnode));
tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
pTask->selfChildId);
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
......@@ -634,9 +642,6 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
continue;
}
if (!pTask->isDataScan) continue;
if (!failed) {
......@@ -665,11 +670,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
if (pTask) {
streamProcessRunReq(pTask);
return 0;
} else {
return -1;
}
streamProcessRunReq(pTask);
return 0;
}
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
......@@ -682,55 +688,62 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
if (pTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp);
return 0;
} else {
return -1;
}
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp);
return 0;
}
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
if (pTask) {
streamProcessRecoverReq(pTask, pReq, pMsg);
return 0;
} else {
return -1;
}
streamProcessRecoverReq(pTask, pReq, pMsg);
return 0;
}
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp);
return 0;
} else {
return -1;
}
streamProcessDispatchRsp(pTask, pRsp);
return 0;
}
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
if (pTask) {
streamProcessRecoverRsp(pTask, pRsp);
return 0;
} else {
return -1;
}
streamProcessRecoverRsp(pTask, pRsp);
return 0;
}
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
if (pTask) {
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
}
// todo
// clear queue
// push drop req into queue
......
......@@ -98,8 +98,21 @@ STqReader* tqOpenReader(SVnode* pVnode) {
void tqCloseReader(STqReader* pReader) {
// close wal reader
if (pReader->pWalReader) {
walCloseReader(pReader->pWalReader);
}
// free cached schema
if (pReader->pSchema) {
taosMemoryFree(pReader->pSchema);
}
if (pReader->pSchemaWrapper) {
tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
}
if (pReader->pColIdList) {
taosArrayDestroy(pReader->pColIdList);
}
// free hash
taosHashCleanup(pReader->tbIdHash);
taosMemoryFree(pReader);
}
......
......@@ -146,7 +146,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
const STColumn* pColumn = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (colDataIsNull_s(pColData, j)) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
} else {
void* data = colDataGetData(pColData, j);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
......
......@@ -147,7 +147,6 @@ typedef struct {
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
int64_t lastScanUid;
SStreamQueue* inputQueue;
} SStreamTaskInfo;
typedef struct SExecTaskInfo {
......
......@@ -44,13 +44,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
goto _error;
}
if (model == OPTR_EXEC_MODEL_STREAM) {
(*pTask)->streamInfo.inputQueue = streamQueueOpen();
if ((*pTask)->streamInfo.inputQueue == NULL) {
goto _error;
}
}
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
code = dsDataSinkMgtInit(&cfg);
if (code != TSDB_CODE_SUCCESS) {
......@@ -259,12 +252,14 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
}
}
#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
return 0;
}
#endif
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
......
......@@ -884,22 +884,22 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
return true;
}
static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex) {
SResultRowInfo dumyInfo;
static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo,
int32_t* pRowIndex) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval,
TSDB_ORDER_ASC);
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
STimeWindow endWin = win;
STimeWindow preWin = win;
while (1) {
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey,
binarySearchForKey, NULL, TSDB_ORDER_ASC);
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
do {
preWin = endWin;
getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
} while (tsCol[(*pRowIndex) - 1] >= endWin.skey);
endWin = preWin;
if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows ) {
if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows) {
win.ekey = endWin.ekey;
return win;
}
......@@ -933,7 +933,8 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
pInfo->updateWin.ekey = tsCols[*pRowIndex - 1];
// win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC);
// (*pRowIndex) +=
// getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
// getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL,
// TSDB_ORDER_ASC);
}
needRead = true;
} else if (isStateWindow(pInfo)) {
......@@ -1447,6 +1448,28 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo
return NULL;
}
static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
#if 0
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
destroyTableScanOperatorInfo(pTableScanInfo, 1);
}
#endif
if (pStreamScan->tqReader) {
tqCloseReader(pStreamScan->tqReader);
}
if (pStreamScan->pColMatchInfo) {
taosArrayDestroy(pStreamScan->pColMatchInfo);
}
blockDataDestroy(pStreamScan->pRes);
blockDataDestroy(pStreamScan->pUpdateRes);
blockDataDestroy(pStreamScan->pPullDataRes);
blockDataDestroy(pStreamScan->pDeleteDataRes);
taosArrayDestroy(pStreamScan->pBlockLists);
taosMemoryFree(pStreamScan);
}
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
uint64_t taskId) {
......@@ -1561,8 +1584,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, destroyStreamScanOperatorInfo,
NULL, NULL, NULL);
return pOperator;
......
......@@ -74,63 +74,62 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
}
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
int32_t cnt = 0;
void* data = NULL;
while (1) {
int32_t cnt = 0;
void* data = NULL;
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
qDebug("stream exec over, queue empty");
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
qDebug("stream exec over, queue empty");
break;
}
if (data == NULL) {
data = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
continue;
} else {
if (streamAppendQueueItem(data, qItem) < 0) {
streamQueueProcessFail(pTask->inputQueue);
break;
}
if (data == NULL) {
data = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
continue;
} else {
if (streamAppendQueueItem(data, qItem) < 0) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
cnt++;
streamQueueProcessSuccess(pTask->inputQueue);
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
taosFreeQitem(qItem);
}
cnt++;
streamQueueProcessSuccess(pTask->inputQueue);
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
taosFreeQitem(qItem);
}
}
if (data == NULL) break;
}
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
if (data) streamFreeQitem(data);
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
return NULL;
}
qDebug("stream task %d exec begin, batch msg: %d", pTask->taskId, cnt);
streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId);
if (data == NULL) return pRes;
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId);
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
return NULL;
}
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
return NULL;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(qRes);
return NULL;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes = taosArrayInit(0, sizeof(SSDataBlock));
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(qRes);
return NULL;
}
streamFreeQitem(data);
/*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes = taosArrayInit(0, sizeof(SSDataBlock));
}
streamFreeQitem(data);
return pRes;
}
......@@ -171,6 +170,11 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
}
FAIL:
if (pRes) taosArrayDestroy(pRes);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return -1;
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
tFreeSStreamTask(pTask);
return 0;
} else {
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return -1;
}
}
......@@ -163,6 +163,6 @@ void tFreeSStreamTask(SStreamTask* pTask) {
streamQueueClose(pTask->inputQueue);
streamQueueClose(pTask->outputQueue);
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
qDestroyTask(pTask->exec.executor);
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
taosMemoryFree(pTask);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册