提交 930ffd3e 编写于 作者: dengyihao's avatar dengyihao

refactor rpc

上级 bcd6df15
...@@ -26,6 +26,7 @@ typedef struct SCliConn { ...@@ -26,6 +26,7 @@ typedef struct SCliConn {
queue conn; queue conn;
char spi; char spi;
char secured; char secured;
uint64_t expireTime;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
...@@ -39,10 +40,13 @@ typedef struct SCliThrdObj { ...@@ -39,10 +40,13 @@ typedef struct SCliThrdObj {
pthread_t thread; pthread_t thread;
uv_loop_t* loop; uv_loop_t* loop;
uv_async_t* cliAsync; // uv_async_t* cliAsync; //
void* cache; // conn pool uv_timer_t* pTimer;
void* cache; // conn pool
queue msg; queue msg;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
void* shandle; uint64_t nextTimeout; // next timeout
void* shandle; //
} SCliThrdObj; } SCliThrdObj;
typedef struct SClientObj { typedef struct SClientObj {
...@@ -63,6 +67,8 @@ static void* connCacheDestroy(void* cache); ...@@ -63,6 +67,8 @@ static void* connCacheDestroy(void* cache);
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port);
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn);
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
// process data read from server, auth/decompress etc // process data read from server, auth/decompress etc
static void clientProcessData(SCliConn* conn); static void clientProcessData(SCliConn* conn);
// check whether already read complete packet from server // check whether already read complete packet from server
...@@ -84,21 +90,55 @@ static void clientMsgDestroy(SCliMsg* pMsg); ...@@ -84,21 +90,55 @@ static void clientMsgDestroy(SCliMsg* pMsg);
static void* clientThread(void* arg); static void* clientThread(void* arg);
static void clientProcessData(SCliConn* conn) { static void clientProcessData(SCliConn* conn) {
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = pCtx->ahandle;
SRpcMsg rpcMsg;
rpcMsg.pCont = conn->readBuf.buf;
rpcMsg.contLen = conn->readBuf.len;
rpcMsg.ahandle = pCtx->ahandle;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
// impl // impl
} }
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->shandle;
int64_t currentTime = pThrd->nextTimeout;
SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL);
while (p != NULL) {
while (!QUEUE_IS_EMPTY(&p->conn)) {
queue* h = QUEUE_HEAD(&p->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
if (c->expireTime < currentTime) {
QUEUE_REMOVE(h);
clientConnDestroy(c);
} else {
break;
}
}
p = taosHashIterate((SHashObj*)pThrd->cache, p);
}
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0);
}
static void* connCacheCreate(int size) { static void* connCacheCreate(int size) {
SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
return false; return false;
} }
static void* connCacheDestroy(void* cache) { static void* connCacheDestroy(void* cache) {
SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); SConnList* connList = taosHashIterate((SHashObj*)cache, NULL);
while (!QUEUE_IS_EMPTY(&connList->conn)) { while (connList != NULL) {
queue* h = QUEUE_HEAD(&connList->conn); while (!QUEUE_IS_EMPTY(&connList->conn)) {
QUEUE_REMOVE(h); queue* h = QUEUE_HEAD(&connList->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); QUEUE_REMOVE(h);
clientConnDestroy(c); SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
clientConnDestroy(c);
}
connList = taosHashIterate((SHashObj*)cache, connList);
} }
taosHashClear(cache); taosHashClear(cache);
} }
...@@ -129,8 +169,10 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) ...@@ -129,8 +169,10 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn)
tstrncpy(key, ip, strlen(ip)); tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
SHashObj* pCache = cache; STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx;
SConnList* plist = taosHashGet(pCache, key, strlen(key)); SRpcInfo* pRpc = ctx->pRpc;
conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key));
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
...@@ -206,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) { ...@@ -206,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) {
} }
static void clientDestroy(uv_handle_t* handle) { static void clientDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
QUEUE_REMOVE(&conn->conn);
clientConnDestroy(conn); clientConnDestroy(conn);
} }
...@@ -279,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -279,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq = malloc(sizeof(uv_write_t));
QUEUE_INIT(&conn->conn);
conn->connReq.data = conn; conn->connReq.data = conn;
conn->data = pMsg; conn->data = pMsg;
...@@ -315,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) { ...@@ -315,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) {
static void* clientThread(void* arg) { static void* clientThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg; SCliThrdObj* pThrd = (SCliThrdObj*)arg;
SRpcInfo* pRpc = pThrd->shandle;
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0);
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
...@@ -336,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -336,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
pThrd->cliAsync->data = pThrd; pThrd->cliAsync->data = pThrd;
pThrd->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->pTimer);
pThrd->shandle = shandle; pThrd->shandle = shandle;
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
if (err == 0) { if (err == 0) {
tDebug("sucess to create tranport-client thread %d", i); tDebug("sucess to create tranport-client thread %d", i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册