From 19686d9ab1cb5b1a6f73e6c4d5dd21b1ea09f1ce Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Mar 2022 22:16:59 +0800 Subject: [PATCH] update transport --- source/libs/index/test/fstTest.cc | 12 +- source/libs/transport/inc/transComm.h | 6 +- source/libs/transport/src/transCli.c | 198 ++++++++++++-------------- source/libs/transport/src/transComm.c | 6 + source/libs/transport/src/transSrv.c | 24 ++-- source/libs/transport/test/rclient.c | 1 + source/libs/transport/test/rserver.c | 2 + 7 files changed, 121 insertions(+), 128 deletions(-) diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index cb3206a611..32f11b8af6 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -258,7 +258,7 @@ void checkFstCheckIterator() { // prefix search std::vector result; - AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS); + AutomationCtx* ctx = automCtxCreate((void*)"H", AUTOMATION_PREFIX); m->Search(ctx, result); std::cout << "size: " << result.size() << std::endl; // assert(result.size() == count); @@ -328,11 +328,11 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) { int main(int argc, char* argv[]) { // tool to check all kind of fst test // if (argc > 1) { validateTFile(argv[1]); } - if (argc > 4) { - // path suid colName ver - iterTFileReader(argv[1], argv[2], argv[3], argv[4]); - } - // checkFstCheckIterator(); + // if (argc > 4) { + // path suid colName ver + // iterTFileReader(argv[1], argv[2], argv[3], argv[4]); + //} + checkFstCheckIterator(); // checkFstLongTerm(); // checkFstPrefixSearch(); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d4d9bff56c..f3e9e88583 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -123,9 +123,8 @@ typedef struct { } SRpcReqContext; typedef struct { - SRpcInfo* pTransInst; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app // struct SRpcConn* pConn; // pConn allocated tmsg_t msgType; // message type uint8_t* pCont; // content provided by app @@ -244,6 +243,7 @@ int transDestroyBuffer(SConnBuffer* buf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); +int transSetConnOption(uv_tcp_t* stream); // int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen); // int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool ); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fb76f38fe5..821b51c935 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -20,7 +20,10 @@ #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) +#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) + typedef struct SCliConn { + T_REF_DECLARE() uv_connect_t connReq; uv_stream_t* stream; uv_write_t* writeReq; @@ -32,8 +35,7 @@ typedef struct SCliConn { int8_t ctnRdCnt; // continue read count int hThrdIdx; - SRpcPush* push; - int persist; // + int persist; // // spi configure char spi; char secured; @@ -41,6 +43,7 @@ typedef struct SCliConn { // debug and log info struct sockaddr_in addr; struct sockaddr_in locaddr; + } SCliConn; typedef struct SCliMsg { @@ -54,14 +57,17 @@ typedef struct SCliThrdObj { pthread_t thread; uv_loop_t* loop; // uv_async_t* cliAsync; // - SAsyncPool* asyncPool; - uv_timer_t* timer; - void* pool; // conn pool + SAsyncPool* asyncPool; + uv_timer_t* timer; + void* pool; // conn pool + + // msg queue queue msg; pthread_mutex_t msgMtx; - uint64_t nextTimeout; // next timeout - void* pTransInst; // - bool quit; + + uint64_t nextTimeout; // next timeout + void* pTransInst; // + bool quit; } SCliThrdObj; typedef struct SClientObj { @@ -96,7 +102,7 @@ static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -// process data read from server, auth/decompress etc later +// process data read from server, add decompress etc later static void clientHandleResp(SCliConn* conn); // handle except about conn static void clientHandleExcept(SCliConn* conn); @@ -104,9 +110,10 @@ static void clientHandleExcept(SCliConn* conn); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientSendQuit(SCliThrdObj* thrd); - static void destroyUserdata(SRpcMsg* userdata); +static int clientRBChoseIdx(SRpcInfo* pTransInst); + static void destroyCmsg(SCliMsg* cmsg); static void transDestroyConnCtx(STransConnCtx* ctx); // thread obj @@ -115,10 +122,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd); // thread static void* clientThread(void* arg); -static void clientHandleResp(SCliConn* conn) { +static void* clientNotifyApp() {} +static void clientHandleResp(SCliConn* conn) { SCliMsg* pMsg = conn->data; STransConnCtx* pCtx = pMsg->ctx; - SRpcInfo* pRpc = pCtx->pTransInst; + + SCliThrdObj* pThrd = conn->hostThrd; + SRpcInfo* pTransInst = pThrd->pTransInst; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); @@ -134,26 +144,24 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - if (pRpc->pfp != NULL && (pRpc->pfp)(pRpc->parent, rpcMsg.msgType)) { + if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { rpcMsg.handle = conn; conn->persist = 1; tDebug("client conn %p persist by app", conn); } - tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pRpc->label, conn, + tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); conn->secured = pHead->secured; - if (conn->push != NULL && conn->ctnRdCnt != 0) { - (*conn->push->callback)(conn->push->arg, &rpcMsg); - conn->push = NULL; - } else { + + if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) { - tTrace("%s client conn %p handle resp", pRpc->label, conn); - (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); + tTrace("%s client conn %p handle resp", pTransInst->label, conn); + (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - tTrace("%s client conn(sync) %p handle resp", pRpc->label, conn); + tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); tsem_post(pCtx->pSem); } @@ -162,28 +170,33 @@ static void clientHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); - SCliThrdObj* pThrd = conn->hostThrd; - // user owns conn->persist = 1 - if (conn->push == NULL && conn->persist == 0) { - if (pRpc->noPool == true) { + if (conn->persist == 0) { + if (pTransInst->noPool == true) { + destroyCmsg(conn->data); + clientConnDestroy(conn, true); } else { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + destroyCmsg(conn->data); + conn->data = NULL; } + } else { + // app decide to free or not } - destroyCmsg(conn->data); - conn->data = NULL; // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { + if (!uv_is_active((uv_handle_t*)pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } static void clientHandleExcept(SCliConn* pConn) { - if (pConn->data == NULL && pConn->push == NULL) { + if (pConn->data == NULL) { // handle conn except in conn pool clientConnDestroy(pConn, true); return; } + SCliThrdObj* pThrd = pConn->hostThrd; + SRpcInfo* pTransInst = pThrd->pTransInst; + SCliMsg* pMsg = pConn->data; STransConnCtx* pCtx = pMsg->ctx; @@ -192,29 +205,14 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.msgType = pMsg->msg.msgType + 1; - if (pConn->push != NULL && pConn->ctnRdCnt != 0) { - (*pConn->push->callback)(pConn->push->arg, &rpcMsg); - pConn->push = NULL; + if (pCtx->pSem == NULL) { + (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - if (pCtx->pSem == NULL) { - (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); - } else { - memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); - tsem_post(pCtx->pSem); - } - if (pConn->push != NULL) { - (*pConn->push->callback)(pConn->push->arg, &rpcMsg); - } - pConn->push = NULL; + memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); + tsem_post(pCtx->pSem); } - tTrace("%s client conn %p start to destroy", pCtx->pTransInst->label, pConn); - if (pConn->push == NULL) { - destroyCmsg(pConn->data); - pConn->data = NULL; - } - // transDestroyConnCtx(pCtx); + tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); clientConnDestroy(pConn, true); - pConn->ctnRdCnt += 1; } static void clientTimeoutCb(uv_timer_t* handle) { @@ -316,17 +314,14 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (transReadComplete(pBuf)) { - tTrace("client conn %p read complete", conn); + tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn); clientHandleResp(conn); } else { - tTrace("client conn %p read partial packet, continue to read", conn); + tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); } return; } - if (nread == UV_EOF) { - tError("client conn %p read error: %s", conn, uv_err_name(nread)); - clientHandleExcept(conn); - } + assert(nread <= 0); if (nread == 0) { // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb @@ -335,18 +330,16 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf return; } if (nread < 0) { - tError("client conn %p read error: %s", conn, uv_err_name(nread)); + tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); clientHandleExcept(conn); } - // tDebug("Read error %s\n", uv_err_name(nread)); - // uv_close((uv_handle_t*)handle, clientDestroy); } static void clientConnDestroy(SCliConn* conn, bool clear) { // conn->ref--; if (conn->ref == 0) { - tTrace("client conn %p remove from conn pool", conn); + tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); if (clear) { uv_close((uv_handle_t*)conn->stream, clientDestroy); @@ -367,8 +360,9 @@ static void clientDestroy(uv_handle_t* handle) { static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; + if (status == 0) { - tTrace("client conn %p data already was written out", pConn); + tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { // handle @@ -376,18 +370,19 @@ static void clientWriteCb(uv_write_t* req, int status) { } destroyUserdata(&pMsg->msg); } else { - tError("client conn %p failed to write: %s", pConn, uv_err_name(status)); + tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); clientHandleExcept(pConn); return; } - SCliThrdObj* pThrd = pConn->hostThrd; uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb); } static void clientWrite(SCliConn* pConn) { SCliMsg* pCliMsg = pConn->data; STransConnCtx* pCtx = pCliMsg->ctx; - SRpcInfo* pTransInst = pCtx->pTransInst; + + SCliThrdObj* pThrd = pConn->hostThrd; + SRpcInfo* pTransInst = pThrd->pTransInst; SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); @@ -416,20 +411,18 @@ static void clientWrite(SCliConn* pConn) { pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - // if (pHead->msgType == TDMT_VND_QUERY || pHead->msgType == TDMT_VND_) - uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), - inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port)); + tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; if (status != 0) { - // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); - tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status)); + tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); clientHandleExcept(pConn); return; } @@ -439,7 +432,7 @@ static void clientConnCb(uv_connect_t* req, int status) { addrlen = sizeof(pConn->locaddr); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); - tTrace("client conn %p connect to server successfully", pConn); + tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); assert(pConn->stream == req->handle); clientWrite(pConn); @@ -462,20 +455,18 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { et = taosGetTimestampUs(); STransConnCtx* pCtx = pMsg->ctx; + SRpcInfo* pTransInst = pThrd->pTransInst; + SCliConn* conn = NULL; - SCliConn* conn = NULL; - if (pMsg->msg.handle == NULL) { - if (pCtx->pTransInst->noPool == true) { - } else { - conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); - } + if (pMsg->msg.handle != NULL) { + conn = (SCliConn*)(pMsg->msg.handle); if (conn != NULL) { - tTrace("client conn %p get from conn pool", conn); + tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); } } else { - conn = (SCliConn*)(pMsg->msg.handle); - if (conn != NULL) { - tTrace("client conn %p reused", conn); + if (pTransInst->noPool == false) { + conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } } @@ -489,7 +480,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { return; } clientWrite(conn); - } else { conn = calloc(1, sizeof(SCliConn)); conn->ref++; @@ -497,12 +487,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - uv_tcp_nodelay((uv_tcp_t*)conn->stream, 1); - int ret = uv_tcp_keepalive((uv_tcp_t*)conn->stream, 1, 1); - if (ret) { - tTrace("client conn %p failed to set keepalive, %s", conn, uv_err_name(ret)); - } - // write req handle + conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq->data = conn; @@ -512,17 +497,17 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->data = pMsg; conn->hostThrd = pThrd; - // conn->push = pMsg->msg.push; - // conn->ctnRdCnt = 0; - + int ret = transSetConnOption((uv_tcp_t*)conn->stream); + if (ret) { + tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); + } struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect - tTrace("client conn %p try to connect to %s:%d", conn, pMsg->ctx->ip, pMsg->ctx->port); + tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); } - conn->push = pMsg->msg.push; conn->ctnRdCnt = 0; conn->hThrdIdx = pCtx->hThrdIdx; } @@ -548,7 +533,6 @@ static void clientAsyncCb(uv_async_t* handle) { } else { clientHandleReq(pMsg, pThrd); } - // clientHandleReq(pMsg, pThrd); count++; } if (count >= 2) { @@ -656,37 +640,36 @@ void taosCloseClient(void* arg) { free(cli->pThreadObj); free(cli); } -static int clientRBChoseIdx(SRpcInfo* pRpc) { - int64_t index = pRpc->index; - if (pRpc->index++ >= pRpc->numOfThreads) { - pRpc->index = 0; +static int clientRBChoseIdx(SRpcInfo* pTransInst) { + int64_t index = pTransInst->index; + if (pTransInst->index++ >= pTransInst->numOfThreads) { + pTransInst->index = 0; } - return index % pRpc->numOfThreads; + return index % pTransInst->numOfThreads; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - SRpcInfo* pRpc = (SRpcInfo*)shandle; + SRpcInfo* pTransInst = (SRpcInfo*)shandle; int index = CONN_HOST_THREAD_INDEX(pMsg->handle); if (index == -1) { - index = clientRBChoseIdx(pRpc); + index = clientRBChoseIdx(pTransInst); } int32_t flen = 0; if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); - pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; pCtx->hThrdIdx = index; - assert(pRpc->connType == TAOS_CONN_CLIENT); + assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); @@ -694,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } @@ -702,15 +685,14 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - SRpcInfo* pRpc = (SRpcInfo*)shandle; + SRpcInfo* pTransInst = (SRpcInfo*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { - index = clientRBChoseIdx(pRpc); + index = clientRBChoseIdx(pTransInst); } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); - pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pReq->ahandle; pCtx->msgType = pReq->msgType; pCtx->ip = strdup(ip); @@ -725,7 +707,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; tsem_wait(pSem); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 9a8607b0ed..92e42cb380 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -257,6 +257,12 @@ int transDestroyBuffer(SConnBuffer* buf) { transClearBuffer(buf); } +int transSetConnOption(uv_tcp_t* stream) { + uv_tcp_nodelay(stream, 1); + int ret = uv_tcp_keepalive(stream, 5, 5); + return ret; +} + SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = calloc(1, sizeof(SAsyncPool)); pool->index = 0; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c7b6ca2a2c..3d0da29e2d 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -18,6 +18,7 @@ #include "transComm.h" typedef struct SSrvConn { + T_REF_DECLARE() uv_tcp_t* pTcp; uv_write_t* pWriter; uv_timer_t* pTimer; @@ -67,16 +68,19 @@ typedef struct SWorkThrdObj { } SWorkThrdObj; typedef struct SServerObj { - pthread_t thread; - uv_tcp_t server; - uv_loop_t* loop; + pthread_t thread; + uv_tcp_t server; + uv_loop_t* loop; + + // work thread info int workerIdx; int numOfThreads; SWorkThrdObj** pThreadObj; - uv_pipe_t** pipe; - uint32_t ip; - uint32_t port; - uv_async_t* pAcceptAsync; // just to quit from from accept thread + + uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; + uv_async_t* pAcceptAsync; // just to quit from from accept thread } SServerObj; static const char* notify = "a"; @@ -493,13 +497,11 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_tcp_init(pThrd->loop, pConn->pTcp); pConn->pTcp->data = pConn; - // uv_tcp_nodelay(pConn->pTcp, 1); - // uv_tcp_keepalive(pConn->pTcp, 1, 1); - - // init write request, just pConn->pWriter = calloc(1, sizeof(uv_write_t)); pConn->pWriter->data = pConn; + transSetConnOption((uv_tcp_t*)pConn->pTcp); + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index bcdf32bf6a..62ae0101ef 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -124,6 +124,7 @@ int main(int argc, char *argv[]) { rpcInit.ckey = "key"; rpcInit.spi = 1; rpcInit.connType = TAOS_CONN_CLIENT; + rpcDebugFlag = 143; for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 5432a07649..ece3c7a500 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -125,6 +125,8 @@ int main(int argc, char *argv[]) { rpcInit.idleTime = 2 * 1500; rpcInit.afp = retrieveAuthInfo; + rpcDebugFlag = 143; + for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { rpcInit.localPort = atoi(argv[++i]); -- GitLab