未验证 提交 f026d8ba 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10776 from taosdata/feature/supportQuery

handle except
......@@ -30,6 +30,7 @@ typedef struct SCliConn {
void* hostThrd;
SConnBuffer readBuf;
void* data;
SArray* cliMsgs;
queue conn;
uint64_t expireTime;
int hThrdIdx;
......@@ -106,6 +107,7 @@ static void cliAsyncCb(uv_async_t* handle);
static SCliConn* cliCreateConn(SCliThrdObj* thrd);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle);
static void cliSend(SCliConn* pConn);
// process data read from server, add decompress etc later
static void cliHandleResp(SCliConn* conn);
......@@ -158,6 +160,14 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
static void* cliWorkThread(void* arg);
bool cliMayContinueSendMsg(SCliConn* conn) {
if (taosArrayGetSize(conn->cliMsgs) > 0) {
cliSend(conn);
return true;
} else {
return false;
}
}
void cliHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
......@@ -173,18 +183,18 @@ void cliHandleResp(SCliConn* conn) {
transMsg.msgType = pHead->msgType;
transMsg.ahandle = NULL;
SCliMsg* pMsg = conn->data;
SCliMsg* pMsg = NULL;
if (taosArrayGetSize(conn->cliMsgs) > 0) {
pMsg = taosArrayGetP(conn->cliMsgs, 0);
taosArrayRemove(conn->cliMsgs, 0);
}
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL;
} else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
// if (rpcMsg.ahandle == NULL) {
// tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn);
// return;
//}
// buf's mem alread translated to transMsg.pCont
transClearBuffer(&conn->readBuf);
......@@ -214,12 +224,15 @@ void cliHandleResp(SCliConn* conn) {
memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
tsem_post(pCtx->pSem);
}
destroyCmsg(pMsg);
if (cliMayContinueSendMsg(conn) == true) {
return;
}
if (CONN_NO_PERSIST_BY_APP(conn)) {
addConnToPool(pThrd->pool, conn);
}
destroyCmsg(conn->data);
conn->data = NULL;
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
// start thread's timer of conn pool if not active
......@@ -229,7 +242,7 @@ void cliHandleResp(SCliConn* conn) {
}
void cliHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) {
if (taosArrayGetSize(pConn->cliMsgs) == 0) {
if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
transUnrefCliHandle(pConn);
return;
......@@ -238,32 +251,38 @@ void cliHandleExcept(SCliConn* pConn) {
SCliThrdObj* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
do {
SCliMsg* pMsg = NULL;
if (taosArrayGetSize(pConn->cliMsgs) > 0) {
pMsg = taosArrayGetP(pConn->cliMsgs, 0);
taosArrayRemove(pConn->cliMsgs, 0);
}
STransMsg transMsg = {0};
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.ahandle = NULL;
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL;
} else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
STransMsg transMsg = {0};
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.ahandle = NULL;
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else {
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
tsem_post(pCtx->pSem);
}
destroyCmsg(pConn->data);
pConn->data = NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL;
} else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else {
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
tsem_post(pCtx->pSem);
}
destroyCmsg(pMsg);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
} while (taosArrayGetSize(pConn->cliMsgs) > 0);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
transUnrefCliHandle(pConn);
}
......@@ -398,6 +417,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
conn->writeReq.data = conn;
conn->connReq.data = conn;
conn->cliMsgs = taosArrayInit(2, sizeof(void*));
QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd;
......@@ -417,6 +437,7 @@ static void cliDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data;
free(conn->ip);
free(conn->stream);
taosArrayDestroy(conn->cliMsgs);
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn);
}
......@@ -426,11 +447,6 @@ static void cliSendCb(uv_write_t* req, int status) {
if (status == 0) {
tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) {
return;
}
destroyUserdata(&pMsg->msg);
} else {
tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
cliHandleExcept(pConn);
......@@ -442,7 +458,8 @@ static void cliSendCb(uv_write_t* req, int status) {
void cliSend(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn);
SCliMsg* pCliMsg = pConn->data;
assert(taosArrayGetSize(pConn->cliMsgs) > 0);
SCliMsg* pCliMsg = taosArrayGetP(pConn->cliMsgs, 0);
STransConnCtx* pCtx = pCliMsg->ctx;
SCliThrdObj* pThrd = pConn->hostThrd;
......@@ -480,6 +497,7 @@ void cliSend(SCliConn* pConn) {
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return;
......@@ -502,8 +520,8 @@ void cliConnCb(uv_connect_t* req, int status) {
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle);
cliSend(pConn);
}
......@@ -521,8 +539,11 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle;
tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn);
destroyCmsg(pMsg);
conn->data = NULL;
while (taosArrayGetSize(conn->cliMsgs) > 0) {
SCliMsg* pMsg = taosArrayGetP(conn->cliMsgs, 0);
destroyCmsg(pMsg);
taosArrayRemove(conn->cliMsgs, 0);
}
transDestroyBuffer(&conn->readBuf);
if (conn->persist && T_REF_VAL_GET(conn) >= 2) {
......@@ -561,14 +582,19 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) {
conn->data = pMsg;
conn->hThrdIdx = pCtx->hThrdIdx;
if (taosArrayGetSize(conn->cliMsgs) > 0) {
taosArrayPush(conn->cliMsgs, &pMsg);
return;
}
taosArrayPush(conn->cliMsgs, &pMsg);
transDestroyBuffer(&conn->readBuf);
cliSend(conn);
} else {
conn = cliCreateConn(pThrd);
conn->data = pMsg;
taosArrayPush(conn->cliMsgs, &pMsg);
conn->hThrdIdx = pCtx->hThrdIdx;
conn->ip = strdup(pMsg->ctx->ip);
conn->port = pMsg->ctx->port;
......
......@@ -50,6 +50,7 @@ static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) {
}
// server except
static bool handleExcept(void *parent, tmsg_t msgType) {
//
return msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP;
}
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册