diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b48992b5f22914455c5a41f8bf46eb7c0b27f5cb..34a0bc86576d2d9d27d26c15f8da9730d1e4bd35 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -346,6 +346,7 @@ struct SStreamTask { int32_t refCnt; int64_t checkpointingId; int32_t checkpointAlignCnt; + int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; SSHashObj* pNameMap; }; @@ -630,6 +631,8 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); +int32_t streamAlignTransferState(SStreamTask* pTask); + #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 68b697ca67c576eb9611b6618290cf0e07f570d8..6f323b2b427b9187198a227b07500ec9e4f87a18 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -889,11 +889,11 @@ _OVER: } int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { - SSdb *pSdb = pMnode->pSdb; - SSmaObj *pSma = NULL; - void *pIter = NULL; - SVgObj *pVgroup = NULL; - int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = NULL; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = -1; while (1) { pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); @@ -911,12 +911,18 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p if (pStream != NULL && pStream->smaId == pSma->uid) { if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); + mndReleaseStream(pMnode, pStream); goto _OVER; } + if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + mndReleaseStream(pMnode, pStream); goto _OVER; } + + mndReleaseStream(pMnode, pStream); } + if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index efe5f358bf562c424d002981c2063b092773e905..fbb7f59349038a6efe41bac6fec545f9966ce87f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -248,7 +248,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckLogInWal(STQ* pTq, int64_t version); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 20061911bc4daa676c24fa26b868e4920b447d12..41e0a97d7986fbe749b5f0c2bec0c027be72f4c0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -934,7 +934,6 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { }; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1106,7 +1105,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // 1. stop the related stream task, get the current scan wal version of stream task, ver. pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - // todo handle error + qError("failed to find s-task:0x%x, it may have been destroyed, drop fill history task:%s", + pTask->streamTaskId.taskId, pTask->id.idStr); + + pTask->status.taskStatus = TASK_STATUS__DROPPING; + tqDebug("s-task:%s scan-history-task set status to be dropping", pId); + + streamMetaSaveTask(pMeta, pTask); + streamMetaReleaseTask(pMeta, pTask); + return -1; } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); @@ -1213,11 +1220,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // notify the downstream tasks to transfer executor state after handle all history blocks. -int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - SStreamTransferReq req; +int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + + SStreamTransferReq req = {0}; SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecoderInit(&decoder, (uint8_t*)pReq, len); int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); @@ -1227,25 +1237,33 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int return -1; } + int32_t remain = streamAlignTransferState(pTask); + if (remain > 0) { + tqDebug("s-task:%s receive transfer state msg, remain:%d", pTask->id.idStr, remain); + return 0; + } + // transfer the ownership of executor state - streamTaskReleaseState(pTask); - tqDebug("s-task:%s receive state transfer req", pTask->id.idStr); + tqDebug("s-task:%s all upstream tasks end transfer msg", pTask->id.idStr); // related stream task load the state from the state storage backend SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { + streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId); return -1; } + // when all upstream tasks have notified the this task to start transfer state, then we start the transfer procedure. + streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); + streamMetaReleaseTask(pTq->pStreamMeta, pStreamTask); ASSERT(pTask->streamTaskId.taskId != 0); pTask->status.transferState = true; streamSchedExec(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 88a260b3a37f706e8a09820aa6ab56cc56780a8b..d4efa38c3e364030171c767d6c45f60f00de7386 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -659,11 +659,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_STREAM_TRANSFER_STATE: { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len); - } + case TDMT_STREAM_TRANSFER_STATE: + return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH: return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 900505acb32fd5256a9481b96c9d800387d37138..06b90d0a516729f77a20c008c81aa828285e557e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -288,9 +288,8 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 return pTaskInfo; } - struct SSubplan* pPlan = NULL; - - int32_t code = qStringToSubplan(msg, &pPlan); + SSubplan* pPlan = NULL; + int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -335,6 +334,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v qTaskInfo_t pTaskInfo = NULL; code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { + nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); terrno = code; return NULL; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ddbc8da3ecc8cc920157f3a8a356850a5c917da0..07d7cb30407b569f45cd21f55c65fbcdc677db71 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -61,7 +61,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { return taosStrdup(buf); } -void streamSchedByTimer(void* param, void* tmrId) { +static void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; int8_t status = atomic_load_8(&pTask->triggerStatus); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d0d63215e613e945457fd6cafd546a8398aaeafd..9adae2a2f5c95745ea898b2eec581d9f499dbfb0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -352,11 +352,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed", + qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { - qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); + qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, + pStreamTask->id.idStr); } ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); @@ -369,6 +370,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { } else { ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); pStreamTask->status.taskStatus = TASK_STATUS__HALT; + qDebug("s-task:%s status: halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } // wait for the stream task to be idle @@ -477,6 +479,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); + pTask->status.transferState = false; // reset this value, to avoid transfer state again if (code != TSDB_CODE_SUCCESS) { // todo handle this return 0; } @@ -611,3 +614,13 @@ int32_t streamTaskReloadState(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } } + +int32_t streamAlignTransferState(SStreamTask* pTask) { + int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream); + if (old == 0) { + qDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream); + } + + return atomic_sub_fetch_32(&pTask->transferStateAlignCnt, 1); +} diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e1f625dd52535fcbf4116ea6adcfd06b3625fa09..a2b5d0e396bb6452843db4f290f91bbcdda73aa3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -264,8 +264,9 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { - atomic_add_fetch_32(&(*ppTask)->refCnt, 1); + int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); taosRUnLockLatch(&pMeta->lock); + qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); return *ppTask; } } @@ -275,12 +276,24 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { } void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { - int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); - if (left < 0) { - qError("task ref is invalid, ref:%d, %s", left, pTask->id.idStr); - } else if (left == 0) { + int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); + if (ref > 0) { + qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref); + } else if (ref == 0) { ASSERT(streamTaskShouldStop(&pTask->status)); tFreeStreamTask(pTask); + } else if (ref < 0) { + qError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr); + } +} + +static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) { + for (int32_t i = 0; i < num; ++i) { + int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); + if (*pTaskId == taskId) { + taosArrayRemove(pMeta->pTaskList, i); + break; + } } } @@ -333,17 +346,17 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { int32_t num = taosArrayGetSize(pMeta->pTaskList); qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1); - for (int32_t i = 0; i < num; ++i) { - int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); - if (*pTaskId == taskId) { - taosArrayRemove(pMeta->pTaskList, i); - break; - } + doRemoveIdFromList(pMeta, num, pTask->id.taskId); + + // remove the ref by timer + if (pTask->triggerParam != 0) { + taosTmrStop(pTask->schedTimer); + streamMetaReleaseTask(pMeta, pTask); } streamMetaReleaseTask(pMeta, pTask); } else { - qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); + qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); } taosWUnLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index a3fc3418aa8f78c14e18ea26184e0f5a14d5bb72..0f2281ea735bfd647bd3eeb0210097f00273dcb2 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -43,6 +43,7 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__SCAN_HISTORY: return "scan-history"; case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__PAUSE: return "paused"; + case TASK_STATUS__DROPPING: return "dropping"; default:return ""; } } @@ -205,7 +206,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus)); streamTaskLaunchScanHistory(pTask); - } else { // todo add assert, agg tasks? + } else { ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); @@ -258,9 +259,15 @@ int32_t streamRestoreParam(SStreamTask* pTask) { } int32_t streamSetStatusNormal(SStreamTask* pTask) { - qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); - return 0; + int32_t status = atomic_load_8(&pTask->status.taskStatus); + if (status == TASK_STATUS__DROPPING) { + qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr); + return -1; + } else { + qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); + return 0; + } } // source @@ -344,7 +351,8 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); - qDebug("s-task:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId); + qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, + pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->taskId, vgId); return 0; } @@ -354,9 +362,6 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr, - pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus)); - req.taskId = pTask->fixedEpDispatcher.taskId; doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -451,6 +456,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus); qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus); + taosMemoryFree(pInfo); (*ppTask)->status.timerActive = 0; taosWUnLockLatch(&pMeta->lock); return; @@ -511,6 +517,7 @@ int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) { pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); if (pTask->launchTaskTimer == NULL) { // todo failed to create timer + taosMemoryFree(pInfo); } else { pTask->status.timerActive = 1; // timer is active qDebug("s-task:%s set timer active flag", pTask->id.idStr); @@ -553,8 +560,10 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { streamSetStatusNormal(pTask); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - // todo check rsp, commit data + taosWLockLatch(&pMeta->lock); streamMetaSaveTask(pMeta, pTask); + taosWUnLockLatch(&pMeta->lock); + return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 06da72188c4b3252d32bb50d5c2dc61c9f14d2bc..ef83583ea4dd19f70599d9c0b45a00bcb0cf94ae 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -205,13 +205,16 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) { qDebug("free s-task:%s", pTask->id.idStr); + int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { streamQueueClose(pTask->inputQueue); } + if (pTask->outputQueue) { streamQueueClose(pTask->outputQueue); } + if (pTask->exec.qmsg) { taosMemoryFree(pTask->exec.qmsg); } @@ -230,9 +233,7 @@ void tFreeStreamTask(SStreamTask* pTask) { tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); tSimpleHashCleanup(pTask->tbSink.pTblInfo); - } - - if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL;