提交 a8bbf342 编写于 作者: A Alex Duan

fix(rpc): probe connection already alive

上级 8549bd82
......@@ -397,6 +397,9 @@ typedef struct SSqlObj {
int32_t retryReason; // previous error code
struct SSqlObj *prev, *next;
int64_t self;
// connect alive
int64_t lastUpdate;
char noAckCnt; // no recevie ack from sever count
} SSqlObj;
typedef struct SSqlStream {
......
......@@ -281,6 +281,52 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
}
}
// pSql connection link is broken
bool dealConnBroken(SSqlObj * pSql) {
return true;
}
// if return true, send probe connection msg to sever ok
bool sendProbeConnMsg(SSqlObj* pSql) {
pSql->noAckCnt++;
tscDebug("0x%"PRIx64" sendProbeConnMsg noAckCnt:%d", pSql->self, pSql->noAckCnt);
// send
pSql->rpcRid
return true;
}
// check have broken link queries than killed
void checkBrokenQueries(STscObj *pTscObj) {
//
SSqlObj *pSql = pTscObj->sqlList;
while (pSql) {
int32_t numOfSub = pSql->subState.numOfSub;
if (numOfSub == 0) {
// no sub sql
if(!sendProbeConnMsg(pSql)) {
// send failed , connect already broken
dealConnBroken(pSql);
}
return ;
}
// have sub sql
for (int i = 0; i < numOfSub; i++) {
SSqlObj *pSubSql = pSql->pSubs[i];
if(!sendProbeConnMsg(pSubSql)) {
// send failed , connect already broken
dealConnBroken(pSubSql);
}
}
}
}
void tscProcessActivityTimer(void *handle, void *tmrId) {
int64_t rid = (int64_t) handle;
STscObj *pObj = taosAcquireRef(tscRefId, rid);
......@@ -296,6 +342,20 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
assert(pHB->self == pObj->hbrid);
// check queries already death
static int activetyTimerCnt = 0;
if (++activetyTimerCnt > 10) { // 1.5s * 10 = 15s interval call
activetyTimerCnt = 0;
// call check if have query doing
if(pObj->sqlList && pObj->sqlList->next) {
// have queries executing
checkBrokenQueies();
}
}
// send self connetion and queries
pHB->retry = 0;
int32_t code = tscBuildAndSendRequest(pHB, NULL);
taosReleaseRef(tscObjRef, pObj->hbrid);
......@@ -417,6 +477,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pSql->rpcRid = -1;
if (pObj->signature != pObj) {
tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
......@@ -427,6 +489,14 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return;
}
// check msgtype
if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
pSql->noAckCnt = 0;
pSql->lastUpdate = taosGetTimestampMs();
tscInfo(" recv sql probe msg. sql=%s", pSql->sqlstr);
return ;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
......
......@@ -29,6 +29,8 @@ void * dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue);
#ifdef __cplusplus
}
#endif
......
......@@ -78,6 +78,8 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeDispatchToVReadQueue;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
......
......@@ -57,6 +57,13 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_VND_INVALID_VGROUP_ID;
char * pCont = pMsg->pCont;
// check probe conn msg
if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP};
rpcSendResponse(&rpcRsp);
return ;
}
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *)pCont;
pHead->vgId = htonl(pHead->vgId);
......
......@@ -123,6 +123,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
// delete
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DELDATA, "delete-data" )
// syn -> ack probe connection msg
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_PROBE_CONN, "probe-connection-alive" )
#ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105
......
......@@ -94,6 +94,7 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
#ifdef __cplusplus
}
#endif
......
......@@ -32,6 +32,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
int32_t taosGetFdID(void *chandle);
#ifdef __cplusplus
}
#endif
......
......@@ -196,7 +196,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn);
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
......@@ -1349,9 +1349,10 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
rpcUnlockConn(pConn);
}
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
int writtenLen = 0;
SRpcHead *pHead = (SRpcHead *)msg;
bool ret = true;
msgLen = rpcAddAuthPart(pConn, msg, msgLen);
......@@ -1371,9 +1372,11 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
if (writtenLen != msgLen) {
tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
ret = false;
}
tDump(msg, msgLen);
return ret;
}
static void rpcProcessConnError(void *param, void *id) {
......@@ -1683,4 +1686,81 @@ int32_t rpcUnusedSession(void * rpcInfo, bool bLock) {
if(info == NULL)
return 0;
return taosIdPoolNumOfFree(info->idPool, bLock);
}
static void doRpcSendProbe(SRpcConn *pConn) {
char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead;
// set msg header
memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg;
pHead->version = 1;
pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN;
pHead->spi = pConn->spi;
pHead->encrypt = 0;
pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->linkUid = pConn->linkUid;
pHead->ahandle = (uint64_t)pConn->ahandle;
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code);
rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
pConn->secured = 1; // connection shall be secured
}
// send server syn
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd) {
bool ret = false;
if(rpcRid < 0) {
tError("ACK rpcRid=%" PRId64 " less than zero, invalid.", rpcRid);
return false;
}
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) {
tError("ACK rpcRid=%" PRId64 " get context NULL.", rpcRid);
return false;
}
// context same
if(pContext != pPrevContext) {
tError("ACK rpcRid=%" PRId64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext);
goto _END;
}
// conn same
if (pContext->pConn != pPrevConn) {
tError("ACK rpcRid=%" PRId64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pPrevConn);
goto _END;
}
// fdObj same
if (pContext->pConn->chandle != pPrevFdObj) {
tError("ACK rpcRid=%" PRId64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pPrevFdObj);
goto _END;
}
// fd same
int32_t fd = taosGetFdID(pContext->pConn->chandle);
if (fd != prevFd) {
tError("ACK rpcRid=%" PRId64 " connect fd diff.fd=%d prevFd=%p", rpcRid, fd, prevFd);
goto _END;
}
// send syn
ret = doRpcSendProbe(pContext->pConn);
_END:
// put back req context
taosReleaseRef(tsRpcRefId, rpcRid);
return ret;
}
bool saveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) {
}
\ No newline at end of file
......@@ -674,3 +674,12 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj);
}
int32_t taosGetFdID(void *chandle) {
SFdObj * pFdObj = chandle;
if(pFdObj == NULL)
return -1;
if (pFdObj->signature != pFdObj)
return -1;
return pFdObj->fd;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册