diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 386ea95dd795b93bcaa2826d471a6e4c97f81b7b..e880a5abdba4cc36383e93ec51214e58fe69c8a6 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -155,6 +155,8 @@ static void clientSentCb(uv_write_t* req, int32_t status) { if (status != 0) { terrno = TAOS_SYSTEM_ERROR(status); uError("http-report failed to send data %s", uv_strerror(status)); + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); + return; } else { uTrace("http-report succ to send data"); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7052b0b915137678d6aff528a26540a973cd74f5..41688c733079f12fbd04683183dd80db3b65606d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -16,7 +16,7 @@ #include "transComm.h" typedef struct SConnList { - queue conn; + queue conns; int32_t size; } SConnList; @@ -107,11 +107,11 @@ static void doCloseIdleConn(void* param); static void cliReadTimeoutCb(uv_timer_t* handle); // register timer in each thread to clear expire conn // static void cliTimeoutCb(uv_timer_t* handle); -// alloc buf for recv +// alloc buffer for recv static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -// callback after read nbytes from socket +// callback after recv nbytes from socket static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -// callback after write data to socket +// callback after send data to socket static void cliSendCb(uv_write_t* req, int status); // callback after conn to server static void cliConnCb(uv_connect_t* req, int status); @@ -129,19 +129,14 @@ static SCliConn* cliCreateConn(SCliThrd* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle); static void cliSend(SCliConn* pConn); +static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); -static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { - if (code != 0) return false; - if (pCtx->retryCnt == 0) return false; - if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false; - return true; -} +// cli util func +static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); +static void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); + +static int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); -void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); -/* - * set TCP connection timeout per-socket level - */ -static int cliCreateSocket(); // process data read from server, add decompress etc later static void cliHandleResp(SCliConn* conn); // handle except about conn @@ -169,15 +164,14 @@ static void destroyThrdObj(SCliThrd* pThrd); static void cliWalkCb(uv_handle_t* handle, void* arg); static void cliReleaseUnfinishedMsg(SCliConn* conn) { - SCliMsg* pMsg = NULL; for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { - pMsg = transQueueGet(&conn->cliMsgs, i); - if (pMsg != NULL && pMsg->ctx != NULL) { - if (conn->ctx.freeFunc != NULL) { - conn->ctx.freeFunc(pMsg->ctx->ahandle); + SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); + if (msg != NULL && msg->ctx != NULL) { + if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { + conn->ctx.freeFunc(msg->ctx->ahandle); } } - destroyCmsg(pMsg); + destroyCmsg(msg); } } #define CLI_RELEASE_UV(loop) \ @@ -217,8 +211,10 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } \ if (i == sz) { \ pMsg = NULL; \ + tDebug("msg not found, %" PRIu64 "", ahandle); \ } else { \ pMsg = transQueueRm(&conn->cliMsgs, i); \ + tDebug("msg found, %" PRIu64 "", ahandle); \ } \ } while (0) #define CONN_GET_NEXT_SENDMSG(conn) \ @@ -470,8 +466,8 @@ void* createConnPool(int size) { void* destroyConnPool(void* pool) { SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); while (connList != NULL) { - while (!QUEUE_IS_EMPTY(&connList->conn)) { - queue* h = QUEUE_HEAD(&connList->conn); + while (!QUEUE_IS_EMPTY(&connList->conns)) { + queue* h = QUEUE_HEAD(&connList->conns); SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } @@ -484,21 +480,21 @@ void* destroyConnPool(void* pool) { static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { char key[32] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - SHashObj* pPool = pool; - SConnList* plist = taosHashGet(pPool, key, strlen(key)); + + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { SConnList list = {0}; - taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet(pPool, key, strlen(key)); - QUEUE_INIT(&plist->conn); + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + QUEUE_INIT(&plist->conns); } - if (QUEUE_IS_EMPTY(&plist->conn)) { + if (QUEUE_IS_EMPTY(&plist->conns)) { return NULL; } plist->size -= 1; - queue* h = QUEUE_HEAD(&plist->conn); + queue* h = QUEUE_HEAD(&plist->conns); SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; QUEUE_REMOVE(&conn->q); @@ -514,22 +510,21 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; } - SCliThrd* thrd = conn->hostThrd; - CONN_HANDLE_THREAD_QUIT(thrd); - allocConnRef(conn, true); + SCliThrd* thrd = conn->hostThrd; if (conn->timer != NULL) { uv_timer_stop(conn->timer); taosArrayPush(thrd->timerList, &conn->timer); conn->timer->data = NULL; conn->timer = NULL; } + if (T_REF_VAL_GET(conn) > 1) { + transUnrefCliHandle(conn); + } + + cliDestroyConnMsgs(conn, false); - STrans* pTransInst = thrd->pTransInst; - cliReleaseUnfinishedMsg(conn); - transQueueClear(&conn->cliMsgs); - transCtxCleanup(&conn->ctx); conn->status = ConnInPool; if (conn->list == NULL) { @@ -540,18 +535,15 @@ static void addConnToPool(void* pool, SCliConn* conn) { } else { tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); } - assert(conn->list != NULL); - QUEUE_INIT(&conn->q); - QUEUE_PUSH(&conn->list->conn, &conn->q); + QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; - conn->task = NULL; - assert(!QUEUE_IS_EMPTY(&conn->list->conn)); - if (conn->list->size >= 50) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; arg->param2 = thrd; + + STrans* pTransInst = thrd->pTransInst; conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); } } @@ -691,11 +683,10 @@ static void cliDestroy(uv_handle_t* handle) { transRemoveExHandle(transGetRefMgt(), conn->refId); taosMemoryFree(conn->ip); - conn->stream->data = NULL; taosMemoryFree(conn->stream); - transCtxCleanup(&conn->ctx); - cliReleaseUnfinishedMsg(conn); - transQueueDestroy(&conn->cliMsgs); + + cliDestroyConnMsgs(conn, true); + tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); transDestroyBuffer(&conn->readBuf); @@ -738,8 +729,6 @@ static void cliSendCb(uv_write_t* req, int status) { } void cliSend(SCliConn* pConn) { - CONN_HANDLE_BROKEN(pConn); - assert(!transQueueEmpty(&pConn->cliMsgs)); SCliMsg* pCliMsg = NULL; @@ -756,8 +745,8 @@ void cliSend(SCliConn* pConn) { pMsg->pCont = (void*)rpcMallocCont(0); pMsg->contLen = 0; } - int msgLen = transMsgLenFromCont(pMsg->contLen); + int msgLen = transMsgLenFromCont(pMsg->contLen); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; @@ -769,8 +758,6 @@ void cliSend(SCliConn* pConn) { pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); - uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - STraceId* trace = &pMsg->info.traceId; tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen); @@ -792,6 +779,8 @@ void cliSend(SCliConn* pConn) { tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType)); uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); } + + uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); return; @@ -807,7 +796,6 @@ void cliConnCb(uv_connect_t* req, int status) { cliHandleExcept(pConn); return; } - // int addrlen = sizeof(pConn->addr); struct sockaddr peername, sockname; int addrlen = sizeof(peername); @@ -840,7 +828,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { int64_t refId = (int64_t)(pMsg->msg.info.handle); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { - tDebug("%" PRId64 " already release", refId); + tDebug("%" PRId64 " already released", refId); destroyCmsg(pMsg); return; } @@ -856,6 +844,9 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { return; } cliSend(conn); + } else { + tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn); + destroyCmsg(pMsg); } } static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { @@ -905,6 +896,27 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { } } } + +bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { + if (code != 0) return false; + if (pCtx->retryCnt == 0) return false; + if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false; + return true; +} + +int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { + if (pMsg == NULL) return -1; + + memset(pResp, 0, sizeof(STransMsg)); + + pResp->code = TSDB_CODE_RPC_BROKEN_LINK; + pResp->msgType = pMsg->msg.msgType + 1; + pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL; + pResp->info.traceId = pMsg->msg.info.traceId; + + return 0; +} + void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; @@ -920,13 +932,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); if (ignore == true) { // persist conn already release by server - STransMsg resp = {0}; - resp.code = TSDB_CODE_RPC_BROKEN_LINK; - resp.msgType = pMsg->msg.msgType + 1; - - resp.info.ahandle = pMsg && pMsg->ctx ? pMsg->ctx->ahandle : NULL; - resp.info.traceId = pMsg->msg.info.traceId; - + STransMsg resp; + cliBuildExceptResp(pMsg, &resp); pTransInst->cfp(pTransInst->parent, &resp, NULL); destroyCmsg(pMsg); return; @@ -991,9 +998,6 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - if (pMsg == NULL) { - continue; - } (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); count++; } @@ -1035,24 +1039,58 @@ static void cliPrepareCb(uv_prepare_t* handle) { if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); } +void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { + transCtxCleanup(&conn->ctx); + cliReleaseUnfinishedMsg(conn); + if (destroy == 1) { + transQueueDestroy(&conn->cliMsgs); + } else { + transQueueClear(&conn->cliMsgs); + } +} + +void cliIteraConnMsgs(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + + for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { + SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i); + if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) { + continue; + } + + STransMsg resp = {0}; + if (-1 == cliBuildExceptResp(cmsg, &resp)) { + continue; + } + pTransInst->cfp(pTransInst->parent, &resp, NULL); + + cmsg->ctx->ahandle = NULL; + } +} bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { uint64_t ahandle = pHead->ahandle; + tDebug("ahandle = %" PRIu64 "", ahandle); SCliMsg* pMsg = NULL; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); + transClearBuffer(&conn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); - if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { - SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); - if (cliMsg->type == Release) return true; + + for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) { + SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i); + if (cliMsg->type == Release) { + assert(pMsg == NULL); + return true; + } } + + cliIteraConnMsgs(conn); + tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); - if (T_REF_VAL_GET(conn) > 1) { - transUnrefCliHandle(conn); - } destroyCmsg(pMsg); - cliReleaseUnfinishedMsg(conn); - transQueueClear(&conn->cliMsgs); + addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); return true; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 207b967923fad439ed043a71752d127bac13934c..46046b2a95f5600ec96b1d69904c2f83e87851d7 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -492,7 +492,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) { // release handle to rpc init if (msg->type == Quit) { (*transAsyncHandle[msg->type])(msg, pThrd); - continue; } else { STransMsg transMsg = msg->msg; @@ -771,7 +770,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { // conn set QUEUE_INIT(&pThrd->conn); - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true;