From 2bcc139443e6e9ae9d9caffb1d0e051e97feff32 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 15 Mar 2022 14:56:33 +0800 Subject: [PATCH] add stream msg routing --- include/common/tmsg.h | 5 ++- include/dnode/snode/snode.h | 3 ++ source/dnode/mgmt/impl/src/dndSnode.c | 45 ++++++++++++++++++++++----- source/dnode/snode/inc/sndInt.h | 19 +++++------ 4 files changed, 54 insertions(+), 18 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5a60761f11..221cf28f23 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -187,7 +187,10 @@ typedef struct SEp { typedef struct { int32_t contLen; - int32_t vgId; + union { + int32_t vgId; + int32_t streamTaskId; + }; } SMsgHead; // Submit message for one table diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 21a93532e0..9dcd58a05f 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -23,6 +23,9 @@ extern "C" { #endif +#define SND_UNIQUE_THREAD_NUM 2 +#define SND_SHARED_THREAD_NUM 2 + /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SDnode SDnode; typedef struct SSnode SSnode; diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index b27a25680a..5ea8a841d2 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -166,7 +166,7 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) { static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); - for (int32_t i = 0; i < 2; i++) { + for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) { SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); if (pUniqueWorker == NULL) { return -1; @@ -177,8 +177,8 @@ static int32_t dndStartSnodeWorker(SDnode *pDnode) { } taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker); } - if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4, - dndProcessSnodeSharedQueue)) { + if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM, + SND_SHARED_THREAD_NUM, dndProcessSnodeSharedQueue)) { dError("failed to start snode shared worker since %s", terrstr()); return -1; } @@ -369,13 +369,39 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void dndWriteSnodeMsgToRandomWorker(SDnode *pDnode, SRpcMsg *pMsg) { +static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + pHead->streamTaskId = htonl(pHead->streamTaskId); + return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM; +} + +static void dndWriteSnodeMsgToWorkerByMsg(SDnode *pDnode, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + int32_t index = dndGetSWIdFromMsg(pMsg); + SDnodeWorker *pWorker = taosArrayGetP(pDnode->smgmt.uniqueWorkers, index); + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); + } + + dndReleaseSnode(pDnode, pSnode); + + if (code != 0) { + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->pCont); + } +} + +static void dndWriteSnodeMsgToMgmtWorker(SDnode *pDnode, SRpcMsg *pMsg) { int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; SSnode *pSnode = dndAcquireSnode(pDnode); if (pSnode != NULL) { - int32_t index = (pDnode->smgmt.uniqueWorkerInUse + 1) % taosArrayGetSize(pDnode->smgmt.uniqueWorkers); - SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, index); + SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, 0); code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); } dndReleaseSnode(pDnode, pSnode); @@ -407,9 +433,12 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc } } +void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg); +} + void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - // judge from msg to write to unique queue - dndWriteSnodeMsgToRandomWorker(pDnode, pMsg); + dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); } void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 5851e18478..5c792c840d 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -30,21 +30,26 @@ extern "C" { #endif enum { - STREAM_STATUS__READY = 1, + STREAM_STATUS__RUNNING = 1, STREAM_STATUS__STOPPED, STREAM_STATUS__CREATING, STREAM_STATUS__STOPING, - STREAM_STATUS__RESUMING, + STREAM_STATUS__RESTORING, STREAM_STATUS__DELETING, }; enum { - STREAM_RUNNER__RUNNING = 1, - STREAM_RUNNER__STOP, + STREAM_TASK_STATUS__RUNNING = 1, + STREAM_TASK_STATUS__STOP, }; +typedef struct { + SHashObj* pHash; // taskId -> streamTask +} SStreamMeta; + typedef struct SSnode { - SSnodeOpt cfg; + SStreamMeta* pMeta; + SSnodeOpt cfg; } SSnode; typedef struct { @@ -62,10 +67,6 @@ typedef struct { // storage handle } SStreamRunner; -typedef struct { - SHashObj* pHash; -} SStreamMeta; - int32_t sndCreateStream(); int32_t sndDropStream(); -- GitLab