From 846129fd38affe6aa7d316c32d0a5e7fc3428376 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 18 Mar 2022 14:54:08 +0800 Subject: [PATCH] shm --- include/dnode/snode/snode.h | 16 +- source/dnode/mgmt/bnode/src/bmWorker.c | 2 +- source/dnode/mgmt/container/inc/dnd.h | 2 +- source/dnode/mgmt/container/inc/dndInt.h | 4 +- source/dnode/mgmt/container/src/dndMsg.c | 2 +- source/dnode/mgmt/container/src/dndWorker.c | 22 +-- source/dnode/mgmt/dnode/src/dmWorker.c | 6 +- source/dnode/mgmt/mnode/src/mmWorker.c | 2 +- source/dnode/mgmt/qnode/src/qmWorker.c | 4 +- source/dnode/mgmt/snode/inc/sm.h | 2 +- source/dnode/mgmt/snode/inc/smInt.h | 9 +- source/dnode/mgmt/snode/src/smWorker.c | 196 ++++++-------------- source/dnode/mgmt/vnode/src/vmWorker.c | 2 +- source/dnode/snode/src/snode.c | 15 +- 14 files changed, 82 insertions(+), 202 deletions(-) diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 14819eecbd..4202859359 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -29,7 +29,6 @@ typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SSnode SSnode; typedef struct { - int64_t numOfErrors; } SSnodeLoad; typedef struct { @@ -73,20 +72,9 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); * @param pSnode The snode object. * @param pMsg The request message * @param pRsp The response message - * @return int32_t 0 for success, -1 for failure */ -// int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); - -int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); - -int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg); - -/** - * @brief Drop a snode. - * - * @param path Path of the snode. - */ -void sndDestroy(const char *path); +void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); +void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index 324c555735..6b77309a20 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -65,7 +65,7 @@ int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { SDnodeWorker *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg, 0); + return dndWriteMsgToWorker(pWorker, pMsg); } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 930d4323a2..83b44e9f9c 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -163,7 +163,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, int32_t maxNum, void *queueFp); void dndCleanupWorker(SDnodeWorker *pWorker); -int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); +int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 25998575d1..9ae70874fe 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -21,8 +21,8 @@ #include "bm.h" #include "dm.h" #include "mm.h" -#include "qmInt.h" -#include "smInt.h" +#include "qm.h" +#include "sm.h" #include "vm.h" #ifdef __cplusplus diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c index adb4b19c77..ac180b137b 100644 --- a/source/dnode/mgmt/container/src/dndMsg.c +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -154,7 +154,7 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { case TDMT_DND_CREATE_SNODE: return dndProcessCreateNodeMsg(pDnode, SNODE, pMsg); case TDMT_DND_DROP_SNODE: - return dndProcessDropNodeMsg(pDnode, MNODE, pMsg); + return dndProcessDropNodeMsg(pDnode, SNODE, pMsg); case TDMT_DND_CREATE_BNODE: return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg); case TDMT_DND_DROP_BNODE: diff --git a/source/dnode/mgmt/container/src/dndWorker.c b/source/dnode/mgmt/container/src/dndWorker.c index 2c2158a060..5d99ec5f0d 100644 --- a/source/dnode/mgmt/container/src/dndWorker.c +++ b/source/dnode/mgmt/container/src/dndWorker.c @@ -81,31 +81,13 @@ void dndCleanupWorker(SDnodeWorker *pWorker) { } } -int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) { - if (pWorker == NULL || pWorker->queue == NULL ) { +int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg) { + if (pWorker == NULL || pWorker->queue == NULL) { terrno = TSDB_CODE_INVALID_PARA; return -1; } - void *pMsg = NULL; - if (contLen != 0) { - pMsg = taosAllocateQitem(contLen); - if (pMsg != NULL) { - memcpy(pMsg, pCont, contLen); - } - } else { - pMsg = pCont; - } - - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - if (taosWriteQitem(pWorker->queue, pMsg) != 0) { - if (contLen != 0) { - taosFreeQitem(pMsg); - } terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index c157db0e13..5aff064de3 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -17,8 +17,8 @@ #include "bm.h" #include "dmInt.h" #include "mm.h" -#include "qmInt.h" -#include "smInt.h" +#include "qm.h" +#include "sm.h" #include "vm.h" static void *dmThreadRoutine(void *param) { @@ -136,5 +136,5 @@ int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { } dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg, 0); + return dndWriteMsgToWorker(pWorker, pMsg); } diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index e68faa88b4..7699d5a35b 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -85,7 +85,7 @@ static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeM if (pMnode == NULL) return -1; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0); + int32_t code = dndWriteMsgToWorker(pWorker, pMsg); mmRelease(pMgmt, pMnode); return code; } diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index a55253ef70..09b25a97f0 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -43,14 +43,14 @@ int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { SDnodeWorker *pWorker = &pMgmt->queryWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg, 0); + return dndWriteMsgToWorker(pWorker, pMsg); } int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { SDnodeWorker *pWorker = &pMgmt->fetchWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg, 0); + return dndWriteMsgToWorker(pWorker, pMsg); } int32_t qmStartWorker(SQnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/snode/inc/sm.h b/source/dnode/mgmt/snode/inc/sm.h index 3ab5102340..55f13e22e1 100644 --- a/source/dnode/mgmt/snode/inc/sm.h +++ b/source/dnode/mgmt/snode/inc/sm.h @@ -22,7 +22,7 @@ extern "C" { #endif -void bmGetMgmtFp(SMgmtWrapper *pWrapper); +void smGetMgmtFp(SMgmtWrapper *pWrapper); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index 95b130b023..485d077375 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -43,10 +43,11 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); // smWorker.c -int32_t smStartWorker(SQnodeMgmt *pMgmt); -void smStopWorker(SQnodeMgmt *pMgmt); -int32_t smProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smStartWorker(SSnodeMgmt *pMgmt); +void smStopWorker(SSnodeMgmt *pMgmt); +int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 67476efd46..1bf07d2232 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -16,27 +16,54 @@ #define _DEFAULT_SOURCE #include "smInt.h" +static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { + for (int32_t i = 0; i < numOfMsgs; i++) { + SNodeMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); -static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg); + dTrace("msg:%p, will be processed in snode unique queue", pMsg); + sndProcessUMsg(pMgmt->pSnode, &pMsg->rpcMsg); -static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); + dTrace("msg:%p, is freed", pMsg); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); + } +} + +static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { + dTrace("msg:%p, will be processed in snode shared queue", pMsg); + sndProcessSMsg(pMgmt->pSnode, pMsg); + + dTrace("msg:%p, is freed", pMsg); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} -static int32_t dndStartSnodeWorker(SDnode *pDnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; +int32_t smStartWorker(SSnodeMgmt *pMgmt) { pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); + if (pMgmt->uniqueWorkers == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) { SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); if (pUniqueWorker == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (dndInitWorker(pDnode, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) { + if (dndInitWorker(pMgmt, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, smProcessSharedQueue) != 0) { dError("failed to start snode unique worker since %s", terrstr()); return -1; } - taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker); + if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } } - if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM, - SND_SHARED_THREAD_NUM, dndProcessSnodeSharedQueue)) { + + if (dndInitWorker(pMgmt, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM, + SND_SHARED_THREAD_NUM, smProcessSharedQueue)) { dError("failed to start snode shared worker since %s", terrstr()); return -1; } @@ -44,156 +71,47 @@ static int32_t dndStartSnodeWorker(SDnode *pDnode) { return 0; } -static void dndStopSnodeWorker(SDnode *pDnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - - taosWLockLatch(&pMgmt->latch); - pMgmt->deployed = 0; - taosWUnLockLatch(&pMgmt->latch); - - while (pMgmt->refCount > 0) { - taosMsleep(10); - } - +void smStopWorker(SSnodeMgmt *pMgmt) { for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i); dndCleanupWorker(worker); } taosArrayDestroy(pMgmt->uniqueWorkers); + dndCleanupWorker(&pMgmt->sharedWorker); } -static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { - /*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/ - int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED; - - SSnode *pSnode = dndAcquireSnode(pDnode); - if (pSnode != NULL) { - for (int32_t i = 0; i < numOfMsgs; i++) { - SRpcMsg *pMsg = NULL; - taosGetQitem(qall, (void **)&pMsg); - - sndProcessUMsg(pSnode, pMsg); - - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } - dndReleaseSnode(pDnode, pSnode); - } else { - for (int32_t i = 0; i < numOfMsgs; i++) { - SRpcMsg *pMsg = NULL; - taosGetQitem(qall, (void **)&pMsg); - SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rpcRsp); - - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } - } -} - -static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { - /*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/ - int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED; - - SSnode *pSnode = dndAcquireSnode(pDnode); - if (pSnode != NULL) { - sndProcessSMsg(pSnode, pMsg); - dndReleaseSnode(pDnode, pSnode); - } else { - SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rpcRsp); - } - -#if 0 - if (pMsg->msgType & 1u) { - if (pRsp != NULL) { - pRsp->ahandle = pMsg->ahandle; - rpcSendResponse(pRsp); - free(pRsp); - } else { - if (code != 0) code = terrno; - SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rpcRsp); - } - } -#endif - - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - -static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) { +static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; pHead->streamTaskId = htonl(pHead->streamTaskId); return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM; } -static void dndWriteSnodeMsgToWorkerByMsg(SDnode *pDnode, SRpcMsg *pMsg) { - int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED; - - SSnode *pSnode = dndAcquireSnode(pDnode); - if (pSnode != NULL) { - int32_t index = dndGetSWIdFromMsg(pMsg); - SDnodeWorker *pWorker = taosArrayGetP(pDnode->smgmt.uniqueWorkers, index); - code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); - } - - dndReleaseSnode(pDnode, pSnode); - - if (code != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - } -} - -static void dndWriteSnodeMsgToMgmtWorker(SDnode *pDnode, SRpcMsg *pMsg) { - int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED; - - SSnode *pSnode = dndAcquireSnode(pDnode); - if (pSnode != NULL) { - SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, 0); - code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); +int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); + if (pWorker == NULL) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; } - dndReleaseSnode(pDnode, pSnode); - if (code != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - } + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + return dndWriteMsgToWorker(pWorker, pMsg); } -static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { - int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED; - - SSnode *pSnode = dndAcquireSnode(pDnode); - if (pSnode != NULL) { - code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); - } - dndReleaseSnode(pDnode, pSnode); - - if (code != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); +int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { + int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); + SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); + if (pWorker == NULL) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; } -} -void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg); + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + return dndWriteMsgToWorker(pWorker, pMsg); } -void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); -} +int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnodeWorker *pWorker = &pMgmt->sharedWorker; -void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + return dndWriteMsgToWorker(pWorker, pMsg); } diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 14e57e2949..74ab0773cc 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -303,5 +303,5 @@ void vmStopWorker(SVnodesMgmt *pMgmt) { int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SDnodeWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg, 0); + return dndWriteMsgToWorker(pWorker, pMsg); } \ No newline at end of file diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2ecaeb00e9..c0bfc2c3f5 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -38,13 +38,6 @@ void sndClose(SSnode *pSnode) { int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } -/*int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {*/ -/**pRsp = NULL;*/ -/*return 0;*/ -/*}*/ - -void sndDestroy(const char *path) {} - SStreamMeta *sndMetaNew() { SStreamMeta *pMeta = calloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { @@ -93,7 +86,7 @@ static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) { return 0; } -int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { +void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { // stream deploy // stream stop/resume // operator exec @@ -101,7 +94,7 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamTask *pTask = malloc(sizeof(SStreamTask)); if (pTask == NULL) { - return -1; + ASSERT(0); } SCoder decoder; tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER); @@ -114,15 +107,13 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { } else { ASSERT(0); } - return 0; } -int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { +void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { // operator exec if (pMsg->msgType == TDMT_SND_TASK_EXEC) { sndProcessTaskExecReq(pSnode, pMsg); } else { ASSERT(0); } - return 0; } -- GitLab