diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d8dea8a1bed03493ba57365c30cb3a84c071b515..7441b383215dc9c7c4e24e791d588a693cbde870 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -135,6 +135,7 @@ void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); +int64_t rpcAllocHandle(); #ifdef __cplusplus } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5bb6349d9b88b515656ccc42d1da1a74787df649..2972f512f17feee20ca6b07596a55218c86fcaec 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -298,6 +298,8 @@ void transSendResponse(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg); void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +int64_t transAllocHandle(); + void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index e9e439971c1e5f2c308a636c9a0ad0d79a6c063c..8a16b20a6f63b16351d7a2db4713dbc0b4199315 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -149,8 +149,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } -int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return 0; } - void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*taosRefHandle[type])(handle); @@ -172,6 +170,8 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { transSetDefaultAddr(thandle, ip, fqdn); } +int64_t rpcAllocHandle() { return transAllocHandle(); } + int32_t rpcInit() { transInit(); return 0; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5d087d57693bf13e51f2ece38dd1abf0b79b9c97..40227f02cc05e362ae0994ffb20f144f5d9c1ce8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -504,7 +504,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { assert(h == &conn->conn); return conn; } -static void allocConnRef(SCliConn* conn, bool update) { +static int32_t allocConnRef(SCliConn* conn, bool update) { if (update) { transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; @@ -514,6 +514,24 @@ static void allocConnRef(SCliConn* conn, bool update) { exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); conn->refId = exh->refId; + return 0; +} + +static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { + if (update) { + transRemoveExHandle(transGetRefMgt(), conn->refId); + conn->refId = -1; + } + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); + if (exh == NULL) { + return -1; + } + exh->handle = conn; + exh->pThrd = conn->hostThrd; + conn->refId = exh->refId; + + transReleaseExHandle(transGetRefMgt(), handle); + return 0; } static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { @@ -753,9 +771,12 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { tDebug("%" PRId64 " already release", refId); + return; } SCliConn* conn = exh->handle; + transReleaseExHandle(transGetRefMgt(), refId); + tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); if (T_REF_VAL_GET(conn) == 2) { @@ -773,8 +794,10 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { } SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { - SCliConn* conn = NULL; - int64_t refId = (int64_t)(pMsg->msg.info.handle); + STransConnCtx* pCtx = pMsg->ctx; + SCliConn* conn = NULL; + + int64_t refId = (int64_t)(pMsg->msg.info.handle); if (refId != 0) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { @@ -783,12 +806,15 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { return NULL; } else { conn = exh->handle; + if (conn == NULL) { + conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); + *ignore = (conn && 0 == specifyConnRef(conn, true, refId)) ? false : true; + } transReleaseExHandle(transGetRefMgt(), refId); } return conn; }; - STransConnCtx* pCtx = pMsg->ctx; conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); @@ -1161,27 +1187,31 @@ void transUnrefCliHandle(void* handle) { cliDestroyConn((SCliConn*)handle, true); } } -SCliThrd* transGetWorkThrdFromHandle(int64_t handle) { +SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) { SCliThrd* pThrd = NULL; SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); if (exh == NULL) { return NULL; } + + *validHandle = true; pThrd = exh->pThrd; transReleaseExHandle(transGetRefMgt(), handle); return pThrd; } -SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { +SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) { if (handle == 0) { int idx = cliRBChoseIdx(trans); if (idx < 0) return NULL; return ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; } - return transGetWorkThrdFromHandle(handle); + return transGetWorkThrdFromHandle(handle, validHandle); } void transReleaseCliHandle(void* handle) { - int idx = -1; - SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle); + int idx = -1; + bool valid = false; + + SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle, &valid); if (pThrd == NULL) { return; } @@ -1198,8 +1228,9 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) return; - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); - if (pThrd == NULL) { + bool valid = false; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid); + if (pThrd == NULL && valid == false) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return; @@ -1236,12 +1267,14 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) return; - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); - if (pThrd == NULL) { + bool valid = false; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid); + if (pThrd == NULL && valid == false) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return; } + tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_init(sem, 0, 0); @@ -1303,4 +1336,11 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); } + +int64_t transAllocHandle() { + SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); + exh->refId = transAddExHandle(transGetRefMgt(), exh); + tDebug("pre alloc refId %" PRId64 "", exh->refId); + return exh->refId; +} #endif