提交 a5420dfa 编写于 作者: dengyihao's avatar dengyihao

fix rpc perf

上级 46c2dfa4
...@@ -16,7 +16,8 @@ ...@@ -16,7 +16,8 @@
#include "transComm.h" #include "transComm.h"
typedef struct SConnList { typedef struct SConnList {
queue conn; queue conn;
int32_t size;
} SConnList; } SConnList;
typedef struct SCliConn { typedef struct SCliConn {
...@@ -518,15 +519,18 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -518,15 +519,18 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
if (QUEUE_IS_EMPTY(&plist->conn)) { if (QUEUE_IS_EMPTY(&plist->conn)) {
return NULL; return NULL;
} }
plist->size -= 1;
queue* h = QUEUE_HEAD(&plist->conn); queue* h = QUEUE_HEAD(&plist->conn);
SCliConn* conn = QUEUE_DATA(h, SCliConn, q); SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal; conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q); QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); if (conn->task != NULL) {
conn->task = NULL; transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
}
return conn; return conn;
} }
static void addConnToPool(void* pool, SCliConn* conn) { static void addConnToPool(void* pool, SCliConn* conn) {
...@@ -555,13 +559,17 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -555,13 +559,17 @@ static void addConnToPool(void* pool, SCliConn* conn) {
assert(conn->list != NULL); assert(conn->list != NULL);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
QUEUE_PUSH(&conn->list->conn, &conn->q); QUEUE_PUSH(&conn->list->conn, &conn->q);
conn->list->size += 1;
conn->task = NULL;
assert(!QUEUE_IS_EMPTY(&conn->list->conn)); assert(!QUEUE_IS_EMPTY(&conn->list->conn));
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); if (conn->list->size >= 10) {
arg->param1 = conn; STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param2 = thrd; arg->param1 = conn;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
} }
static int32_t allocConnRef(SCliConn* conn, bool update) { static int32_t allocConnRef(SCliConn* conn, bool update) {
if (update) { if (update) {
......
...@@ -75,7 +75,6 @@ typedef struct SWorkThrd { ...@@ -75,7 +75,6 @@ typedef struct SWorkThrd {
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
uv_prepare_t* prepare; uv_prepare_t* prepare;
queue msg; queue msg;
TdThreadMutex msgMtx;
queue conn; queue conn;
void* pTransInst; void* pTransInst;
...@@ -499,6 +498,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -499,6 +498,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError("unexcept occurred, continue"); tError("unexcept occurred, continue");
continue; continue;
} }
// release handle to rpc init // release handle to rpc init
if (msg->type == Quit) { if (msg->type == Quit) {
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
...@@ -743,7 +743,6 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { ...@@ -743,7 +743,6 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
pThrd->pipe->data = pThrd; pThrd->pipe->data = pThrd;
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
taosThreadMutexInit(&pThrd->msgMtx, NULL);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare); uv_prepare_init(pThrd->loop, pThrd->prepare);
......
...@@ -75,15 +75,14 @@ void processShellMsg() { ...@@ -75,15 +75,14 @@ void processShellMsg() {
void *handle = pRpcMsg->info.handle; void *handle = pRpcMsg->info.handle;
taosFreeQitem(pRpcMsg); taosFreeQitem(pRpcMsg);
//{
{ // SRpcMsg nRpcMsg = {0};
SRpcMsg nRpcMsg = {0}; // nRpcMsg.pCont = rpcMallocCont(msgSize);
nRpcMsg.pCont = rpcMallocCont(msgSize); // nRpcMsg.contLen = msgSize;
nRpcMsg.contLen = msgSize; // nRpcMsg.info.handle = handle;
nRpcMsg.info.handle = handle; // nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
nRpcMsg.code = TSDB_CODE_CTG_NOT_READY; // rpcSendResponse(&nRpcMsg);
rpcSendResponse(&nRpcMsg); //}
}
} }
taosUpdateItemSize(qinfo.queue, numOfMsgs); taosUpdateItemSize(qinfo.queue, numOfMsgs);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册