diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 04b58da570f81aba36766d3ce9795742c12bc3b4..117455f722e0fe225eef8e373273dd9dec0434fb 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -99,6 +99,12 @@ typedef void* queue[2]; #define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 + +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) + typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; typedef SRpcCtxVal STransCtxVal; @@ -151,6 +157,7 @@ typedef struct { char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char user[TSDB_UNI_LEN]; + uint32_t magicNum; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later @@ -203,6 +210,7 @@ typedef struct SConnBuffer { int cap; int left; int total; + int invalid; } SConnBuffer; typedef void (*AsyncCB)(uv_async_t* handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9eea43be2354e5407a31a7bd10d9f215d8dd0cb5..5428b8acf692fe05401c56c52e60d7685ee04925 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -127,6 +127,8 @@ static void cliAsyncCb(uv_async_t* handle); static void cliIdleCb(uv_idle_t* handle); static void cliPrepareCb(uv_prepare_t* handle); +static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); + static int32_t allocConnRef(SCliConn* conn, bool update); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); @@ -211,28 +213,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - uint64_t ahandle = head->ahandle; \ - CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \ - SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \ - if (cliMsg->type == Release) return; \ - } \ - tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \ - if (T_REF_VAL_GET(conn) > 1) { \ - transUnrefCliHandle(conn); \ - } \ - destroyCmsg(pMsg); \ - cliReleaseUnfinishedMsg(conn); \ - transQueueClear(&conn->cliMsgs); \ - addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ - return; \ - } \ - } while (0) #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ @@ -346,10 +326,17 @@ void cliHandleResp(SCliConn* conn) { } STransMsgHead* pHead = NULL; - transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) { + tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + return; + } pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + if (cliRecvReleaseReq(conn, pHead)) { + return; + } + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -361,7 +348,6 @@ void cliHandleResp(SCliConn* conn) { SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; - CONN_SHOULD_RELEASE(conn, pHead); if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); @@ -625,7 +611,12 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { pBuf->len += nread; while (transReadComplete(pBuf)) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); - cliHandleResp(conn); + if (pBuf->invalid) { + cliHandleExcept(conn); + break; + } else { + cliHandleResp(conn); + } } return; } @@ -786,6 +777,7 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); @@ -1053,6 +1045,30 @@ static void cliPrepareCb(uv_prepare_t* handle) { if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); } +bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { + if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { + uint64_t ahandle = pHead->ahandle; + SCliMsg* pMsg = NULL; + CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); + transClearBuffer(&conn->readBuf); + transFreeMsg(transContFromHead((char*)pHead)); + if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { + SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); + if (cliMsg->type == Release) return true; + } + tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); + if (T_REF_VAL_GET(conn) > 1) { + transUnrefCliHandle(conn); + } + destroyCmsg(pMsg); + cliReleaseUnfinishedMsg(conn); + transQueueClear(&conn->cliMsgs); + addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); + return true; + } + return false; +} + static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index b568163e23ec75dfa626c2c3618391e94c838d2e..c50d0d3e5cbcfec1e07e133161de4ebf0572da70 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) { buf->left = -1; buf->len = 0; buf->total = 0; + buf->invalid = 0; return 0; } int transDestroyBuffer(SConnBuffer* p) { @@ -108,19 +109,24 @@ int transClearBuffer(SConnBuffer* buf) { p->left = -1; p->len = 0; p->total = 0; + p->invalid = 0; return 0; } int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { - SConnBuffer* p = connBuf; + static const int HEADSIZE = sizeof(STransMsgHead); + SConnBuffer* p = connBuf; if (p->left != 0) { return -1; } int total = connBuf->total; - *buf = taosMemoryCalloc(1, total); - memcpy(*buf, p->buf, total); - - transResetBuffer(connBuf); + if (total >= HEADSIZE && !p->invalid) { + *buf = taosMemoryCalloc(1, total); + memcpy(*buf, p->buf, total); + transResetBuffer(connBuf); + } else { + total = -1; + } return total; } @@ -173,6 +179,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); } if (p->total >= p->len) { p->left = p->total - p->len; @@ -180,7 +187,7 @@ bool transReadComplete(SConnBuffer* connBuf) { p->left = 0; } } - return p->left == 0 ? true : false; + return (p->left == 0 || p->invalid) ? true : false; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 4d35e346b17d81e108d33e5c2f1e6da376047f5e..f50711c59c82e3552169ea74cc45419a23d328fc 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -114,6 +114,8 @@ static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvPrepareCb(uv_prepare_t* handle); +static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead); + /* * time-consuming task throwed into BG work thread */ @@ -154,37 +156,6 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - reallocConnRef(conn); \ - tTrace("conn %p received release request", conn); \ - \ - STraceId traceId = head->traceId; \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - \ - STransMsg tmsg = { \ - .code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; \ - SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - if (conn->regArg.init) { \ - tTrace("conn %p release, notify server app", conn); \ - STrans* pTransInst = conn->pTransInst; \ - (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ - memset(&conn->regArg, 0, sizeof(conn->regArg)); \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ - } while (0) - #define SRV_RELEASE_UV(loop) \ do { \ uv_walk(loop, uvWalkCb, NULL); \ @@ -212,17 +183,25 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { tDebug("%p timeout since no activity", conn); } -static void uvHandleReq(SSvrConn* pConn) { - STransMsgHead* msg = NULL; - int msgLen = 0; +static bool uvHandleReq(SSvrConn* pConn) { + STrans* pTransInst = pConn->pTransInst; - msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + STransMsgHead* msg = NULL; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + if (msgLen <= 0) { + tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); + return false; + } STransMsgHead* pHead = (STransMsgHead*)msg; pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); memcpy(pConn->user, pHead->user, strlen(pHead->user)); + if (uvRecvReleaseReq(pConn, pHead)) { + return true; + } + // TODO(dengyihao): time-consuming task throwed into BG Thread // uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t)); // wreq->data = pConn; @@ -230,8 +209,6 @@ static void uvHandleReq(SSvrConn* pConn) { // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - CONN_SHOULD_RELEASE(pConn, pHead); - STransMsg transMsg; memset(&transMsg, 0, sizeof(transMsg)); transMsg.contLen = transContLenFromMsg(pHead->msgLen); @@ -247,7 +224,6 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } - STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); @@ -285,6 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + return true; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -295,11 +272,22 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); - while (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (pBuf->len <= TRANS_PACKET_LIMIT) { + while (transReadComplete(pBuf)) { + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); + if (pBuf->invalid) { + tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn); + destroyConn(conn, true); + break; + } else { + if (false == uvHandleReq(conn)) break; + } + } + return; + } else { + destroyConn(conn, true); + return; } - return; } if (nread == 0) { return; @@ -391,6 +379,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); if (pConn->status == ConnNormal) { pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); @@ -591,6 +580,36 @@ static void uvPrepareCb(uv_prepare_t* handle) { } } +static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { + if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { + reallocConnRef(pConn); + tTrace("conn %p received release request", pConn); + + STraceId traceId = pHead->traceId; + pConn->status = ConnRelease; + transClearBuffer(&pConn->readBuf); + transFreeMsg(transContFromHead((char*)pHead)); + + STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; + SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + srvMsg->msg = tmsg; + srvMsg->type = Release; + srvMsg->pConn = pConn; + if (!transQueuePush(&pConn->srvMsgs, srvMsg)) { + return true; + } + if (pConn->regArg.init) { + tTrace("conn %p release, notify server app", pConn); + STrans* pTransInst = pConn->pTransInst; + (*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL); + memset(&pConn->regArg, 0, sizeof(pConn->regArg)); + } + uvStartSendRespInternal(srvMsg); + return true; + } + return false; +} + static void uvWorkDoTask(uv_work_t* req) { // doing time-consumeing task // only auth conn currently, add more func later @@ -857,6 +876,7 @@ static int reallocConnRef(SSvrConn* conn) { } static void uvDestroyConn(uv_handle_t* handle) { SSvrConn* conn = handle->data; + if (conn == NULL) { return; } @@ -872,9 +892,8 @@ static void uvDestroyConn(uv_handle_t* handle) { SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); destroySmsg(msg); } - - transReqQueueClear(&conn->wreqQueue); transQueueDestroy(&conn->srvMsgs); + transReqQueueClear(&conn->wreqQueue); QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp);