提交 c172979a 编写于 作者: A Alex Duan

fix(query): check probe in timer

上级 44b031b2
...@@ -397,9 +397,15 @@ typedef struct SSqlObj { ...@@ -397,9 +397,15 @@ typedef struct SSqlObj {
int32_t retryReason; // previous error code int32_t retryReason; // previous error code
struct SSqlObj *prev, *next; struct SSqlObj *prev, *next;
int64_t self; int64_t self;
// connect alive // connect alive
int64_t lastUpdate; int64_t lastProbe;
int64_t lastAlive;
char noAckCnt; // no recevie ack from sever count char noAckCnt; // no recevie ack from sever count
void * pPrevContext;
void * pPrevConn;
void * pPrevFdObj;
int32_t prevFd;
} SSqlObj; } SSqlObj;
typedef struct SSqlStream { typedef struct SSqlStream {
......
...@@ -283,21 +283,40 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -283,21 +283,40 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
// pSql connection link is broken // pSql connection link is broken
bool dealConnBroken(SSqlObj * pSql) { bool dealConnBroken(SSqlObj * pSql) {
// TODO
return true; return true;
} }
// if return true, send probe connection msg to sever ok // if return true, send probe connection msg to sever ok
bool sendProbeConnMsg(SSqlObj* pSql) { bool sendProbeConnMsg(SSqlObj* pSql) {
pSql->noAckCnt++; // check time out
tscDebug("0x%"PRIx64" sendProbeConnMsg noAckCnt:%d", pSql->self, pSql->noAckCnt); int32_t probeTimeout = 60*1000; // over this value send probe msg
int32_t killTimeout = 3*60*1000; // over this value query can be killed
int64_t stime = MAX(pSql->stime, pSql->lastAlive);
int32_t diff = (int32_t)(taosGetTimestampMs() - stime);
if (diff < probeTimeout) {
// exec time short , need not probe alive
return true;
}
// send if (diff > killTimeout) {
pSql->rpcRid // need kill query
tscDebug("PROBE 0x%"PRIx64" need killed, noAckCnt:%d diff=%d", pSql->self, pSql->noAckCnt, diff);
return false;
}
if (pSql->pPrevContext == NULL || pSql->pPrevConn == NULL || pSql->pPrevFdObj == NULL || pSql->prevFd <= 0) {
// last connect info save uncompletely, so can't probe
return true;
}
return true; // It's long time from lastAlive, so need probe
pSql->noAckCnt++;
pSql->lastProbe = taosGetTimestampMs();
tscDebug("0x%"PRIx64" sendProbeConnMsg noAckCnt:%d diff=%d", pSql->self, pSql->noAckCnt, diff);
return rpcSendProbe(pSql->rpcRid, pSql->pPrevContext, pSql->pPrevConn, pSql->pPrevFdObj, pSql->prevFd);
} }
// check have broken link queries than killed // check have broken link queries than killed
...@@ -350,11 +369,10 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -350,11 +369,10 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
// call check if have query doing // call check if have query doing
if(pObj->sqlList && pObj->sqlList->next) { if(pObj->sqlList && pObj->sqlList->next) {
// have queries executing // have queries executing
checkBrokenQueies(); checkBrokenQueies(pObj);
} }
} }
// send self connetion and queries // send self connetion and queries
pHB->retry = 0; pHB->retry = 0;
int32_t code = tscBuildAndSendRequest(pHB, NULL); int32_t code = tscBuildAndSendRequest(pHB, NULL);
...@@ -398,8 +416,12 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -398,8 +416,12 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) {
return TSDB_CODE_SUCCESS; saveSendInfo(pSql->rpcRid, &pSql->pPrevContext, &pSql->pPrevConn, &pSql->pPrevFdObj, &pSql->prevFd);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
} }
// handle three situation // handle three situation
......
...@@ -85,7 +85,7 @@ void rpcClose(void *); ...@@ -85,7 +85,7 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
...@@ -93,7 +93,10 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp ...@@ -93,7 +93,10 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock); int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
// send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd);
// after sql request send , save conn info
bool saveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -193,7 +193,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); ...@@ -193,7 +193,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
...@@ -385,7 +385,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -385,7 +385,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -415,7 +415,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64 ...@@ -415,7 +415,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64
pContext->rid = taosAddRef(tsRpcRefId, pContext); pContext->rid = taosAddRef(tsRpcRefId, pContext);
if (pRid) *pRid = pContext->rid; if (pRid) *pRid = pContext->rid;
rpcSendReqToServer(pRpc, pContext); return rpcSendReqToServer(pRpc, pContext);
} }
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
...@@ -1302,7 +1302,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { ...@@ -1302,7 +1302,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
return; return;
} }
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead; char *msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen); int msgLen = rpcMsgLenFromCont(pContext->contLen);
...@@ -1313,7 +1313,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1313,7 +1313,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
if (pConn == NULL) { if (pConn == NULL) {
pContext->code = terrno; pContext->code = terrno;
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
return; return false;
} }
pContext->pConn = pConn; pContext->pConn = pConn;
...@@ -1342,11 +1342,12 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1342,11 +1342,12 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pConn->reqMsgLen = msgLen; pConn->reqMsgLen = msgLen;
pConn->pContext = pContext; pConn->pContext = pContext;
rpcSendMsgToPeer(pConn, msg, msgLen); bool ret = rpcSendMsgToPeer(pConn, msg, msgLen);
if (pConn->connType != RPC_CONN_TCPC) if (pConn->connType != RPC_CONN_TCPC)
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
return ret;
} }
static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
...@@ -1763,7 +1764,28 @@ _END: ...@@ -1763,7 +1764,28 @@ _END:
return ret; return ret;
} }
// after sql request send , save conn info
bool saveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) { bool saveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) {
return true; if(rpcRid < 0) {
tError("ACK saveSendInfo rpcRid=%" PRId64 " less than zero, invalid.", rpcRid);
return false;
}
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) {
tError("ACK rpcRid=%" PRId64 " get context NULL.", rpcRid);
return false;
}
if (ppContext)
*ppContext = pContext;
if (ppConn)
*ppConn = pContext->pConn;
if (ppFdObj)
*ppFdObj = pContext->pConn->chandle;
if (pFd)
*pFd = taosGetFdID(pContext->pConn->chandle);
taosReleaseRef(tsRpcRefId, rpcRid);
return true;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册