未验证 提交 90f38515 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #15446 from taosdata/rpc/optBuf

refactor: opt rpc
...@@ -7,8 +7,7 @@ ...@@ -7,8 +7,7 @@
* *
* This program is distributed in the hope that it will be useful, but WITHOUT * This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. * FITNESS FOR A PARTICULAR PURPOSE. *
*
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
...@@ -211,6 +210,7 @@ typedef struct SConnBuffer { ...@@ -211,6 +210,7 @@ typedef struct SConnBuffer {
char* buf; char* buf;
int len; int len;
int cap; int cap;
int left;
int total; int total;
} SConnBuffer; } SConnBuffer;
...@@ -282,6 +282,8 @@ int transClearBuffer(SConnBuffer* buf); ...@@ -282,6 +282,8 @@ int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf); bool transReadComplete(SConnBuffer* connBuf);
int transResetBuffer(SConnBuffer* connBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf);
int transSetConnOption(uv_tcp_t* stream); int transSetConnOption(uv_tcp_t* stream);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#ifdef USE_UV #ifdef USE_UV
#include <uv.h> #include <uv.h>
#endif #endif
// clang-format off
#include "zlib.h" #include "zlib.h"
#include "thttp.h" #include "thttp.h"
#include "taoserror.h" #include "taoserror.h"
...@@ -174,7 +175,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 ...@@ -174,7 +175,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
#else #else
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
int32_t code = -1; int32_t code = -1;
TdSocketPtr pSocket = NULL; TdSocketPtr pSocket = NULL;
uint32_t ip = taosGetIpv4FromFqdn(server); uint32_t ip = taosGetIpv4FromFqdn(server);
...@@ -231,4 +232,5 @@ SEND_OVER: ...@@ -231,4 +232,5 @@ SEND_OVER:
return code; return code;
} }
#endif // clang-format on
\ No newline at end of file #endif
...@@ -323,7 +323,8 @@ void cliHandleResp(SCliConn* conn) { ...@@ -323,7 +323,8 @@ void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); STransMsgHead* pHead = NULL;
transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
...@@ -366,7 +367,6 @@ void cliHandleResp(SCliConn* conn) { ...@@ -366,7 +367,6 @@ void cliHandleResp(SCliConn* conn) {
} }
} }
// buf's mem alread translated to transMsg.pCont // buf's mem alread translated to transMsg.pCont
transClearBuffer(&conn->readBuf);
if (!CONN_NO_PERSIST_BY_APP(conn)) { if (!CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.info.handle = (void*)conn->refId; transMsg.info.handle = (void*)conn->refId;
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
...@@ -636,6 +636,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -636,6 +636,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
transReqQueueInit(&conn->wreqQueue); transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL); transQueueInit(&conn->cliMsgs, NULL);
transInitBuffer(&conn->readBuf);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->status = ConnNormal; conn->status = ConnNormal;
...@@ -651,8 +653,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { ...@@ -651,8 +653,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
QUEUE_REMOVE(&conn->q); QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; transDestroyBuffer(&conn->readBuf);
conn->refId = -1;
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
if (clear) { if (clear) {
...@@ -678,7 +681,6 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -678,7 +681,6 @@ static void cliDestroy(uv_handle_t* handle) {
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
transReqQueueClear(&conn->wreqQueue); transReqQueueClear(&conn->wreqQueue);
transDestroyBuffer(&conn->readBuf);
taosMemoryFree(conn); taosMemoryFree(conn);
} }
static bool cliHandleNoResp(SCliConn* conn) { static bool cliHandleNoResp(SCliConn* conn) {
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include "transComm.h" #include "transComm.h"
#define BUFFER_CAP 4096
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt; static int32_t refMgt;
...@@ -111,12 +113,56 @@ int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) { ...@@ -111,12 +113,56 @@ int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) {
return r; return r;
} }
int transInitBuffer(SConnBuffer* buf) { int transInitBuffer(SConnBuffer* buf) {
transClearBuffer(buf); buf->cap = BUFFER_CAP;
buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
buf->left = -1;
buf->len = 0;
buf->total = 0;
return 0; return 0;
} }
int transDestroyBuffer(SConnBuffer* buf) {
taosMemoryFree(buf->buf);
return 0;
}
int transClearBuffer(SConnBuffer* buf) { int transClearBuffer(SConnBuffer* buf) {
memset(buf, 0, sizeof(*buf)); SConnBuffer* p = buf;
buf->total = -1; if (p->cap > BUFFER_CAP) {
p->cap = BUFFER_CAP;
p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
}
p->left = -1;
p->len = 0;
p->total = 0;
return 0;
}
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
SConnBuffer* p = connBuf;
if (p->left != 0) {
return -1;
}
int total = connBuf->total;
*buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total);
transResetBuffer(connBuf);
return total;
}
int transResetBuffer(SConnBuffer* connBuf) {
SConnBuffer* p = connBuf;
if (p->total <= p->len) {
int left = p->len - p->total;
memmove(p->buf, p->buf + p->total, left);
p->left = -1;
p->total = 0;
p->len = left;
} else {
p->left = -1;
p->total = 0;
p->len = 0;
}
return 0; return 0;
} }
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
...@@ -126,54 +172,39 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { ...@@ -126,54 +172,39 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
* |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
* info--->| * info--->|
*/ */
static const int CAPACITY = sizeof(STransMsgHead);
SConnBuffer* p = connBuf; SConnBuffer* p = connBuf;
if (p->cap == 0) {
p->buf = (char*)taosMemoryCalloc(CAPACITY, sizeof(char));
tTrace("internal malloc mem:%p, size:%d", p->buf, CAPACITY);
p->len = 0;
p->cap = CAPACITY;
p->total = -1;
uvBuf->base = p->buf;
uvBuf->len = CAPACITY;
} else if (p->total == -1 && p->len < CAPACITY) {
uvBuf->base = p->buf + p->len;
uvBuf->len = CAPACITY - p->len;
} else {
p->cap = p->total;
p->buf = taosMemoryRealloc(p->buf, p->cap);
tTrace("internal realloc mem:%p, size:%d", p->buf, p->cap);
uvBuf->base = p->buf + p->len; uvBuf->base = p->buf + p->len;
if (p->left == -1) {
uvBuf->len = p->cap - p->len; uvBuf->len = p->cap - p->len;
} else {
if (p->left < p->cap - p->len) {
uvBuf->len = p->left;
} else {
p->buf = taosMemoryRealloc(p->buf, p->left + p->len);
uvBuf->base = p->buf + p->len;
uvBuf->len = p->left;
}
} }
return 0; return 0;
} }
// check whether already read complete // check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) { bool transReadComplete(SConnBuffer* connBuf) {
if (connBuf->total == -1 && connBuf->len >= sizeof(STransMsgHead)) { SConnBuffer* p = connBuf;
STransMsgHead head; if (p->len >= sizeof(STransMsgHead)) {
memcpy((char*)&head, connBuf->buf, sizeof(head)); if (p->left == -1) {
int32_t msgLen = (int32_t)htonl(head.msgLen); STransMsgHead head;
connBuf->total = msgLen; memcpy((char*)&head, connBuf->buf, sizeof(head));
} int32_t msgLen = (int32_t)htonl(head.msgLen);
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) { p->total = msgLen;
return true; }
} if (p->total >= p->len) {
return false; p->left = p->total - p->len;
} } else {
int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; } p->left = 0;
}
int transUnpackMsg(STransMsgHead* msgHead) { return 0; }
int transDestroyBuffer(SConnBuffer* buf) {
if (buf->cap > 0) {
taosMemoryFreeClear(buf->buf);
} }
transClearBuffer(buf); return p->left == 0 ? true : false;
return 0;
} }
int transSetConnOption(uv_tcp_t* stream) { int transSetConnOption(uv_tcp_t* stream) {
......
...@@ -212,9 +212,10 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { ...@@ -212,9 +212,10 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
} }
static void uvHandleReq(SSvrConn* pConn) { static void uvHandleReq(SSvrConn* pConn) {
SConnBuffer* pBuf = &pConn->readBuf; STransMsgHead* msg = NULL;
char* msg = pBuf->buf; int msgLen = 0;
uint32_t msgLen = pBuf->len;
msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
...@@ -761,6 +762,7 @@ static SSvrConn* createConn(void* hThrd) { ...@@ -761,6 +762,7 @@ static SSvrConn* createConn(void* hThrd) {
memset(&pConn->regArg, 0, sizeof(pConn->regArg)); memset(&pConn->regArg, 0, sizeof(pConn->regArg));
pConn->broken = false; pConn->broken = false;
pConn->status = ConnNormal; pConn->status = ConnNormal;
transInitBuffer(&pConn->readBuf);
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = pConn; exh->handle = pConn;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册