From 1e8579e8c5e8497a93ae00b621a4cab2bd004ed4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 9 Aug 2023 16:04:48 +0800 Subject: [PATCH] fix(stream): fix other cases. --- include/libs/stream/tstream.h | 2 +- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 4 +- source/libs/stream/src/streamExec.c | 3 +- source/libs/stream/src/streamMeta.c | 57 ++++++-------------------- source/libs/stream/src/streamRecover.c | 7 +++- 6 files changed, 22 insertions(+), 53 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index add6660cef..b9b24917f3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -644,7 +644,7 @@ void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); -int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 51b25b2476..91346e1d83 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -185,7 +185,7 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { return 0; } - streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId); + streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e80aa800bc..bf52f77ce3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1242,7 +1242,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamMetaUnregisterTask(pMeta, pTask->id.taskId); + streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); return -1; } @@ -1575,7 +1575,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL return 0; } - streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId); + streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2a35cb4978..c7da80fdaf 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -350,10 +350,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); - int32_t taskId = pTask->id.taskId; // 5. free it and remove fill-history task from disk meta-store - streamMetaUnregisterTask(pMeta, taskId); + streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); // 6. save to disk taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2bfad78ebf..6d1dca0561 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -161,43 +161,6 @@ void streamMetaClose(SStreamMeta* pMeta) { taosMemoryFree(pMeta); } -#if 0 -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) { - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeStreamTask(&decoder, pTask) < 0) { - tDecoderClear(&decoder); - goto FAIL; - } - tDecoderClear(&decoder); - - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - ASSERT(0); - goto FAIL; - } - - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { - goto FAIL; - } - - if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) { - taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t)); - ASSERT(0); - goto FAIL; - } - - return 0; - -FAIL: - if (pTask) tFreeStreamTask(pTask); - return -1; -} -#endif - int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; int32_t len; @@ -241,7 +204,8 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { *pAdded = false; - void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); @@ -263,7 +227,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return 0; } - taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); + taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, POINTER_BYTES); *pAdded = true; return 0; } @@ -315,12 +279,14 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamId* id) { } } -int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; // pre-delete operation taosWLockLatch(&pMeta->lock); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + + int64_t keys[2] = {streamId, taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { pTask = *ppTask; atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); @@ -336,7 +302,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { while (1) { taosRLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { if ((*ppTask)->status.timerActive == 0) { @@ -355,7 +321,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { // let's do delete of stream task taosWLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); @@ -472,7 +438,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { } // do duplicate task check. - void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { tdbFree(pKey); @@ -492,7 +459,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { continue; } - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 830637adbc..79f856ee0b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -541,7 +541,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId); taosWLockLatch(&pMeta->lock); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); + int64_t keys[2] = {pInfo->streamId, pInfo->taskId}; + + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { ASSERT((*ppTask)->status.timerActive == 1); @@ -596,8 +598,9 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; int32_t hTaskId = pTask->historyTaskId.taskId; + int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId}; // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId)); + SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (pHTask == NULL) { qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, pMeta->vgId, hTaskId); -- GitLab