提交 a8bb8373 编写于 作者: dengyihao's avatar dengyihao

avoid rpc send-recv cpu imbalance

上级 962ed386
...@@ -1181,7 +1181,7 @@ static SCliThrd* createThrdObj(void* trans) { ...@@ -1181,7 +1181,7 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare); uv_prepare_init(pThrd->loop, pThrd->prepare);
...@@ -1253,11 +1253,14 @@ void cliWalkCb(uv_handle_t* handle, void* arg) { ...@@ -1253,11 +1253,14 @@ void cliWalkCb(uv_handle_t* handle, void* arg) {
} }
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) { FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
int8_t index = pTransInst->index; int32_t index = pTransInst->index;
if (pTransInst->numOfThreads == 0) { if (pTransInst->numOfThreads == 0) {
return -1; return -1;
} }
if (pTransInst->index++ >= pTransInst->numOfThreads) { /*
* no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000;
*/
if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) {
pTransInst->index = 0; pTransInst->index = 0;
} }
return index % pTransInst->numOfThreads; return index % pTransInst->numOfThreads;
...@@ -1271,7 +1274,7 @@ static FORCE_INLINE void doDelayTask(void* param) { ...@@ -1271,7 +1274,7 @@ static FORCE_INLINE void doDelayTask(void* param) {
static void doCloseIdleConn(void* param) { static void doCloseIdleConn(void* param) {
STaskArg* arg = param; STaskArg* arg = param;
SCliConn* conn = arg->param1; SCliConn* conn = arg->param1;
tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
conn->task = NULL; conn->task = NULL;
cliDestroyConn(conn, true); cliDestroyConn(conn, true);
taosMemoryFree(arg); taosMemoryFree(arg);
......
...@@ -252,7 +252,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { ...@@ -252,7 +252,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
int idx = pool->index % pool->nAsync; int idx = pool->index % pool->nAsync;
// no need mutex here // no need mutex here
if (pool->index++ > pool->nAsync) { if (pool->index++ > pool->nAsync * 2000) {
pool->index = 0; pool->index = 0;
} }
uv_async_t* async = &(pool->asyncs[idx]); uv_async_t* async = &(pool->asyncs[idx]);
......
...@@ -812,7 +812,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { ...@@ -812,7 +812,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
// conn set // conn set
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb);
#if defined(WINDOWS) || defined(DARWIN) #if defined(WINDOWS) || defined(DARWIN)
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
#else #else
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册