From c35b938698f913073b7ce52fc6cc8dd6354508c6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 16 Feb 2023 17:01:09 +0800 Subject: [PATCH] fix invalid read/write --- source/libs/transport/src/transCli.c | 269 ++++++++++++++------------- 1 file changed, 143 insertions(+), 126 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8063ac838b..54203e8d7c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -20,6 +20,15 @@ typedef struct SConnList { int32_t size; } SConnList; +typedef struct { + queue wq; + int32_t wLen; + int32_t batchSize; // + int32_t batch; + char* dst; + char* ip; + uint16_t port; +} SCliBatch; typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -64,16 +73,6 @@ typedef struct SCliMsg { int sent; //(0: no send, 1: alread sent) } SCliMsg; -typedef struct { - queue wq; - int32_t wLen; - int32_t batchSize; // - int32_t batch; - char* dst; - char* ip; - uint16_t port; -} SCliBatch; - typedef struct SCliThrd { TdThread thread; // tid int64_t pid; // pid @@ -145,10 +144,12 @@ static void cliAsyncCb(uv_async_t* handle); static void cliIdleCb(uv_idle_t* handle); static void cliPrepareCb(uv_prepare_t* handle); -static void cliSendBatch(const SCliBatch* pBatch, SCliThrd* pThrd); +static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); +// static void cliConnBatchCb(uv_connect_t* req, int status); static void cliSendBatchCb(uv_write_t* req, int status); -// callback after conn to server -static void cliConnBatchCb(uv_connect_t* req, int status); +// static void cliConnBatchCb(uv_connect_t* req, int status); +// callback after conn to server +// static void cliConnBatchCb(uv_connect_t* req, int status); static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); @@ -160,6 +161,7 @@ static SCliConn* cliCreateConn(SCliThrd* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle); static void cliSend(SCliConn* pConn); +static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); // cli util func @@ -825,7 +827,63 @@ static void cliSendCb(uv_write_t* req, int status) { } uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } +void cliSendBatch(SCliConn* pConn) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + + SCliBatch* pBatch = pConn->pBatch; + int32_t wLen = pBatch->wLen; + + uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); + int i = 0; + + while (!QUEUE_IS_EMPTY(&pBatch->wq)) { + queue* h = QUEUE_HEAD(&pBatch->wq); + SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q); + QUEUE_REMOVE(&pCliMsg->q); + + STransConnCtx* pCtx = pCliMsg->ctx; + + STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } + int msgLen = transMsgLenFromCont(pMsg->contLen); + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + + if (pHead->comp == 0) { + pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; + pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; + pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; + pHead->msgType = pMsg->msgType; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; + memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); + pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); + } + pHead->timestamp = taosHton64(taosGetTimestampUs()); + + if (pHead->comp == 0) { + if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { + msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + } else { + msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); + } + + wb[i++] = uv_buf_init((char*)pHead, msgLen); + } + + pBatch->wLen = 0; + uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); + req->data = pConn; + uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); + taosMemoryFree(wb); +} void cliSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -911,8 +969,8 @@ _RETURN: } static SCliBatch* cliDumpBatch(SCliBatch* pBatch) { - SCliBatch* pNewBatch = taosMemCalloc(1, sizeof(SClicBatch)); - pNewBatch->wq = pBatch->wq; + SCliBatch* pNewBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + memcpy(pNewBatch->wq, pBatch->wq, sizeof(pBatch->wq)); pNewBatch->batchSize = pBatch->batchSize; pNewBatch->batch = pBatch->batch; @@ -929,19 +987,19 @@ static SCliBatch* cliDumpBatch(SCliBatch* pBatch) { return pNewBatch; } static void cliDestroyBatch(SCliBatch* pBatch) { - while (!EMPTY_IS_EMPTY(&pBatch->wq)) { + while (!QUEUE_IS_EMPTY(&pBatch->wq)) { queue* h = QUEUE_HEAD(&pBatch->wq); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + SCliMsg* p = QUEUE_DATA(h, SCliMsg, q); - QUEUE_REMOVE(&pMsg->q); + QUEUE_REMOVE(&p->q); destroyCmsg(p); } taosMemoryFree(pBatch->ip); taosMemoryFree(pBatch->dst); taosMemoryFree(pBatch); } -static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) { - if (pBatch->wLen == 0 || EMPTY_IS_EMPTY(&pBatch->wq)) { +static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { + if (pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { return; } STrans* pTransInst = pThrd->pTransInst; @@ -961,33 +1019,32 @@ static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) { taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; - cliHandleExcept(conn); + cliHandleFastFail(conn, -1); return; } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = ipaddr; - addr.sin_port = (uint16_t)htons(port); + addr.sin_port = (uint16_t)htons(pBatch->port); - tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip); + tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pBatch->ip); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); if (fd == -1) { - tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, - tstrerror(TAOS_SYSTEM_ERROR(errno))); - cliHandleExcept(conn); - errno = 0; + tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + cliHandleFastFail(conn, -1); return; } int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { - tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); - cliHandleExcept(conn); + tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleFastFail(conn, -1); return; } ret = transSetConnOption((uv_tcp_t*)conn->stream); if (ret != 0) { - tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); - cliHandleExcept(conn); + tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleFastFail(conn, -1); return; } @@ -997,8 +1054,7 @@ static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) { conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; - - cliHandleFastFail(conn, ret); + cliHandleFastFail(conn, -1); return; } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); @@ -1006,59 +1062,7 @@ static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) { } conn->pBatch = pNewBatch; - - int32_t wLen = pBatch->wLen; - uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); - int i = 0; - - while (!EMPTY_IS_EMPTY(&pBatch->wq)) { - queue* h = QUEUE_HEAD(&pBatch->wq); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - QUEUE_REMOVE(&pMsg->q); - - transQueuePush(conn->cliMsgs, pMsg); - - STransConnCtx* pCtx = pCliMsg->ctx; - - STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); - if (pMsg->pCont == 0) { - pMsg->pCont = (void*)rpcMallocCont(0); - pMsg->contLen = 0; - } - - int msgLen = transMsgLenFromCont(pMsg->contLen); - STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - - if (pHead->comp == 0) { - pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; - pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; - pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; - pHead->msgType = pMsg->msgType; - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; - memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); - pHead->traceId = pMsg->info.traceId; - pHead->magicNum = htonl(TRANS_MAGIC_NUM); - } - pHead->timestamp = taosHton64(taosGetTimestampUs()); - - if (pHead->comp == 0) { - if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { - msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - } - } else { - msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); - } - - wb[i++] = uv_buf_init((char*)pHead, msgLen); - } - - pBatch->wLen = 0; - uv_write_t* req = taosMemCalloc(1, sizeof(uv_write_t)); - req->data = pConn; - uv_write(req, (uv_stream_t*)conn->stream, wb, wLen, cliSendBatchCb); - taosMemoryFree(wb); + cliSendBatch(conn); } static void cliSendBatchCb(uv_write_t* req, int status) { SCliConn* conn = req->data; @@ -1075,29 +1079,34 @@ static void cliSendBatchCb(uv_write_t* req, int status) { static void cliHandleFastFail(SCliConn* pConn, int status) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - - SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); - - STraceId* trace = &pMsg->msg.info.traceId; - tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status)); - - if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && - (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip)); - int64_t cTimestamp = taosGetTimestampMs(); - if (item != NULL) { - int32_t elapse = cTimestamp - item->timestamp; - if (elapse >= 0 && elapse <= pTransInst->failFastInterval) { - item->count++; + tError("conn %p free twice", pConn); + if (pConn->pBatch == NULL) { + SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); + + STraceId* trace = &pMsg->msg.info.traceId; + tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), + TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status)); + + if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && + (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { + SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip)); + int64_t cTimestamp = taosGetTimestampMs(); + if (item != NULL) { + int32_t elapse = cTimestamp - item->timestamp; + if (elapse >= 0 && elapse <= pTransInst->failFastInterval) { + item->count++; + } else { + item->count = 1; + item->timestamp = cTimestamp; + } } else { - item->count = 1; - item->timestamp = cTimestamp; + SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; + taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem)); } - } else { - SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem)); } + } else { + cliDestroyBatch(pConn->pBatch); + pConn->pBatch = NULL; } cliHandleExcept(pConn); } @@ -1117,7 +1126,11 @@ void cliConnCb(uv_connect_t* req, int status) { } if (status != 0) { - if (timeout == false) cliHandleFastFail(pConn, status); + if (timeout == false) { + cliHandleFastFail(pConn, status); + } else if (timeout == true) { + // already deal by timeout + } return; } @@ -1135,8 +1148,11 @@ void cliConnCb(uv_connect_t* req, int status) { transSockInfo2Str(&sockname, pConn->src); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); - - cliSend(pConn); + if (pConn->pBatch != NULL) { + cliSendBatch(pConn); + } else { + cliSend(pConn); + } } static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { @@ -1403,11 +1419,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { tGTrace("%s conn %p ready", pTransInst->label, conn); } -static void cliNoBatchDealReq(queue wq, SCliThrd* pThrd) { +static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { int count = 0; - while (!QUEUE_IS_EMPTY(&wq)) { - queue* h = QUEUE_HEAD(&wq); + while (!QUEUE_IS_EMPTY(wq)) { + queue* h = QUEUE_HEAD(wq); QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); @@ -1420,10 +1436,10 @@ static void cliNoBatchDealReq(queue wq, SCliThrd* pThrd) { } } -static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pThrd) { +static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { int count = 0; - while (!QUEUE_IS_EMPTY(&wq)) { - queue* h = QUEUE_HEAD(&wq); + while (!QUEUE_IS_EMPTY(wq)) { + queue* h = QUEUE_HEAD(wq); QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); @@ -1435,8 +1451,8 @@ static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pTh char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - SCliBatch *ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key))); - if (*ppBatch == NULL) { + SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key)); + if (ppBatch == NULL || *ppBatch == NULL) { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); QUEUE_INIT(&pBatch->wq); QUEUE_PUSH(&pBatch->wq, h); @@ -1450,20 +1466,21 @@ static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pTh taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatch, sizeof(void*)); } else { QUEUE_PUSH(&(*ppBatch)->wq, h); - (*pBatch)->wLen += 1; - (*pBatch)->batchSize += pMsg->msg.contLen; + (*ppBatch)->wLen += 1; + (*ppBatch)->batchSize += pMsg->msg.contLen; } + return; } (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); count++; } - void** pIter = taoskHashIterate(pThrd->batchCache, NULL); + void** pIter = taosHashIterate(pThrd->batchCache, NULL); while (pIter != NULL) { SCliBatch* batch = (SCliBatch*)(*pIter); - cliSendBatch(batch, pThrd); - pIter = (void**)taosHashIterate(info, pIter); + cliHandleBatchReq(batch, pThrd); + pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } if (count >= 2) { @@ -1483,11 +1500,11 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); - int8_t supportBatch = pTransInst->supprtBatch; + int8_t supportBatch = pTransInst->supportBatch; if (supportBatch == 0) { - cliNotBatchDealReq(wq, pThrd); + cliNoBatchDealReq(&wq, pThrd); } else if (supportBatch == 1) { - cliBatchDealReq(wq, pThrd); + cliBatchDealReq(&wq, pThrd); } if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); @@ -1704,7 +1721,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK); - pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, hash_no_lock); + pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->quit = false; return pThrd; -- GitLab