提交 df934ab4 编写于 作者: A Alex Duan

feat(rpc): support probe alive for connection

上级 fe45e56c
...@@ -396,6 +396,10 @@ typedef struct SSqlObj { ...@@ -396,6 +396,10 @@ typedef struct SSqlObj {
int32_t retryReason; // previous error code int32_t retryReason; // previous error code
struct SSqlObj *prev, *next; struct SSqlObj *prev, *next;
int64_t self; int64_t self;
// connect alive
int64_t lastAlive;
void * pPrevContext;
} SSqlObj; } SSqlObj;
typedef struct SSqlStream { typedef struct SSqlStream {
......
...@@ -312,7 +312,11 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) { ...@@ -312,7 +312,11 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
return; return;
} }
assert(pSql->res.code != TSDB_CODE_SUCCESS); // probe send error , but result be responsed by server async
if(pSql->res.code == TSDB_CODE_SUCCESS) {
return ;
}
tscError("0x%"PRIx64" async result callback, code:%s", pSql->self, tstrerror(pSql->res.code)); tscError("0x%"PRIx64" async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
......
...@@ -281,6 +281,102 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -281,6 +281,102 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
} }
} }
// if return true, send probe connection msg to sever ok
bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) {
if(stime == 0) {
// not start , no need probe
tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self);
return true;
}
int64_t start = MAX(stime, pSql->lastAlive);
int32_t diff = (int32_t)(taosGetTimestampMs() - start);
if (diff < tsProbeSeconds * 1000) {
// exec time short , need not probe alive
tscInfo("PROBE 0x%" PRIx64 " not arrived probe time. cfg timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \
pSql->self, tsProbeSeconds, pSql->lastAlive, pSql->stime);
return true;
}
if (diff > tsProbeKillSeconds * 1000) {
// need kill query
tscInfo("PROBE 0x%" PRIx64 " kill query by probe. because arrived kill time. time=%ds cfg timeout=%ds lastAlive=%" PRId64 " stime=%" PRId64, \
pSql->self, diff/1000, tsProbeKillSeconds, pSql->lastAlive, pSql->stime);
return false;
}
if (pSql->pPrevContext == NULL) {
// last connect info save uncompletely, so can't probe
tscInfo("PROBE 0x%" PRIx64 " save last connect info uncompletely. prev context is null", pSql->self);
return true;
}
if(pSql->rpcRid == -1) {
// cancel or reponse ok from server, so need not probe
tscInfo("PROBE 0x%" PRIx64 " rpcRid is -1, response ok. no need probe.", pSql->self);
return true;
}
bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext);
tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid);
return ret;
}
// check have broken link queries than killed
void checkBrokenQueries(STscObj *pTscObj) {
tscDebug("PROBE checkBrokenQueries pTscObj=%p pTscObj->rid=0x%" PRIx64, pTscObj, pTscObj->rid);
SSqlObj *pSql = pTscObj->sqlList;
while (pSql) {
// avoid sqlobj may not be correctly removed from sql list
if (pSql->sqlstr == NULL) {
pSql = pSql->next;
continue;
}
bool kill = false;
int32_t numOfSub = pSql->subState.numOfSub;
tscInfo("PROBE 0x%" PRIx64 " start checking sql alive, numOfSub=%d sql=%s stime=%" PRId64 " alive=%" PRId64 " rpcRid=0x%" PRIx64 \
,pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr, pSql->stime, pSql->lastAlive, pSql->rpcRid);
if (numOfSub == 0) {
// no sub sql
if(!sendProbeConnMsg(pSql, pSql->stime)) {
// need kill
tscInfo("PROBE 0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
kill = true;
}
} else {
// lock subs
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs) {
// have sub sql
for (int i = 0; i < numOfSub; i++) {
SSqlObj *pSubSql = pSql->pSubs[i];
if(pSubSql) {
tscInfo("PROBE 0x%" PRIx64 " sub sql app is 0x%" PRIx64, pSql->self, pSubSql->self);
if(!sendProbeConnMsg(pSubSql, pSql->stime)) {
// need kill
tscInfo("PROBE 0x%" PRIx64 " i=%d sub app=0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, i, pSubSql->self, pSubSql->rpcRid);
kill = true;
break;
}
}
}
}
// unlock
pthread_mutex_unlock(&pSql->subState.mutex);
}
// kill query
if(kill) {
taos_stop_query(pSql);
}
// move next
pSql = pSql->next;
}
}
void tscProcessActivityTimer(void *handle, void *tmrId) { void tscProcessActivityTimer(void *handle, void *tmrId) {
int64_t rid = (int64_t) handle; int64_t rid = (int64_t) handle;
STscObj *pObj = taosAcquireRef(tscRefId, rid); STscObj *pObj = taosAcquireRef(tscRefId, rid);
...@@ -296,6 +392,18 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -296,6 +392,18 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
assert(pHB->self == pObj->hbrid); assert(pHB->self == pObj->hbrid);
// check queries already death
static int activetyCnt = 0;
if (++activetyCnt > tsProbeInterval) { // 1.5s * 40 = 60s interval call check queries alive
activetyCnt = 0;
// call check if have query doing
if(pObj->sqlList) {
// have queries executing
checkBrokenQueries(pObj);
}
}
pHB->retry = 0; pHB->retry = 0;
int32_t code = tscBuildAndSendRequest(pHB, NULL); int32_t code = tscBuildAndSendRequest(pHB, NULL);
taosReleaseRef(tscObjRef, pObj->hbrid); taosReleaseRef(tscObjRef, pObj->hbrid);
......
...@@ -227,6 +227,11 @@ extern int32_t debugFlag; ...@@ -227,6 +227,11 @@ extern int32_t debugFlag;
extern int8_t tsClientMerge; extern int8_t tsClientMerge;
// probe alive connection
extern int32_t tsProbeSeconds;
extern int32_t tsProbeKillSeconds;
extern int32_t tsProbeInterval;
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy // lossy
extern char lossyColumns[]; extern char lossyColumns[];
......
...@@ -277,6 +277,11 @@ int32_t fsDebugFlag = 135; ...@@ -277,6 +277,11 @@ int32_t fsDebugFlag = 135;
int8_t tsClientMerge = 0; int8_t tsClientMerge = 0;
// probe alive connection
int32_t tsProbeSeconds = 5 * 60; // start probe link alive after tsProbeSeconds from starting query
int32_t tsProbeKillSeconds = 10 * 60; // start kill query after tsProbeKillSeconds from last alive time
int32_t tsProbeInterval = 40; // 40 * 1.5s = 60 s interval time
#ifdef TD_TSZ #ifdef TD_TSZ
// //
// lossy compress 6 // lossy compress 6
...@@ -1782,6 +1787,39 @@ static void doInitGlobalConfig(void) { ...@@ -1782,6 +1787,39 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_MB; cfg.unitType = TAOS_CFG_UTYPE_MB;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
// probeSeconds
cfg.option = "probeSeconds";
cfg.ptr = &tsProbeSeconds;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 100000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// probeKillSeconds
cfg.option = "probeKillSeconds";
cfg.ptr = &tsProbeKillSeconds;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 100000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// probeInterval
cfg.option = "probeInterval";
cfg.ptr = &tsProbeInterval;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 100000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -29,6 +29,9 @@ void * dnodeAllocVFetchQueue(void *pVnode); ...@@ -29,6 +29,9 @@ void * dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue); void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue); void dnodeFreeVFetchQueue(void *pFqueue);
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -77,6 +77,7 @@ int32_t dnodeInitShell() { ...@@ -77,6 +77,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
......
...@@ -152,3 +152,13 @@ static void *dnodeProcessReadQueue(void *wparam) { ...@@ -152,3 +152,13 @@ static void *dnodeProcessReadQueue(void *wparam) {
return NULL; return NULL;
} }
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg) {
// check probe conn msg
if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP};
rpcSendResponse(&rpcRsp);
return ;
}
}
\ No newline at end of file
...@@ -120,6 +120,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) ...@@ -120,6 +120,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
// syn -> ack probe connection msg
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_PROBE_CONN, "probe-connection-alive" )
#ifndef TAOS_MESSAGE_C #ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105 TSDB_MSG_TYPE_MAX // 105
#endif #endif
......
...@@ -85,13 +85,17 @@ void rpcClose(void *); ...@@ -85,13 +85,17 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
// send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext);
// after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -32,6 +32,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -32,6 +32,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
void taosCloseTcpConnection(void *chandle); void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
SOCKET taosGetFdID(void *chandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -69,6 +69,13 @@ typedef struct { ...@@ -69,6 +69,13 @@ typedef struct {
struct SRpcConn *connList; // connection list struct SRpcConn *connList; // connection list
} SRpcInfo; } SRpcInfo;
typedef struct SSendInfo {
void *pContext;
void *pConn;
void *pFdObj;
SOCKET fd;
} SSendInfo;
typedef struct { typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo SRpcInfo *pRpc; // associated SRpcInfo
SRpcEpSet epSet; // ip list provided by app SRpcEpSet epSet; // ip list provided by app
......
...@@ -674,3 +674,12 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -674,3 +674,12 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj); tfree(pFdObj);
} }
SOCKET taosGetFdID(void *chandle) {
SFdObj * pFdObj = chandle;
if(pFdObj == NULL)
return -1;
if (pFdObj->signature != pFdObj)
return -1;
return pFdObj->fd;
}
\ No newline at end of file
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 135 #define TSDB_CFG_MAX_NUM 138
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册