未验证 提交 ee980837 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #9827 from taosdata/feature/rpc

refactor code
......@@ -135,12 +135,13 @@ typedef struct SRpcConn {
uv_async_t* pWorkerAsync;
queue queue;
int ref;
int persist; // persist connection or not
SConnBuffer connBuf;
int persist; // persist connection or not
SConnBuffer connBuf; // read buf,
SConnBuffer writeBuf; // write buf
int count;
void* shandle; // rpc init
void* ahandle;
void* ahandle; //
void* hostThread;
// del later
char secured;
int spi;
......@@ -151,6 +152,7 @@ typedef struct SRpcConn {
} SRpcConn;
// auth function
static int uvAuthMsg(SRpcConn* pConn, char* msg, int msgLen);
static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen);
......@@ -259,7 +261,7 @@ static bool isReadAll(SConnBuffer* data) {
SRpcHead rpcHead;
int32_t headLen = sizeof(rpcHead);
if (data->len >= headLen) {
memcpy((char*)&rpcHead, data->buf, headLen);
memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen);
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
if (msgLen > data->len) {
data->left = msgLen - data->len;
......@@ -283,7 +285,7 @@ static void uvDoProcess(SRecvInfo* pRecv) {
// do auth and check
}
static int uvAuthData(SRpcConn* pConn, char* msg, int len) {
static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) {
SRpcHead* pHead = (SRpcHead*)msg;
int code = 0;
......@@ -334,16 +336,21 @@ static int uvAuthData(SRpcConn* pConn, char* msg, int len) {
return code;
}
static void uvProcessData(SRpcConn* ctx) {
// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
// impl later
SRpcConn* conn = handle->data;
}
static void uvProcessData(SRpcConn* pConn) {
SRecvInfo info;
SRecvInfo* p = &info;
SConnBuffer* pBuf = &ctx->connBuf;
SConnBuffer* pBuf = &pConn->connBuf;
p->msg = pBuf->buf + RPC_RESERVE_SIZE;
p->msgLen = pBuf->len;
p->ip = 0;
p->port = 0;
p->shandle = ctx->shandle; //
p->thandle = ctx;
p->shandle = pConn->shandle; //
p->thandle = pConn;
p->chandle = NULL;
//
......@@ -351,9 +358,14 @@ static void uvProcessData(SRpcConn* ctx) {
assert(rpcIsReq(pHead->msgType));
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
SRpcConn* pConn = (SRpcConn*)p->thandle;
pConn->ahandle = (void*)pHead->ahandle;
// auth here
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
if (code != 0) {
terrno = code;
return;
}
pHead->code = htonl(pHead->code);
SRpcMsg rpcMsg;
......@@ -365,7 +377,9 @@ static void uvProcessData(SRpcConn* ctx) {
rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
rpcMsg.handle = pConn;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0);
// auth
// validate msg type
}
......@@ -383,6 +397,9 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
}
return;
}
if (terrno != 0) {
// handle err code
}
if (nread != UV_EOF) {
tDebug("Read error %s\n", uv_err_name(nread));
......@@ -410,21 +427,23 @@ void uvOnWriteCb(uv_write_t* req, int status) {
}
void uvWorkerAsyncCb(uv_async_t* handle) {
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
SThreadObj* pThrd = container_of(handle, SThreadObj, workerAsync);
SRpcConn* conn = NULL;
// opt later
pthread_mutex_lock(&pObj->connMtx);
if (!QUEUE_IS_EMPTY(&pObj->conn)) {
queue* head = QUEUE_HEAD(&pObj->conn);
pthread_mutex_lock(&pThrd->connMtx);
if (!QUEUE_IS_EMPTY(&pThrd->conn)) {
queue* head = QUEUE_HEAD(&pThrd->conn);
conn = QUEUE_DATA(head, SRpcConn, queue);
QUEUE_REMOVE(&conn->queue);
}
pthread_mutex_unlock(&pObj->connMtx);
pthread_mutex_unlock(&pThrd->connMtx);
if (conn == NULL) {
tError("except occurred, do nothing");
return;
}
uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len);
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
}
void uvOnAcceptCb(uv_stream_t* stream, int status) {
......@@ -463,7 +482,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert(buf->base[0] == notify[0]);
free(buf->base);
SThreadObj* pObj = q->data;
SThreadObj* pThrd = q->data;
uv_pipe_t* pipe = (uv_pipe_t*)q;
if (!uv_pipe_pending_count(pipe)) {
......@@ -475,16 +494,18 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert(pending == UV_TCP);
SRpcConn* pConn = connCreate();
pConn->shandle = pObj->shandle;
pConn->shandle = pThrd->shandle;
/* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pObj->loop, pConn->pTimer);
uv_timer_init(pThrd->loop, pConn->pTimer);
pConn->pTimer->data = pConn;
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
pConn->hostThread = pThrd;
pConn->pWorkerAsync = pThrd->workerAsync; // thread safty
// init client handle
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, pConn->pTcp);
uv_tcp_init(pThrd->loop, pConn->pTcp);
pConn->pTcp->data = pConn;
// init write request, just
......@@ -518,23 +539,23 @@ void* acceptThread(void* arg) {
uv_run(srv->loop, UV_RUN_DEFAULT);
}
void* workerThread(void* arg) {
SThreadObj* pObj = (SThreadObj*)arg;
SThreadObj* pThrd = (SThreadObj*)arg;
pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pObj->loop);
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
uv_pipe_init(pObj->loop, pObj->pipe, 1);
uv_pipe_open(pObj->pipe, pObj->fd);
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd);
pObj->pipe->data = pObj;
pThrd->pipe->data = pThrd;
QUEUE_INIT(&pObj->conn);
QUEUE_INIT(&pThrd->conn);
pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, uvWorkerAsyncCb);
pThrd->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pObj->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
uv_run(pObj->loop, UV_RUN_DEFAULT);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
uv_run(pThrd->loop, UV_RUN_DEFAULT);
}
static SRpcConn* connCreate() {
SRpcConn* pConn = (SRpcConn*)calloc(1, sizeof(SRpcConn));
......@@ -547,6 +568,7 @@ static void connDestroy(SRpcConn* conn) {
uv_timer_stop(conn->pTimer);
free(conn->pTimer);
uv_close((uv_handle_t*)conn->pTcp, NULL);
free(conn->connBuf.buf);
free(conn->pTcp);
free(conn->pWriter);
free(conn);
......@@ -573,9 +595,22 @@ void* rpcMallocCont(int contLen) { return NULL; }
void rpcFreeCont(void* cont) { return; }
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) {
// impl later
return;
}
void rpcSendResponse(const SRpcMsg* pMsg) {}
void rpcSendResponse(const SRpcMsg* pMsg) {
SRpcConn* pConn = pMsg->handle;
SThreadObj* pThrd = pConn->hostThread;
// opt later
pthread_mutex_lock(&pThrd->connMtx);
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pthread_mutex_unlock(&pThrd->connMtx);
uv_async_send(pConn->pWorkerAsync);
}
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册