diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 790cbf906da887c01bec011a875a67db8f359b7b..e6a4dd1d493969a333005a64f515ba35dde34573 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -33,7 +33,7 @@ extern bool gRaftDetailLog; #define SYNC_MAX_READ_RANGE 2 #define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) -#define SYNC_MAX_RECV_TIME_RANGE_MS 1000 +#define SYNC_MAX_RECV_TIME_RANGE_MS 1200 #define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_MAX_BATCH_SIZE 1 diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 6b52c7427177bdd1662db63d57308e231348e99b..bc1c6386f6e0d9a61b21e5ca0e3aa198c1da043e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -98,6 +98,11 @@ typedef void* queue[2]; #define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 + +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; @@ -151,6 +156,7 @@ typedef struct { char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char user[TSDB_UNI_LEN]; + uint32_t magicNum; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later @@ -203,6 +209,7 @@ typedef struct SConnBuffer { int cap; int left; int total; + int invalid; } SConnBuffer; typedef void (*AsyncCB)(uv_async_t* handle); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 62277a7569a836f84e2ea143dc648737c208b3c8..dc0f9128b0540ee96637de619017c4a67c335659 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -221,7 +221,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uError("failed to receive response from %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } - code = 0; SEND_OVER: diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index add007e14d2a49a18bb2be13deb17430eb8841c6..8fdfcd5309c927e4e49386340fbd44496a4a7688 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -759,6 +759,7 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 4272ec0b1c201d930e4c63c9266312a77bef97bd..3ba8186e9dbebb52260eeb24766de603f957a9e3 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) { buf->left = -1; buf->len = 0; buf->total = 0; + buf->invalid = 0; return 0; } int transDestroyBuffer(SConnBuffer* p) { @@ -108,19 +109,25 @@ int transClearBuffer(SConnBuffer* buf) { p->left = -1; p->len = 0; p->total = 0; + p->invalid = 0; return 0; } int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { + static const int HEADSIZE = sizeof(STransMsgHead); + SConnBuffer* p = connBuf; if (p->left != 0) { return -1; } int total = connBuf->total; - *buf = taosMemoryCalloc(1, total); - memcpy(*buf, p->buf, total); - - transResetBuffer(connBuf); + if (total >= HEADSIZE && !p->invalid) { + *buf = taosMemoryCalloc(1, total); + memcpy(*buf, p->buf, total); + transResetBuffer(connBuf); + } else { + total = -1; + } return total; } @@ -173,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); } if (p->total >= p->len) { p->left = p->total - p->len; @@ -180,7 +188,8 @@ bool transReadComplete(SConnBuffer* connBuf) { p->left = 0; } } - return p->left == 0 ? true : false; + + return (p->left == 0 || p->invalid) ? true : false; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 3512b27bf86f71c34562f7bc928d4ecd18807684..c980b70abddd570e63d424842205e5147e2ae31f 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -183,11 +183,15 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { tDebug("%p timeout since no activity", conn); } -static void uvHandleReq(SSvrConn* pConn) { - STransMsgHead* msg = NULL; - int msgLen = 0; +static bool uvHandleReq(SSvrConn* pConn) { + STrans* pTransInst = pConn->pTransInst; - msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + STransMsgHead* msg = NULL; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + if (msgLen <= 0) { + tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); + return false; + } STransMsgHead* pHead = (STransMsgHead*)msg; pHead->code = htonl(pHead->code); @@ -200,9 +204,8 @@ static void uvHandleReq(SSvrConn* pConn) { // uv_read_stop((uv_stream_t*)pConn->pTcp); // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - if (uvRecvReleaseReq(pConn, pHead)) { - return; + return true; } STransMsg transMsg; @@ -220,7 +223,6 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } - STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); @@ -258,21 +260,31 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + return true; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // opt - SSvrConn* conn = cli->data; + SSvrConn* conn = cli->data; + STrans* pTransInst = conn->pTransInst; + SConnBuffer* pBuf = &conn->readBuf; - STrans* pTransInst = conn->pTransInst; if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); - while (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (pBuf->len <= TRANS_PACKET_LIMIT) { + while (transReadComplete(pBuf)) { + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); + if (uvHandleReq(conn) == false) { + destroyConn(conn, true); + return; + } + } + return; + } else { + tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn); + destroyConn(conn, true); + return; } - return; } if (nread == 0) { return; @@ -364,6 +376,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); if (pConn->status == ConnNormal) { pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);