提交 a9c16769 编写于 作者: A Alex Duan

feat(rpc): add probe msg

上级 9f20bf05
...@@ -402,9 +402,6 @@ typedef struct SSqlObj { ...@@ -402,9 +402,6 @@ typedef struct SSqlObj {
int64_t lastProbe; int64_t lastProbe;
int64_t lastAlive; int64_t lastAlive;
void * pPrevContext; void * pPrevContext;
void * pPrevConn;
void * pPrevFdObj;
int32_t prevFd;
} SSqlObj; } SSqlObj;
typedef struct SSqlStream { typedef struct SSqlStream {
......
...@@ -303,7 +303,7 @@ bool dealConnBroken(SSqlObj * pSql) { ...@@ -303,7 +303,7 @@ bool dealConnBroken(SSqlObj * pSql) {
} }
// error notify // error notify
tscInfo("PROBE 0x%"PRIx64" async result error. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); tscInfo("PROBE 0x%"PRIx64" call async result error." PRIx64, pSql->self);
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return true; return true;
...@@ -325,23 +325,22 @@ bool sendProbeConnMsg(SSqlObj* pSql) { ...@@ -325,23 +325,22 @@ bool sendProbeConnMsg(SSqlObj* pSql) {
int32_t diff = (int32_t)(taosGetTimestampMs() - stime); int32_t diff = (int32_t)(taosGetTimestampMs() - stime);
if (diff < tsProbeSeconds * 1000) { if (diff < tsProbeSeconds * 1000) {
// exec time short , need not probe alive // exec time short , need not probe alive
tscInfo("PROBE 0x%" PRIx64 "not arrived probe time. timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \ tscInfo("PROBE 0x%" PRIx64 " not arrived probe time. cfg timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \
pSql->self, tsProbeSeconds, pSql->lastAlive, pSql->stime); pSql->self, tsProbeSeconds, pSql->lastAlive, pSql->stime);
return true; return true;
} }
if (diff > tsProbeKillSeconds * 1000) { if (diff > tsProbeKillSeconds * 1000) {
// need kill query // need kill query
tscInfo("PROBE 0x%" PRIx64 "kill query by probe. because arrived kill time. timeout=%ds lastAlive=%" PRId64 " stime=%" PRId64, \ tscInfo("PROBE 0x%" PRIx64 " kill query by probe. because arrived kill time. time=%ds cfg timeout=%ds lastAlive=%" PRId64 " stime=%" PRId64, \
pSql->self, tsProbeKillSeconds, pSql->lastAlive, pSql->stime); pSql->self, diff/1000, tsProbeKillSeconds, pSql->lastAlive, pSql->stime);
return false; return false;
} }
if (pSql->pPrevContext == NULL || pSql->pPrevConn == NULL || pSql->pPrevFdObj == NULL || pSql->prevFd <= 0) { if (pSql->pPrevContext == NULL) {
// last connect info save uncompletely, so can't probe // last connect info save uncompletely, so can't probe
tscInfo("PROBE 0x%" PRIx64 "save last connect info uncompletely. prev context=%p conn=%p fdobj=%p fd=%d", \ tscInfo("PROBE 0x%" PRIx64 " save last connect info uncompletely. prev context is null", pSql->self);
pSql->self, pSql->pPrevContext, pSql->pPrevConn, pSql->pPrevFdObj, pSql->prevFd);
return true; return true;
} }
...@@ -354,8 +353,8 @@ bool sendProbeConnMsg(SSqlObj* pSql) { ...@@ -354,8 +353,8 @@ bool sendProbeConnMsg(SSqlObj* pSql) {
// It's long time from lastAlive, so need probe // It's long time from lastAlive, so need probe
pSql->lastProbe = taosGetTimestampMs(); pSql->lastProbe = taosGetTimestampMs();
bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext, pSql->pPrevConn, pSql->pPrevFdObj, pSql->prevFd); bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext);
tscInfo("PROBE 0x%" PRIx64 " rpcRid=0x%" PRIx64 " send probe msg, ret=%d", pSql->self, pSql->rpcRid, ret); tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid);
return ret; return ret;
} }
...@@ -365,7 +364,7 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -365,7 +364,7 @@ void checkBrokenQueries(STscObj *pTscObj) {
SSqlObj *pSql = pTscObj->sqlList; SSqlObj *pSql = pTscObj->sqlList;
while (pSql) { while (pSql) {
int32_t numOfSub = pSql->subState.numOfSub; int32_t numOfSub = pSql->subState.numOfSub;
tscInfo("PROBE 0x%" PRIx64 " numOfSub=%d sql=%s", pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr); tscInfo("PROBE 0x%" PRIx64 " check sql connection alive, numOfSub=%d sql=%s", pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr);
if (numOfSub == 0) { if (numOfSub == 0) {
// no sub sql // no sub sql
if(!sendProbeConnMsg(pSql)) { if(!sendProbeConnMsg(pSql)) {
...@@ -460,9 +459,10 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -460,9 +459,10 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) {
if(pSql->cmd.command != TSDB_SQL_HB) if(pSql->cmd.command != TSDB_SQL_HB)
rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext, &pSql->pPrevConn, &pSql->pPrevFdObj, &pSql->prevFd); rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -78,6 +78,13 @@ typedef struct SRpcInit { ...@@ -78,6 +78,13 @@ typedef struct SRpcInit {
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit; } SRpcInit;
typedef struct SSendInfo {
void *pContext;
void *pConn;
void *pFdObj;
int32_t fd;
} SSendInfo;
int32_t rpcInit(); int32_t rpcInit();
void rpcCleanup(); void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc); void *rpcOpen(const SRpcInit *pRpc);
...@@ -94,9 +101,9 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen); ...@@ -94,9 +101,9 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock); int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
// send rpc Refid connection probe alive message // send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd); bool rpcSendProbe(int64_t rpcRid, void* pPrevContext);
// after sql request send , save conn info // after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd); bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -6369,7 +6369,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -6369,7 +6369,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// TEST TODU DELETE // TEST TODU DELETE
static int loop = 0; static int loop = 0;
taosMsleep(3*1000); taosMsleep(1*1000);
qInfo(" loop=%d pEnv=%p", loop++, pRuntimeEnv); qInfo(" loop=%d pEnv=%p", loop++, pRuntimeEnv);
......
...@@ -87,6 +87,7 @@ typedef struct { ...@@ -87,6 +87,7 @@ typedef struct {
tsem_t *pSem; // for synchronous API tsem_t *pSem; // for synchronous API
SRpcEpSet *pSet; // for synchronous API SRpcEpSet *pSet; // for synchronous API
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
SSendInfo sendInfo; // save last send information
} SRpcReqContext; } SRpcReqContext;
typedef struct SRpcConn { typedef struct SRpcConn {
...@@ -1412,6 +1413,11 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1412,6 +1413,11 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
if(pContext) if(pContext)
pConn->rid = pContext->rid; pConn->rid = pContext->rid;
// save
pContext->sendInfo.pConn = pConn;
pContext->sendInfo.pFdObj = pConn->chandle;
pContext->sendInfo.fd = taosGetFdID(pConn->chandle);
bool ret = rpcSendMsgToPeer(pConn, msg, msgLen); bool ret = rpcSendMsgToPeer(pConn, msg, msgLen);
if (pConn->connType != RPC_CONN_TCPC) if (pConn->connType != RPC_CONN_TCPC)
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
...@@ -1787,7 +1793,7 @@ bool doRpcSendProbe(SRpcConn *pConn) { ...@@ -1787,7 +1793,7 @@ bool doRpcSendProbe(SRpcConn *pConn) {
} }
// send server syn // send server syn
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd) { bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
// return false can kill query // return false can kill query
bool ret = false; bool ret = false;
if(rpcRid < 0) { if(rpcRid < 0) {
...@@ -1809,28 +1815,27 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPr ...@@ -1809,28 +1815,27 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPr
} }
// conn same // conn same
if (pContext->pConn != pPrevConn) { if (pContext->pConn != pContext->sendInfo.pConn) {
tError("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pPrevConn); tError("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn);
ret = pContext->pConn == NULL; ret = pContext->pConn == NULL;
goto _END; goto _END;
} }
// fdObj same // fdObj same
if (pContext->pConn->chandle != pPrevFdObj) { if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) {
tError("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pPrevFdObj); tError("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj);
goto _END; goto _END;
} }
// fd same // fd same
int32_t fd = taosGetFdID(pContext->pConn->chandle); int32_t fd = taosGetFdID(pContext->pConn->chandle);
if (fd != prevFd) { if (fd != pContext->sendInfo.fd) {
tError("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, prevFd); tError("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd);
goto _END; goto _END;
} }
// send syn // send syn
ret = doRpcSendProbe(pContext->pConn); ret = doRpcSendProbe(pContext->pConn);
tInfo("PROBE 0x%" PRIx64 " rrpcRid=0x%" PRIx64 " send data ret=%d fd=%d.", (int64_t)pContext->ahandle, rpcRid, ret, fd);
_END: _END:
// put back req context // put back req context
...@@ -1839,7 +1844,7 @@ _END: ...@@ -1839,7 +1844,7 @@ _END:
} }
// after sql request send , save conn info // after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) { bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) {
if(rpcRid < 0) { if(rpcRid < 0) {
tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return false; return false;
...@@ -1851,18 +1856,8 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppF ...@@ -1851,18 +1856,8 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppF
return false; return false;
} }
if (pContext->pConn == NULL || pContext->pConn->chandle == NULL) {
return false;
}
if (ppContext) if (ppContext)
*ppContext = pContext; *ppContext = pContext;
if (ppConn)
*ppConn = pContext->pConn;
if (ppFdObj && pContext->pConn)
*ppFdObj = pContext->pConn->chandle;
if (pFd && pContext->pConn)
*pFd = taosGetFdID(pContext->pConn->chandle);
taosReleaseRef(tsRpcRefId, rpcRid); taosReleaseRef(tsRpcRefId, rpcRid);
return true; return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册