提交 b2c24a03 编写于 作者: dengyihao's avatar dengyihao

modify transport

上级 fdb64e7c
......@@ -86,7 +86,7 @@ typedef struct SRpcInit {
int32_t rpcInit();
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc);
void * rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void * rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
......@@ -99,6 +99,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp)
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
void rpcRefHandle(void *handle, int8_t type);
void rpcUnrefHandle(void *handle, int8_t type);
#ifdef __cplusplus
}
#endif
......
......@@ -242,8 +242,11 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf);
int transSetConnOption(uv_tcp_t* stream);
// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen);
// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool );
void transRefSrvHandle(void* handle);
void transUnrefSrvHandle(void* handle);
void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle);
#endif
......@@ -122,4 +122,17 @@ void rpcCleanup(void) {
//
return;
}
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosRefHandle[type])(handle);
}
void rpcUnrefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosUnRefHandle[type])(handle);
}
#endif
......@@ -351,14 +351,11 @@ static void clientConnDestroy(SCliConn* conn, bool clear) {
}
static void clientDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data;
// transDestroyBuffer(&conn->readBuf);
free(conn->stream);
free(conn->writeReq);
tTrace("client conn %p destroy successfully", conn);
tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn);
// clientConnDestroy(conn, false);
}
static void clientWriteCb(uv_write_t* req, int status) {
......@@ -454,8 +451,7 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st;
tTrace("client msg tran time cost: %" PRIu64 "us", el);
et = taosGetTimestampUs();
tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el);
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pTransInst = pThrd->pTransInst;
......@@ -630,8 +626,6 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
static void clientSendQuit(SCliThrdObj* thrd) {
// cli can stop gracefully
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
msg->ctx = NULL; //
transSendAsync(thrd->asyncPool, &msg->q);
}
void taosCloseClient(void* arg) {
......@@ -650,6 +644,23 @@ static int clientRBChoseIdx(SRpcInfo* pTransInst) {
}
return index % pTransInst->numOfThreads;
}
void transRefCliHandle(void* handle) {
if (handle == NULL) {
return;
}
int ref = T_REF_INC((SCliConn*)handle);
UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
if (handle == NULL) {
return;
}
int ref = T_REF_DEC((SCliConn*)handle);
if (ref == 0) {
}
// unref cli handle
}
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
......
......@@ -33,11 +33,11 @@ typedef struct SSrvConn {
void* ahandle; //
void* hostThrd;
SArray* srvMsgs;
// void* pSrvMsg;
bool broken; // conn broken;
struct sockaddr_in addr;
struct sockaddr_in locaddr;
// SRpcMsg sendMsg;
// del later
char secured;
......@@ -206,7 +206,6 @@ static void uvHandleReq(SSrvConn* pConn) {
}
pConn->inType = pHead->msgType;
// assert(transIsReq(pHead->msgType));
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
pHead->code = htonl(pHead->code);
......@@ -230,7 +229,8 @@ static void uvHandleReq(SSrvConn* pConn) {
rpcMsg.handle = pConn;
transClearBuffer(&pConn->readBuf);
pConn->ref++;
transRefSrvHandle(pConn);
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port), rpcMsg.contLen);
......@@ -255,23 +255,20 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
}
return;
}
if (nread == UV_EOF) {
tError("server conn %p read error: %s", conn, uv_err_name(nread));
if (conn->ref > 1) {
conn->ref++; // ref > 1 signed that write is in progress
}
destroyConn(conn, true);
return;
}
if (nread == 0) {
return;
}
if (nread < 0 || nread != UV_EOF) {
if (conn->ref > 1) {
conn->ref++; // ref > 1 signed that write is in progress
}
tError("server conn %p read error: %s", conn, uv_err_name(nread));
destroyConn(conn, true);
tError("server conn %p read error: %s", conn, uv_err_name(nread));
if (nread < 0 || nread == UV_EOF) {
conn->broken = true;
transUnrefSrvHandle(conn);
// if (conn->ref > 1) {
// conn->ref++; // ref > 1 signed that write is in progress
//}
// tError("server conn %p read error: %s", conn, uv_err_name(nread));
// destroyConn(conn, true);
}
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
......@@ -304,10 +301,9 @@ void uvOnWriteCb(uv_write_t* req, int status) {
}
} else {
tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
//
destroyConn(conn, true);
conn->broken = false;
transUnrefSrvHandle(conn);
}
// opt
}
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
if (status == 0) {
......@@ -353,15 +349,18 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) {
SSrvConn* pConn = smsg->pConn;
uv_timer_stop(pConn->pTimer);
// pConn->pSrvMsg = smsg;
// conn->pWriter->data = smsg;
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
}
static void uvStartSendResp(SSrvMsg* smsg) {
// impl
SSrvConn* pConn = smsg->pConn;
pConn->ref--; //
if (pConn->broken == true) {
transUnrefSrvHandle(pConn);
return;
}
transUnrefSrvHandle(pConn);
if (taosArrayGetSize(pConn->srvMsgs) > 0) {
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
......@@ -386,7 +385,8 @@ static void destroyAllConn(SWorkThrdObj* pThrd) {
QUEUE_INIT(h);
SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
destroyConn(c, true);
transUnrefSrvHandle(c);
// destroyConn(c, true);
}
}
void uvWorkerAsyncCb(uv_async_t* handle) {
......@@ -394,11 +394,11 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SWorkThrdObj* pThrd = item->pThrd;
SSrvConn* conn = NULL;
queue wq;
// batch process to avoid to lock/unlock frequently
pthread_mutex_lock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
pthread_mutex_unlock(&item->mtx);
// pthread_mutex_unlock(&mtx);
while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq);
......@@ -411,7 +411,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
}
if (msg->pConn == NULL) {
free(msg);
destroyAllConn(pThrd);
uv_loop_close(pThrd->loop);
......@@ -601,7 +600,9 @@ static SSrvConn* createConn(void* hThrd) {
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
tTrace("conn %p created", pConn);
++pConn->ref;
pConn->broken = false;
transRefSrvHandle(pConn);
return pConn;
}
......@@ -609,10 +610,6 @@ static void destroyConn(SSrvConn* conn, bool clear) {
if (conn == NULL) {
return;
}
tTrace("server conn %p try to destroy, ref: %d", conn, conn->ref);
if (--conn->ref > 0) {
return;
}
transDestroyBuffer(&conn->readBuf);
for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
......@@ -624,9 +621,9 @@ static void destroyConn(SSrvConn* conn, bool clear) {
if (clear) {
tTrace("try to destroy conn %p", conn);
uv_tcp_close_reset(conn->pTcp, uvDestroyConn);
// uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
// uv_tcp_close_reset(conn->pTcp, uvDestroyConn);
uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
// uv_unref((uv_handle_t*)conn->pTcp);
// uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
}
......@@ -722,8 +719,6 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
pthread_join(pThrd->thread, NULL);
free(pThrd->loop);
transDestroyAsyncPool(pThrd->asyncPool);
// free(pThrd->workerAsync);
free(pThrd);
}
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
......@@ -757,6 +752,27 @@ void taosCloseServer(void* arg) {
free(srv);
}
void transRefSrvHandle(void* handle) {
if (handle == NULL) {
return;
}
SSrvConn* conn = handle;
int ref = T_REF_INC((SSrvConn*)handle);
UNUSED(ref);
}
void transUnrefSrvHandle(void* handle) {
if (handle == NULL) {
return;
}
int ref = T_REF_DEC((SSrvConn*)handle);
if (ref == 0) {
destroyConn((SSrvConn*)handle, true);
}
// unref srv handle
}
void rpcSendResponse(const SRpcMsg* pMsg) {
if (pMsg->handle == NULL) {
return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册