提交 41c3160d 编写于 作者: dengyihao's avatar dengyihao

refactor code

上级 6600ae49
......@@ -135,12 +135,13 @@ typedef struct SRpcConn {
uv_async_t* pWorkerAsync;
queue queue;
int ref;
int persist; // persist connection or not
SConnBuffer connBuf;
int persist; // persist connection or not
SConnBuffer connBuf; // read buf,
SConnBuffer writeBuf; // write buf
int count;
void* shandle; // rpc init
void* ahandle;
void* ahandle; //
void* hostThread;
// del later
char secured;
int spi;
......@@ -335,6 +336,11 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) {
return code;
}
// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
// impl later
SRpcConn* conn = handle->data;
}
static void uvProcessData(SRpcConn* pConn) {
SRecvInfo info;
SRecvInfo* p = &info;
......@@ -358,8 +364,8 @@ static void uvProcessData(SRpcConn* pConn) {
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
if (code != 0) {
terrno = code;
return;
}
// rpcCheckAuthentication(pConn, (char*)pHead, pBuf->len);
pHead->code = htonl(pHead->code);
SRpcMsg rpcMsg;
......@@ -371,7 +377,9 @@ static void uvProcessData(SRpcConn* pConn) {
rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
rpcMsg.handle = pConn;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0);
// auth
// validate msg type
}
......@@ -419,21 +427,23 @@ void uvOnWriteCb(uv_write_t* req, int status) {
}
void uvWorkerAsyncCb(uv_async_t* handle) {
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
SThreadObj* pThrd = container_of(handle, SThreadObj, workerAsync);
SRpcConn* conn = NULL;
// opt later
pthread_mutex_lock(&pObj->connMtx);
if (!QUEUE_IS_EMPTY(&pObj->conn)) {
queue* head = QUEUE_HEAD(&pObj->conn);
pthread_mutex_lock(&pThrd->connMtx);
if (!QUEUE_IS_EMPTY(&pThrd->conn)) {
queue* head = QUEUE_HEAD(&pThrd->conn);
conn = QUEUE_DATA(head, SRpcConn, queue);
QUEUE_REMOVE(&conn->queue);
}
pthread_mutex_unlock(&pObj->connMtx);
pthread_mutex_unlock(&pThrd->connMtx);
if (conn == NULL) {
tError("except occurred, do nothing");
return;
}
uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len);
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
}
void uvOnAcceptCb(uv_stream_t* stream, int status) {
......@@ -472,7 +482,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert(buf->base[0] == notify[0]);
free(buf->base);
SThreadObj* pObj = q->data;
SThreadObj* pThrd = q->data;
uv_pipe_t* pipe = (uv_pipe_t*)q;
if (!uv_pipe_pending_count(pipe)) {
......@@ -484,16 +494,18 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert(pending == UV_TCP);
SRpcConn* pConn = connCreate();
pConn->shandle = pObj->shandle;
pConn->shandle = pThrd->shandle;
/* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pObj->loop, pConn->pTimer);
uv_timer_init(pThrd->loop, pConn->pTimer);
pConn->pTimer->data = pConn;
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
pConn->hostThread = pThrd;
pConn->pWorkerAsync = pThrd->workerAsync; // thread safty
// init client handle
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, pConn->pTcp);
uv_tcp_init(pThrd->loop, pConn->pTcp);
pConn->pTcp->data = pConn;
// init write request, just
......@@ -527,23 +539,23 @@ void* acceptThread(void* arg) {
uv_run(srv->loop, UV_RUN_DEFAULT);
}
void* workerThread(void* arg) {
SThreadObj* pObj = (SThreadObj*)arg;
SThreadObj* pThrd = (SThreadObj*)arg;
pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pObj->loop);
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
uv_pipe_init(pObj->loop, pObj->pipe, 1);
uv_pipe_open(pObj->pipe, pObj->fd);
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd);
pObj->pipe->data = pObj;
pThrd->pipe->data = pThrd;
QUEUE_INIT(&pObj->conn);
QUEUE_INIT(&pThrd->conn);
pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, uvWorkerAsyncCb);
pThrd->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pObj->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
uv_run(pObj->loop, UV_RUN_DEFAULT);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
uv_run(pThrd->loop, UV_RUN_DEFAULT);
}
static SRpcConn* connCreate() {
SRpcConn* pConn = (SRpcConn*)calloc(1, sizeof(SRpcConn));
......@@ -583,9 +595,22 @@ void* rpcMallocCont(int contLen) { return NULL; }
void rpcFreeCont(void* cont) { return; }
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) {
// impl later
return;
}
void rpcSendResponse(const SRpcMsg* pMsg) {
SRpcConn* pConn = pMsg->handle;
SThreadObj* pThrd = pConn->hostThread;
void rpcSendResponse(const SRpcMsg* pMsg) {}
// opt later
pthread_mutex_lock(&pThrd->connMtx);
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pthread_mutex_unlock(&pThrd->connMtx);
uv_async_send(pConn->pWorkerAsync);
}
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册