未验证 提交 590f7c4c 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #14915 from taosdata/enh/preallocHandle

enh:  prealloc handle
...@@ -135,6 +135,7 @@ void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg ...@@ -135,6 +135,7 @@ void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
int64_t rpcAllocHandle();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -298,6 +298,8 @@ void transSendResponse(const STransMsg* msg); ...@@ -298,6 +298,8 @@ void transSendResponse(const STransMsg* msg);
void transRegisterMsg(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg);
void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int64_t transAllocHandle();
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
......
...@@ -149,8 +149,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { ...@@ -149,8 +149,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return 0; }
void rpcRefHandle(void* handle, int8_t type) { void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosRefHandle[type])(handle); (*taosRefHandle[type])(handle);
...@@ -172,6 +170,8 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { ...@@ -172,6 +170,8 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
transSetDefaultAddr(thandle, ip, fqdn); transSetDefaultAddr(thandle, ip, fqdn);
} }
int64_t rpcAllocHandle() { return transAllocHandle(); }
int32_t rpcInit() { int32_t rpcInit() {
transInit(); transInit();
return 0; return 0;
......
...@@ -504,7 +504,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -504,7 +504,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
assert(h == &conn->conn); assert(h == &conn->conn);
return conn; return conn;
} }
static void allocConnRef(SCliConn* conn, bool update) { static int32_t allocConnRef(SCliConn* conn, bool update) {
if (update) { if (update) {
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; conn->refId = -1;
...@@ -514,6 +514,24 @@ static void allocConnRef(SCliConn* conn, bool update) { ...@@ -514,6 +514,24 @@ static void allocConnRef(SCliConn* conn, bool update) {
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh); exh->refId = transAddExHandle(transGetRefMgt(), exh);
conn->refId = exh->refId; conn->refId = exh->refId;
return 0;
}
static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
if (update) {
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
}
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh == NULL) {
return -1;
}
exh->handle = conn;
exh->pThrd = conn->hostThrd;
conn->refId = exh->refId;
transReleaseExHandle(transGetRefMgt(), handle);
return 0;
} }
static void addConnToPool(void* pool, SCliConn* conn) { static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) { if (conn->status == ConnInPool) {
...@@ -753,9 +771,12 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -753,9 +771,12 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) { if (exh == NULL) {
tDebug("%" PRId64 " already release", refId); tDebug("%" PRId64 " already release", refId);
return;
} }
SCliConn* conn = exh->handle; SCliConn* conn = exh->handle;
transReleaseExHandle(transGetRefMgt(), refId);
tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
if (T_REF_VAL_GET(conn) == 2) { if (T_REF_VAL_GET(conn) == 2) {
...@@ -773,8 +794,10 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -773,8 +794,10 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
} }
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
SCliConn* conn = NULL; STransConnCtx* pCtx = pMsg->ctx;
int64_t refId = (int64_t)(pMsg->msg.info.handle); SCliConn* conn = NULL;
int64_t refId = (int64_t)(pMsg->msg.info.handle);
if (refId != 0) { if (refId != 0) {
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) { if (exh == NULL) {
...@@ -783,12 +806,15 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { ...@@ -783,12 +806,15 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
return NULL; return NULL;
} else { } else {
conn = exh->handle; conn = exh->handle;
if (conn == NULL) {
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
*ignore = (conn && 0 == specifyConnRef(conn, true, refId)) ? false : true;
}
transReleaseExHandle(transGetRefMgt(), refId); transReleaseExHandle(transGetRefMgt(), refId);
} }
return conn; return conn;
}; };
STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
if (conn != NULL) { if (conn != NULL) {
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
...@@ -1161,27 +1187,31 @@ void transUnrefCliHandle(void* handle) { ...@@ -1161,27 +1187,31 @@ void transUnrefCliHandle(void* handle) {
cliDestroyConn((SCliConn*)handle, true); cliDestroyConn((SCliConn*)handle, true);
} }
} }
SCliThrd* transGetWorkThrdFromHandle(int64_t handle) { SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) {
SCliThrd* pThrd = NULL; SCliThrd* pThrd = NULL;
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh == NULL) { if (exh == NULL) {
return NULL; return NULL;
} }
*validHandle = true;
pThrd = exh->pThrd; pThrd = exh->pThrd;
transReleaseExHandle(transGetRefMgt(), handle); transReleaseExHandle(transGetRefMgt(), handle);
return pThrd; return pThrd;
} }
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) {
if (handle == 0) { if (handle == 0) {
int idx = cliRBChoseIdx(trans); int idx = cliRBChoseIdx(trans);
if (idx < 0) return NULL; if (idx < 0) return NULL;
return ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
} }
return transGetWorkThrdFromHandle(handle); return transGetWorkThrdFromHandle(handle, validHandle);
} }
void transReleaseCliHandle(void* handle) { void transReleaseCliHandle(void* handle) {
int idx = -1; int idx = -1;
SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle); bool valid = false;
SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle, &valid);
if (pThrd == NULL) { if (pThrd == NULL) {
return; return;
} }
...@@ -1198,8 +1228,9 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1198,8 +1228,9 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return; if (pTransInst == NULL) return;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); bool valid = false;
if (pThrd == NULL) { SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid);
if (pThrd == NULL && valid == false) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return; return;
...@@ -1236,12 +1267,14 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1236,12 +1267,14 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return; if (pTransInst == NULL) return;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); bool valid = false;
if (pThrd == NULL) { SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid);
if (pThrd == NULL && valid == false) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return; return;
} }
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0); tsem_init(sem, 0, 0);
...@@ -1303,4 +1336,11 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { ...@@ -1303,4 +1336,11 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
} }
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
} }
int64_t transAllocHandle() {
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->refId = transAddExHandle(transGetRefMgt(), exh);
tDebug("pre alloc refId %" PRId64 "", exh->refId);
return exh->refId;
}
#endif #endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册