提交 de5a5152 编写于 作者: A Alex Duan

fix(query): add TBOOL type

上级 e99f512e
...@@ -460,7 +460,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -460,7 +460,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
} }
if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid) != BOOL_FALSE) {
if(pSql->cmd.command == TSDB_SQL_SELECT) if(pSql->cmd.command == TSDB_SQL_SELECT)
rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext); rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -48,6 +48,12 @@ typedef void **TAOS_ROW; ...@@ -48,6 +48,12 @@ typedef void **TAOS_ROW;
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes #define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_JSON 15 // json string #define TSDB_DATA_TYPE_JSON 15 // json string
typedef enum {
BOOL_FALSE = 0,
BOOL_TRUE = 1,
BOOL_ASYNC = 2 //request is processed by async for another thread, not now true or false
} TBOOL;
typedef enum { typedef enum {
TSDB_OPTION_LOCALE, TSDB_OPTION_LOCALE,
TSDB_OPTION_CHARSET, TSDB_OPTION_CHARSET,
......
...@@ -85,7 +85,7 @@ void rpcClose(void *); ...@@ -85,7 +85,7 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
......
...@@ -202,7 +202,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); ...@@ -202,7 +202,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
...@@ -394,7 +394,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -394,7 +394,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -1384,7 +1384,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { ...@@ -1384,7 +1384,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
return; return;
} }
static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead; char *msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen); int msgLen = rpcMsgLenFromCont(pContext->contLen);
...@@ -1394,8 +1394,9 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1394,8 +1394,9 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcConn *pConn = rpcSetupConnToServer(pContext); SRpcConn *pConn = rpcSetupConnToServer(pContext);
if (pConn == NULL) { if (pConn == NULL) {
pContext->code = terrno; pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
return false; return BOOL_ASYNC;
} }
pContext->pConn = pConn; pContext->pConn = pConn;
...@@ -1436,7 +1437,7 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1436,7 +1437,7 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
return ret; return ret ? BOOL_TRUE : BOOL_FALSE;
} }
static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
...@@ -1478,8 +1479,6 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1478,8 +1479,6 @@ static void rpcProcessConnError(void *param, void *id) {
return; return;
} }
tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) { if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) {
rpcMsg.msgType = pContext->msgType+1; rpcMsg.msgType = pContext->msgType+1;
rpcMsg.ahandle = pContext->ahandle; rpcMsg.ahandle = pContext->ahandle;
...@@ -1487,9 +1486,11 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1487,9 +1486,11 @@ static void rpcProcessConnError(void *param, void *id) {
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
rpcMsg.contLen = 0; rpcMsg.contLen = 0;
tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType);
rpcNotifyClient(pContext, &rpcMsg); rpcNotifyClient(pContext, &rpcMsg);
} else { } else {
// move to next IP // move to next IP
tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType);
pContext->epSet.inUse++; pContext->epSet.inUse++;
pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册