diff --git a/source/dnode/mgmt/container/src/dndInt.c b/source/dnode/mgmt/container/src/dndInt.c index dc1bde6b06a7ace4f9e596aab7baf13ec4a13527..ca80dcb9e4045234c0f987bc35dbbf2a7053f117 100644 --- a/source/dnode/mgmt/container/src/dndInt.c +++ b/source/dnode/mgmt/container/src/dndInt.c @@ -135,6 +135,7 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); - SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = NULL}; + SRpcMsg rpcRsp = { + .handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle}; rpcSendResponse(&rpcRsp); } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index b62c18655a01632807689b699e23e3e14ec3ff04..13902985670e31386662e560d6b05a08f227516d 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -23,7 +23,7 @@ static void *dmThreadRoutine(void *param) { SDnodeMgmt *pMgmt = param; - SDnode *pDnode = pMgmt->pDnode; + SDnode * pDnode = pMgmt->pDnode; int64_t lastStatusTime = taosGetTimestampMs(); int64_t lastMonitorTime = lastStatusTime; @@ -55,7 +55,7 @@ static void *dmThreadRoutine(void *param) { static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SDnodeMgmt *pMgmt = pInfo->ahandle; - SDnode *pDnode = pMgmt->pDnode; + SDnode * pDnode = pMgmt->pDnode; SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; dTrace("msg:%p, will be processed in dnode queue", pMsg); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 962acfee2c2d2517ac032118a423a86f2bbcc37c..bd915d6ebe4d16d8ab8897756a0ca1eff93fa651 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -183,7 +183,7 @@ typedef struct { #pragma pack(pop) typedef enum { Normal, Quit, Release, Register } STransMsgType; -typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus; +typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8694d4098ca8ebdbfd69a6b19e188838a659cc77..7150ec470e844455b0cafe8811b9125713c2ac91 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -53,6 +53,7 @@ typedef struct SCliMsg { queue q; uint64_t st; STransMsgType type; + int sent; //(0: no send, 1: alread sent) } SCliMsg; typedef struct SCliThrdObj { @@ -135,6 +136,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_SHOULD_RELEASE(conn, head) \ do { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + uint64_t ahandle = head->ahandle; \ + CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ conn->status = ConnRelease; \ transClearBuffer(&conn->readBuf); \ transFreeMsg(transContFromHead((char*)head)); \ @@ -146,6 +149,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); SCliThrdObj* thrd = conn->hostThrd; \ addConnToPool(thrd->pool, conn); \ } \ + destroyCmsg(pMsg); \ return; \ } \ } while (0) @@ -198,8 +202,18 @@ static void* cliWorkThread(void* arg); bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { + SCliMsg* pCliMsg = NULL; + int i = 0; + do { + pCliMsg = transQueueGet(&conn->cliMsgs, i++); + if (pCliMsg && 0 == pCliMsg->sent) { + break; + } + } while (pCliMsg != NULL); + if (pCliMsg == NULL) { + return false; + } cliSend(conn); - return true; } return false; } @@ -218,33 +232,27 @@ void cliHandleResp(SCliConn* conn) { transMsg.msgType = pHead->msgType; transMsg.ahandle = NULL; - CONN_SHOULD_RELEASE(conn, pHead); - SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; + CONN_SHOULD_RELEASE(conn, pHead); if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); + /// uint64_t ahandle = (uint64_t)pHead->ahandle; + // CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); pCtx = pMsg ? pMsg->ctx : NULL; - if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); - if (transMsg.ahandle == NULL) { - transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); - } - tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle); - } else { - transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; - tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle); - } + transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle); } else { uint64_t ahandle = (uint64_t)pHead->ahandle; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); if (pMsg == NULL) { transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); + tDebug("cli conn %p construct ahandle %p by %d, persist: 1", conn, transMsg.ahandle, transMsg.msgType); if (transMsg.ahandle == NULL) { + tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle); transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); } - tDebug("cli conn %p construct ahandle %p, persist: 1", conn, transMsg.ahandle); } else { pCtx = pMsg ? pMsg->ctx : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; @@ -419,7 +427,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); transCtxCleanup(&conn->ctx); transQueueClear(&conn->cliMsgs); - conn->status = ConnNormal; + conn->status = ConnInPool; char key[128] = {0}; tstrncpy(key, conn->ip, strlen(conn->ip)); @@ -546,7 +554,21 @@ void cliSend(SCliConn* pConn) { // assert(taosArrayGetSize(pConn->cliMsgs) > 0); assert(!transQueueEmpty(&pConn->cliMsgs)); - SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, 0); + + SCliMsg* pCliMsg = NULL; + int i = 0; + do { + pCliMsg = transQueueGet(&pConn->cliMsgs, i++); + if (pCliMsg && 0 == pCliMsg->sent) { + break; + } + } while (pCliMsg != NULL); + if (pCliMsg == NULL) { + return; + } + + pCliMsg->sent = 1; + STransConnCtx* pCtx = pCliMsg->ctx; SCliThrdObj* pThrd = pConn->hostThrd; @@ -558,7 +580,7 @@ void cliSend(SCliConn* pConn) { pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - pHead->ahandle = (uint64_t)pCtx->ahandle; + pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; int msgLen = transMsgLenFromCont(pMsg->contLen); @@ -868,6 +890,7 @@ void transReleaseCliHandle(void* handle) { STransMsg tmsg = {.handle = handle}; SCliMsg* cmsg = calloc(1, sizeof(SCliMsg)); + cmsg->msg = tmsg; cmsg->type = Release; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 691c572022a8923301b2db00348b48bd3d79fa0c..15dcc29232041dc7206e1b2b227bc3efce897b91 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -93,25 +93,25 @@ typedef struct SServerObj { static const char* notify = "a"; -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("server conn %p received release request", conn); \ - \ - STransMsg tmsg = {.handle = (void*)conn, .code = 0}; \ - SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ } while (0) static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); @@ -823,7 +823,7 @@ void transReleaseSrvHandle(void* handle) { SSrvConn* pConn = handle; SWorkThrdObj* pThrd = pConn->hostThrd; - STransMsg tmsg = {.handle = handle, .code = 0}; + STransMsg tmsg = {.code = 0, .handle = handle, .ahandle = NULL}; SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); srvMsg->msg = tmsg;