提交 6ecc4abc 编写于 作者: A Alex Duan

fix(rpc): probe alive test

上级 0775971f
...@@ -291,7 +291,7 @@ bool dealConnBroken(SSqlObj * pSql) { ...@@ -291,7 +291,7 @@ bool dealConnBroken(SSqlObj * pSql) {
// if return true, send probe connection msg to sever ok // if return true, send probe connection msg to sever ok
bool sendProbeConnMsg(SSqlObj* pSql) { bool sendProbeConnMsg(SSqlObj* pSql) {
// check time out // check time out
int32_t probeTimeout = 60*1000; // over this value send probe msg int32_t probeTimeout = 1*1000; // over this value send probe msg
int32_t killTimeout = 3*60*1000; // over this value query can be killed int32_t killTimeout = 3*60*1000; // over this value query can be killed
if(pSql->stime == 0) { if(pSql->stime == 0) {
// not start , no need probe // not start , no need probe
...@@ -430,7 +430,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -430,7 +430,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
} }
if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) {
saveSendInfo(pSql->rpcRid, &pSql->pPrevContext, &pSql->pPrevConn, &pSql->pPrevFdObj, &pSql->prevFd); if(pSql->cmd.command != TSDB_SQL_HB)
rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext, &pSql->pPrevConn, &pSql->pPrevFdObj, &pSql->prevFd);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -28,6 +28,8 @@ void * dnodeAllocVQueryQueue(void *pVnode); ...@@ -28,6 +28,8 @@ void * dnodeAllocVQueryQueue(void *pVnode);
void * dnodeAllocVFetchQueue(void *pVnode); void * dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue); void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue); void dnodeFreeVFetchQueue(void *pFqueue);
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg);
......
...@@ -78,7 +78,7 @@ int32_t dnodeInitShell() { ...@@ -78,7 +78,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
......
...@@ -143,7 +143,11 @@ static void *dnodeProcessReadQueue(void *wparam) { ...@@ -143,7 +143,11 @@ static void *dnodeProcessReadQueue(void *wparam) {
int32_t code = vnodeProcessRead(pVnode, pRead); int32_t code = vnodeProcessRead(pVnode, pRead);
if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { if(code == 9999) {
dInfo(" ******* doNotRsp Test **** msg:%p, app:%p type:%s will be processed in vquery queue, qtype:%d", pRead, pRead->rpcAhandle,taosMsg[pRead->msgType], qtype);
printf(" ******* doNotRsp Test **** msg:%p, app:%p type:%s will be processed in vquery queue, qtype:%d", pRead, pRead->rpcAhandle,taosMsg[pRead->msgType], qtype);
} else if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
dnodeSendRpcVReadRsp(pVnode, pRead, code); dnodeSendRpcVReadRsp(pVnode, pRead, code);
} else { } else {
if (code == TSDB_CODE_QRY_HAS_RSP) { if (code == TSDB_CODE_QRY_HAS_RSP) {
...@@ -159,3 +163,13 @@ static void *dnodeProcessReadQueue(void *wparam) { ...@@ -159,3 +163,13 @@ static void *dnodeProcessReadQueue(void *wparam) {
return NULL; return NULL;
} }
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg) {
// check probe conn msg
if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP};
rpcSendResponse(&rpcRsp);
return ;
}
}
...@@ -96,7 +96,7 @@ int32_t rpcUnusedSession(void * rpcInfo, bool bLock); ...@@ -96,7 +96,7 @@ int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
// send rpc Refid connection probe alive message // send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd); bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd);
// after sql request send , save conn info // after sql request send , save conn info
bool saveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd); bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1698,6 +1698,7 @@ bool doRpcSendProbe(SRpcConn *pConn) { ...@@ -1698,6 +1698,7 @@ bool doRpcSendProbe(SRpcConn *pConn) {
memset(msg, 0, sizeof(SRpcHead)); memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg; pHead = (SRpcHead *)msg;
pHead->version = 1; pHead->version = 1;
pHead->msgVer = htonl(tsVersion >> 8);
pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN; pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN;
pHead->spi = pConn->spi; pHead->spi = pConn->spi;
pHead->encrypt = 0; pHead->encrypt = 0;
...@@ -1765,7 +1766,7 @@ _END: ...@@ -1765,7 +1766,7 @@ _END:
} }
// after sql request send , save conn info // after sql request send , save conn info
bool saveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) { bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) {
if(rpcRid < 0) { if(rpcRid < 0) {
tError("ACK saveSendInfo rpcRid=%" PRId64 " less than zero, invalid.", rpcRid); tError("ACK saveSendInfo rpcRid=%" PRId64 " less than zero, invalid.", rpcRid);
return false; return false;
......
...@@ -339,8 +339,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -339,8 +339,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
} }
} }
// TEST CODE
//code = 9999;
} }
return code; return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册