diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 239fcdad8d3afe8803b1a39243f4d1f8fdeb4405..842e656b9ded148534e1cc488dff53071eb95ae1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -476,8 +476,10 @@ typedef struct SStreamMeta { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc); void streamMetaClose(SStreamMeta* streamMeta); -int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c093b2cd5d1eb6142dc33d0d2f4fa4693debdce7..a1dba41c941f9baf8a28304b80eb12530e929e9d 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -117,10 +117,9 @@ typedef struct { struct STQ { SVnode* pVnode; char* path; - SHashObj* pushMgr; // consumerId -> STqHandle* - SHashObj* handles; // subKey -> STqHandle - SHashObj* pStreamTasks; // taksId -> SStreamTask - SHashObj* pAlterInfo; // topic -> SAlterCheckInfo + SHashObj* pushMgr; // consumerId -> STqHandle* + SHashObj* handles; // subKey -> STqHandle + SHashObj* pAlterInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; @@ -129,9 +128,7 @@ struct STQ { TTB* pAlterInfoStore; - TDB* pStreamStore; - TTB* pTaskDb; - TTB* pTaskState; + SStreamMeta* pStreamMeta; }; typedef struct { @@ -188,6 +185,9 @@ static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ve pOffsetVal->version = ver; } +// tqStream +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 62e37f048edcaea1710db79418bf77697f252c5d..c1c680fc56df0ff76f5c1c9c13897bfd233cf34c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -62,8 +62,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -76,6 +74,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ASSERT(0); } + pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask); + if (pTq->pStreamMeta == NULL) { + ASSERT(0); + } + return pTq; } @@ -83,18 +86,11 @@ void tqClose(STQ* pTq) { if (pTq) { tqOffsetClose(pTq->pOffsetStore); taosHashCleanup(pTq->handles); - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pTq->pStreamTasks, pIter); - if (pIter == NULL) break; - SStreamTask* pTask = *(SStreamTask**)pIter; - tFreeSStreamTask(pTask); - } - taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pushMgr); taosHashCleanup(pTq->pAlterInfo); taosMemoryFree(pTq->path); tqMetaClose(pTq); + streamMetaClose(pTq->pStreamMeta); taosMemoryFree(pTq); } } @@ -672,6 +668,9 @@ FAIL: } int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { + // + return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen); +#if 0 SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return -1; @@ -695,6 +694,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { FAIL: if (pTask) taosMemoryFree(pTask); return -1; +#endif } int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { @@ -710,7 +710,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { } while (1) { - pIter = taosHashIterate(pTq->pStreamTasks, pIter); + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; @@ -744,9 +744,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - streamProcessRunReq(*ppTask); + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { + streamProcessRunReq(pTask); return 0; } else { return -1; @@ -762,14 +762,15 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - int32_t taskId = req.taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { + int32_t taskId = req.taskId; + + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(*ppTask, &req, &rsp, exec); + streamProcessDispatchReq(pTask, &req, &rsp, exec); return 0; } else { return -1; @@ -779,9 +780,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - streamProcessRecoverReq(*ppTask, pReq, pMsg); + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { + streamProcessRecoverReq(pTask, pReq, pMsg); return 0; } else { return -1; @@ -791,9 +792,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - streamProcessDispatchRsp(*ppTask, pRsp); + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { + streamProcessDispatchRsp(pTask, pRsp); return 0; } else { return -1; @@ -803,9 +804,10 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->rspTaskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - streamProcessRecoverRsp(*ppTask, pRsp); + + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { + streamProcessRecoverRsp(pTask, pRsp); return 0; } else { return -1; @@ -815,18 +817,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - if (ppTask) { - SStreamTask* pTask = *ppTask; - taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); - } - // todo - // clear queue - // push drop req into queue - // launch exec to free memory - // remove from hash - return 0; + return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); } int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { @@ -837,18 +828,18 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamRetrieveReq(&decoder, &req); - int32_t taskId = req.dstTaskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { + int32_t taskId = req.dstTaskId; + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; - streamProcessRetrieveReq(*ppTask, &req, &rsp); + streamProcessRetrieveReq(pTask, &req, &rsp); + return 0; } else { return -1; } - return 0; } int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { @@ -871,16 +862,18 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { goto FAIL; } - int32_t taskId = req.taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { + int32_t taskId = req.taskId; + + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(*ppTask, &req, &rsp, false); + streamProcessDispatchReq(pTask, &req, &rsp, false); return; } + FAIL: if (pMsg->info.handle == NULL) return; SRpcMsg rsp = { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 1c48ef7535993eff1aea41e121923f11ae19b7c7..ae3fef9b4b7ebf02654e93e09b5bf6c52f6e2354 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -215,7 +215,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) walApplyVer(pTq->pVnode->pWal, ver); if (msgType == TDMT_VND_SUBMIT) { - if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0; + if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; void* data = taosMemoryMalloc(msgLen); if (data == NULL) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 501789385323d84e1c97c7443472335930381ca4..6ce8dbe5d90e5d7448a9be03cb42eaf27c3f9bff 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -413,7 +413,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } while (1) { - pIter = taosHashIterate(pTq->pStreamTasks, pIter); + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 085a0e4ce7a807a97f99aa3f63c1d6f386b7bb33..8faa22d6439487bad9c76ab5ad554489b752976e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -36,8 +36,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } + pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pMeta->pTasks == NULL) { + goto _err; + } + + if (streamMetaBegin(pMeta) < 0) { + goto _err; + } + pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; + return pMeta; _err: return NULL; @@ -48,6 +58,48 @@ void streamMetaClose(SStreamMeta* pMeta) { tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pStateDb); tdbClose(pMeta->db); + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pMeta->pTasks, pIter); + if (pIter == NULL) break; + SStreamTask* pTask = *(SStreamTask**)pIter; + tFreeSStreamTask(pTask); + } + taosHashCleanup(pMeta->pTasks); + taosMemoryFree(pMeta->path); + taosMemoryFree(pMeta); +} + +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) { + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + if (tDecodeSStreamTask(&decoder, pTask) < 0) { + ASSERT(0); + goto FAIL; + } + tDecoderClear(&decoder); + + if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { + ASSERT(0); + goto FAIL; + } + + taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + + if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) { + ASSERT(0); + return -1; + } + return 0; + +FAIL: + if (pTask) taosMemoryFree(pTask); + return -1; } int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { @@ -80,6 +132,16 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } +SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + ASSERT((*ppTask)->taskId == taskId); + return *ppTask; + } else { + return NULL; + } +} + int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) {