提交 eb08390d 编写于 作者: A Alex Duan

fix conn can not get

上级 c3b58766
......@@ -291,6 +291,9 @@ bool dealConnBroken(SSqlObj * pSql) {
return false;
}
// set error
pSql->res.code = TSDB_CODE_RPC_CONN_BROKEN;
// cancel
if (pSql->rpcRid > 0) {
tscDebug("PROBE 0x%" PRIx64 " rpc cancel request rpcRid=0x%" PRIx64 ".", pSql->self, pSql->rpcRid);
......
......@@ -28,8 +28,8 @@ void * dnodeAllocVQueryQueue(void *pVnode);
void * dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue);
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -77,6 +77,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
......
......@@ -155,4 +155,14 @@ static void *dnodeProcessReadQueue(void *wparam) {
}
return NULL;
}
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg) {
// check probe conn msg
if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP};
rpcSendResponse(&rpcRsp);
return ;
}
}
\ No newline at end of file
......@@ -60,6 +60,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014) //"Database not ready"
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) //"Unable to resolve FQDN"
#define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) //"Invalid app version"
#define TSDB_CODE_RPC_CONN_BROKEN TAOS_DEF_ERROR_CODE(0, 0x0017) //"connection is broken"
//common & util
#define TSDB_CODE_COM_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //"Operation not supported"
......
......@@ -16,6 +16,9 @@
#ifndef TDENGINE_RPCHEAD_H
#define TDENGINE_RPCHEAD_H
#define TSDB_MSG_TYPE_PROBE_CONN 0xFE
#define TSDB_MSG_TYPE_PROBE_CONN_RSP 0xFF
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -124,6 +124,7 @@ typedef struct SRpcConn {
int8_t connType; // connection type
int64_t lockedBy; // lock for connection
SRpcReqContext *pContext; // request context
int64_t lastLiveTime; // last alive time with ms
} SRpcConn;
int tsRpcMaxUdpSize = 15000; // bytes
......@@ -993,6 +994,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
pConn->peerPort = pRecv->port;
if (pHead->port) pConn->peerPort = htons(pHead->port);
/*
// probe msg
if(pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
pConn->inType = pHead->msgType;
......@@ -1002,6 +1004,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
pRecv->msg = NULL;
return pConn;
}
*/
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
......@@ -1086,6 +1089,41 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
rpcUnlockConn(pConn);
}
// process probe msg , return true is probe msg, false is not probe msg
static bool rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
// response to
SRpcHead rspHead;
memset(&rspHead, 0, sizeof(SRpcHead));
rspHead.msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP;
rspHead.version = 1;
rspHead.ahandle = pHead->ahandle;
rspHead.tranId = pHead->tranId;
rspHead.code = 0;
rspHead.spi = pHead->spi;
rspHead.linkUid = pHead->linkUid;
rspHead.sourceId = pConn->ownId;
rspHead.destId = pConn->peerId;
memcpy(rspHead.user, pHead->user, tListLen(pHead->user));
bool ret = rpcSendMsgToPeer(pConn, &rspHead, sizeof(SRpcHead));
tDebug("PROBE 0x%" PRIx64 " recv probe msg and response. ret=%d", pHead->ahandle, ret);
return true;
} else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
if(pConn) {
pConn->lastLiveTime = taosGetTimestampMs();
rpcProcessIncomingMsg(pConn, pHead, pConn->pContext);
}
tDebug("PROBE 0x%" PRIx64 " recv response probe msg and update lastLiveTime. pConn=%p", pHead->ahandle, pConn);
return false;
} else {
return false;
}
}
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
......@@ -1105,6 +1143,14 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcReqContext *pContext;
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
// deal probe msg
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
if (rpcProcessProbeMsg(pRecv, pConn)) {
rpcFreeMsg(pRecv->msg);
return pConn;
}
}
if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) {
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
......@@ -1712,7 +1758,7 @@ bool doRpcSendProbe(SRpcConn *pConn) {
pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN;
pHead->spi = pConn->spi;
pHead->encrypt = 0;
pHead->tranId = pConn->inTranId;
pHead->tranId = (uint16_t)(taosRand() & 0xFFFF); // rand
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->linkUid = pConn->linkUid;
......@@ -1720,9 +1766,8 @@ bool doRpcSendProbe(SRpcConn *pConn) {
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code);
pConn->outType = pHead->msgType;
bool ret = rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead) + sizeof(int32_t));
pConn->secured = 1; // connection shall be secured
pConn->lastLiveTime = taosGetTimestampMs();
return ret;
}
......@@ -1731,44 +1776,45 @@ bool doRpcSendProbe(SRpcConn *pConn) {
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd) {
bool ret = false;
if(rpcRid < 0) {
tError("ACK rpcRid=%" PRId64 " less than zero, invalid.", rpcRid);
tError("PROBE rpcRid=%" PRId64 " less than zero, invalid.", rpcRid);
return false;
}
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) {
tError("ACK rpcRid=%" PRId64 " get context NULL.", rpcRid);
tError("PROBE rpcRid=%" PRId64 " get context NULL.", rpcRid);
return false;
}
// context same
if(pContext != pPrevContext) {
tError("ACK rpcRid=%" PRId64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext);
tError("PROBE rpcRid=%" PRId64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext);
goto _END;
}
// conn same
if (pContext->pConn != pPrevConn) {
tError("ACK rpcRid=%" PRId64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pPrevConn);
tError("PROBE rpcRid=%" PRId64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pPrevConn);
goto _END;
}
// fdObj same
if (pContext->pConn->chandle != pPrevFdObj) {
tError("ACK rpcRid=%" PRId64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pPrevFdObj);
tError("PROBE rpcRid=%" PRId64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pPrevFdObj);
goto _END;
}
// fd same
int32_t fd = taosGetFdID(pContext->pConn->chandle);
if (fd != prevFd) {
tError("ACK rpcRid=%" PRId64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, prevFd);
tError("PROBE rpcRid=%" PRId64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, prevFd);
goto _END;
}
// send syn
ret = doRpcSendProbe(pContext->pConn);
tInfo("PROBE 0x%" PRIx64 " rpcRid=%" PRId64 " send data ret=%d fd=%d.", (int64_t)pContext->ahandle, rpcRid, ret, fd);
_END:
// put back req context
......@@ -1797,9 +1843,9 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppF
*ppContext = pContext;
if (ppConn)
*ppConn = pContext->pConn;
if (ppFdObj)
if (ppFdObj && pContext->pConn)
*ppFdObj = pContext->pConn->chandle;
if (pFd)
if (pFd && pContext->pConn)
*pFd = taosGetFdID(pContext->pConn->chandle);
taosReleaseRef(tsRpcRefId, rpcRid);
......
......@@ -68,6 +68,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_CONN_BROKEN, "Connection broken")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, "Operation not supported")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册