提交 6c6573cc 编写于 作者: dengyihao's avatar dengyihao

optimize trans

上级 19517077
...@@ -211,6 +211,18 @@ typedef struct SConnBuffer { ...@@ -211,6 +211,18 @@ typedef struct SConnBuffer {
int left; int left;
} SConnBuffer; } SConnBuffer;
typedef void (*AsyncCB)(uv_async_t* handle);
typedef struct {
int index;
int nAsync;
uv_async_t* asyncs;
} SAsyncPool;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool);
int transSendAsync(SAsyncPool* pool);
int transInitBuffer(SConnBuffer* buf); int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf);
......
...@@ -42,9 +42,10 @@ typedef struct SCliMsg { ...@@ -42,9 +42,10 @@ typedef struct SCliMsg {
} SCliMsg; } SCliMsg;
typedef struct SCliThrdObj { typedef struct SCliThrdObj {
pthread_t thread; pthread_t thread;
uv_loop_t* loop; uv_loop_t* loop;
uv_async_t* cliAsync; // // uv_async_t* cliAsync; //
SAsyncPool* asyncPool;
uv_timer_t* timer; uv_timer_t* timer;
void* pool; // conn pool void* pool; // conn pool
queue msg; queue msg;
...@@ -379,7 +380,7 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -379,7 +380,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("thread %p start to quit", pThrd); tDebug("thread %p start to quit", pThrd);
destroyCmsg(pMsg); destroyCmsg(pMsg);
uv_close((uv_handle_t*)pThrd->cliAsync, NULL); // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
uv_timer_stop(pThrd->timer); uv_timer_stop(pThrd->timer);
pThrd->quit = true; pThrd->quit = true;
// uv__async_stop(pThrd->cliAsync); // uv__async_stop(pThrd->cliAsync);
...@@ -501,6 +502,7 @@ static void destroyCmsg(SCliMsg* pMsg) { ...@@ -501,6 +502,7 @@ static void destroyCmsg(SCliMsg* pMsg) {
destroyUserdata(&pMsg->msg); destroyUserdata(&pMsg->msg);
free(pMsg); free(pMsg);
} }
static SCliThrdObj* createThrdObj() { static SCliThrdObj* createThrdObj() {
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
...@@ -509,9 +511,7 @@ static SCliThrdObj* createThrdObj() { ...@@ -509,9 +511,7 @@ static SCliThrdObj* createThrdObj() {
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
pThrd->cliAsync = malloc(sizeof(uv_async_t)); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
pThrd->cliAsync->data = pThrd;
pThrd->timer = malloc(sizeof(uv_timer_t)); pThrd->timer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->timer); uv_timer_init(pThrd->loop, pThrd->timer);
...@@ -529,7 +529,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { ...@@ -529,7 +529,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
uv_stop(pThrd->loop); uv_stop(pThrd->loop);
pthread_join(pThrd->thread, NULL); pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx); pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->cliAsync); transDestroyAsyncPool(pThrd->asyncPool);
// free(pThrd->cliAsync);
free(pThrd->timer); free(pThrd->timer);
free(pThrd->loop); free(pThrd->loop);
free(pThrd); free(pThrd);
...@@ -551,7 +552,8 @@ static void clientSendQuit(SCliThrdObj* thrd) { ...@@ -551,7 +552,8 @@ static void clientSendQuit(SCliThrdObj* thrd) {
QUEUE_PUSH(&thrd->msg, &msg->q); QUEUE_PUSH(&thrd->msg, &msg->q);
pthread_mutex_unlock(&thrd->msgMtx); pthread_mutex_unlock(&thrd->msgMtx);
uv_async_send(thrd->cliAsync); transSendAsync(thrd->asyncPool);
// uv_async_send(thrd->cliAsync);
} }
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
// impl later // impl later
...@@ -600,6 +602,10 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* ...@@ -600,6 +602,10 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
QUEUE_PUSH(&thrd->msg, &cliMsg->q); QUEUE_PUSH(&thrd->msg, &cliMsg->q);
pthread_mutex_unlock(&thrd->msgMtx); pthread_mutex_unlock(&thrd->msgMtx);
uv_async_send(thrd->cliAsync); int start = taosGetTimestampUs();
transSendAsync(thrd->asyncPool);
// uv_async_send(thrd->cliAsync);
int end = taosGetTimestampUs() - start;
// tError("client sent to rpc, time cost: %d", (int)end);
} }
#endif #endif
...@@ -245,4 +245,36 @@ int transDestroyBuffer(SConnBuffer* buf) { ...@@ -245,4 +245,36 @@ int transDestroyBuffer(SConnBuffer* buf) {
} }
transClearBuffer(buf); transClearBuffer(buf);
} }
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
static int sz = 20;
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
pool->index = 0;
pool->nAsync = sz;
pool->asyncs = calloc(1, sizeof(uv_async_t) * pool->nAsync);
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
uv_async_init(loop, async, cb);
async->data = arg;
}
return pool;
}
void transDestroyAsyncPool(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
}
free(pool->asyncs);
free(pool);
}
int transSendAsync(SAsyncPool* pool) {
int idx = pool->index;
idx = idx % pool->nAsync;
// no need mutex here
if (pool->index++ > pool->nAsync) {
pool->index = 0;
}
return uv_async_send(&(pool->asyncs[idx]));
}
#endif #endif
...@@ -22,7 +22,7 @@ typedef struct SSrvConn { ...@@ -22,7 +22,7 @@ typedef struct SSrvConn {
uv_write_t* pWriter; uv_write_t* pWriter;
uv_timer_t* pTimer; uv_timer_t* pTimer;
uv_async_t* pWorkerAsync; // uv_async_t* pWorkerAsync;
queue queue; queue queue;
int ref; int ref;
int persist; // persist connection or not int persist; // persist connection or not
...@@ -50,11 +50,12 @@ typedef struct SSrvMsg { ...@@ -50,11 +50,12 @@ typedef struct SSrvMsg {
} SSrvMsg; } SSrvMsg;
typedef struct SWorkThrdObj { typedef struct SWorkThrdObj {
pthread_t thread; pthread_t thread;
uv_pipe_t* pipe; uv_pipe_t* pipe;
int fd; int fd;
uv_loop_t* loop; uv_loop_t* loop;
uv_async_t* workerAsync; // SAsyncPool* asyncPool;
// uv_async_t* workerAsync; //
queue msg; queue msg;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
void* pTransInst; void* pTransInst;
...@@ -469,7 +470,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -469,7 +470,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
pConn->pTimer->data = pConn; pConn->pTimer->data = pConn;
pConn->hostThrd = pThrd; pConn->hostThrd = pThrd;
pConn->pWorkerAsync = pThrd->workerAsync; // thread safty // pConn->pWorkerAsync = pThrd->workerAsync; // thread safty
// init client handle // init client handle
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
...@@ -512,10 +513,7 @@ static bool addHandleToWorkloop(void* arg) { ...@@ -512,10 +513,7 @@ static bool addHandleToWorkloop(void* arg) {
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL); pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->workerAsync = malloc(sizeof(uv_async_t)); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb);
uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
pThrd->workerAsync->data = pThrd;
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true; return true;
} }
...@@ -665,7 +663,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { ...@@ -665,7 +663,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
} }
pthread_join(pThrd->thread, NULL); pthread_join(pThrd->thread, NULL);
free(pThrd->loop); free(pThrd->loop);
free(pThrd->workerAsync); transDestroyAsyncPool(pThrd->asyncPool);
// free(pThrd->workerAsync);
free(pThrd); free(pThrd);
} }
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
...@@ -676,7 +676,8 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { ...@@ -676,7 +676,8 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
pthread_mutex_unlock(&pThrd->msgMtx); pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("send quit msg to work thread"); tDebug("send quit msg to work thread");
uv_async_send(pThrd->workerAsync); transSendAsync(pThrd->asyncPool);
// uv_async_send(pThrd->workerAsync);
} }
void taosCloseServer(void* arg) { void taosCloseServer(void* arg) {
...@@ -716,8 +717,8 @@ void rpcSendResponse(const SRpcMsg* pMsg) { ...@@ -716,8 +717,8 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
pthread_mutex_unlock(&pThrd->msgMtx); pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("conn %p start to send resp", pConn); tDebug("conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool);
uv_async_send(pThrd->workerAsync); // uv_async_send(pThrd->workerAsync);
} }
#endif #endif
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <sys/time.h>
#include "os.h" #include "os.h"
#include "rpcLog.h" #include "rpcLog.h"
...@@ -52,6 +53,12 @@ static void *sendRequest(void *param) { ...@@ -52,6 +53,12 @@ static void *sendRequest(void *param) {
tDebug("thread:%d, start to send request", pInfo->index); tDebug("thread:%d, start to send request", pInfo->index);
tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
int u100 = 0;
int u500 = 0;
int u1000 = 0;
int u10000 = 0;
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++; pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
...@@ -59,15 +66,28 @@ static void *sendRequest(void *param) { ...@@ -59,15 +66,28 @@ static void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo; rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
int64_t start = taosGetTimestampUs();
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
// tsem_wait(&pInfo->rspSem); // tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem);
int64_t end = taosGetTimestampUs() - start;
if (end <= 100) {
u100++;
} else if (end > 100 && end <= 500) {
u500++;
} else if (end > 500 && end < 1000) {
u1000++;
} else {
u10000++;
}
tDebug("recv response succefully"); tDebug("recv response succefully");
// usleep(100000000); // usleep(100000000);
} }
tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
tDebug("thread:%d, it is over", pInfo->index); tDebug("thread:%d, it is over", pInfo->index);
tcount++; tcount++;
...@@ -163,8 +183,8 @@ int main(int argc, char *argv[]) { ...@@ -163,8 +183,8 @@ int main(int argc, char *argv[]) {
tInfo("client is initialized"); tInfo("client is initialized");
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs); tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
// gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
// startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads); SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
...@@ -186,13 +206,12 @@ int main(int argc, char *argv[]) { ...@@ -186,13 +206,12 @@ int main(int argc, char *argv[]) {
usleep(1); usleep(1);
} while (tcount < appThreads); } while (tcount < appThreads);
// gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
// endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
// float usedTime = (endTime - startTime) / 1000.0f; // mseconds float usedTime = (endTime - startTime) / 1000.0f; // mseconds
// tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
// tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
// msgSize);
int ch = getchar(); int ch = getchar();
UNUSED(ch); UNUSED(ch);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册