未验证 提交 a0944e6e 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10345 from taosdata/feature/trans_impl

refactor code
...@@ -24,6 +24,9 @@ static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { ...@@ -24,6 +24,9 @@ static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) {
} }
void TestClient::SetRpcRsp(SRpcMsg* rsp) { void TestClient::SetRpcRsp(SRpcMsg* rsp) {
if (this->pRsp) {
free(this->pRsp);
}
this->pRsp = (SRpcMsg*)calloc(1, sizeof(SRpcMsg)); this->pRsp = (SRpcMsg*)calloc(1, sizeof(SRpcMsg));
this->pRsp->msgType = rsp->msgType; this->pRsp->msgType = rsp->msgType;
this->pRsp->code = rsp->code; this->pRsp->code = rsp->code;
...@@ -60,7 +63,7 @@ bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint ...@@ -60,7 +63,7 @@ bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint
strcpy(this->user, user); strcpy(this->user, user);
strcpy(this->pass, pass); strcpy(this->pass, pass);
this->port = port; this->port = port;
// this->pRsp = NULL; this->pRsp = NULL;
this->DoInit(); this->DoInit();
return true; return true;
} }
......
...@@ -242,6 +242,7 @@ int transInitBuffer(SConnBuffer* buf); ...@@ -242,6 +242,7 @@ int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf);
// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen); // int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen);
......
...@@ -84,8 +84,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co ...@@ -84,8 +84,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle); static void clientTimeoutCb(uv_timer_t* handle);
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read // alloc buf for read
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket // callback after read nbytes from socket
...@@ -309,32 +307,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { ...@@ -309,32 +307,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
assert(plist != NULL); assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
} }
static bool clientReadComplete(SConnBuffer* data) {
if (data->len >= sizeof(STransMsgHead)) {
STransMsgHead head;
memcpy((char*)&head, data->buf, sizeof(head));
int32_t msgLen = (int32_t)htonl(head.msgLen);
data->total = msgLen;
}
if (data->len == data->cap && data->total == data->cap) {
return true;
}
return false;
// if (data->len >= headLen) {
// memcpy((char*)&head, data->buf, headLen);
// int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
// if (msgLen > data->len) {
// data->left = msgLen - data->len;
// return false;
// } else if (msgLen == data->len) {
// data->left = 0;
// return true;
// }
//} else {
// return false;
//}
}
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
...@@ -349,7 +321,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -349,7 +321,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (clientReadComplete(pBuf)) { if (transReadComplete(pBuf)) {
tTrace("client conn %p read complete", conn); tTrace("client conn %p read complete", conn);
clientHandleResp(conn); clientHandleResp(conn);
} else { } else {
......
...@@ -222,23 +222,31 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { ...@@ -222,23 +222,31 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
p->buf = (char*)calloc(CAPACITY, sizeof(char)); p->buf = (char*)calloc(CAPACITY, sizeof(char));
p->len = 0; p->len = 0;
p->cap = CAPACITY; p->cap = CAPACITY;
p->total = 0; p->total = -1;
uvBuf->base = p->buf; uvBuf->base = p->buf;
uvBuf->len = CAPACITY; uvBuf->len = CAPACITY;
} else { } else {
STransMsgHead head; p->cap = p->total;
memcpy((char*)&head, p->buf, sizeof(head));
int32_t msgLen = (int32_t)htonl(head.msgLen);
p->total = msgLen;
p->cap = msgLen;
p->buf = realloc(p->buf, p->cap); p->buf = realloc(p->buf, p->cap);
uvBuf->base = p->buf + p->len; uvBuf->base = p->buf + p->len;
uvBuf->len = p->cap - p->len; uvBuf->len = p->cap - p->len;
} }
return 0; return 0;
} }
// check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) {
if (connBuf->total == -1 && connBuf->len >= sizeof(STransMsgHead)) {
STransMsgHead head;
memcpy((char*)&head, connBuf->buf, sizeof(head));
int32_t msgLen = (int32_t)htonl(head.msgLen);
connBuf->total = msgLen;
}
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
return true;
}
return false;
}
int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {} int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {}
int transUnpackMsg(STransMsgHead* msgHead) {} int transUnpackMsg(STransMsgHead* msgHead) {}
......
...@@ -104,7 +104,6 @@ static void uvStartSendResp(SSrvMsg* msg); ...@@ -104,7 +104,6 @@ static void uvStartSendResp(SSrvMsg* msg);
static void destroySmsg(SSrvMsg* smsg); static void destroySmsg(SSrvMsg* smsg);
// check whether already read complete packet // check whether already read complete packet
static bool readComplete(SConnBuffer* buf);
static SSrvConn* createConn(void* hThrd); static SSrvConn* createConn(void* hThrd);
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
...@@ -124,45 +123,6 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b ...@@ -124,45 +123,6 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
} }
// check data read from socket complete or not
//
static bool readComplete(SConnBuffer* data) {
// TODO(yihao): handle pipeline later
if (data->len == data->cap && data->total == data->cap) {
return true;
}
return false;
// STransMsgHead head;
// int32_t headLen = sizeof(head);
// if (data->len >= headLen) {
// memcpy((char*)&head, data->buf, headLen);
// int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
// if (msgLen > data->len) {
// data->left = msgLen - data->len;
// return false;
// } else if (msgLen == data->len) {
// return true;
// } else if (msgLen < data->len) {
// return false;
// // handle other packet later
// }
//} else {
// return false;
//}
}
// static void uvDoProcess(SRecvInfo* pRecv) {
// // impl later
// STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
// SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
// SSrvConn* pConn = pRecv->thandle;
// tDump(pRecv->msg, pRecv->msgLen);
// terrno = 0;
// // SRpcReqContext* pContest;
//
// // do auth and check
//}
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) { static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
...@@ -283,7 +243,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -283,7 +243,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread); tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
if (readComplete(pBuf)) { if (transReadComplete(pBuf)) {
tTrace("server conn %p alread read complete packet", conn); tTrace("server conn %p alread read complete packet", conn);
uvHandleReq(conn); uvHandleReq(conn);
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册