diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index f90c38f341edccf801d7f7d470228c524a8f794d..634d708260790fb26ec5c6fb28987fedbecbc75f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -74,7 +74,7 @@ typedef enum { * @param vgId * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId); +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId); /** * Create the exec task for queue mode @@ -95,8 +95,6 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList */ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); -//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); - int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); // todo refactor diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 7235a56691ce580d8cb67d923245e26fdb53c82f..558180a3c23f8293d33587d87ba8e7b47fe4db40 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -88,7 +88,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory }; initStreamStateAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId); ASSERT(pTask->exec.pExecutor); taosThreadMutexInit(&pTask->lock, NULL); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 9fd493844877d343e07d9f251b6b0882f25f89e0..1e7de3c5263049a02984b47d171104bf43ba496c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -267,7 +267,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState}; initStorageAPI(&handle.api); - pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode)); + pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); if (!pRSmaInfo->taskInfo[idx]) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 41e9268452ebb8f24be8a0376b5b40103ca219d1..af336adc6aaff9ee6cf0331e362956b33fc47019 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -956,7 +956,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .winRange = pTask->dataRange.window}; initStorageAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); if (pTask->exec.pExecutor == NULL) { return -1; } @@ -983,7 +983,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .winRange = pTask->dataRange.window}; initStorageAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); if (pTask->exec.pExecutor == NULL) { return -1; } @@ -1149,32 +1149,27 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosWLockLatch(&pStreamMeta->lock); code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); + taosWUnLockLatch(&pStreamMeta->lock); if (code < 0) { tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks); tFreeStreamTask(pTask); - taosWUnLockLatch(&pStreamMeta->lock); return -1; } // not added into meta store - if (!added) { + if (added) { + tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); + SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); + if (p != NULL) { // reset the downstreamReady flag. + streamTaskCheckDownstreamTasks(p); + } + streamMetaReleaseTask(pStreamMeta, p); + } else { tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId); tFreeStreamTask(pTask); - pTask = NULL; - } - - taosWUnLockLatch(&pStreamMeta->lock); - - tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); - - // 3. It's an fill history task, do nothing. wait for the main task to start it - SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); - if (p != NULL) { // reset the downstreamReady flag. - streamTaskCheckDownstreamTasks(p); } - streamMetaReleaseTask(pStreamMeta, p); return 0; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7832834cee13fe64364da4c0eefd3564c65bf5e9..a6059c7c4200551c654847cb00c3f738be9e36fc 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -304,7 +304,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 return pTaskInfo; } -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) { +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) { if (msg == NULL) { return NULL; } @@ -317,7 +317,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); + code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dc4e5ff4a65a3326c3079e45ef9cd09b99343375..9056fa8d93397bea55b216c202d5090259ab5419 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -216,7 +216,7 @@ static void freeItem(void* p) { } void tFreeStreamTask(SStreamTask* pTask) { - qDebug("free s-task:%s, %p", pTask->id.idStr, pTask); + qDebug("free s-task:0x%x, %p", pTask->id.taskId, pTask); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) {