提交 07cb902a 编写于 作者: dengyihao's avatar dengyihao

handle server/client execpet

上级 7d700361
......@@ -123,9 +123,9 @@ typedef struct {
} SRpcReqContext;
typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
SRpcInfo* pTransInst; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
// struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
......
......@@ -30,6 +30,7 @@ typedef struct SCliConn {
char spi;
char secured;
uint64_t expireTime;
int8_t notifyCount; // timers already notify to client
} SCliConn;
typedef struct SCliMsg {
......@@ -72,8 +73,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
// process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read
......@@ -88,10 +87,15 @@ static void clientAsyncCb(uv_async_t* handle);
static void clientDestroy(uv_handle_t* handle);
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void clientMsgDestroy(SCliMsg* pMsg);
// process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// handle except about conn
static void clientHandleExcept(SCliConn* conn);
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientMsgDestroy(SCliMsg* pMsg);
static void destroyTransConnCtx(STransConnCtx* ctx);
// thread obj
static SCliThrdObj* createThrdObj();
static void destroyThrdObj(SCliThrdObj* pThrd);
......@@ -100,22 +104,38 @@ static void* clientThread(void* arg);
static void clientHandleResp(SCliConn* conn) {
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = pCtx->pRpc;
SRpcInfo* pRpc = pCtx->pTransInst;
SRpcMsg rpcMsg;
rpcMsg.pCont = conn->readBuf.buf;
rpcMsg.contLen = conn->readBuf.len;
rpcMsg.ahandle = pCtx->ahandle;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
conn->notifyCount += 1;
SCliThrdObj* pThrd = conn->hostThrd;
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
// start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
free(pCtx->ip);
free(pCtx);
// impl
destroyTransConnCtx(pCtx);
}
static void clientHandleExcept(SCliConn* pConn) {
SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pTransInst;
SRpcMsg rpcMsg;
rpcMsg.ahandle = pCtx->ahandle;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
pConn->notifyCount += 1;
destroyTransConnCtx(pCtx);
clientConnDestroy(pConn, true);
}
static void clientTimeoutCb(uv_timer_t* handle) {
......@@ -191,6 +211,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
conn->notifyCount = 0;
// list already create before
assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn);
......@@ -246,19 +267,21 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) {
pBuf->len += nread;
if (clientReadComplete(pBuf)) {
tDebug("alread read complete");
tDebug("conn read complete");
clientHandleResp(conn);
} else {
tDebug("read half packet, continue to read");
tDebug("conn read half packet, continue to read");
}
return;
}
assert(nread <= 0);
if (nread == 0) {
tError("conn closed");
return;
}
if (nread != UV_EOF) {
tDebug("read error %s", uv_err_name(nread));
if (nread < 0) {
tError("conn read error: %s", uv_err_name(nread));
clientHandleExcept(conn);
}
// tDebug("Read error %s\n", uv_err_name(nread));
// uv_close((uv_handle_t*)handle, clientDestroy);
......@@ -282,19 +305,20 @@ static void clientDestroy(uv_handle_t* handle) {
static void clientWriteCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data;
if (status == 0) {
tDebug("data already was written on stream");
tDebug("conn data already was written out");
} else {
tError("failed to write: %s", uv_err_name(status));
clientConnDestroy(pConn, true);
clientHandleExcept(pConn);
return;
}
SCliThrdObj* pThrd = pConn->hostThrd;
if (pConn->stream == NULL) {
pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
pConn->stream->data = pConn;
}
// if (pConn->stream == NULL) {
// pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
// uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
// pConn->stream->data = pConn;
//}
uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb);
// impl later
}
......@@ -316,23 +340,10 @@ static void clientWrite(SCliConn* pConn) {
static void clientConnCb(uv_connect_t* req, int status) {
// impl later
SCliConn* pConn = req->data;
SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pRpc;
if (status != 0) {
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
tError("failed to connect server, errmsg: %s", uv_strerror(status));
// call user fp later
SRpcMsg rpcMsg;
rpcMsg.ahandle = pCtx->ahandle;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
clientConnDestroy(pConn, true);
// uv_close((uv_handle_t*)req->handle, clientDestroy);
return;
clientHandleExcept(pConn);
}
assert(pConn->stream == req->handle);
......@@ -462,6 +473,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
free(pThrd->loop);
free(pThrd);
}
static void destroyTransConnCtx(STransConnCtx* ctx) {
if (ctx != NULL) {
free(ctx->ip);
}
free(ctx);
}
//
void taosCloseClient(void* arg) {
// impl later
......@@ -472,7 +490,6 @@ void taosCloseClient(void* arg) {
free(cli->pThreadObj);
free(cli);
}
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later
char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]);
......@@ -487,7 +504,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->pRpc = (SRpcInfo*)shandle;
pCtx->pTransInst = (SRpcInfo*)shandle;
pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType;
pCtx->ip = strdup(ip);
......
......@@ -16,6 +16,7 @@
#ifdef USE_UV
#include "transComm.h"
typedef struct SConn {
uv_tcp_t* pTcp;
uv_write_t* pWriter;
......@@ -226,7 +227,7 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
tDebug("%p timeout since no activity", conn);
}
static void uvProcessData(SConn* pConn) {
static void uvHandleReq(SConn* pConn) {
SRecvInfo info;
SRecvInfo* p = &info;
SConnBuffer* pBuf = &pConn->connBuf;
......@@ -283,20 +284,23 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->connBuf;
if (nread > 0) {
pBuf->len += nread;
tDebug("on read %p, total read: %d, current read: %d", cli, pBuf->len, (int)nread);
tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
if (readComplete(pBuf)) {
tDebug("alread read complete packet");
uvProcessData(conn);
tDebug("conn %p alread read complete packet", conn);
uvHandleReq(conn);
} else {
tDebug("read half packet, continue to read");
tDebug("conn %p read partial packet, continue to read", conn);
}
return;
}
if (nread == 0) {
tDebug("conn %p except read", conn);
// destroyConn(conn, true);
return;
}
if (nread != UV_EOF) {
tDebug("read error %s", uv_err_name(nread));
tDebug("conn %p read error: %s", conn, uv_err_name(nread));
destroyConn(conn, true);
}
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
......@@ -306,7 +310,8 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
void uvOnTimeoutCb(uv_timer_t* handle) {
// opt
tDebug("time out");
SConn* pConn = handle->data;
tDebug("conn %p time out", pConn);
}
void uvOnWriteCb(uv_write_t* req, int status) {
......@@ -317,9 +322,9 @@ void uvOnWriteCb(uv_write_t* req, int status) {
memset(buf->buf, 0, buf->cap);
buf->left = -1;
if (status == 0) {
tDebug("data already was written on stream");
tDebug("conn %p data already was written on stream", conn);
} else {
tDebug("failed to write data, %s", uv_err_name(status));
tDebug("conn %p failed to write data, %s", conn, uv_err_name(status));
destroyConn(conn, true);
}
// opt
......@@ -334,7 +339,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
// impl later;
tDebug("prepare to send back");
tDebug("conn %p prepare to send resp", conn);
SRpcMsg* pMsg = &conn->sendMsg;
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
......@@ -448,7 +453,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tDebug("new connection created: %d", fd);
tDebug("conn %p created, fd: %d", pConn, fd);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
} else {
tDebug("failed to create new connection");
......@@ -517,17 +522,13 @@ static SConn* createConn() {
SConn* pConn = (SConn*)calloc(1, sizeof(SConn));
return pConn;
}
static void connCloseCb(uv_handle_t* handle) {
// impl later
//
}
static void destroyConn(SConn* conn, bool clear) {
if (conn == NULL) {
return;
}
if (clear) {
uv_handle_t handle = *((uv_handle_t*)conn->pTcp);
uv_close(&handle, NULL);
uv_close((uv_handle_t*)conn->pTcp, NULL);
}
uv_timer_stop(conn->pTimer);
free(conn->pTimer);
......@@ -646,6 +647,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
pthread_mutex_lock(&pThrd->connMtx);
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pthread_mutex_unlock(&pThrd->connMtx);
tDebug("conn %p start to send resp", pConn);
uv_async_send(pConn->pWorkerAsync);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册