From fb686a9825eda565a948b588df212f12d9494a17 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 17 Aug 2022 18:19:40 +0800 Subject: [PATCH] fix(query): add TBOOL type to solve async call --- src/client/src/tscServer.c | 2 +- src/inc/taos.h | 5 +++++ src/inc/trpc.h | 2 +- src/rpc/src/rpcMain.c | 16 +++++++++------- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cc3190a1d0..83e8105d48 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -453,7 +453,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .code = 0 }; - if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { + if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid) != BOOL_FALSE) { if(pSql->cmd.command == TSDB_SQL_SELECT ) rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext); return TSDB_CODE_SUCCESS; diff --git a/src/inc/taos.h b/src/inc/taos.h index ea8e1d9dad..1a33a2910b 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -48,6 +48,11 @@ typedef void **TAOS_ROW; #define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes #define TSDB_DATA_TYPE_JSON 15 // json string +typedef enum { + BOOL_FALSE = 0, + BOOL_TRUE = 1, + BOOL_ASYNC = 2 //request is processed by async for another thread, not now true or false +} TBOOL; typedef enum { TSDB_OPTION_LOCALE, TSDB_OPTION_CHARSET, diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 52793423d7..b165415671 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -85,7 +85,7 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 01c41b6e09..54db7f3a59 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -201,7 +201,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); -static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); +static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); @@ -393,7 +393,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { +TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -1370,7 +1370,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { return; } -static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { +static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); char *msg = (char *)pHead; int msgLen = rpcMsgLenFromCont(pContext->contLen); @@ -1380,8 +1380,9 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcConn *pConn = rpcSetupConnToServer(pContext); if (pConn == NULL) { pContext->code = terrno; + // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); - return false; + return BOOL_ASYNC; } pContext->pConn = pConn; @@ -1422,7 +1423,7 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcUnlockConn(pConn); - return ret; + return ret ? BOOL_TRUE : BOOL_FALSE; } static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { @@ -1464,7 +1465,6 @@ static void rpcProcessConnError(void *param, void *id) { return; } - tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) { rpcMsg.msgType = pContext->msgType+1; @@ -1473,9 +1473,11 @@ static void rpcProcessConnError(void *param, void *id) { rpcMsg.pCont = NULL; rpcMsg.contLen = 0; + tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); rpcNotifyClient(pContext, &rpcMsg); } else { - // move to next IP + // move to next IP + tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); pContext->epSet.inUse++; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; rpcSendReqToServer(pRpc, pContext); -- GitLab