From a6a8daec23f83c5553c688ab15ba3db44fbd7ab7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 16 Feb 2023 15:09:39 +0800 Subject: [PATCH] fix: batch write --- include/libs/transport/trpc.h | 3 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 1 + source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 269 +++++++++++++++++- 5 files changed, 265 insertions(+), 10 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 5787f41772..acfd5dfb51 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -115,7 +115,8 @@ typedef struct SRpcInit { int32_t connLimitNum; int32_t connLimitLock; - void *parent; + int8_t supportBatch; // 0: no batch, 1. batch + void *parent; } SRpcInit; typedef struct { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index b16ff6efac..8751b575f3 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -290,6 +290,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; + rpcInit.supportBatch = 1; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 1fe32955b9..13adb4d2b4 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -66,6 +66,7 @@ typedef struct { int32_t connLimitNum; int8_t connLimitLock; // 0: no lock. 1. lock + int8_t supportBatch; // 0: no batch, 1: support batch int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 6eec54b370..38ec1c7fdc 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -69,6 +69,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->failFastFp = pInit->ffp; pRpc->connLimitNum = pInit->connLimitNum; pRpc->connLimitLock = pInit->connLimitLock; + pRpc->supportBatch = pInit->supportBatch; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2d09822dc7..8063ac838b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -40,6 +40,8 @@ typedef struct SCliConn { bool broken; // link broken or not ConnStatus status; // + SCliBatch* pBatch; + int64_t refId; char* ip; @@ -62,6 +64,16 @@ typedef struct SCliMsg { int sent; //(0: no send, 1: alread sent) } SCliMsg; +typedef struct { + queue wq; + int32_t wLen; + int32_t batchSize; // + int32_t batch; + char* dst; + char* ip; + uint16_t port; +} SCliBatch; + typedef struct SCliThrd { TdThread thread; // tid int64_t pid; // pid @@ -86,6 +98,7 @@ typedef struct SCliThrd { SHashObj* failFastCache; SHashObj* connLimitCache; + SHashObj* batchCache; SCliMsg* stopMsg; @@ -132,6 +145,11 @@ static void cliAsyncCb(uv_async_t* handle); static void cliIdleCb(uv_idle_t* handle); static void cliPrepareCb(uv_prepare_t* handle); +static void cliSendBatch(const SCliBatch* pBatch, SCliThrd* pThrd); +static void cliSendBatchCb(uv_write_t* req, int status); +// callback after conn to server +static void cliConnBatchCb(uv_connect_t* req, int status); + static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static int32_t allocConnRef(SCliConn* conn, bool update); @@ -167,6 +185,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, cliHandleUpdate}; +/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, +/// NULL,cliHandleUpdate}; static FORCE_INLINE void destroyUserdata(STransMsg* userdata); static FORCE_INLINE void destroyCmsg(void* cmsg); @@ -287,6 +307,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } destroyCmsg(msg); } + transQueueClear(&conn->cliMsgs); memset(&conn->ctx, 0, sizeof(conn->ctx)); } bool cliMaySendCachedMsg(SCliConn* conn) { @@ -888,6 +909,169 @@ void cliSend(SCliConn* pConn) { _RETURN: return; } + +static SCliBatch* cliDumpBatch(SCliBatch* pBatch) { + SCliBatch* pNewBatch = taosMemCalloc(1, sizeof(SClicBatch)); + pNewBatch->wq = pBatch->wq; + + pNewBatch->batchSize = pBatch->batchSize; + pNewBatch->batch = pBatch->batch; + pNewBatch->wLen = pBatch->wLen; + pNewBatch->dst = strdup(pBatch->dst); + pNewBatch->ip = strdup(pBatch->ip); + pNewBatch->port = pBatch->port; + + QUEUE_INIT(&pBatch->wq); + pBatch->batchSize = 0; + pBatch->batch = 0; + pBatch->wLen = 0; + + return pNewBatch; +} +static void cliDestroyBatch(SCliBatch* pBatch) { + while (!EMPTY_IS_EMPTY(&pBatch->wq)) { + queue* h = QUEUE_HEAD(&pBatch->wq); + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + QUEUE_REMOVE(&pMsg->q); + destroyCmsg(p); + } + taosMemoryFree(pBatch->ip); + taosMemoryFree(pBatch->dst); + taosMemoryFree(pBatch); +} +static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) { + if (pBatch->wLen == 0 || EMPTY_IS_EMPTY(&pBatch->wq)) { + return; + } + STrans* pTransInst = pThrd->pTransInst; + + SCliBatch* pNewBatch = cliDumpBatch(pBatch); + + SCliConn* conn = getConnFromPool(pThrd->pool, pBatch->ip, pBatch->port); + if (conn == NULL) { + conn = cliCreateConn(pThrd); + conn->pBatch = pNewBatch; + conn->ip = strdup(conn->pBatch->ip); + + uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); + if (ipaddr == 0xffffffff) { + uv_timer_stop(conn->timer); + conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + conn->timer = NULL; + + cliHandleExcept(conn); + return; + } + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = ipaddr; + addr.sin_port = (uint16_t)htons(port); + + tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip); + int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); + if (fd == -1) { + tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + cliHandleExcept(conn); + errno = 0; + return; + } + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); + if (ret != 0) { + tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleExcept(conn); + return; + } + ret = transSetConnOption((uv_tcp_t*)conn->stream); + if (ret != 0) { + tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleExcept(conn); + return; + } + + ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + if (ret != 0) { + uv_timer_stop(conn->timer); + conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + conn->timer = NULL; + + cliHandleFastFail(conn, ret); + return; + } + uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); + return; + } + + conn->pBatch = pNewBatch; + + int32_t wLen = pBatch->wLen; + uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); + int i = 0; + + while (!EMPTY_IS_EMPTY(&pBatch->wq)) { + queue* h = QUEUE_HEAD(&pBatch->wq); + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + QUEUE_REMOVE(&pMsg->q); + + transQueuePush(conn->cliMsgs, pMsg); + + STransConnCtx* pCtx = pCliMsg->ctx; + + STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } + + int msgLen = transMsgLenFromCont(pMsg->contLen); + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + + if (pHead->comp == 0) { + pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; + pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; + pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; + pHead->msgType = pMsg->msgType; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; + memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); + pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); + } + pHead->timestamp = taosHton64(taosGetTimestampUs()); + + if (pHead->comp == 0) { + if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { + msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + } else { + msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); + } + + wb[i++] = uv_buf_init((char*)pHead, msgLen); + } + + pBatch->wLen = 0; + uv_write_t* req = taosMemCalloc(1, sizeof(uv_write_t)); + req->data = pConn; + uv_write(req, (uv_stream_t*)conn->stream, wb, wLen, cliSendBatchCb); + taosMemoryFree(wb); +} +static void cliSendBatchCb(uv_write_t* req, int status) { + SCliConn* conn = req->data; + SCliThrd* thrd = conn->hostThrd; + cliDestroyBatch(conn->pBatch); + conn->pBatch = NULL; + + if (status != 0) { + cliHandleExcept(conn); + } else { + addConnToPool(thrd->pool, conn); + } +} static void cliHandleFastFail(SCliConn* pConn, int status) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -1218,29 +1402,93 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { } tGTrace("%s conn %p ready", pTransInst->label, conn); } -static void cliAsyncCb(uv_async_t* handle) { - SAsyncItem* item = handle->data; - SCliThrd* pThrd = item->pThrd; - SCliMsg* pMsg = NULL; - // batch process to avoid to lock/unlock frequently - queue wq; - taosThreadMutexLock(&item->mtx); - QUEUE_MOVE(&item->qmsg, &wq); - taosThreadMutexUnlock(&item->mtx); +static void cliNoBatchDealReq(queue wq, SCliThrd* pThrd) { + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); + + count++; + } + if (count >= 2) { + tTrace("cli process batch size:%d", count); + } +} + +static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pThrd) { int count = 0; while (!QUEUE_IS_EMPTY(&wq)) { queue* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + if (REQUEST_NO_RESP(&pMsg->msg)) { + STransConnCtx* pCtx = pMsg->ctx; + + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + SCliBatch *ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key))); + if (*ppBatch == NULL) { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + QUEUE_INIT(&pBatch->wq); + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize += pMsg->msg.contLen; + + pBatch->dst = strdup(key); + pBatch->ip = strdup(ip); + pBatch->port = (uint16_t)port; + + taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatch, sizeof(void*)); + } else { + QUEUE_PUSH(&(*ppBatch)->wq, h); + (*pBatch)->wLen += 1; + (*pBatch)->batchSize += pMsg->msg.contLen; + } + } (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); count++; } + + void** pIter = taoskHashIterate(pThrd->batchCache, NULL); + while (pIter != NULL) { + SCliBatch* batch = (SCliBatch*)(*pIter); + + cliSendBatch(batch, pThrd); + pIter = (void**)taosHashIterate(info, pIter); + } + if (count >= 2) { tTrace("cli process batch size:%d", count); } +} + +static void cliAsyncCb(uv_async_t* handle) { + SAsyncItem* item = handle->data; + SCliThrd* pThrd = item->pThrd; + STrans* pTransInst = pThrd->pTransInst; + + SCliMsg* pMsg = NULL; + // batch process to avoid to lock/unlock frequently + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int8_t supportBatch = pTransInst->supprtBatch; + if (supportBatch == 0) { + cliNotBatchDealReq(wq, pThrd); + } else if (supportBatch == 1) { + cliBatchDealReq(wq, pThrd); + } if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); } @@ -1456,6 +1704,8 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK); + pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, hash_no_lock); + pThrd->quit = false; return pThrd; } @@ -1484,6 +1734,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->failFastCache); taosHashCleanup(pThrd->connLimitCache); + taosHashCleanup(pThrd->batchCache); taosMemoryFree(pThrd); } -- GitLab