提交 ecd560cc 编写于 作者: dengyihao's avatar dengyihao

start timer for particular msg

上级 67e2f901
...@@ -41,12 +41,13 @@ typedef struct { ...@@ -41,12 +41,13 @@ typedef struct {
typedef struct SRpcHandleInfo { typedef struct SRpcHandleInfo {
// rpc info // rpc info
void *handle; // rpc handle returned to app void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp)
int32_t persistHandle; // persist handle or not int8_t persistHandle; // persist handle or not
int8_t hasEpSet;
STraceId traceId; STraceId traceId;
int8_t hasEpSet;
// app info // app info
void *ahandle; // app handle set by client void *ahandle; // app handle set by client
...@@ -69,8 +70,9 @@ typedef struct SRpcMsg { ...@@ -69,8 +70,9 @@ typedef struct SRpcMsg {
SRpcHandleInfo info; SRpcHandleInfo info;
} SRpcMsg; } SRpcMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef struct SRpcInit { typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN]; char localFqdn[TSDB_FQDN_LEN];
...@@ -84,12 +86,15 @@ typedef struct SRpcInit { ...@@ -84,12 +86,15 @@ typedef struct SRpcInit {
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg
RpcCfp cfp; RpcCfp cfp;
// user defined retry func // retry not not for particular msg
RpcRfp rfp; RpcRfp rfp;
// set up timeout for particular msg
RpcTfp tfp;
void *parent; void *parent;
} SRpcInit; } SRpcInit;
......
...@@ -46,6 +46,7 @@ int32_t* taosGetErrno(); ...@@ -46,6 +46,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) #define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) #define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018) #define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018)
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019)
//common & util //common & util
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013) #define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013)
......
...@@ -60,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { ...@@ -60,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
} }
static void deregisterRequest(SRequestObj *pRequest) { static void deregisterRequest(SRequestObj *pRequest) {
const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable
assert(pRequest != NULL); assert(pRequest != NULL);
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
...@@ -77,13 +77,13 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -77,13 +77,13 @@ static void deregisterRequest(SRequestObj *pRequest) {
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
} }
if (duration >= SLOW_QUERY_INTERVAL) { if (duration >= SLOW_QUERY_INTERVAL) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
} }
releaseTscObj(pTscObj->id); releaseTscObj(pTscObj->id);
} }
...@@ -109,6 +109,12 @@ static bool clientRpcRfp(int32_t code, tmsg_t msgType) { ...@@ -109,6 +109,12 @@ static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
} }
} }
// start timer for particular msgType
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
//
return false;
}
// TODO refactor // TODO refactor
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit; SRpcInit rpcInit;
...@@ -118,6 +124,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -118,6 +124,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.numOfThreads = numOfThread; rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer; rpcInit.cfp = processMsgFromServer;
rpcInit.rfp = clientRpcRfp; rpcInit.rfp = clientRpcRfp;
rpcInit.tfp = clientRpcTfp;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user; rpcInit.user = (char *)user;
...@@ -375,7 +382,7 @@ void taos_init_imp(void) { ...@@ -375,7 +382,7 @@ void taos_init_imp(void) {
initQueryModuleMsgHandle(); initQueryModuleMsgHandle();
taosConvInit(); taosConvInit();
rpcInit(); rpcInit();
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
......
...@@ -95,8 +95,9 @@ typedef void* queue[2]; ...@@ -95,8 +95,9 @@ typedef void* queue[2];
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // ms retry interval #define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout #define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_READ_TIMEOUT 200 // read timeout (ms)
typedef SRpcMsg STransMsg; typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx; typedef SRpcCtx STransCtx;
......
...@@ -53,6 +53,7 @@ typedef struct { ...@@ -53,6 +53,7 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType); bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType);
int index; int index;
void* parent; void* parent;
......
...@@ -48,6 +48,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -48,6 +48,7 @@ void* rpcOpen(const SRpcInit* pInit) {
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp; pRpc->retry = pInit->rfp;
pRpc->startTimer = pInit->tfp;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
......
...@@ -24,6 +24,7 @@ typedef struct SCliConn { ...@@ -24,6 +24,7 @@ typedef struct SCliConn {
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
queue wreqQueue; queue wreqQueue;
uv_timer_t* timer;
void* hostThrd; void* hostThrd;
...@@ -108,6 +109,8 @@ static int sockDebugInfo(struct sockaddr* sockname, char* dst) { ...@@ -108,6 +109,8 @@ static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r; return r;
} }
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
// static void cliTimeoutCb(uv_timer_t* handle); // static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv // alloc buf for recv
...@@ -330,6 +333,11 @@ void cliHandleResp(SCliConn* conn) { ...@@ -330,6 +333,11 @@ void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
if (uv_is_active((uv_handle_t*)conn->timer)) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
uv_timer_stop(conn->timer);
}
STransMsgHead* pHead = NULL; STransMsgHead* pHead = NULL;
transDumpFromBuffer(&conn->readBuf, (char**)&pHead); transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
...@@ -409,7 +417,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -409,7 +417,7 @@ void cliHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
} }
void cliHandleExcept(SCliConn* pConn) { void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if (transQueueEmpty(&pConn->cliMsgs)) { if (transQueueEmpty(&pConn->cliMsgs)) {
if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
...@@ -428,7 +436,7 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -428,7 +436,7 @@ void cliHandleExcept(SCliConn* pConn) {
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg transMsg = {0}; STransMsg transMsg = {0};
transMsg.code = pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.info.ahandle = NULL; transMsg.info.ahandle = NULL;
...@@ -459,31 +467,17 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -459,31 +467,17 @@ void cliHandleExcept(SCliConn* pConn) {
} while (!transQueueEmpty(&pConn->cliMsgs)); } while (!transQueueEmpty(&pConn->cliMsgs));
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
void cliHandleExcept(SCliConn* conn) {
tTrace("%s conn except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
cliHandleExceptImpl(conn, -1);
}
// void cliTimeoutCb(uv_timer_t* handle) { void cliReadTimeoutCb(uv_timer_t* handle) {
// SCliThrd* pThrd = handle->data; // set up timeout cb
// STrans* pTransInst = pThrd->pTransInst; SCliConn* conn = handle->data;
// int64_t currentTime = pThrd->nextTimeout; tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
// tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
// }
// SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
// while (p != NULL) {
// while (!QUEUE_IS_EMPTY(&p->conn)) {
// queue* h = QUEUE_HEAD(&p->conn);
// SCliConn* c = QUEUE_DATA(h, SCliConn, q);
// if (c->expireTime < currentTime) {
// QUEUE_REMOVE(h);
// transUnrefCliHandle(c);
// } else {
// break;
// }
// }
// p = taosHashIterate((SHashObj*)pThrd->pool, p);
// }
//
// pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
// uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
// }
void* createConnPool(int size) { void* createConnPool(int size) {
// thread local, no lock // thread local, no lock
...@@ -638,6 +632,11 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -638,6 +632,11 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
conn->connReq.data = conn; conn->connReq.data = conn;
// set read timeout
conn->timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, conn->timer);
conn->timer->data = conn;
transReqQueueInit(&conn->wreqQueue); transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL); transQueueInit(&conn->cliMsgs, NULL);
...@@ -660,7 +659,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { ...@@ -660,7 +659,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; conn->refId = -1;
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
if (clear) { if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) { if (!uv_is_closing((uv_handle_t*)conn->stream)) {
...@@ -675,6 +673,15 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -675,6 +673,15 @@ static void cliDestroy(uv_handle_t* handle) {
} }
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosMemoryFree(conn->timer);
conn->timer = NULL;
}
if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
}
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip); taosMemoryFree(conn->ip);
conn->stream->data = NULL; conn->stream->data = NULL;
...@@ -764,6 +771,10 @@ void cliSend(SCliConn* pConn) { ...@@ -764,6 +771,10 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
} }
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
}
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
......
...@@ -333,7 +333,7 @@ int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) { ...@@ -333,7 +333,7 @@ int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
return -1; return -1;
} }
int32_t nleft, nwritten; int32_t nleft, nwritten;
char * ptr = (char *)buf; char *ptr = (char *)buf;
nleft = nbytes; nleft = nbytes;
...@@ -362,7 +362,7 @@ int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) { ...@@ -362,7 +362,7 @@ int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
return -1; return -1;
} }
int32_t nleft, nread; int32_t nleft, nread;
char * ptr = (char *)buf; char *ptr = (char *)buf;
nleft = nbytes; nleft = nbytes;
...@@ -912,7 +912,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) { ...@@ -912,7 +912,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result); int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) { if (result) {
struct sockaddr * sa = result->ai_addr; struct sockaddr *sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in *)sa; struct sockaddr_in *si = (struct sockaddr_in *)sa;
struct in_addr ia = si->sin_addr; struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr; uint32_t ip = ia.s_addr;
......
...@@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish c ...@@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish c
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册