diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 94c1348bbc42f9242d5d2f1eea5e87ab359d2930..a3db44c3826e823f38a4c1a5533a713bf58bc927 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -398,6 +398,10 @@ typedef struct SSqlObj { int32_t retryReason; // previous error code struct SSqlObj *prev, *next; int64_t self; + + // connect alive + int64_t lastAlive; + void * pPrevContext; } SSqlObj; typedef struct SSqlStream { diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 78dce41455ead3bf24252f36ded1ba70e9c7d440..b0900513e999a1d9f5be182f5aa242794307541e 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -314,7 +314,11 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) { return; } - assert(pSql->res.code != TSDB_CODE_SUCCESS); + // probe send error , but result be responsed by server async + if(pSql->res.code == TSDB_CODE_SUCCESS) { + return ; + } + if (tsShortcutFlag && (pSql->res.code == TSDB_CODE_RPC_SHORTCUT)) { tscDebug("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code)); pSql->res.code = TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 61ee447152758181aa053497e068489abf31f51c..c753ee3160dd452bfa62038d5048ba24c816bdd3 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -281,6 +281,102 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { } } +// if return true, send probe connection msg to sever ok +bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) { + if(stime == 0) { + // not start , no need probe + tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self); + return true; + } + + int64_t start = MAX(stime, pSql->lastAlive); + int32_t diff = (int32_t)(taosGetTimestampMs() - start); + if (diff < tsProbeSeconds * 1000) { + // exec time short , need not probe alive + tscInfo("PROBE 0x%" PRIx64 " not arrived probe time. cfg timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \ + pSql->self, tsProbeSeconds, pSql->lastAlive, pSql->stime); + return true; + } + + if (diff > tsProbeKillSeconds * 1000) { + // need kill query + tscInfo("PROBE 0x%" PRIx64 " kill query by probe. because arrived kill time. time=%ds cfg timeout=%ds lastAlive=%" PRId64 " stime=%" PRId64, \ + pSql->self, diff/1000, tsProbeKillSeconds, pSql->lastAlive, pSql->stime); + + return false; + } + + if (pSql->pPrevContext == NULL) { + // last connect info save uncompletely, so can't probe + tscInfo("PROBE 0x%" PRIx64 " save last connect info uncompletely. prev context is null", pSql->self); + return true; + } + + if(pSql->rpcRid == -1) { + // cancel or reponse ok from server, so need not probe + tscInfo("PROBE 0x%" PRIx64 " rpcRid is -1, response ok. no need probe.", pSql->self); + return true; + } + + bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext); + tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid); + return ret; +} + +// check have broken link queries than killed +void checkBrokenQueries(STscObj *pTscObj) { + tscDebug("PROBE checkBrokenQueries pTscObj=%p pTscObj->rid=0x%" PRIx64, pTscObj, pTscObj->rid); + SSqlObj *pSql = pTscObj->sqlList; + while (pSql) { + // avoid sqlobj may not be correctly removed from sql list + if (pSql->sqlstr == NULL) { + pSql = pSql->next; + continue; + } + + bool kill = false; + int32_t numOfSub = pSql->subState.numOfSub; + tscInfo("PROBE 0x%" PRIx64 " start checking sql alive, numOfSub=%d sql=%s stime=%" PRId64 " alive=%" PRId64 " rpcRid=0x%" PRIx64 \ + ,pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr, pSql->stime, pSql->lastAlive, pSql->rpcRid); + if (numOfSub == 0) { + // no sub sql + if(!sendProbeConnMsg(pSql, pSql->stime)) { + // need kill + tscInfo("PROBE 0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); + kill = true; + } + } else { + // lock subs + pthread_mutex_lock(&pSql->subState.mutex); + if (pSql->pSubs) { + // have sub sql + for (int i = 0; i < numOfSub; i++) { + SSqlObj *pSubSql = pSql->pSubs[i]; + if(pSubSql) { + tscInfo("PROBE 0x%" PRIx64 " sub sql app is 0x%" PRIx64, pSql->self, pSubSql->self); + if(!sendProbeConnMsg(pSubSql, pSql->stime)) { + // need kill + tscInfo("PROBE 0x%" PRIx64 " i=%d sub app=0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, i, pSubSql->self, pSubSql->rpcRid); + kill = true; + break; + } + } + } + } + // unlock + pthread_mutex_unlock(&pSql->subState.mutex); + } + + // kill query + if(kill) { + taos_stop_query(pSql); + } + + // move next + pSql = pSql->next; + } +} + void tscProcessActivityTimer(void *handle, void *tmrId) { int64_t rid = (int64_t) handle; STscObj *pObj = taosAcquireRef(tscRefId, rid); @@ -296,6 +392,19 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { assert(pHB->self == pObj->hbrid); + // check queries already death + static int activetyCnt = 0; + if (++activetyCnt > tsProbeInterval) { // 1.5s * 40 = 60s interval call check queries alive + activetyCnt = 0; + + // call check if have query doing + if(pObj->sqlList) { + // have queries executing + checkBrokenQueries(pObj); + } + } + + // send self connetion and queries pHB->retry = 0; int32_t code = tscBuildAndSendRequest(pHB, NULL); taosReleaseRef(tscObjRef, pObj->hbrid); @@ -338,7 +447,13 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_RPC_SHORTCUT; } - rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); + + if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { + if(pSql->cmd.command < TSDB_SQL_HB) + rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext); + return TSDB_CODE_SUCCESS; + } + return TSDB_CODE_SUCCESS; } @@ -413,10 +528,20 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { assert(pSql->self == handle); + // check msgtype + if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + pSql->lastAlive = taosGetTimestampMs(); + tscInfo("PROBE 0x%" PRIx64 " recv probe msg response. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); + rpcFreeCont(rpcMsg->pCont); + return ; + } + STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; + + pSql->rpcRid = -1; if (pObj->signature != pObj) { tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 5f1d996b54b76cf922510fccf36dc913f117a47a..ab34d99bfc4ba339c5200292d178b96f9b42488a 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3391,6 +3391,10 @@ static void doFreeInsertSupporter(SSqlObj* pSqlObj) { } static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { + if(param == NULL) { + tscError("callback multiVnodeInsertFinalize param is NULL. tres=%p numOfRows=%d", tres, numOfRows); + return ; + } SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 3add0b566dffb88d0a60e70c6098fd6586d3b689..e1b7cff8be8c551d0985ea293fcaff3b52f0d97b 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -218,6 +218,11 @@ extern int32_t debugFlag; extern int8_t tsClientMerge; +// probe alive connection +extern int32_t tsProbeSeconds; +extern int32_t tsProbeKillSeconds; +extern int32_t tsProbeInterval; + #ifdef TD_TSZ // lossy extern char lossyColumns[]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 44a53efd8c01ae47d4c6d069ca6d6d5c53448b10..77540cd0b61ed54c461fa44984abd20d507a74ab 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -268,6 +268,12 @@ int32_t fsDebugFlag = 135; int8_t tsClientMerge = 0; +// probe alive connection +int32_t tsProbeSeconds = 5 * 60; // start probe link alive after tsProbeSeconds from starting query +int32_t tsProbeKillSeconds = 10 * 60; // start kill query after tsProbeKillSeconds from last alive time +int32_t tsProbeInterval = 40; // 40 * 1.5s = 60 s interval time + + #ifdef TD_TSZ // // lossy compress 6 @@ -1733,6 +1739,39 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + // probeSeconds + cfg.option = "probeSeconds"; + cfg.ptr = &tsProbeSeconds; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 100000; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + // probeKillSeconds + cfg.option = "probeKillSeconds"; + cfg.ptr = &tsProbeKillSeconds; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 100000; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + // probeInterval + cfg.option = "probeInterval"; + cfg.ptr = &tsProbeInterval; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 100000; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + #ifdef TD_TSZ // lossy compress cfg.option = "lossyColumns"; diff --git a/src/dnode/inc/dnodeVRead.h b/src/dnode/inc/dnodeVRead.h index 9c88886f88bedaa63a5071b9dd2d773d4ff1cc0c..768075da9128d3bafbfc4035f929ec589c83f01c 100644 --- a/src/dnode/inc/dnodeVRead.h +++ b/src/dnode/inc/dnodeVRead.h @@ -28,6 +28,8 @@ void * dnodeAllocVQueryQueue(void *pVnode); void * dnodeAllocVFetchQueue(void *pVnode); void dnodeFreeVQueryQueue(void *pQqueue); void dnodeFreeVFetchQueue(void *pFqueue); +// reponse probe connection msg +void dnodeResponseProbeMsg(SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 26a2fc9651ceaaa9f73485839c24d0c4c52c0066..af1afc9766af70b180b06cb56bb35d90de40d2eb 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -77,6 +77,7 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index c404ab1a55c3788f5756c99f7914764e6e9af295..0b0bf29e504e1779ebe7921d8d374b1b31729b7a 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -152,3 +152,13 @@ static void *dnodeProcessReadQueue(void *wparam) { return NULL; } + +// reponse probe connection msg +void dnodeResponseProbeMsg(SRpcMsg *pMsg) { + // check probe conn msg + if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP}; + rpcSendResponse(&rpcRsp); + return ; + } +} \ No newline at end of file diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 1a8907efabc483233f7e12ce2511bfec339a8d6f..280747afed543eaddf2ed657c6067f4aad31a321 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -123,6 +123,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) // delete TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DELDATA, "delete-data" ) +// syn -> ack probe connection msg +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_PROBE_CONN, "probe-connection-alive" ) + #ifndef TAOS_MESSAGE_C TSDB_MSG_TYPE_MAX // 105 diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 5e3fbd571ef4b6425f2e5a58c308c8fc9da0b12e..59908253c2e825c0d26a27332f3ede5a90aa0060 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -85,7 +85,7 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); @@ -93,6 +93,10 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); int32_t rpcUnusedSession(void * rpcInfo, bool bLock); +// send rpc Refid connection probe alive message +bool rpcSendProbe(int64_t rpcRid, void* pPrevContext); +// after sql request send , save conn info +bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext); #ifdef __cplusplus } diff --git a/src/rpc/inc/rpcTcp.h b/src/rpc/inc/rpcTcp.h index 6ef8fc2d921a3379532bbc0efd2f226ef3389fc5..a47fa39ceb3b3a642c7839902885cbc8728abe16 100644 --- a/src/rpc/inc/rpcTcp.h +++ b/src/rpc/inc/rpcTcp.h @@ -32,6 +32,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin void taosCloseTcpConnection(void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); +SOCKET taosGetFdID(void *chandle); + #ifdef __cplusplus } #endif diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 4f67c6088db5712c8ef3139bbe758b0c2d66c02b..bcc759f845e7c0e4c110854953d674f5a917cfcf 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -69,6 +69,13 @@ typedef struct { struct SRpcConn *connList; // connection list } SRpcInfo; +typedef struct SSendInfo { + void *pContext; + void *pConn; + void *pFdObj; + SOCKET fd; +} SSendInfo; + typedef struct { SRpcInfo *pRpc; // associated SRpcInfo SRpcEpSet epSet; // ip list provided by app @@ -86,6 +93,7 @@ typedef struct { SRpcMsg *pRsp; // for synchronous API tsem_t *pSem; // for synchronous API SRpcEpSet *pSet; // for synchronous API + SSendInfo sendInfo; // save last send information char msg[0]; // RpcHead starts from here } SRpcReqContext; @@ -124,6 +132,7 @@ typedef struct SRpcConn { int8_t connType; // connection type int64_t lockedBy; // lock for connection SRpcReqContext *pContext; // request context + int64_t rid; // probe msg use rid get pContext } SRpcConn; int tsRpcMaxUdpSize = 15000; // bytes @@ -193,10 +202,10 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); -static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); +static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); -static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); +static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); @@ -385,7 +394,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { +bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -415,7 +424,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64 pContext->rid = taosAddRef(tsRpcRefId, pContext); if (pRid) *pRid = pContext->rid; - rpcSendReqToServer(pRpc, pContext); + return rpcSendReqToServer(pRpc, pContext); } void rpcSendResponse(const SRpcMsg *pRsp) { @@ -980,6 +989,10 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont return NULL; } + if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + return pConn; + } + rpcLockConn(pConn); if (rpcIsReq(pHead->msgType)) { @@ -1076,6 +1089,58 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { rpcUnlockConn(pConn); } +// process probe msg , return true is probe msg, false is not probe msg +static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { + SRpcHead *pHead = (SRpcHead *)pRecv->msg; + uint64_t ahandle = pHead->ahandle; + if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) { + // response to + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pRspHead; + + // set msg header + memset(msg, 0, sizeof(SRpcHead)); + pRspHead = (SRpcHead *)msg; + + pRspHead->msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP; + pRspHead->version = 1; + pRspHead->ahandle = pHead->ahandle; + pRspHead->tranId = pHead->tranId; + pRspHead->code = 0; + pRspHead->linkUid = pHead->linkUid; + + rpcLockConn(pConn); + pRspHead->sourceId = pConn->ownId; + pRspHead->destId = pConn->peerId; + memcpy(pRspHead->user, pHead->user, tListLen(pHead->user)); + + bool ret = rpcSendMsgToPeer(pConn, pRspHead, sizeof(SRpcHead)); + tInfo("PROBE 0x%" PRIx64 " recv probe msg and do response. ret=%d", ahandle, ret); + + rpcUnlockConn(pConn); + rpcFreeMsg(pRecv->msg); + } else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + if(pConn) { + rpcLockConn(pConn); + // get req content + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, pConn->rid); + + if (pContext) { + rpcProcessIncomingMsg(pConn, pHead, pContext); + taosReleaseRef(tsRpcRefId, pConn->rid); + } else { + tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle, pConn->rid); + rpcFreeMsg(pRecv->msg); + } + + rpcUnlockConn(pConn); + } else { + tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", ahandle); + rpcFreeMsg(pRecv->msg); + } + } +} + static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; @@ -1095,6 +1160,12 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { SRpcReqContext *pContext; pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); + // deal probe msg + if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + rpcProcessProbeMsg(pRecv, pConn); + return pConn; + } + if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, @@ -1147,7 +1218,10 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { } // free the request message - taosRemoveRef(tsRpcRefId, pContext->rid); + if(pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN && pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN_RSP) { + taosRemoveRef(tsRpcRefId, pContext->rid); + } + } static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { @@ -1185,6 +1259,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte // it's a response rpcMsg.handle = pContext; rpcMsg.ahandle = pContext->ahandle; + + if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + // probe msg + rpcNotifyClient(pContext, &rpcMsg); + return ; + } + + // reset pConn NULL pContext->pConn = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache @@ -1302,7 +1384,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { return; } -static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { +static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); char *msg = (char *)pHead; int msgLen = rpcMsgLenFromCont(pContext->contLen); @@ -1313,7 +1395,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { if (pConn == NULL) { pContext->code = terrno; taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); - return; + return false; } pContext->pConn = pConn; @@ -1341,17 +1423,26 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->pReqMsg = msg; pConn->reqMsgLen = msgLen; pConn->pContext = pContext; + if(pContext) + pConn->rid = pContext->rid; - rpcSendMsgToPeer(pConn, msg, msgLen); + // save + pContext->sendInfo.pConn = pConn; + pContext->sendInfo.pFdObj = pConn->chandle; + pContext->sendInfo.fd = taosGetFdID(pConn->chandle); + + bool ret = rpcSendMsgToPeer(pConn, msg, msgLen); if (pConn->connType != RPC_CONN_TCPC) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcUnlockConn(pConn); + return ret; } -static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { +static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { int writtenLen = 0; SRpcHead *pHead = (SRpcHead *)msg; + bool ret = true; msgLen = rpcAddAuthPart(pConn, msg, msgLen); @@ -1371,9 +1462,11 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { if (writtenLen != msgLen) { tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); + ret = false; } tDump(msg, msgLen); + return ret; } static void rpcProcessConnError(void *param, void *id) { @@ -1684,3 +1777,103 @@ int32_t rpcUnusedSession(void * rpcInfo, bool bLock) { return 0; return taosIdPoolNumOfFree(info->idPool, bLock); } + +bool doRpcSendProbe(SRpcConn *pConn) { + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; + int code = 0; + + // set msg header + memset(msg, 0, sizeof(SRpcHead)); + pHead = (SRpcHead *)msg; + pHead->version = 1; + pHead->msgVer = htonl(tsVersion >> 8); + pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN; + pHead->spi = pConn->spi; + pHead->encrypt = 0; + pHead->tranId = (uint16_t)(taosRand() & 0xFFFF); // rand + pHead->sourceId = pConn->ownId; + pHead->destId = pConn->peerId; + pHead->linkUid = pConn->linkUid; + pHead->ahandle = (uint64_t)pConn->ahandle; + memcpy(pHead->user, pConn->user, tListLen(pHead->user)); + pHead->code = htonl(code); + + bool ret = rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead) + sizeof(int32_t)); + + return ret; +} + +// send server syn +bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { + // return false can kill query + bool ret = false; + if(rpcRid < 0) { + tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); + return true; + } + + // get req content + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid); + if (pContext == NULL) { + tError("PROBE rpcRid=0x%" PRIx64 " get context NULL. sql finished no need send probe.", rpcRid); + return true; + } + + // context same + if(pContext != pPrevContext) { + tError("PROBE rpcRid=0x%" PRIx64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext); + goto _END; + } + + // conn same + if(pContext->pConn == NULL) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. ", rpcRid); + ret = true; + goto _END; + } else if (pContext->pConn != pContext->sendInfo.pConn) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn); + goto _END; + } + + // fdObj same + if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj); + goto _END; + } + + // fd same + SOCKET fd = taosGetFdID(pContext->pConn->chandle); + if (fd != pContext->sendInfo.fd) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd); + goto _END; + } + + // send syn + ret = doRpcSendProbe(pContext->pConn); + +_END: + // put back req context + taosReleaseRef(tsRpcRefId, rpcRid); + return ret; +} + +// after sql request send , save conn info +bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) { + if(rpcRid < 0) { + tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); + return false; + } + // get req content + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid); + if (pContext == NULL) { + tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " get context NULL.", rpcRid); + return false; + } + + if (ppContext) + *ppContext = pContext; + + taosReleaseRef(tsRpcRefId, rpcRid); + return true; +} \ No newline at end of file diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 44686fa9e01815138d7ea8b1963bf7e72f8606d6..530a279cc72a291525356aa1f2a6495f605946a0 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -674,3 +674,12 @@ static void taosFreeFdObj(SFdObj *pFdObj) { tfree(pFdObj); } + +SOCKET taosGetFdID(void *chandle) { + SFdObj * pFdObj = chandle; + if(pFdObj == NULL) + return -1; + if (pFdObj->signature != pFdObj) + return -1; + return pFdObj->fd; +} \ No newline at end of file diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index fd9a340a25a752b18ab07a8fbb2691038af3b71b..872da82a8e16549facd03fb3249b03b150a8f842 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define TSDB_CFG_MAX_NUM 131 +#define TSDB_CFG_MAX_NUM 134 #define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41