提交 d1c4e9ae 编写于 作者: dengyihao's avatar dengyihao

handle except

上级 a9692987
...@@ -30,6 +30,7 @@ typedef struct SCliConn { ...@@ -30,6 +30,7 @@ typedef struct SCliConn {
void* hostThrd; void* hostThrd;
SConnBuffer readBuf; SConnBuffer readBuf;
void* data; void* data;
SArray* cliMsgs;
queue conn; queue conn;
uint64_t expireTime; uint64_t expireTime;
int hThrdIdx; int hThrdIdx;
...@@ -106,6 +107,7 @@ static void cliAsyncCb(uv_async_t* handle); ...@@ -106,6 +107,7 @@ static void cliAsyncCb(uv_async_t* handle);
static SCliConn* cliCreateConn(SCliThrdObj* thrd); static SCliConn* cliCreateConn(SCliThrdObj* thrd);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle); static void cliDestroy(uv_handle_t* handle);
static void cliSend(SCliConn* pConn);
// process data read from server, add decompress etc later // process data read from server, add decompress etc later
static void cliHandleResp(SCliConn* conn); static void cliHandleResp(SCliConn* conn);
...@@ -158,6 +160,14 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -158,6 +160,14 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
bool cliMayContinueSendMsg(SCliConn* conn) {
if (taosArrayGetSize(conn->cliMsgs) > 0) {
cliSend(conn);
return true;
} else {
return false;
}
}
void cliHandleResp(SCliConn* conn) { void cliHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
...@@ -173,18 +183,18 @@ void cliHandleResp(SCliConn* conn) { ...@@ -173,18 +183,18 @@ void cliHandleResp(SCliConn* conn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.ahandle = NULL; transMsg.ahandle = NULL;
SCliMsg* pMsg = conn->data; SCliMsg* pMsg = NULL;
if (taosArrayGetSize(conn->cliMsgs) > 0) {
pMsg = taosArrayGetP(conn->cliMsgs, 0);
taosArrayRemove(conn->cliMsgs, 0);
}
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL;
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
} }
// if (rpcMsg.ahandle == NULL) {
// tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn);
// return;
//}
// buf's mem alread translated to transMsg.pCont // buf's mem alread translated to transMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
...@@ -214,12 +224,15 @@ void cliHandleResp(SCliConn* conn) { ...@@ -214,12 +224,15 @@ void cliHandleResp(SCliConn* conn) {
memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
destroyCmsg(pMsg);
if (cliMayContinueSendMsg(conn) == true) {
return;
}
if (CONN_NO_PERSIST_BY_APP(conn)) { if (CONN_NO_PERSIST_BY_APP(conn)) {
addConnToPool(pThrd->pool, conn); addConnToPool(pThrd->pool, conn);
} }
destroyCmsg(conn->data);
conn->data = NULL;
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
...@@ -229,7 +242,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -229,7 +242,7 @@ void cliHandleResp(SCliConn* conn) {
} }
void cliHandleExcept(SCliConn* pConn) { void cliHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) { if (taosArrayGetSize(pConn->cliMsgs) == 0) {
if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
return; return;
...@@ -238,32 +251,38 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -238,32 +251,38 @@ void cliHandleExcept(SCliConn* pConn) {
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliMsg* pMsg = pConn->data; do {
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; SCliMsg* pMsg = NULL;
if (taosArrayGetSize(pConn->cliMsgs) > 0) {
pMsg = taosArrayGetP(pConn->cliMsgs, 0);
taosArrayRemove(pConn->cliMsgs, 0);
}
STransMsg transMsg = {0}; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.ahandle = NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { STransMsg transMsg = {0};
transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
} else { transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = NULL;
}
if (pCtx == NULL || pCtx->pSem == NULL) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL;
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); } else {
} else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn); }
memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
tsem_post(pCtx->pSem); if (pCtx == NULL || pCtx->pSem == NULL) {
} tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
destroyCmsg(pConn->data); (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
pConn->data = NULL; } else {
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
tsem_post(pCtx->pSem);
}
destroyCmsg(pMsg);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
} while (taosArrayGetSize(pConn->cliMsgs) > 0);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
...@@ -398,6 +417,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { ...@@ -398,6 +417,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
conn->writeReq.data = conn; conn->writeReq.data = conn;
conn->connReq.data = conn; conn->connReq.data = conn;
conn->cliMsgs = taosArrayInit(2, sizeof(void*));
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
...@@ -417,6 +437,7 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -417,6 +437,7 @@ static void cliDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
free(conn->ip); free(conn->ip);
free(conn->stream); free(conn->stream);
taosArrayDestroy(conn->cliMsgs);
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn); free(conn);
} }
...@@ -426,11 +447,6 @@ static void cliSendCb(uv_write_t* req, int status) { ...@@ -426,11 +447,6 @@ static void cliSendCb(uv_write_t* req, int status) {
if (status == 0) { if (status == 0) {
tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) {
return;
}
destroyUserdata(&pMsg->msg);
} else { } else {
tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
cliHandleExcept(pConn); cliHandleExcept(pConn);
...@@ -442,7 +458,8 @@ static void cliSendCb(uv_write_t* req, int status) { ...@@ -442,7 +458,8 @@ static void cliSendCb(uv_write_t* req, int status) {
void cliSend(SCliConn* pConn) { void cliSend(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn); CONN_HANDLE_BROKEN(pConn);
SCliMsg* pCliMsg = pConn->data; assert(taosArrayGetSize(pConn->cliMsgs) > 0);
SCliMsg* pCliMsg = taosArrayGetP(pConn->cliMsgs, 0);
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
...@@ -480,6 +497,7 @@ void cliSend(SCliConn* pConn) { ...@@ -480,6 +497,7 @@ void cliSend(SCliConn* pConn) {
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
...@@ -502,8 +520,8 @@ void cliConnCb(uv_connect_t* req, int status) { ...@@ -502,8 +520,8 @@ void cliConnCb(uv_connect_t* req, int status) {
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
cliSend(pConn); cliSend(pConn);
} }
...@@ -521,8 +539,11 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -521,8 +539,11 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle; SCliConn* conn = pMsg->msg.handle;
tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn);
destroyCmsg(pMsg); while (taosArrayGetSize(conn->cliMsgs) > 0) {
conn->data = NULL; SCliMsg* pMsg = taosArrayGetP(conn->cliMsgs, 0);
destroyCmsg(pMsg);
taosArrayRemove(conn->cliMsgs, 0);
}
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
if (conn->persist && T_REF_VAL_GET(conn) >= 2) { if (conn->persist && T_REF_VAL_GET(conn) >= 2) {
...@@ -561,14 +582,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -561,14 +582,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = cliGetConn(pMsg, pThrd); SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) { if (conn != NULL) {
conn->data = pMsg; taosArrayPush(conn->cliMsgs, &pMsg);
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
cliSend(conn); cliSend(conn);
} else { } else {
conn = cliCreateConn(pThrd); conn = cliCreateConn(pThrd);
conn->data = pMsg; taosArrayPush(conn->cliMsgs, &pMsg);
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
conn->ip = strdup(pMsg->ctx->ip); conn->ip = strdup(pMsg->ctx->ip);
conn->port = pMsg->ctx->port; conn->port = pMsg->ctx->port;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册