未验证 提交 04cc3142 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #15700 from taosdata/fix/TS-1566-V26

Fix/TS-1566-V26  RPC Probe connection alive
...@@ -398,6 +398,10 @@ typedef struct SSqlObj { ...@@ -398,6 +398,10 @@ typedef struct SSqlObj {
int32_t retryReason; // previous error code int32_t retryReason; // previous error code
struct SSqlObj *prev, *next; struct SSqlObj *prev, *next;
int64_t self; int64_t self;
// connect alive
int64_t lastAlive;
void * pPrevContext;
} SSqlObj; } SSqlObj;
typedef struct SSqlStream { typedef struct SSqlStream {
......
...@@ -314,7 +314,11 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) { ...@@ -314,7 +314,11 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
return; return;
} }
assert(pSql->res.code != TSDB_CODE_SUCCESS); // probe send error , but result be responsed by server async
if(pSql->res.code == TSDB_CODE_SUCCESS) {
return ;
}
if (tsShortcutFlag && (pSql->res.code == TSDB_CODE_RPC_SHORTCUT)) { if (tsShortcutFlag && (pSql->res.code == TSDB_CODE_RPC_SHORTCUT)) {
tscDebug("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code)); tscDebug("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
pSql->res.code = TSDB_CODE_SUCCESS; pSql->res.code = TSDB_CODE_SUCCESS;
......
...@@ -281,6 +281,102 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -281,6 +281,102 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
} }
} }
// if return true, send probe connection msg to sever ok
bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) {
if(stime == 0) {
// not start , no need probe
tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self);
return true;
}
int64_t start = MAX(stime, pSql->lastAlive);
int32_t diff = (int32_t)(taosGetTimestampMs() - start);
if (diff < tsProbeSeconds * 1000) {
// exec time short , need not probe alive
tscInfo("PROBE 0x%" PRIx64 " not arrived probe time. cfg timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \
pSql->self, tsProbeSeconds, pSql->lastAlive, pSql->stime);
return true;
}
if (diff > tsProbeKillSeconds * 1000) {
// need kill query
tscInfo("PROBE 0x%" PRIx64 " kill query by probe. because arrived kill time. time=%ds cfg timeout=%ds lastAlive=%" PRId64 " stime=%" PRId64, \
pSql->self, diff/1000, tsProbeKillSeconds, pSql->lastAlive, pSql->stime);
return false;
}
if (pSql->pPrevContext == NULL) {
// last connect info save uncompletely, so can't probe
tscInfo("PROBE 0x%" PRIx64 " save last connect info uncompletely. prev context is null", pSql->self);
return true;
}
if(pSql->rpcRid == -1) {
// cancel or reponse ok from server, so need not probe
tscInfo("PROBE 0x%" PRIx64 " rpcRid is -1, response ok. no need probe.", pSql->self);
return true;
}
bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext);
tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid);
return ret;
}
// check have broken link queries than killed
void checkBrokenQueries(STscObj *pTscObj) {
tscDebug("PROBE checkBrokenQueries pTscObj=%p pTscObj->rid=0x%" PRIx64, pTscObj, pTscObj->rid);
SSqlObj *pSql = pTscObj->sqlList;
while (pSql) {
// avoid sqlobj may not be correctly removed from sql list
if (pSql->sqlstr == NULL) {
pSql = pSql->next;
continue;
}
bool kill = false;
int32_t numOfSub = pSql->subState.numOfSub;
tscInfo("PROBE 0x%" PRIx64 " start checking sql alive, numOfSub=%d sql=%s stime=%" PRId64 " alive=%" PRId64 " rpcRid=0x%" PRIx64 \
,pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr, pSql->stime, pSql->lastAlive, pSql->rpcRid);
if (numOfSub == 0) {
// no sub sql
if(!sendProbeConnMsg(pSql, pSql->stime)) {
// need kill
tscInfo("PROBE 0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
kill = true;
}
} else {
// lock subs
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs) {
// have sub sql
for (int i = 0; i < numOfSub; i++) {
SSqlObj *pSubSql = pSql->pSubs[i];
if(pSubSql) {
tscInfo("PROBE 0x%" PRIx64 " sub sql app is 0x%" PRIx64, pSql->self, pSubSql->self);
if(!sendProbeConnMsg(pSubSql, pSql->stime)) {
// need kill
tscInfo("PROBE 0x%" PRIx64 " i=%d sub app=0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, i, pSubSql->self, pSubSql->rpcRid);
kill = true;
break;
}
}
}
}
// unlock
pthread_mutex_unlock(&pSql->subState.mutex);
}
// kill query
if(kill) {
taos_stop_query(pSql);
}
// move next
pSql = pSql->next;
}
}
void tscProcessActivityTimer(void *handle, void *tmrId) { void tscProcessActivityTimer(void *handle, void *tmrId) {
int64_t rid = (int64_t) handle; int64_t rid = (int64_t) handle;
STscObj *pObj = taosAcquireRef(tscRefId, rid); STscObj *pObj = taosAcquireRef(tscRefId, rid);
...@@ -296,6 +392,19 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -296,6 +392,19 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
assert(pHB->self == pObj->hbrid); assert(pHB->self == pObj->hbrid);
// check queries already death
static int activetyCnt = 0;
if (++activetyCnt > tsProbeInterval) { // 1.5s * 40 = 60s interval call check queries alive
activetyCnt = 0;
// call check if have query doing
if(pObj->sqlList) {
// have queries executing
checkBrokenQueries(pObj);
}
}
// send self connetion and queries
pHB->retry = 0; pHB->retry = 0;
int32_t code = tscBuildAndSendRequest(pHB, NULL); int32_t code = tscBuildAndSendRequest(pHB, NULL);
taosReleaseRef(tscObjRef, pObj->hbrid); taosReleaseRef(tscObjRef, pObj->hbrid);
...@@ -338,7 +447,13 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -338,7 +447,13 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_RPC_SHORTCUT; return TSDB_CODE_RPC_SHORTCUT;
} }
rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) {
if(pSql->cmd.command < TSDB_SQL_HB)
rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -413,10 +528,20 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -413,10 +528,20 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
assert(pSql->self == handle); assert(pSql->self == handle);
// check msgtype
if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
pSql->lastAlive = taosGetTimestampMs();
tscInfo("PROBE 0x%" PRIx64 " recv probe msg response. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
rpcFreeCont(rpcMsg->pCont);
return ;
}
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pSql->rpcRid = -1; pSql->rpcRid = -1;
if (pObj->signature != pObj) { if (pObj->signature != pObj) {
tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature); tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
......
...@@ -3391,6 +3391,10 @@ static void doFreeInsertSupporter(SSqlObj* pSqlObj) { ...@@ -3391,6 +3391,10 @@ static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
} }
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
if(param == NULL) {
tscError("callback multiVnodeInsertFinalize param is NULL. tres=%p numOfRows=%d", tres, numOfRows);
return ;
}
SInsertSupporter *pSupporter = (SInsertSupporter *)param; SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql; SSqlObj* pParentObj = pSupporter->pSql;
......
...@@ -218,6 +218,11 @@ extern int32_t debugFlag; ...@@ -218,6 +218,11 @@ extern int32_t debugFlag;
extern int8_t tsClientMerge; extern int8_t tsClientMerge;
// probe alive connection
extern int32_t tsProbeSeconds;
extern int32_t tsProbeKillSeconds;
extern int32_t tsProbeInterval;
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy // lossy
extern char lossyColumns[]; extern char lossyColumns[];
......
...@@ -268,6 +268,12 @@ int32_t fsDebugFlag = 135; ...@@ -268,6 +268,12 @@ int32_t fsDebugFlag = 135;
int8_t tsClientMerge = 0; int8_t tsClientMerge = 0;
// probe alive connection
int32_t tsProbeSeconds = 5 * 60; // start probe link alive after tsProbeSeconds from starting query
int32_t tsProbeKillSeconds = 10 * 60; // start kill query after tsProbeKillSeconds from last alive time
int32_t tsProbeInterval = 40; // 40 * 1.5s = 60 s interval time
#ifdef TD_TSZ #ifdef TD_TSZ
// //
// lossy compress 6 // lossy compress 6
...@@ -1733,6 +1739,39 @@ static void doInitGlobalConfig(void) { ...@@ -1733,6 +1739,39 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
// probeSeconds
cfg.option = "probeSeconds";
cfg.ptr = &tsProbeSeconds;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 100000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// probeKillSeconds
cfg.option = "probeKillSeconds";
cfg.ptr = &tsProbeKillSeconds;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 100000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// probeInterval
cfg.option = "probeInterval";
cfg.ptr = &tsProbeInterval;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 100000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -28,6 +28,8 @@ void * dnodeAllocVQueryQueue(void *pVnode); ...@@ -28,6 +28,8 @@ void * dnodeAllocVQueryQueue(void *pVnode);
void * dnodeAllocVFetchQueue(void *pVnode); void * dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue); void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue); void dnodeFreeVFetchQueue(void *pFqueue);
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -77,6 +77,7 @@ int32_t dnodeInitShell() { ...@@ -77,6 +77,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
......
...@@ -152,3 +152,13 @@ static void *dnodeProcessReadQueue(void *wparam) { ...@@ -152,3 +152,13 @@ static void *dnodeProcessReadQueue(void *wparam) {
return NULL; return NULL;
} }
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg) {
// check probe conn msg
if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP};
rpcSendResponse(&rpcRsp);
return ;
}
}
\ No newline at end of file
...@@ -123,6 +123,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) ...@@ -123,6 +123,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
// delete // delete
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DELDATA, "delete-data" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DELDATA, "delete-data" )
// syn -> ack probe connection msg
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_PROBE_CONN, "probe-connection-alive" )
#ifndef TAOS_MESSAGE_C #ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105 TSDB_MSG_TYPE_MAX // 105
......
...@@ -85,7 +85,7 @@ void rpcClose(void *); ...@@ -85,7 +85,7 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
...@@ -93,6 +93,10 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp ...@@ -93,6 +93,10 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock); int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
// send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext);
// after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -32,6 +32,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -32,6 +32,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
void taosCloseTcpConnection(void *chandle); void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
SOCKET taosGetFdID(void *chandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -69,6 +69,13 @@ typedef struct { ...@@ -69,6 +69,13 @@ typedef struct {
struct SRpcConn *connList; // connection list struct SRpcConn *connList; // connection list
} SRpcInfo; } SRpcInfo;
typedef struct SSendInfo {
void *pContext;
void *pConn;
void *pFdObj;
SOCKET fd;
} SSendInfo;
typedef struct { typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo SRpcInfo *pRpc; // associated SRpcInfo
SRpcEpSet epSet; // ip list provided by app SRpcEpSet epSet; // ip list provided by app
...@@ -86,6 +93,7 @@ typedef struct { ...@@ -86,6 +93,7 @@ typedef struct {
SRpcMsg *pRsp; // for synchronous API SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API tsem_t *pSem; // for synchronous API
SRpcEpSet *pSet; // for synchronous API SRpcEpSet *pSet; // for synchronous API
SSendInfo sendInfo; // save last send information
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
...@@ -124,6 +132,7 @@ typedef struct SRpcConn { ...@@ -124,6 +132,7 @@ typedef struct SRpcConn {
int8_t connType; // connection type int8_t connType; // connection type
int64_t lockedBy; // lock for connection int64_t lockedBy; // lock for connection
SRpcReqContext *pContext; // request context SRpcReqContext *pContext; // request context
int64_t rid; // probe msg use rid get pContext
} SRpcConn; } SRpcConn;
int tsRpcMaxUdpSize = 15000; // bytes int tsRpcMaxUdpSize = 15000; // bytes
...@@ -193,10 +202,10 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); ...@@ -193,10 +202,10 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn); static void rpcSendReqHead(SRpcConn *pConn);
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
...@@ -385,7 +394,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -385,7 +394,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -415,7 +424,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64 ...@@ -415,7 +424,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64
pContext->rid = taosAddRef(tsRpcRefId, pContext); pContext->rid = taosAddRef(tsRpcRefId, pContext);
if (pRid) *pRid = pContext->rid; if (pRid) *pRid = pContext->rid;
rpcSendReqToServer(pRpc, pContext); return rpcSendReqToServer(pRpc, pContext);
} }
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
...@@ -980,6 +989,10 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -980,6 +989,10 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
return NULL; return NULL;
} }
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
return pConn;
}
rpcLockConn(pConn); rpcLockConn(pConn);
if (rpcIsReq(pHead->msgType)) { if (rpcIsReq(pHead->msgType)) {
...@@ -1076,6 +1089,58 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -1076,6 +1089,58 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} }
// process probe msg , return true is probe msg, false is not probe msg
static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
uint64_t ahandle = pHead->ahandle;
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
// response to
char msg[RPC_MSG_OVERHEAD];
SRpcHead *pRspHead;
// set msg header
memset(msg, 0, sizeof(SRpcHead));
pRspHead = (SRpcHead *)msg;
pRspHead->msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP;
pRspHead->version = 1;
pRspHead->ahandle = pHead->ahandle;
pRspHead->tranId = pHead->tranId;
pRspHead->code = 0;
pRspHead->linkUid = pHead->linkUid;
rpcLockConn(pConn);
pRspHead->sourceId = pConn->ownId;
pRspHead->destId = pConn->peerId;
memcpy(pRspHead->user, pHead->user, tListLen(pHead->user));
bool ret = rpcSendMsgToPeer(pConn, pRspHead, sizeof(SRpcHead));
tInfo("PROBE 0x%" PRIx64 " recv probe msg and do response. ret=%d", ahandle, ret);
rpcUnlockConn(pConn);
rpcFreeMsg(pRecv->msg);
} else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
if(pConn) {
rpcLockConn(pConn);
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, pConn->rid);
if (pContext) {
rpcProcessIncomingMsg(pConn, pHead, pContext);
taosReleaseRef(tsRpcRefId, pConn->rid);
} else {
tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle, pConn->rid);
rpcFreeMsg(pRecv->msg);
}
rpcUnlockConn(pConn);
} else {
tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", ahandle);
rpcFreeMsg(pRecv->msg);
}
}
}
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
...@@ -1095,6 +1160,12 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -1095,6 +1160,12 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcReqContext *pContext; SRpcReqContext *pContext;
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
// deal probe msg
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
rpcProcessProbeMsg(pRecv, pConn);
return pConn;
}
if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) {
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
...@@ -1147,7 +1218,10 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1147,7 +1218,10 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
} }
// free the request message // free the request message
if(pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN && pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN_RSP) {
taosRemoveRef(tsRpcRefId, pContext->rid); taosRemoveRef(tsRpcRefId, pContext->rid);
}
} }
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
...@@ -1185,6 +1259,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1185,6 +1259,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
// it's a response // it's a response
rpcMsg.handle = pContext; rpcMsg.handle = pContext;
rpcMsg.ahandle = pContext->ahandle; rpcMsg.ahandle = pContext->ahandle;
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
// probe msg
rpcNotifyClient(pContext, &rpcMsg);
return ;
}
// reset pConn NULL
pContext->pConn = NULL; pContext->pConn = NULL;
// for UDP, port may be changed by server, the port in epSet shall be used for cache // for UDP, port may be changed by server, the port in epSet shall be used for cache
...@@ -1302,7 +1384,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { ...@@ -1302,7 +1384,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
return; return;
} }
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead; char *msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen); int msgLen = rpcMsgLenFromCont(pContext->contLen);
...@@ -1313,7 +1395,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1313,7 +1395,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
if (pConn == NULL) { if (pConn == NULL) {
pContext->code = terrno; pContext->code = terrno;
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
return; return false;
} }
pContext->pConn = pConn; pContext->pConn = pConn;
...@@ -1341,17 +1423,26 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1341,17 +1423,26 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pConn->pReqMsg = msg; pConn->pReqMsg = msg;
pConn->reqMsgLen = msgLen; pConn->reqMsgLen = msgLen;
pConn->pContext = pContext; pConn->pContext = pContext;
if(pContext)
pConn->rid = pContext->rid;
rpcSendMsgToPeer(pConn, msg, msgLen); // save
pContext->sendInfo.pConn = pConn;
pContext->sendInfo.pFdObj = pConn->chandle;
pContext->sendInfo.fd = taosGetFdID(pConn->chandle);
bool ret = rpcSendMsgToPeer(pConn, msg, msgLen);
if (pConn->connType != RPC_CONN_TCPC) if (pConn->connType != RPC_CONN_TCPC)
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
return ret;
} }
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
int writtenLen = 0; int writtenLen = 0;
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
bool ret = true;
msgLen = rpcAddAuthPart(pConn, msg, msgLen); msgLen = rpcAddAuthPart(pConn, msg, msgLen);
...@@ -1371,9 +1462,11 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -1371,9 +1462,11 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
if (writtenLen != msgLen) { if (writtenLen != msgLen) {
tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
ret = false;
} }
tDump(msg, msgLen); tDump(msg, msgLen);
return ret;
} }
static void rpcProcessConnError(void *param, void *id) { static void rpcProcessConnError(void *param, void *id) {
...@@ -1684,3 +1777,103 @@ int32_t rpcUnusedSession(void * rpcInfo, bool bLock) { ...@@ -1684,3 +1777,103 @@ int32_t rpcUnusedSession(void * rpcInfo, bool bLock) {
return 0; return 0;
return taosIdPoolNumOfFree(info->idPool, bLock); return taosIdPoolNumOfFree(info->idPool, bLock);
} }
bool doRpcSendProbe(SRpcConn *pConn) {
char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead;
int code = 0;
// set msg header
memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg;
pHead->version = 1;
pHead->msgVer = htonl(tsVersion >> 8);
pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN;
pHead->spi = pConn->spi;
pHead->encrypt = 0;
pHead->tranId = (uint16_t)(taosRand() & 0xFFFF); // rand
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->linkUid = pConn->linkUid;
pHead->ahandle = (uint64_t)pConn->ahandle;
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code);
bool ret = rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead) + sizeof(int32_t));
return ret;
}
// send server syn
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
// return false can kill query
bool ret = false;
if(rpcRid < 0) {
tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return true;
}
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) {
tError("PROBE rpcRid=0x%" PRIx64 " get context NULL. sql finished no need send probe.", rpcRid);
return true;
}
// context same
if(pContext != pPrevContext) {
tError("PROBE rpcRid=0x%" PRIx64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext);
goto _END;
}
// conn same
if(pContext->pConn == NULL) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. ", rpcRid);
ret = true;
goto _END;
} else if (pContext->pConn != pContext->sendInfo.pConn) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn);
goto _END;
}
// fdObj same
if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj);
goto _END;
}
// fd same
SOCKET fd = taosGetFdID(pContext->pConn->chandle);
if (fd != pContext->sendInfo.fd) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd);
goto _END;
}
// send syn
ret = doRpcSendProbe(pContext->pConn);
_END:
// put back req context
taosReleaseRef(tsRpcRefId, rpcRid);
return ret;
}
// after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) {
if(rpcRid < 0) {
tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return false;
}
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) {
tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " get context NULL.", rpcRid);
return false;
}
if (ppContext)
*ppContext = pContext;
taosReleaseRef(tsRpcRefId, rpcRid);
return true;
}
\ No newline at end of file
...@@ -674,3 +674,12 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -674,3 +674,12 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj); tfree(pFdObj);
} }
SOCKET taosGetFdID(void *chandle) {
SFdObj * pFdObj = chandle;
if(pFdObj == NULL)
return -1;
if (pFdObj->signature != pFdObj)
return -1;
return pFdObj->fd;
}
\ No newline at end of file
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 131 #define TSDB_CFG_MAX_NUM 134
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册