diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cc3190a1d03f2423790a8de18720280b7f001329..83e8105d48d482aefb159c9bb48864432c96f8f4 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -453,7 +453,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .code = 0 }; - if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { + if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid) != BOOL_FALSE) { if(pSql->cmd.command == TSDB_SQL_SELECT ) rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext); return TSDB_CODE_SUCCESS; diff --git a/src/inc/taos.h b/src/inc/taos.h index ea8e1d9dad61bdd513e8beded93d996ae66137dd..1a33a2910b84268f7fc0639ca436b85b6919c2f0 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -48,6 +48,11 @@ typedef void **TAOS_ROW; #define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes #define TSDB_DATA_TYPE_JSON 15 // json string +typedef enum { + BOOL_FALSE = 0, + BOOL_TRUE = 1, + BOOL_ASYNC = 2 //request is processed by async for another thread, not now true or false +} TBOOL; typedef enum { TSDB_OPTION_LOCALE, TSDB_OPTION_CHARSET, diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 52793423d7700aba9cfe51c9c5cbb8690f8621e7..b165415671ba8dfd16c07c4bb7ee5e0647008fba 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -85,7 +85,7 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 01c41b6e09e5e868aa680a841455486c2417dfa0..54db7f3a5990ae22f4fdc26de157280a4a4a26d1 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -201,7 +201,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); -static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); +static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); @@ -393,7 +393,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { +TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -1370,7 +1370,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { return; } -static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { +static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); char *msg = (char *)pHead; int msgLen = rpcMsgLenFromCont(pContext->contLen); @@ -1380,8 +1380,9 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcConn *pConn = rpcSetupConnToServer(pContext); if (pConn == NULL) { pContext->code = terrno; + // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); - return false; + return BOOL_ASYNC; } pContext->pConn = pConn; @@ -1422,7 +1423,7 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcUnlockConn(pConn); - return ret; + return ret ? BOOL_TRUE : BOOL_FALSE; } static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { @@ -1464,7 +1465,6 @@ static void rpcProcessConnError(void *param, void *id) { return; } - tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) { rpcMsg.msgType = pContext->msgType+1; @@ -1473,9 +1473,11 @@ static void rpcProcessConnError(void *param, void *id) { rpcMsg.pCont = NULL; rpcMsg.contLen = 0; + tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); rpcNotifyClient(pContext, &rpcMsg); } else { - // move to next IP + // move to next IP + tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); pContext->epSet.inUse++; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; rpcSendReqToServer(pRpc, pContext);