提交 d5f5c33c 编写于 作者: dengyihao's avatar dengyihao

refactor code

上级 bfe5ab77
......@@ -45,9 +45,11 @@ typedef struct STraceId {
#define TRACE_GET_MSGID(traceId) (traceId)->msgId
#define TRACE_TO_STR(traceId, buf) \
do { \
sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", traceId->rootId, traceId->msgId); \
#define TRACE_TO_STR(traceId, buf) \
do { \
int64_t rootId = (traceId) != NULL ? (traceId)->rootId : 0; \
int64_t msgId = (traceId) != NULL ? (traceId)->msgId : 0; \
sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", rootId, msgId); \
} while (0)
#ifdef __cplusplus
......
......@@ -105,6 +105,13 @@ typedef SRpcCtxVal STransCtxVal;
typedef SRpcInfo STrans;
typedef SRpcConnInfo STransHandleInfo;
// ref mgt
// handle
typedef struct SExHandle {
void* handle;
int64_t refId;
void* pThrd;
} SExHandle;
/*convet from fqdn to ip */
typedef struct SCvtAddr {
char ip[TSDB_FQDN_LEN];
......@@ -239,30 +246,30 @@ int transSendAsync(SAsyncPool* pool, queue* mq);
} \
} while (0)
#define ASYNC_CHECK_HANDLE(exh1, refId) \
do { \
if (refId > 0) { \
tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, refId); \
goto _return1; \
} \
} else if (refId == 0) { \
tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, refId, \
exh2 ? exh2->refId : 0); \
goto _return1; \
} else { \
refId = exh1->refId; \
} \
} else if (refId < 0) { \
tTrace("handle step3"); \
goto _return2; \
} \
#define ASYNC_CHECK_HANDLE(exh1, id) \
do { \
if (id > 0) { \
tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, id); \
if (exh2 == NULL || id != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, id); \
goto _return1; \
} \
} else if (id == 0) { \
tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, id); \
if (exh2 == NULL || id == exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \
exh2 ? exh2->refId : 0); \
goto _return1; \
} else { \
id = exh1->refId; \
} \
} else if (id < 0) { \
tTrace("handle step3"); \
goto _return2; \
} \
} while (0)
int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf);
......@@ -381,14 +388,6 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
*/
void transThreadOnce();
// ref mgt
// handle
typedef struct SExHandle {
void* handle;
int64_t refId;
void* pThrd;
} SExHandle;
void transInitEnv();
int32_t transOpenExHandleMgt(int size);
void transCloseExHandleMgt(int32_t mgt);
......
......@@ -18,10 +18,6 @@
static int32_t transSCliInst = 0;
static int32_t refMgt = 0;
typedef struct SExHandleWrap {
void* exhandle;
int64_t refId;
} SExHandleWrap;
typedef struct SCliConn {
T_REF_DECLARE()
uv_connect_t connReq;
......@@ -177,8 +173,8 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
if (exh == NULL) { \
idx = -1; \
} else { \
ASYNC_CHECK_HANDLE(exh, refId); \
pThrd = (SCliThrdObj*)exh->pThrd; \
ASYNC_CHECK_HANDLE((exh), refId); \
pThrd = (SCliThrdObj*)(exh)->pThrd; \
} \
} while (0)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
......@@ -201,6 +197,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
if (status != ConnInPool) { \
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
} \
transRemoveExHandle(refMgt, conn->refId); \
return; \
} \
} while (0)
......@@ -335,18 +332,8 @@ void cliHandleResp(SCliConn* conn) {
}
// buf's mem alread translated to transMsg.pCont
transClearBuffer(&conn->readBuf);
if (!CONN_NO_PERSIST_BY_APP(conn)) {
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = pThrd;
exh->refId = transAddExHandle(refMgt, exh);
SExHandleWrap* wrap = taosMemoryCalloc(1, sizeof(SExHandleWrap));
wrap->exhandle = exh;
wrap->refId = exh->refId;
conn->refId = exh->refId;
transMsg.info.handle = wrap;
transMsg.info.handle = (void*)conn->refId;
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
}
......@@ -357,12 +344,10 @@ void cliHandleResp(SCliConn* conn) {
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
// transUnrefCliHandle(conn);
return;
}
if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
// transUnrefCliHandle(conn);
return;
}
......@@ -433,7 +418,7 @@ void cliHandleExcept(SCliConn* pConn) {
return;
}
destroyCmsg(pMsg);
tTrace("%s conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
} while (!transQueueEmpty(&pConn->cliMsgs));
transUnrefCliHandle(pConn);
}
......@@ -505,10 +490,22 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
QUEUE_INIT(&conn->conn);
return conn;
}
static void allocConnRef(SCliConn* conn, bool update) {
if (update) {
transRemoveExHandle(refMgt, conn->refId);
}
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(refMgt, exh);
conn->refId = exh->refId;
}
static void addConnToPool(void* pool, SCliConn* conn) {
SCliThrdObj* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd);
allocConnRef(conn, true);
STrans* pTransInst = thrd->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
transQueueClear(&conn->cliMsgs);
......@@ -558,7 +555,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return;
}
if (nread < 0) {
tError("%s conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
tError("%s conn %p read error: %s, ref: %d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
T_REF_VAL_GET(conn));
conn->broken = true;
cliHandleExcept(conn);
}
......@@ -581,11 +579,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
conn->broken = 0;
transRefCliHandle(conn);
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = pThrd;
exh->refId = transAddExHandle(refMgt, exh);
conn->refId = exh->refId;
allocConnRef(conn, false);
return conn;
}
......@@ -749,25 +743,27 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) {
}
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL;
SRpcHandleInfo* pInfo = &pMsg->msg.info;
SExHandle* exh = transAcquireExHandle(refMgt, pInfo->refId);
if (exh == NULL) {
if (pInfo->refId != 0) {
tTrace("%s conn %p ignore msg", CONN_GET_INST_LABEL(conn), conn);
assert(0);
return NULL;
}
} else {
transReleaseExHandle(refMgt, pInfo->refId);
return exh->handle;
}
SCliConn* conn = NULL;
// SExHandleWrap* exWrap = &pMsg->msg.info.handle;
// if (exWrap != NULL) {
//}
// SExHandle* exh = transAcquireExHandle(refMgt, exWrap->refId);
// if (exh == NULL) {
// if (pInfo->refId != 0) {
// tTrace("%s conn %p ignore msg", CONN_GET_INST_LABEL(conn), conn);
// assert(0);
// return NULL;
// }
//} else {
// transReleaseExHandle(refMgt, pInfo->refId);
// return exh->handle;
//}
STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
if (conn != NULL) {
exh = taosMemoryCalloc(1, sizeof(SExHandle));
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = pThrd;
exh->refId = transAddExHandle(refMgt, exh);
......@@ -790,10 +786,6 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
}
}
void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st;
// tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el);
STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst;
......@@ -1014,7 +1006,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tTrace("try to send req to next node");
pMsg->st = taosGetTimestampUs();
taosMemoryFree(pResp->info.handle);
pCtx->retryCount += 1;
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pCtx->retryCount < pEpSet->numOfEps * 3) {
......@@ -1060,16 +1051,16 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId* trace = &pResp->info.traceId;
if (pCtx->pSem != NULL) {
tGTrace("conn %p(sync) handle resp", pConn);
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pRsp == NULL) {
tGTrace("conn %p(sync) failed to resp, ignore", pConn);
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
} else {
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
}
tsem_post(pCtx->pSem);
pCtx->pRsp = NULL;
} else {
tGTrace("conn %p handle resp", pConn);
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pResp->code != 0 || pCtx->retryCount == 0 || transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) {
pTransInst->cfp(pTransInst->parent, pResp, NULL);
} else {
......@@ -1105,14 +1096,33 @@ void transUnrefCliHandle(void* handle) {
return;
}
int ref = T_REF_DEC((SCliConn*)handle);
tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
if (ref == 0) {
cliDestroyConn((SCliConn*)handle, true);
}
}
SCliThrdObj* transGetWorkThrdFromHandle(int64_t handle) {
SCliThrdObj* pThrd = NULL;
SExHandle* exh = transAcquireExHandle(refMgt, handle);
if (exh == NULL) {
return NULL;
}
pThrd = exh->pThrd;
transReleaseExHandle(refMgt, handle);
return pThrd;
}
SCliThrdObj* transGetWorkThrd(STrans* trans, int64_t handle) {
int idx = -1;
if (handle == 0) {
idx = cliRBChoseIdx(trans);
return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
}
return transGetWorkThrdFromHandle(handle);
}
void transReleaseCliHandle(void* handle) {
SCliThrdObj* thrd = CONN_GET_HOST_THREAD(handle);
if (thrd == NULL) {
int idx = -1;
SCliThrdObj* pThrd = transGetWorkThrdFromHandle((int64_t)handle);
if (pThrd == NULL) {
return;
}
......@@ -1121,26 +1131,18 @@ void transReleaseCliHandle(void* handle) {
cmsg->msg = tmsg;
cmsg->type = Release;
transSendAsync(thrd->asyncPool, &cmsg->q);
transSendAsync(pThrd->asyncPool, &cmsg->q);
return;
}
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)shandle;
SRpcHandleInfo* info = &pReq->info;
int idx = -1;
SCliThrdObj* pThrd = NULL;
SExHandle* exh = info->handle;
int64_t refId = -1;
if (exh != NULL) {
refId = exh->refId;
STrans* pTransInst = (STrans*)shandle;
SCliThrdObj* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
return;
}
CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd);
if (idx == -1) {
idx = cliRBChoseIdx(pTransInst);
pThrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
}
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
......@@ -1163,28 +1165,17 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transSendAsync(pThrd->asyncPool, &(cliMsg->q)) == 0);
_return1:
return;
_return2:
return;
}
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)shandle;
SRpcHandleInfo* info = &pReq->info;
SCliThrdObj* pThrd = NULL;
int idx = -1;
SExHandle* exh = info->handle;
int64_t refId = -1;
if (exh != NULL) {
refId = exh->refId;
STrans* pTransInst = (STrans*)shandle;
SCliThrdObj* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
return;
}
CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd);
if (idx == -1) {
idx = cliRBChoseIdx(pTransInst);
pThrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
}
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0);
......@@ -1211,16 +1202,13 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
tsem_wait(sem);
tsem_destroy(sem);
taosMemoryFree(sem);
_return1:
return;
_return2:
return;
}
/*
*
**/
void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
STrans* pTransInst = ahandle;
void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
STrans* pTransInst = shandle;
SCvtAddr cvtAddr = {0};
if (ip != NULL && fqdn != NULL) {
......
......@@ -133,7 +133,7 @@ static SSvrConn* createConn(void* hThrd);
static void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
static void destroyConnRegArg(SSvrConn* conn);
static int reallocConnRefHandle(SSvrConn* conn);
static int reallocConnRef(SSvrConn* conn);
static void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd);
......@@ -176,7 +176,7 @@ static bool addHandleToAcceptloop(void* arg);
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
reallocConnRefHandle(conn); \
reallocConnRef(conn); \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
......@@ -353,7 +353,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
// if (msg->type == Release && conn->status != ConnNormal) {
// conn->status = ConnNormal;
// transUnrefSrvHandle(conn);
// reallocConnRefHandle(conn);
// reallocConnRef(conn);
// destroySmsg(msg);
// transQueueClear(&conn->srvMsgs);
// return;
......@@ -800,7 +800,7 @@ static void destroyConnRegArg(SSvrConn* conn) {
conn->regArg.init = 0;
}
}
static int reallocConnRefHandle(SSvrConn* conn) {
static int reallocConnRef(SSvrConn* conn) {
transReleaseExHandle(refMgt, conn->refId);
transRemoveExHandle(refMgt, conn->refId);
// avoid app continue to send msg on invalid handle
......@@ -945,7 +945,7 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) {
void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) {
SSvrConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
reallocConnRefHandle(conn);
reallocConnRef(conn);
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册