提交 96257226 编写于 作者: A Alex Duan

fix(rpc): add global config and broken link method

上级 f3134e5a
...@@ -283,16 +283,34 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -283,16 +283,34 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
// pSql connection link is broken // pSql connection link is broken
bool dealConnBroken(SSqlObj * pSql) { bool dealConnBroken(SSqlObj * pSql) {
// TODO // check valid
if (pSql == NULL || pSql->signature != pSql) {
return false;
}
if (pSql->cmd.command >= TSDB_SQL_LOCAL) {
return false;
}
// cancel
if (pSql->rpcRid > 0) {
tscDebug("PROBE 0x%" PRIx64 " rpc cancel request rpcRid=0x%" PRIx64 ".", pSql->self, pSql->rpcRid);
rpcCancelRequest(pSql->rpcRid);
pSql->rpcRid = -1;
}
// error
tscDebug("PROBE 0x%"PRIx64" async result error.", pSql->self);
tscAsyncResultOnError(pSql);
return true; return true;
} }
// if return true, send probe connection msg to sever ok // if return true, send probe connection msg to sever ok
bool sendProbeConnMsg(SSqlObj* pSql) { bool sendProbeConnMsg(SSqlObj* pSql) {
// check time out // TEST TODO DELETE
int32_t probeTimeout = 1*1000; // over this value send probe msg tsProbeSeconds = 1; // over this value send probe msg
int32_t killTimeout = 3*60*1000; // over this value query can be killed tsProbeKillSeconds = 3*60; // over this value query can be killed
if(pSql->stime == 0) { if(pSql->stime == 0) {
// not start , no need probe // not start , no need probe
return true; return true;
...@@ -300,12 +318,12 @@ bool sendProbeConnMsg(SSqlObj* pSql) { ...@@ -300,12 +318,12 @@ bool sendProbeConnMsg(SSqlObj* pSql) {
int64_t stime = MAX(pSql->stime, pSql->lastAlive); int64_t stime = MAX(pSql->stime, pSql->lastAlive);
int32_t diff = (int32_t)(taosGetTimestampMs() - stime); int32_t diff = (int32_t)(taosGetTimestampMs() - stime);
if (diff < probeTimeout) { if (diff < tsProbeSeconds * 1000) {
// exec time short , need not probe alive // exec time short , need not probe alive
return true; return true;
} }
if (diff > killTimeout) { if (diff > tsProbeKillSeconds * 1000) {
// need kill query // need kill query
tscDebug("PROBE 0x%"PRIx64" need killed, noAckCnt:%d diff=%d", pSql->self, pSql->noAckCnt, diff); tscDebug("PROBE 0x%"PRIx64" need killed, noAckCnt:%d diff=%d", pSql->self, pSql->noAckCnt, diff);
//return false; //return false;
...@@ -529,7 +547,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -529,7 +547,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
pSql->noAckCnt = 0; pSql->noAckCnt = 0;
pSql->lastAlive = taosGetTimestampMs(); pSql->lastAlive = taosGetTimestampMs();
tscInfo(" recv sql probe msg. sql=%s", pSql->sqlstr); tscDebug(" PROBE %p recv probe msg. sql=%s", pSql->self, pSql->sqlstr);
return ; return ;
} }
......
...@@ -218,6 +218,10 @@ extern int32_t debugFlag; ...@@ -218,6 +218,10 @@ extern int32_t debugFlag;
extern int8_t tsClientMerge; extern int8_t tsClientMerge;
// probe alive connection
extern int32_t tsProbeSeconds;
extern int32_t tsProbeKillSeconds;
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy // lossy
extern char lossyColumns[]; extern char lossyColumns[];
......
...@@ -268,6 +268,11 @@ int32_t fsDebugFlag = 135; ...@@ -268,6 +268,11 @@ int32_t fsDebugFlag = 135;
int8_t tsClientMerge = 0; int8_t tsClientMerge = 0;
// probe alive connection
int32_t tsProbeSeconds = 10 * 60; // start probe link alive after tsProbeSeconds from starting query
int32_t tsProbeKillSeconds = 30 * 60; // start kill query after tsProbeKillSeconds from starting query
#ifdef TD_TSZ #ifdef TD_TSZ
// //
// lossy compress 6 // lossy compress 6
...@@ -1733,6 +1738,28 @@ static void doInitGlobalConfig(void) { ...@@ -1733,6 +1738,28 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
// probeSeconds
cfg.option = "probeSeconds";
cfg.ptr = &tsProbeSeconds;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// probeKillSeconds
cfg.option = "probeKillSeconds";
cfg.ptr = &tsProbeKillSeconds;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -28,8 +28,6 @@ void * dnodeAllocVQueryQueue(void *pVnode); ...@@ -28,8 +28,6 @@ void * dnodeAllocVQueryQueue(void *pVnode);
void * dnodeAllocVFetchQueue(void *pVnode); void * dnodeAllocVFetchQueue(void *pVnode);
void dnodeFreeVQueryQueue(void *pQqueue); void dnodeFreeVQueryQueue(void *pQqueue);
void dnodeFreeVFetchQueue(void *pFqueue); void dnodeFreeVFetchQueue(void *pFqueue);
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg);
......
...@@ -78,8 +78,6 @@ int32_t dnodeInitShell() { ...@@ -78,8 +78,6 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
......
...@@ -155,14 +155,4 @@ static void *dnodeProcessReadQueue(void *wparam) { ...@@ -155,14 +155,4 @@ static void *dnodeProcessReadQueue(void *wparam) {
} }
return NULL; return NULL;
} }
\ No newline at end of file
// reponse probe connection msg
void dnodeResponseProbeMsg(SRpcMsg *pMsg) {
// check probe conn msg
if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP};
rpcSendResponse(&rpcRsp);
return ;
}
}
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 131 #define TSDB_CFG_MAX_NUM 133
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册