提交 e016c8f7 编写于 作者: dengyihao's avatar dengyihao

fix invalid packet

上级 9eb99615
...@@ -100,6 +100,10 @@ typedef void* queue[2]; ...@@ -100,6 +100,10 @@ typedef void* queue[2];
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 #define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
typedef SRpcMsg STransMsg; typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx; typedef SRpcCtx STransCtx;
typedef SRpcCtxVal STransCtxVal; typedef SRpcCtxVal STransCtxVal;
...@@ -152,6 +156,7 @@ typedef struct { ...@@ -152,6 +156,7 @@ typedef struct {
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
char user[TSDB_UNI_LEN]; char user[TSDB_UNI_LEN];
uint32_t magicNum;
STraceId traceId; STraceId traceId;
uint64_t ahandle; // ahandle assigned by client uint64_t ahandle; // ahandle assigned by client
uint32_t code; // del later uint32_t code; // del later
...@@ -204,6 +209,7 @@ typedef struct SConnBuffer { ...@@ -204,6 +209,7 @@ typedef struct SConnBuffer {
int cap; int cap;
int left; int left;
int total; int total;
int invalid;
} SConnBuffer; } SConnBuffer;
typedef void (*AsyncCB)(uv_async_t* handle); typedef void (*AsyncCB)(uv_async_t* handle);
......
...@@ -759,6 +759,7 @@ void cliSend(SCliConn* pConn) { ...@@ -759,6 +759,7 @@ void cliSend(SCliConn* pConn) {
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
......
...@@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) { ...@@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) {
buf->left = -1; buf->left = -1;
buf->len = 0; buf->len = 0;
buf->total = 0; buf->total = 0;
buf->invalid = 0;
return 0; return 0;
} }
int transDestroyBuffer(SConnBuffer* p) { int transDestroyBuffer(SConnBuffer* p) {
...@@ -108,6 +109,7 @@ int transClearBuffer(SConnBuffer* buf) { ...@@ -108,6 +109,7 @@ int transClearBuffer(SConnBuffer* buf) {
p->left = -1; p->left = -1;
p->len = 0; p->len = 0;
p->total = 0; p->total = 0;
p->invalid = 0;
return 0; return 0;
} }
...@@ -119,7 +121,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { ...@@ -119,7 +121,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
return -1; return -1;
} }
int total = connBuf->total; int total = connBuf->total;
if (total >= HEADSIZE) { if (total >= HEADSIZE && !p->invalid) {
*buf = taosMemoryCalloc(1, total); *buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total); memcpy(*buf, p->buf, total);
transResetBuffer(connBuf); transResetBuffer(connBuf);
...@@ -178,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) { ...@@ -178,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
memcpy((char*)&head, connBuf->buf, sizeof(head)); memcpy((char*)&head, connBuf->buf, sizeof(head));
int32_t msgLen = (int32_t)htonl(head.msgLen); int32_t msgLen = (int32_t)htonl(head.msgLen);
p->total = msgLen; p->total = msgLen;
p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
} }
if (p->total >= p->len) { if (p->total >= p->len) {
p->left = p->total - p->len; p->left = p->total - p->len;
...@@ -185,7 +188,8 @@ bool transReadComplete(SConnBuffer* connBuf) { ...@@ -185,7 +188,8 @@ bool transReadComplete(SConnBuffer* connBuf) {
p->left = 0; p->left = 0;
} }
} }
return p->left == 0 ? true : false;
return (p->left == 0 || p->invalid) ? true : false;
} }
int transSetConnOption(uv_tcp_t* stream) { int transSetConnOption(uv_tcp_t* stream) {
......
...@@ -183,15 +183,14 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { ...@@ -183,15 +183,14 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
tDebug("%p timeout since no activity", conn); tDebug("%p timeout since no activity", conn);
} }
static void uvHandleReq(SSvrConn* pConn) { static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
STransMsgHead* msg = NULL; STransMsgHead* msg = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
if (msgLen <= 0) { if (msgLen <= 0) {
tError("%s conn %p alread read complete packet", transLabel(pTransInst), pConn); tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
transUnrefSrvHandle(pConn); return false;
return;
} }
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
...@@ -207,7 +206,7 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -207,7 +206,7 @@ static void uvHandleReq(SSvrConn* pConn) {
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
if (uvRecvReleaseReq(pConn, pHead)) { if (uvRecvReleaseReq(pConn, pHead)) {
return; return true;
} }
STransMsg transMsg; STransMsg transMsg;
...@@ -262,6 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -262,6 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) {
transReleaseExHandle(transGetRefMgt(), pConn->refId); transReleaseExHandle(transGetRefMgt(), pConn->refId);
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
return true;
} }
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
...@@ -275,7 +275,10 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -275,7 +275,10 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if (pBuf->len <= TRANS_PACKET_LIMIT) { if (pBuf->len <= TRANS_PACKET_LIMIT) {
while (transReadComplete(pBuf)) { while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
uvHandleReq(conn); if (uvHandleReq(conn) == false) {
destroyConn(conn, true);
break;
}
} }
return; return;
} else { } else {
...@@ -374,6 +377,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -374,6 +377,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->ahandle = (uint64_t)pMsg->info.ahandle;
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet; pHead->hasEpSet = pMsg->info.hasEpSet;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册