diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b021149c7acce0022fccb5f2ee5c3d0940b17920..1e964591665cb57c737fe8819c36f4127d2a3afe 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,6 +17,7 @@ #include "transComm.h" +#define CONN_HOST_THREAD_INDEX(conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) typedef struct SCliConn { @@ -28,7 +29,8 @@ typedef struct SCliConn { void* data; queue conn; uint64_t expireTime; - int8_t notifyCount; // timers already notify to client + int8_t ctnRdCnt; // timers already notify to client + int hostThreadIndex; SRpcPush* push; int persist; // @@ -131,11 +133,16 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; + if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP) { + rpcMsg.handle = conn; + conn->persist = 1; + } + tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port)); - if (conn->push != NULL && conn->notifyCount != 0) { + if (conn->push != NULL && conn->ctnRdCnt != 0) { (*conn->push->callback)(conn->push->arg, &rpcMsg); conn->push = NULL; } else { @@ -148,7 +155,7 @@ static void clientHandleResp(SCliConn* conn) { tsem_post(pCtx->pSem); } } - conn->notifyCount += 1; + conn->ctnRdCnt += 1; conn->secured = pHead->secured; // buf's mem alread translated to rpcMsg.pCont @@ -157,13 +164,15 @@ static void clientHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); SCliThrdObj* pThrd = conn->hostThrd; + // user owns conn->persist = 1 - if (conn->push == NULL) { + if (conn->push == NULL || conn->persist == 0) { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); destroyCmsg(conn->data); conn->data = NULL; } + // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); @@ -189,7 +198,7 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.msgType = msgType + 1; - if (pConn->push != NULL && pConn->notifyCount != 0) { + if (pConn->push != NULL && pConn->ctnRdCnt != 0) { (*pConn->push->callback)(pConn->push->arg, &rpcMsg); pConn->push = NULL; } else { @@ -210,7 +219,7 @@ static void clientHandleExcept(SCliConn* pConn) { } // transDestroyConnCtx(pCtx); clientConnDestroy(pConn, true); - pConn->notifyCount += 1; + pConn->ctnRdCnt += 1; } static void clientTimeoutCb(uv_timer_t* handle) { @@ -292,7 +301,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - conn->notifyCount = 0; + conn->ctnRdCnt = 0; // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -423,6 +432,8 @@ static void clientWrite(SCliConn* pConn) { pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + // if (pHead->msgType == TDMT_VND_QUERY || pHead->msgType == TDMT_VND_) + uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), @@ -462,14 +473,25 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("client msg tran time cost: %" PRIu64 "", el); + tTrace("client msg tran time cost: %" PRIu64 "us", el); et = taosGetTimestampUs(); STransConnCtx* pCtx = pMsg->ctx; - SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + + SCliConn* conn = NULL; + if (pMsg->msg.handle == NULL) { + conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + if (conn != NULL) { + tTrace("client conn %d get from conn pool", conn); + } + } else { + conn = (SCliConn*)(pMsg->msg.handle); + if (conn != NULL) { + tTrace("client conn %d reused", conn); + } + } + if (conn != NULL) { - // impl later - tTrace("client get conn %p from pool", conn); conn->data = pMsg; conn->writeReq->data = conn; transDestroyBuffer(&conn->readBuf); @@ -480,8 +502,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { } clientWrite(conn); - conn->push = pMsg->msg.push; - } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); conn->ref++; @@ -500,13 +520,18 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->data = pMsg; conn->hostThrd = pThrd; - conn->push = pMsg->msg.push; + // conn->push = pMsg->msg.push; + // conn->ctnRdCnt = 0; struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); } + + conn->push = pMsg->msg.push; + conn->ctnRdCnt = 0; + conn->hThrdIdx = pCtx->hThrdIdx; } static void clientAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; @@ -644,6 +669,13 @@ void taosCloseClient(void* arg) { free(cli->pThreadObj); free(cli); } +static int clientRBChoseIdx(SRpcInfo* pRpc) { + int64_t index = pRpc->index; + if (pRpc->index++ >= pRpc->numOfThreads) { + pRpc->index = 0; + } + return index % pRpc->numOfThreads; +} void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); @@ -651,48 +683,45 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* SRpcInfo* pRpc = (SRpcInfo*)shandle; + int index = CONN_HOST_THREAD_INDEX(pMsg.handle); + if (index == -1) { + index = clientRBChoseIdx(pRpc); + } int32_t flen = 0; if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } - STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); - pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; + pCtx->hThrdIdx = index; assert(pRpc->connType == TAOS_CONN_CLIENT); // atomic or not - int64_t index = pRpc->index; - if (pRpc->index++ >= pRpc->numOfThreads) { - pRpc->index = 0; - } + SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; - - // pthread_mutex_lock(&thrd->msgMtx); - // QUEUE_PUSH(&thrd->msg, &cliMsg->q); - // pthread_mutex_unlock(&thrd->msgMtx); - - // int start = taosGetTimestampUs(); + SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); - // uv_async_send(thrd->cliAsync); - // int end = taosGetTimestampUs() - start; - // tError("client sent to rpc, time cost: %d", (int)end); } + void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; SRpcInfo* pRpc = (SRpcInfo*)shandle; + int index = CONN_HOST_THREAD_INDEX(pMsg.handle); + if (index == -1) { + index = clientRBChoseIdx(pRpc); + } + STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pReq->ahandle; @@ -703,23 +732,13 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { pCtx->pRsp = pRsp; tsem_init(pCtx->pSem, 0, 0); - int64_t index = pRpc->index; - if (pRpc->index++ >= pRpc->numOfThreads) { - pRpc->index = 0; - } SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; - - // pthread_mutex_lock(&thrd->msgMtx); - // QUEUE_PUSH(&thrd->msg, &cliMsg->q); - // pthread_mutex_unlock(&thrd->msgMtx); - // int start = taosGetTimestampUs(); + SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); - tsem_t* pSem = pCtx->pSem; tsem_wait(pSem); tsem_destroy(pSem);