提交 dc7ce530 编写于 作者: dengyihao's avatar dengyihao

refactor code

上级 46bd5e86
...@@ -103,14 +103,6 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); ...@@ -103,14 +103,6 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, SCliConn* conn); static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[16] = {0};
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r;
}
// register timer for read // register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle); static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
...@@ -127,6 +119,8 @@ static void cliAsyncCb(uv_async_t* handle); ...@@ -127,6 +119,8 @@ static void cliAsyncCb(uv_async_t* handle);
static void cliIdleCb(uv_idle_t* handle); static void cliIdleCb(uv_idle_t* handle);
static void cliPrepareCb(uv_prepare_t* handle); static void cliPrepareCb(uv_prepare_t* handle);
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
static int32_t allocConnRef(SCliConn* conn, bool update); static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
...@@ -361,6 +355,9 @@ void cliHandleResp(SCliConn* conn) { ...@@ -361,6 +355,9 @@ void cliHandleResp(SCliConn* conn) {
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL; STransConnCtx* pCtx = NULL;
if (cliRecvReleaseReq(conn, pHead)) {
return;
}
CONN_SHOULD_RELEASE(conn, pHead); CONN_SHOULD_RELEASE(conn, pHead);
if (CONN_NO_PERSIST_BY_APP(conn)) { if (CONN_NO_PERSIST_BY_APP(conn)) {
...@@ -383,7 +380,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -383,7 +380,7 @@ void cliHandleResp(SCliConn* conn) {
transMsg.info.ahandle); transMsg.info.ahandle);
} }
} else { } else {
pCtx = pMsg ? pMsg->ctx : NULL; pCtx = pMsg->ctx;
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
} }
...@@ -395,7 +392,6 @@ void cliHandleResp(SCliConn* conn) { ...@@ -395,7 +392,6 @@ void cliHandleResp(SCliConn* conn) {
} }
STraceId* trace = &transMsg.info.traceId; STraceId* trace = &transMsg.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code)); TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code));
...@@ -1053,6 +1049,30 @@ static void cliPrepareCb(uv_prepare_t* handle) { ...@@ -1053,6 +1049,30 @@ static void cliPrepareCb(uv_prepare_t* handle) {
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
} }
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
uint64_t ahandle = pHead->ahandle;
SCliMsg* pMsg = NULL;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
transClearBuffer(&conn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) {
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0);
if (cliMsg->type == Release) return true;
}
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
if (T_REF_VAL_GET(conn) > 1) {
transUnrefCliHandle(conn);
}
destroyCmsg(pMsg);
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
return true;
}
return false;
}
static void* cliWorkThread(void* arg) { static void* cliWorkThread(void* arg) {
SCliThrd* pThrd = (SCliThrd*)arg; SCliThrd* pThrd = (SCliThrd*)arg;
pThrd->pid = taosGetSelfPthreadId(); pThrd->pid = taosGetSelfPthreadId();
......
...@@ -114,6 +114,8 @@ static void uvAcceptAsyncCb(uv_async_t* handle); ...@@ -114,6 +114,8 @@ static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvShutDownCb(uv_shutdown_t* req, int status);
static void uvPrepareCb(uv_prepare_t* handle); static void uvPrepareCb(uv_prepare_t* handle);
static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead);
/* /*
* time-consuming task throwed into BG work thread * time-consuming task throwed into BG work thread
*/ */
...@@ -123,7 +125,7 @@ static void uvWorkAfterTask(uv_work_t* req, int status); ...@@ -123,7 +125,7 @@ static void uvWorkAfterTask(uv_work_t* req, int status);
static void uvWalkCb(uv_handle_t* handle, void* arg); static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle); static void uvFreeCb(uv_handle_t* handle);
static void uvStartSendRespInternal(SSvrMsg* smsg); static void uvStartSendRespImpl(SSvrMsg* smsg);
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb); static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSvrMsg* msg); static void uvStartSendResp(SSvrMsg* msg);
...@@ -154,37 +156,6 @@ static void* transAcceptThread(void* arg); ...@@ -154,37 +156,6 @@ static void* transAcceptThread(void* arg);
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
static bool addHandleToAcceptloop(void* arg); static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
reallocConnRef(conn); \
tTrace("conn %p received release request", conn); \
\
STraceId traceId = head->traceId; \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
\
STransMsg tmsg = { \
.code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
#define SRV_RELEASE_UV(loop) \ #define SRV_RELEASE_UV(loop) \
do { \ do { \
uv_walk(loop, uvWalkCb, NULL); \ uv_walk(loop, uvWalkCb, NULL); \
...@@ -230,7 +201,9 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -230,7 +201,9 @@ static void uvHandleReq(SSvrConn* pConn) {
// transRefSrvHandle(pConn); // transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
CONN_SHOULD_RELEASE(pConn, pHead); if (uvRecvReleaseReq(pConn, pHead)) {
return;
}
STransMsg transMsg; STransMsg transMsg;
memset(&transMsg, 0, sizeof(transMsg)); memset(&transMsg, 0, sizeof(transMsg));
...@@ -356,10 +329,10 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -356,10 +329,10 @@ void uvOnSendCb(uv_write_t* req, int status) {
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg != NULL) { if (msg != NULL) {
uvStartSendRespInternal(msg); uvStartSendRespImpl(msg);
} }
} else { } else {
uvStartSendRespInternal(msg); uvStartSendRespImpl(msg);
} }
} }
} }
...@@ -423,7 +396,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -423,7 +396,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
wb->len = len; wb->len = len;
} }
static void uvStartSendRespInternal(SSvrMsg* smsg) { static void uvStartSendRespImpl(SSvrMsg* smsg) {
SSvrConn* pConn = smsg->pConn; SSvrConn* pConn = smsg->pConn;
if (pConn->broken) { if (pConn->broken) {
return; return;
...@@ -453,7 +426,7 @@ static void uvStartSendResp(SSvrMsg* smsg) { ...@@ -453,7 +426,7 @@ static void uvStartSendResp(SSvrMsg* smsg) {
if (!transQueuePush(&pConn->srvMsgs, smsg)) { if (!transQueuePush(&pConn->srvMsgs, smsg)) {
return; return;
} }
uvStartSendRespInternal(smsg); uvStartSendRespImpl(smsg);
return; return;
} }
...@@ -544,6 +517,35 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { ...@@ -544,6 +517,35 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
uv_close((uv_handle_t*)req->handle, uvDestroyConn); uv_close((uv_handle_t*)req->handle, uvDestroyConn);
taosMemoryFree(req); taosMemoryFree(req);
} }
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
reallocConnRef(pConn);
tTrace("conn %p received release request", pConn);
STraceId traceId = pHead->traceId;
pConn->status = ConnRelease;
transClearBuffer(&pConn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527};
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
srvMsg->msg = tmsg;
srvMsg->type = Release;
srvMsg->pConn = pConn;
if (!transQueuePush(&pConn->srvMsgs, srvMsg)) {
return true;
}
if (pConn->regArg.init) {
tTrace("conn %p release, notify server app", pConn);
STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL);
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
}
uvStartSendRespImpl(srvMsg);
return true;
}
return false;
}
static void uvPrepareCb(uv_prepare_t* handle) { static void uvPrepareCb(uv_prepare_t* handle) {
// prepare callback // prepare callback
SWorkThrd* pThrd = handle->data; SWorkThrd* pThrd = handle->data;
...@@ -992,7 +994,7 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { ...@@ -992,7 +994,7 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
if (!transQueuePush(&conn->srvMsgs, msg)) { if (!transQueuePush(&conn->srvMsgs, msg)) {
return; return;
} }
uvStartSendRespInternal(msg); uvStartSendRespImpl(msg);
return; return;
} else if (conn->status == ConnRelease || conn->status == ConnNormal) { } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn); tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册