diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 2b8c6a895e92e8fadbbce8f3dea1d5484ce37dec..8471aa8286da029f53587b258ab417362badf952 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -45,7 +45,7 @@ typedef struct SRpcHandleInfo { int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int32_t persistHandle; // persist handle or not STraceId traceId; - // int64_t traceId; + int8_t hasEpSet; // app info void *ahandle; // app handle set by client @@ -123,7 +123,7 @@ void * rpcReallocCont(void *ptr, int32_t contLen); void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); void rpcRegisterBrokenLinkArg(SRpcMsg *msg); -void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock +void rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock // These functions will not be called in the child process void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 4b5f11a49716539290f473c62e4e15c27af952c4..bb554793d9929dda540463736630234829460d9c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -148,6 +148,7 @@ typedef struct { char release : 2; char secured : 2; char spi : 2; + char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char user[TSDB_UNI_LEN]; STraceId traceId; @@ -378,13 +379,10 @@ typedef struct SDelayQueue { uv_loop_t* loop; } SDelayQueue; -int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); - +int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); void transDQDestroy(SDelayQueue* queue); +int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); -int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); - -// void transPrintEpSet(SEpSet* pEpSet); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); /* * init global func diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9c1de0b64ee527ecffebfeed96d733d44d695732..587e2bad98710b7a42b37522e89d44b5cb89833f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -309,6 +309,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.msgType = pHead->msgType; transMsg.info.ahandle = NULL; transMsg.info.traceId = pHead->traceId; + transMsg.info.hasEpSet = pHead->hasEpSet; SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; @@ -1011,6 +1012,23 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { *val = newVal; } } + +bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { + if (pResp == NULL || pResp->info.hasEpSet == 0) { + return false; + } + tDeserializeSEpSet(pResp->pCont, pResp->contLen, dst); + int32_t tlen = tSerializeSEpSet(NULL, 0, dst); + + int32_t bufLen = pResp->contLen - tlen; + char* buf = rpcMallocCont(bufLen); + + memcpy(buf, (char*)pResp->pCont + tlen, bufLen); + + pResp->pCont = buf; + pResp->contLen = bufLen; + return true; +} int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -1055,6 +1073,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } STraceId* trace = &pResp->info.traceId; + + if (cliTryToExtractEpSet(pResp, &pCtx->epSet)) { + char tbuf[256] = {0}; + EPSET_DEBUG_STR(&pCtx->epSet, tbuf); + tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); + } if (pCtx->pSem != NULL) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pRsp == NULL) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 56572ce767c19d163515fd58e06bf4474742524f..9809f3d564a4ca3e31b3b235e432b9fb62972353 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -337,16 +337,8 @@ void uvOnSendCb(uv_write_t* req, int status) { tTrace("conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) { SSvrMsg* msg = transQueuePop(&conn->srvMsgs); - // if (msg->type == Release && conn->status != ConnNormal) { - // conn->status = ConnNormal; - // transUnrefSrvHandle(conn); - // reallocConnRef(conn); - // destroySmsg(msg); - // transQueueClear(&conn->srvMsgs); - // return; - //} destroySmsg(msg); - // send second data, just use for push + // send cached data if (!transQueueEmpty(&conn->srvMsgs)) { msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); if (msg->type == Register && conn->status == ConnAcquire) { @@ -383,7 +375,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { tError("fail to dispatch conn to work thread"); } uv_close((uv_handle_t*)req->data, uvFreeCb); - // taosMemoryFree(req->data); taosMemoryFree(req); } @@ -397,6 +388,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; + pHead->hasEpSet = pMsg->info.hasEpSet; if (pConn->status == ConnNormal) { pHead->msgType = pConn->inType + 1; @@ -409,6 +401,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { transUnrefSrvHandle(pConn); } else { pHead->msgType = pMsg->msgType; + // set up resp msg type if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead)) pHead->msgType = pConn->inType + 1; } @@ -1121,6 +1114,4 @@ _return2: rpcFreeCont(msg->pCont); } -int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } - #endif