提交 4c4a59b7 编写于 作者: A Alex Duan

feat(rpc): fix build error

上级 df934ab4
...@@ -92,7 +92,8 @@ typedef struct { ...@@ -92,7 +92,8 @@ typedef struct {
int64_t rid; // refId returned by taosAddRef int64_t rid; // refId returned by taosAddRef
SRpcMsg *pRsp; // for synchronous API SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API tsem_t *pSem; // for synchronous API
SRpcEpSet *pSet; // for synchronous API SRpcEpSet *pSet; // for synchronous API
SSendInfo sendInfo; // save last send information
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
...@@ -131,6 +132,7 @@ typedef struct SRpcConn { ...@@ -131,6 +132,7 @@ typedef struct SRpcConn {
int8_t connType; // connection type int8_t connType; // connection type
int64_t lockedBy; // lock for connection int64_t lockedBy; // lock for connection
SRpcReqContext *pContext; // request context SRpcReqContext *pContext; // request context
int64_t rid; // probe msg use rid get pContext
} SRpcConn; } SRpcConn;
int tsRpcMaxUdpSize = 15000; // bytes int tsRpcMaxUdpSize = 15000; // bytes
...@@ -199,10 +201,10 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); ...@@ -199,10 +201,10 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn); static void rpcSendReqHead(SRpcConn *pConn);
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
...@@ -391,7 +393,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -391,7 +393,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -421,7 +423,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64 ...@@ -421,7 +423,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64
pContext->rid = taosAddRef(tsRpcRefId, pContext); pContext->rid = taosAddRef(tsRpcRefId, pContext);
if (pRid) *pRid = pContext->rid; if (pRid) *pRid = pContext->rid;
rpcSendReqToServer(pRpc, pContext); return rpcSendReqToServer(pRpc, pContext);
} }
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
...@@ -986,6 +988,10 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -986,6 +988,10 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
return NULL; return NULL;
} }
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
return pConn;
}
rpcLockConn(pConn); rpcLockConn(pConn);
if (rpcIsReq(pHead->msgType)) { if (rpcIsReq(pHead->msgType)) {
...@@ -1083,6 +1089,58 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -1083,6 +1089,58 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} }
// process probe msg , return true is probe msg, false is not probe msg
static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
uint64_t ahandle = pHead->ahandle;
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
// response to
char msg[RPC_MSG_OVERHEAD];
SRpcHead *pRspHead;
// set msg header
memset(msg, 0, sizeof(SRpcHead));
pRspHead = (SRpcHead *)msg;
pRspHead->msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP;
pRspHead->version = 1;
pRspHead->ahandle = pHead->ahandle;
pRspHead->tranId = pHead->tranId;
pRspHead->code = 0;
pRspHead->linkUid = pHead->linkUid;
rpcLockConn(pConn);
pRspHead->sourceId = pConn->ownId;
pRspHead->destId = pConn->peerId;
memcpy(pRspHead->user, pHead->user, tListLen(pHead->user));
bool ret = rpcSendMsgToPeer(pConn, pRspHead, sizeof(SRpcHead));
tInfo("PROBE 0x%" PRIx64 " recv probe msg and do response. ret=%d", ahandle, ret);
rpcUnlockConn(pConn);
rpcFreeMsg(pRecv->msg);
} else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
if(pConn) {
rpcLockConn(pConn);
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, pConn->rid);
if (pContext) {
rpcProcessIncomingMsg(pConn, pHead, pContext);
taosReleaseRef(tsRpcRefId, pConn->rid);
} else {
tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle, pConn->rid);
rpcFreeMsg(pRecv->msg);
}
rpcUnlockConn(pConn);
} else {
tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", ahandle);
rpcFreeMsg(pRecv->msg);
}
}
}
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
...@@ -1154,7 +1212,10 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1154,7 +1212,10 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
} }
// free the request message // free the request message
taosRemoveRef(tsRpcRefId, pContext->rid); if(pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN && pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN_RSP) {
taosRemoveRef(tsRpcRefId, pContext->rid);
}
} }
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
...@@ -1179,6 +1240,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1179,6 +1240,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
// it's a response // it's a response
rpcMsg.handle = pContext; rpcMsg.handle = pContext;
rpcMsg.ahandle = pContext->ahandle; rpcMsg.ahandle = pContext->ahandle;
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
// probe msg
rpcNotifyClient(pContext, &rpcMsg);
return ;
}
// reset pConn NULL
pContext->pConn = NULL; pContext->pConn = NULL;
// for UDP, port may be changed by server, the port in epSet shall be used for cache // for UDP, port may be changed by server, the port in epSet shall be used for cache
...@@ -1296,7 +1364,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { ...@@ -1296,7 +1364,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
return; return;
} }
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead; char *msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen); int msgLen = rpcMsgLenFromCont(pContext->contLen);
...@@ -1335,17 +1403,26 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1335,17 +1403,26 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pConn->pReqMsg = msg; pConn->pReqMsg = msg;
pConn->reqMsgLen = msgLen; pConn->reqMsgLen = msgLen;
pConn->pContext = pContext; pConn->pContext = pContext;
if(pContext)
pConn->rid = pContext->rid;
rpcSendMsgToPeer(pConn, msg, msgLen); // save
pContext->sendInfo.pConn = pConn;
pContext->sendInfo.pFdObj = pConn->chandle;
pContext->sendInfo.fd = taosGetFdID(pConn->chandle);
bool ret = rpcSendMsgToPeer(pConn, msg, msgLen);
if (pConn->connType != RPC_CONN_TCPC) if (pConn->connType != RPC_CONN_TCPC)
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
return ret;
} }
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
int writtenLen = 0; int writtenLen = 0;
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
bool ret = true;
msgLen = rpcAddAuthPart(pConn, msg, msgLen); msgLen = rpcAddAuthPart(pConn, msg, msgLen);
...@@ -1365,9 +1442,11 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -1365,9 +1442,11 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
if (writtenLen != msgLen) { if (writtenLen != msgLen) {
tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
ret = false;
} }
tDump(msg, msgLen); tDump(msg, msgLen);
return ret;
} }
static void rpcProcessConnError(void *param, void *id) { static void rpcProcessConnError(void *param, void *id) {
...@@ -1672,3 +1751,103 @@ static void rpcDecRef(SRpcInfo *pRpc) ...@@ -1672,3 +1751,103 @@ static void rpcDecRef(SRpcInfo *pRpc)
} }
} }
bool doRpcSendProbe(SRpcConn *pConn) {
char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead;
int code = 0;
// set msg header
memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg;
pHead->version = 1;
pHead->msgVer = htonl(tsVersion >> 8);
pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN;
pHead->spi = pConn->spi;
pHead->encrypt = 0;
pHead->tranId = (uint16_t)(taosRand() & 0xFFFF); // rand
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->linkUid = pConn->linkUid;
pHead->ahandle = (uint64_t)pConn->ahandle;
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code);
bool ret = rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead) + sizeof(int32_t));
pConn->lastLiveTime = taosGetTimestampMs();
return ret;
}
// send server syn
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
// return false can kill query
bool ret = false;
if(rpcRid < 0) {
tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return true;
}
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) {
tError("PROBE rpcRid=0x%" PRIx64 " get context NULL. sql finished no need send probe.", rpcRid);
return true;
}
// context same
if(pContext != pPrevContext) {
tError("PROBE rpcRid=0x%" PRIx64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext);
goto _END;
}
// conn same
if(pContext->pConn == NULL) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. ", rpcRid);
ret = true;
goto _END;
} else if (pContext->pConn != pContext->sendInfo.pConn) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn);
goto _END;
}
// fdObj same
if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj);
goto _END;
}
// fd same
SOCKET fd = taosGetFdID(pContext->pConn->chandle);
if (fd != pContext->sendInfo.fd) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd);
goto _END;
}
// send syn
ret = doRpcSendProbe(pContext->pConn);
_END:
// put back req context
taosReleaseRef(tsRpcRefId, rpcRid);
return ret;
}
// after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) {
if(rpcRid < 0) {
tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return false;
}
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) {
tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " get context NULL.", rpcRid);
return false;
}
if (ppContext)
*ppContext = pContext;
taosReleaseRef(tsRpcRefId, rpcRid);
return true;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册