提交 d5228ebd 编写于 作者: dengyihao's avatar dengyihao

refactor code

上级 7dba7215
......@@ -233,7 +233,7 @@ typedef struct {
uv_async_t* asyncs;
} SAsyncPool;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb);
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool);
int transSendAsync(SAsyncPool* pool, queue* mq);
......
......@@ -38,6 +38,7 @@ typedef struct SCliConn {
int32_t ref;
// debug and log info
struct sockaddr_in addr;
struct sockaddr_in locaddr;
} SCliConn;
typedef struct SCliMsg {
......@@ -130,8 +131,9 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle;
tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
ntohs(conn->addr.sin_port));
tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType),
inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr),
ntohs(conn->locaddr.sin_port));
if (conn->push != NULL && conn->notifyCount != 0) {
(*conn->push->callback)(conn->push->arg, &rpcMsg);
......@@ -417,8 +419,9 @@ static void clientWrite(SCliConn* pConn) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port));
tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port));
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
static void clientConnCb(uv_connect_t* req, int status) {
......@@ -433,6 +436,9 @@ static void clientConnCb(uv_connect_t* req, int status) {
int addrlen = sizeof(pConn->addr);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);
addrlen = sizeof(pConn->locaddr);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
tTrace("client conn %p create", pConn);
assert(pConn->stream == req->handle);
......@@ -579,7 +585,7 @@ static SCliThrdObj* createThrdObj() {
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb);
pThrd->timer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->timer);
......
......@@ -250,9 +250,7 @@ int transDestroyBuffer(SConnBuffer* buf) {
transClearBuffer(buf);
}
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
static int sz = 10;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
pool->index = 0;
pool->nAsync = sz;
......
......@@ -31,7 +31,8 @@ typedef struct SSrvConn {
void* pTransInst; // rpc init
void* ahandle; //
void* hostThrd;
void* pSrvMsg;
SArray* srvMsgs;
// void* pSrvMsg;
struct sockaddr_in addr;
......@@ -94,6 +95,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvStartSendRespInternal(SSrvMsg* smsg);
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
......@@ -310,14 +312,19 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnWriteCb(uv_write_t* req, int status) {
SSrvConn* conn = req->data;
SSrvMsg* smsg = conn->pSrvMsg;
destroySmsg(smsg);
conn->pSrvMsg = NULL;
transClearBuffer(&conn->readBuf);
if (status == 0) {
tTrace("server conn %p data already was written on stream", conn);
assert(taosArrayGetSize(conn->srvMsgs) >= 1);
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
taosArrayRemove(conn->srvMsgs, 0);
destroySmsg(msg);
// send second data, just use for push
if (taosArrayGetSize(conn->srvMsgs) > 0) {
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
uvStartSendRespInternal(msg);
}
} else {
tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
//
......@@ -361,20 +368,29 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
wb->base = msg;
wb->len = len;
}
static void uvStartSendResp(SSrvMsg* smsg) {
// impl
static void uvStartSendRespInternal(SSrvMsg* smsg) {
uv_buf_t wb;
uvPrepareSendData(smsg, &wb);
SSrvConn* pConn = smsg->pConn;
uv_timer_stop(pConn->pTimer);
pConn->pSrvMsg = smsg;
// pConn->pSrvMsg = smsg;
// conn->pWriter->data = smsg;
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
// SRpcMsg* rpcMsg = smsg->msg;
}
static void uvStartSendResp(SSrvMsg* smsg) {
// impl
SSrvConn* pConn = smsg->pConn;
if (taosArrayGetSize(pConn->srvMsgs) > 0) {
tDebug("server conn %p push data to client %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port));
taosArrayPush(pConn->srvMsgs, &smsg);
return;
}
taosArrayPush(pConn->srvMsgs, &smsg);
uvStartSendRespInternal(smsg);
return;
}
static void destroySmsg(SSrvMsg* smsg) {
......@@ -531,7 +547,7 @@ static bool addHandleToWorkloop(void* arg) {
QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true;
}
......@@ -571,6 +587,7 @@ void* workerThread(void* arg) {
static SSrvConn* createConn() {
SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
tTrace("conn %p created", pConn);
++pConn->ref;
return pConn;
......@@ -585,8 +602,15 @@ static void destroyConn(SSrvConn* conn, bool clear) {
return;
}
transDestroyBuffer(&conn->readBuf);
destroySmsg(conn->pSrvMsg);
conn->pSrvMsg = NULL;
for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
destroySmsg(msg);
}
taosArrayDestroy(conn->srvMsgs);
// destroySmsg(conn->pSrvMsg);
// conn->pSrvMsg = NULL;
if (clear) {
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
......
......@@ -77,7 +77,7 @@ void processShellMsg() {
taosFreeQitem(pRpcMsg);
{
sleep(1);
// sleep(1);
SRpcMsg nRpcMsg = {0};
nRpcMsg.pCont = rpcMallocCont(msgSize);
nRpcMsg.contLen = msgSize;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册