diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 402b4cb4a628fb632f502ae3b11e1248e4aab7de..276333e18de415148e194a0fe34bd574e2b657f4 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -92,7 +92,8 @@ typedef struct { int64_t rid; // refId returned by taosAddRef SRpcMsg *pRsp; // for synchronous API tsem_t *pSem; // for synchronous API - SRpcEpSet *pSet; // for synchronous API + SRpcEpSet *pSet; // for synchronous API + SSendInfo sendInfo; // save last send information char msg[0]; // RpcHead starts from here } SRpcReqContext; @@ -131,6 +132,7 @@ typedef struct SRpcConn { int8_t connType; // connection type int64_t lockedBy; // lock for connection SRpcReqContext *pContext; // request context + int64_t rid; // probe msg use rid get pContext } SRpcConn; int tsRpcMaxUdpSize = 15000; // bytes @@ -199,10 +201,10 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); -static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); +static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); -static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); +static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); @@ -391,7 +393,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { +bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -421,7 +423,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64 pContext->rid = taosAddRef(tsRpcRefId, pContext); if (pRid) *pRid = pContext->rid; - rpcSendReqToServer(pRpc, pContext); + return rpcSendReqToServer(pRpc, pContext); } void rpcSendResponse(const SRpcMsg *pRsp) { @@ -986,6 +988,10 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont return NULL; } + if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + return pConn; + } + rpcLockConn(pConn); if (rpcIsReq(pHead->msgType)) { @@ -1083,6 +1089,58 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { rpcUnlockConn(pConn); } +// process probe msg , return true is probe msg, false is not probe msg +static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { + SRpcHead *pHead = (SRpcHead *)pRecv->msg; + uint64_t ahandle = pHead->ahandle; + if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) { + // response to + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pRspHead; + + // set msg header + memset(msg, 0, sizeof(SRpcHead)); + pRspHead = (SRpcHead *)msg; + + pRspHead->msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP; + pRspHead->version = 1; + pRspHead->ahandle = pHead->ahandle; + pRspHead->tranId = pHead->tranId; + pRspHead->code = 0; + pRspHead->linkUid = pHead->linkUid; + + rpcLockConn(pConn); + pRspHead->sourceId = pConn->ownId; + pRspHead->destId = pConn->peerId; + memcpy(pRspHead->user, pHead->user, tListLen(pHead->user)); + + bool ret = rpcSendMsgToPeer(pConn, pRspHead, sizeof(SRpcHead)); + tInfo("PROBE 0x%" PRIx64 " recv probe msg and do response. ret=%d", ahandle, ret); + + rpcUnlockConn(pConn); + rpcFreeMsg(pRecv->msg); + } else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + if(pConn) { + rpcLockConn(pConn); + // get req content + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, pConn->rid); + + if (pContext) { + rpcProcessIncomingMsg(pConn, pHead, pContext); + taosReleaseRef(tsRpcRefId, pConn->rid); + } else { + tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle, pConn->rid); + rpcFreeMsg(pRecv->msg); + } + + rpcUnlockConn(pConn); + } else { + tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", ahandle); + rpcFreeMsg(pRecv->msg); + } + } +} + static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; @@ -1154,7 +1212,10 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { } // free the request message - taosRemoveRef(tsRpcRefId, pContext->rid); + if(pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN && pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN_RSP) { + taosRemoveRef(tsRpcRefId, pContext->rid); + } + } static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { @@ -1179,6 +1240,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte // it's a response rpcMsg.handle = pContext; rpcMsg.ahandle = pContext->ahandle; + if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + // probe msg + rpcNotifyClient(pContext, &rpcMsg); + return ; + } + + // reset pConn NULL pContext->pConn = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache @@ -1296,7 +1364,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { return; } -static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { +static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); char *msg = (char *)pHead; int msgLen = rpcMsgLenFromCont(pContext->contLen); @@ -1335,17 +1403,26 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->pReqMsg = msg; pConn->reqMsgLen = msgLen; pConn->pContext = pContext; + if(pContext) + pConn->rid = pContext->rid; - rpcSendMsgToPeer(pConn, msg, msgLen); + // save + pContext->sendInfo.pConn = pConn; + pContext->sendInfo.pFdObj = pConn->chandle; + pContext->sendInfo.fd = taosGetFdID(pConn->chandle); + + bool ret = rpcSendMsgToPeer(pConn, msg, msgLen); if (pConn->connType != RPC_CONN_TCPC) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcUnlockConn(pConn); + return ret; } -static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { +static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { int writtenLen = 0; SRpcHead *pHead = (SRpcHead *)msg; + bool ret = true; msgLen = rpcAddAuthPart(pConn, msg, msgLen); @@ -1365,9 +1442,11 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { if (writtenLen != msgLen) { tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); + ret = false; } tDump(msg, msgLen); + return ret; } static void rpcProcessConnError(void *param, void *id) { @@ -1672,3 +1751,103 @@ static void rpcDecRef(SRpcInfo *pRpc) } } +bool doRpcSendProbe(SRpcConn *pConn) { + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; + int code = 0; + + // set msg header + memset(msg, 0, sizeof(SRpcHead)); + pHead = (SRpcHead *)msg; + pHead->version = 1; + pHead->msgVer = htonl(tsVersion >> 8); + pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN; + pHead->spi = pConn->spi; + pHead->encrypt = 0; + pHead->tranId = (uint16_t)(taosRand() & 0xFFFF); // rand + pHead->sourceId = pConn->ownId; + pHead->destId = pConn->peerId; + pHead->linkUid = pConn->linkUid; + pHead->ahandle = (uint64_t)pConn->ahandle; + memcpy(pHead->user, pConn->user, tListLen(pHead->user)); + pHead->code = htonl(code); + + bool ret = rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead) + sizeof(int32_t)); + pConn->lastLiveTime = taosGetTimestampMs(); + + return ret; +} + +// send server syn +bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { + // return false can kill query + bool ret = false; + if(rpcRid < 0) { + tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); + return true; + } + + // get req content + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid); + if (pContext == NULL) { + tError("PROBE rpcRid=0x%" PRIx64 " get context NULL. sql finished no need send probe.", rpcRid); + return true; + } + + // context same + if(pContext != pPrevContext) { + tError("PROBE rpcRid=0x%" PRIx64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext); + goto _END; + } + + // conn same + if(pContext->pConn == NULL) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. ", rpcRid); + ret = true; + goto _END; + } else if (pContext->pConn != pContext->sendInfo.pConn) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn); + goto _END; + } + + // fdObj same + if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj); + goto _END; + } + + // fd same + SOCKET fd = taosGetFdID(pContext->pConn->chandle); + if (fd != pContext->sendInfo.fd) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd); + goto _END; + } + + // send syn + ret = doRpcSendProbe(pContext->pConn); + +_END: + // put back req context + taosReleaseRef(tsRpcRefId, rpcRid); + return ret; +} + +// after sql request send , save conn info +bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) { + if(rpcRid < 0) { + tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); + return false; + } + // get req content + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid); + if (pContext == NULL) { + tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " get context NULL.", rpcRid); + return false; + } + + if (ppContext) + *ppContext = pContext; + + taosReleaseRef(tsRpcRefId, rpcRid); + return true; +} \ No newline at end of file