diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h
index 3f2aa1170e03d3ddfca7580e7daeebe24a57b14b..f4fe0b1f79fb905f2f0ef6616b6fcfd3066fb729 100644
--- a/source/libs/transport/inc/transComm.h
+++ b/source/libs/transport/inc/transComm.h
@@ -211,6 +211,18 @@ typedef struct SConnBuffer {
int left;
} SConnBuffer;
+typedef void (*AsyncCB)(uv_async_t* handle);
+
+typedef struct {
+ int index;
+ int nAsync;
+ uv_async_t* asyncs;
+} SAsyncPool;
+
+SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb);
+void transDestroyAsyncPool(SAsyncPool* pool);
+int transSendAsync(SAsyncPool* pool);
+
int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf);
diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c
index 069ebaeb8acb427a0a70366fd7a349504d9da767..5037de1407e3b9ff6d2a6feeda201b5cb702839a 100644
--- a/source/libs/transport/src/transCli.c
+++ b/source/libs/transport/src/transCli.c
@@ -42,9 +42,10 @@ typedef struct SCliMsg {
} SCliMsg;
typedef struct SCliThrdObj {
- pthread_t thread;
- uv_loop_t* loop;
- uv_async_t* cliAsync; //
+ pthread_t thread;
+ uv_loop_t* loop;
+ // uv_async_t* cliAsync; //
+ SAsyncPool* asyncPool;
uv_timer_t* timer;
void* pool; // conn pool
queue msg;
@@ -379,7 +380,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("thread %p start to quit", pThrd);
destroyCmsg(pMsg);
- uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
+ // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
uv_timer_stop(pThrd->timer);
pThrd->quit = true;
// uv__async_stop(pThrd->cliAsync);
@@ -501,6 +502,7 @@ static void destroyCmsg(SCliMsg* pMsg) {
destroyUserdata(&pMsg->msg);
free(pMsg);
}
+
static SCliThrdObj* createThrdObj() {
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
QUEUE_INIT(&pThrd->msg);
@@ -509,9 +511,7 @@ static SCliThrdObj* createThrdObj() {
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
- pThrd->cliAsync = malloc(sizeof(uv_async_t));
- uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
- pThrd->cliAsync->data = pThrd;
+ pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
pThrd->timer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->timer);
@@ -529,7 +529,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
uv_stop(pThrd->loop);
pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx);
- free(pThrd->cliAsync);
+ transDestroyAsyncPool(pThrd->asyncPool);
+ // free(pThrd->cliAsync);
free(pThrd->timer);
free(pThrd->loop);
free(pThrd);
@@ -551,7 +552,8 @@ static void clientSendQuit(SCliThrdObj* thrd) {
QUEUE_PUSH(&thrd->msg, &msg->q);
pthread_mutex_unlock(&thrd->msgMtx);
- uv_async_send(thrd->cliAsync);
+ transSendAsync(thrd->asyncPool);
+ // uv_async_send(thrd->cliAsync);
}
void taosCloseClient(void* arg) {
// impl later
@@ -600,6 +602,10 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
QUEUE_PUSH(&thrd->msg, &cliMsg->q);
pthread_mutex_unlock(&thrd->msgMtx);
- uv_async_send(thrd->cliAsync);
+ int start = taosGetTimestampUs();
+ transSendAsync(thrd->asyncPool);
+ // uv_async_send(thrd->cliAsync);
+ int end = taosGetTimestampUs() - start;
+ // tError("client sent to rpc, time cost: %d", (int)end);
}
#endif
diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c
index ca39f85eb3925ad4e39770cc9ac096ae698b891b..47eabd4320a8c8f6930cd3098694daee4a52ad3d 100644
--- a/source/libs/transport/src/transComm.c
+++ b/source/libs/transport/src/transComm.c
@@ -245,4 +245,36 @@ int transDestroyBuffer(SConnBuffer* buf) {
}
transClearBuffer(buf);
}
+
+SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
+ static int sz = 20;
+
+ SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
+ pool->index = 0;
+ pool->nAsync = sz;
+ pool->asyncs = calloc(1, sizeof(uv_async_t) * pool->nAsync);
+
+ for (int i = 0; i < pool->nAsync; i++) {
+ uv_async_t* async = &(pool->asyncs[i]);
+ uv_async_init(loop, async, cb);
+ async->data = arg;
+ }
+ return pool;
+}
+void transDestroyAsyncPool(SAsyncPool* pool) {
+ for (int i = 0; i < pool->nAsync; i++) {
+ uv_async_t* async = &(pool->asyncs[i]);
+ }
+ free(pool->asyncs);
+ free(pool);
+}
+int transSendAsync(SAsyncPool* pool) {
+ int idx = pool->index;
+ idx = idx % pool->nAsync;
+ // no need mutex here
+ if (pool->index++ > pool->nAsync) {
+ pool->index = 0;
+ }
+ return uv_async_send(&(pool->asyncs[idx]));
+}
#endif
diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c
index 475ef32b46c549d3e631a917cd7fad119feef2fb..826b91dc029fbec23bf26c4d370a0b50eca9e571 100644
--- a/source/libs/transport/src/transSrv.c
+++ b/source/libs/transport/src/transSrv.c
@@ -22,7 +22,7 @@ typedef struct SSrvConn {
uv_write_t* pWriter;
uv_timer_t* pTimer;
- uv_async_t* pWorkerAsync;
+ // uv_async_t* pWorkerAsync;
queue queue;
int ref;
int persist; // persist connection or not
@@ -50,11 +50,12 @@ typedef struct SSrvMsg {
} SSrvMsg;
typedef struct SWorkThrdObj {
- pthread_t thread;
- uv_pipe_t* pipe;
- int fd;
- uv_loop_t* loop;
- uv_async_t* workerAsync; //
+ pthread_t thread;
+ uv_pipe_t* pipe;
+ int fd;
+ uv_loop_t* loop;
+ SAsyncPool* asyncPool;
+ // uv_async_t* workerAsync; //
queue msg;
pthread_mutex_t msgMtx;
void* pTransInst;
@@ -469,7 +470,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
pConn->pTimer->data = pConn;
pConn->hostThrd = pThrd;
- pConn->pWorkerAsync = pThrd->workerAsync; // thread safty
+ // pConn->pWorkerAsync = pThrd->workerAsync; // thread safty
// init client handle
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
@@ -512,10 +513,7 @@ static bool addHandleToWorkloop(void* arg) {
QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL);
- pThrd->workerAsync = malloc(sizeof(uv_async_t));
- uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
- pThrd->workerAsync->data = pThrd;
-
+ pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true;
}
@@ -665,7 +663,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
}
pthread_join(pThrd->thread, NULL);
free(pThrd->loop);
- free(pThrd->workerAsync);
+ transDestroyAsyncPool(pThrd->asyncPool);
+
+ // free(pThrd->workerAsync);
free(pThrd);
}
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
@@ -676,7 +676,8 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("send quit msg to work thread");
- uv_async_send(pThrd->workerAsync);
+ transSendAsync(pThrd->asyncPool);
+ // uv_async_send(pThrd->workerAsync);
}
void taosCloseServer(void* arg) {
@@ -716,8 +717,8 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("conn %p start to send resp", pConn);
-
- uv_async_send(pThrd->workerAsync);
+ transSendAsync(pThrd->asyncPool);
+ // uv_async_send(pThrd->workerAsync);
}
#endif
diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c
index f9ad20c0653d73960439d05280808fc3c35f3cdd..37c189406ab8c89f9c014db41afa30d767cbf769 100644
--- a/source/libs/transport/test/rclient.c
+++ b/source/libs/transport/test/rclient.c
@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+#include
#include "os.h"
#include "rpcLog.h"
@@ -52,6 +53,12 @@ static void *sendRequest(void *param) {
tDebug("thread:%d, start to send request", pInfo->index);
+ tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
+ int u100 = 0;
+ int u500 = 0;
+ int u1000 = 0;
+ int u10000 = 0;
+
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
@@ -59,15 +66,28 @@ static void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
+ int64_t start = taosGetTimestampUs();
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
// tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem);
+ int64_t end = taosGetTimestampUs() - start;
+ if (end <= 100) {
+ u100++;
+ } else if (end > 100 && end <= 500) {
+ u500++;
+ } else if (end > 500 && end < 1000) {
+ u1000++;
+ } else {
+ u10000++;
+ }
+
tDebug("recv response succefully");
// usleep(100000000);
}
+ tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
tDebug("thread:%d, it is over", pInfo->index);
tcount++;
@@ -163,8 +183,8 @@ int main(int argc, char *argv[]) {
tInfo("client is initialized");
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
- // gettimeofday(&systemTime, NULL);
- // startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
+ gettimeofday(&systemTime, NULL);
+ startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
@@ -186,13 +206,12 @@ int main(int argc, char *argv[]) {
usleep(1);
} while (tcount < appThreads);
- // gettimeofday(&systemTime, NULL);
- // endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
- // float usedTime = (endTime - startTime) / 1000.0f; // mseconds
+ gettimeofday(&systemTime, NULL);
+ endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
+ float usedTime = (endTime - startTime) / 1000.0f; // mseconds
- // tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
- // tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime,
- // msgSize);
+ tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
+ tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
int ch = getchar();
UNUSED(ch);