From ecd560ccb0ae7ecd8d93654a90ad6427c0994c0e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 6 Aug 2022 15:30:11 +0800 Subject: [PATCH] start timer for particular msg --- include/libs/transport/trpc.h | 21 +++++--- include/util/taoserror.h | 1 + source/client/src/clientEnv.c | 17 +++++-- source/libs/transport/inc/transComm.h | 5 +- source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 65 ++++++++++++++---------- source/os/src/osSocket.c | 6 +-- source/util/src/terror.c | 1 + 9 files changed, 73 insertions(+), 45 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 50f9959177..467f0a9ff0 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -41,12 +41,13 @@ typedef struct { typedef struct SRpcHandleInfo { // rpc info - void *handle; // rpc handle returned to app - int64_t refId; // refid, used by server - int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); - int32_t persistHandle; // persist handle or not + void *handle; // rpc handle returned to app + int64_t refId; // refid, used by server + int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp) + int8_t persistHandle; // persist handle or not + int8_t hasEpSet; + STraceId traceId; - int8_t hasEpSet; // app info void *ahandle; // app handle set by client @@ -69,8 +70,9 @@ typedef struct SRpcMsg { SRpcHandleInfo info; } SRpcMsg; -typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); +typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); +typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); typedef struct SRpcInit { char localFqdn[TSDB_FQDN_LEN]; @@ -84,12 +86,15 @@ typedef struct SRpcInit { // the following is for client app ecurity only char *user; // user name - // call back to process incoming msg, code shall be ignored by server app + // call back to process incoming msg RpcCfp cfp; - // user defined retry func + // retry not not for particular msg RpcRfp rfp; + // set up timeout for particular msg + RpcTfp tfp; + void *parent; } SRpcInit; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 97a664d776..27fb057b44 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -46,6 +46,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) #define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) #define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018) +#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) //common & util #define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index d88a587f6a..ae80868167 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -60,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { } static void deregisterRequest(SRequestObj *pRequest) { - const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable + const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable assert(pRequest != NULL); STscObj *pTscObj = pRequest->pTscObj; @@ -77,13 +77,13 @@ static void deregisterRequest(SRequestObj *pRequest) { if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { - atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); + atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); } - + if (duration >= SLOW_QUERY_INTERVAL) { atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); } - + releaseTscObj(pTscObj->id); } @@ -109,6 +109,12 @@ static bool clientRpcRfp(int32_t code, tmsg_t msgType) { } } +// start timer for particular msgType +static bool clientRpcTfp(int32_t code, tmsg_t msgType) { + // + return false; +} + // TODO refactor void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { SRpcInit rpcInit; @@ -118,6 +124,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.numOfThreads = numOfThread; rpcInit.cfp = processMsgFromServer; rpcInit.rfp = clientRpcRfp; + rpcInit.tfp = clientRpcTfp; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)user; @@ -375,7 +382,7 @@ void taos_init_imp(void) { initQueryModuleMsgHandle(); taosConvInit(); - + rpcInit(); SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a81d6db80f..44075f316c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -95,8 +95,9 @@ typedef void* queue[2]; #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -#define TRANS_RETRY_INTERVAL 15 // ms retry interval -#define TRANS_CONN_TIMEOUT 3 // connect timeout +#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) +#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) +#define TRANS_READ_TIMEOUT 200 // read timeout (ms) typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 5fb2980ceb..6aeeffa192 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -53,6 +53,7 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); + bool (*startTimer)(int32_t code, tmsg_t msgType); int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 7633820292..0a0dcef378 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -48,6 +48,7 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; + pRpc->startTimer = pInit->tfp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 431e479123..6458fc99b7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -24,6 +24,7 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; queue wreqQueue; + uv_timer_t* timer; void* hostThrd; @@ -108,6 +109,8 @@ static int sockDebugInfo(struct sockaddr* sockname, char* dst) { sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); return r; } +// register timer for read +static void cliReadTimeoutCb(uv_timer_t* handle); // register timer in each thread to clear expire conn // static void cliTimeoutCb(uv_timer_t* handle); // alloc buf for recv @@ -330,6 +333,11 @@ void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; + if (uv_is_active((uv_handle_t*)conn->timer)) { + tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn); + uv_timer_stop(conn->timer); + } + STransMsgHead* pHead = NULL; transDumpFromBuffer(&conn->readBuf, (char**)&pHead); pHead->code = htonl(pHead->code); @@ -409,7 +417,7 @@ void cliHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } -void cliHandleExcept(SCliConn* pConn) { +void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (transQueueEmpty(&pConn->cliMsgs)) { if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); @@ -428,7 +436,7 @@ void cliHandleExcept(SCliConn* pConn) { STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransMsg transMsg = {0}; - transMsg.code = pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL; + transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.info.ahandle = NULL; @@ -459,31 +467,17 @@ void cliHandleExcept(SCliConn* pConn) { } while (!transQueueEmpty(&pConn->cliMsgs)); transUnrefCliHandle(pConn); } +void cliHandleExcept(SCliConn* conn) { + tTrace("%s conn except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); + cliHandleExceptImpl(conn, -1); +} -// void cliTimeoutCb(uv_timer_t* handle) { -// SCliThrd* pThrd = handle->data; -// STrans* pTransInst = pThrd->pTransInst; -// int64_t currentTime = pThrd->nextTimeout; -// tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); -// -// SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); -// while (p != NULL) { -// while (!QUEUE_IS_EMPTY(&p->conn)) { -// queue* h = QUEUE_HEAD(&p->conn); -// SCliConn* c = QUEUE_DATA(h, SCliConn, q); -// if (c->expireTime < currentTime) { -// QUEUE_REMOVE(h); -// transUnrefCliHandle(c); -// } else { -// break; -// } -// } -// p = taosHashIterate((SHashObj*)pThrd->pool, p); -// } -// -// pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); -// uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0); -// } +void cliReadTimeoutCb(uv_timer_t* handle) { + // set up timeout cb + SCliConn* conn = handle->data; + tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); + cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT); +} void* createConnPool(int size) { // thread local, no lock @@ -638,6 +632,11 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->connReq.data = conn; + // set read timeout + conn->timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, conn->timer); + conn->timer->data = conn; + transReqQueueInit(&conn->wreqQueue); transQueueInit(&conn->cliMsgs, NULL); @@ -660,7 +659,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; - if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { @@ -675,6 +673,15 @@ static void cliDestroy(uv_handle_t* handle) { } SCliConn* conn = handle->data; + if (conn->timer != NULL) { + uv_timer_stop(conn->timer); + taosMemoryFree(conn->timer); + conn->timer = NULL; + } + if (conn->task != NULL) { + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + conn->task = NULL; + } transRemoveExHandle(transGetRefMgt(), conn->refId); taosMemoryFree(conn->ip); conn->stream->data = NULL; @@ -764,6 +771,10 @@ void cliSend(SCliConn* pConn) { CONN_SET_PERSIST_BY_APP(pConn); } + if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) { + tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType)); + uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); + } uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); return; diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 98dfaa4972..f34032056c 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -333,7 +333,7 @@ int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) { return -1; } int32_t nleft, nwritten; - char * ptr = (char *)buf; + char *ptr = (char *)buf; nleft = nbytes; @@ -362,7 +362,7 @@ int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) { return -1; } int32_t nleft, nread; - char * ptr = (char *)buf; + char *ptr = (char *)buf; nleft = nbytes; @@ -912,7 +912,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) { int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result); if (result) { - struct sockaddr * sa = result->ai_addr; + struct sockaddr *sa = result->ai_addr; struct sockaddr_in *si = (struct sockaddr_in *)sa; struct in_addr ia = si->sin_addr; uint32_t ip = ia.s_addr; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e4b6983dcb..4780d85a30 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish c TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") -- GitLab