提交 538a0217 编写于 作者: dengyihao's avatar dengyihao

merge 3.0

上级 96db2f50
...@@ -173,6 +173,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -173,6 +173,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
...@@ -509,7 +510,10 @@ void cliSend(SCliConn* pConn) { ...@@ -509,7 +510,10 @@ void cliSend(SCliConn* pConn) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen); int msgLen = transMsgLenFromCont(pMsg->contLen);
...@@ -537,6 +541,7 @@ void cliSend(SCliConn* pConn) { ...@@ -537,6 +541,7 @@ void cliSend(SCliConn* pConn) {
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
...@@ -546,6 +551,7 @@ void cliSend(SCliConn* pConn) { ...@@ -546,6 +551,7 @@ void cliSend(SCliConn* pConn) {
if (pHead->persist == 1) { if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
} }
pConn->writeReq.data = pConn; pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
...@@ -586,22 +592,13 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -586,22 +592,13 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle; SCliConn* conn = pMsg->msg.handle;
tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
while (taosArrayGetSize(conn->cliMsgs) > 0) {
SCliMsg* pMsg = taosArrayGetP(conn->cliMsgs, 0);
destroyCmsg(pMsg);
taosArrayRemove(conn->cliMsgs, 0);
}
transDestroyBuffer(&conn->readBuf); taosArrayPush(conn->cliMsgs, &pMsg);
conn->status = ConnRelease; if (taosArrayGetSize(conn->cliMsgs) >= 2) {
int ref = T_REF_VAL_GET(conn); return; // send one by one
if (ref == 2) {
transUnrefCliHandle(conn);
} else if (ref == 1) {
addConnToPool(pThrd->pool, conn);
} }
cliSend(conn);
} }
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
......
...@@ -93,6 +93,15 @@ typedef struct SServerObj { ...@@ -93,6 +93,15 @@ typedef struct SServerObj {
static const char* notify = "a"; static const char* notify = "a";
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
goto _RETURE; \
} \
} while (0)
// refactor later // refactor later
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen); static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
...@@ -233,6 +242,7 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -233,6 +242,7 @@ static void uvHandleReq(SSrvConn* pConn) {
pHead->msgLen -= sizeof(STransUserMsg); pHead->msgLen -= sizeof(STransUserMsg);
} }
} }
CONN_SHOULD_RELEASE(pConn, pHead);
STransMsg transMsg; STransMsg transMsg;
transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.contLen = transContLenFromMsg(pHead->msgLen);
...@@ -257,8 +267,8 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -257,8 +267,8 @@ static void uvHandleReq(SSrvConn* pConn) {
ntohs(pConn->locaddr.sin_port), transMsg.contLen); ntohs(pConn->locaddr.sin_port), transMsg.contLen);
} else { } else {
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
// no ref here // no ref here
} }
...@@ -270,6 +280,8 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -270,6 +280,8 @@ static void uvHandleReq(SSrvConn* pConn) {
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
_RETURE:
return;
} }
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
...@@ -350,7 +362,7 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -350,7 +362,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
} }
} else { } else {
tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
conn->broken = false; conn->broken = true;
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
} }
} }
...@@ -407,6 +419,7 @@ static void uvStartSendResp(SSrvMsg* smsg) { ...@@ -407,6 +419,7 @@ static void uvStartSendResp(SSrvMsg* smsg) {
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
if (pConn->broken == true) { if (pConn->broken == true) {
// persist by
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
return; return;
} }
...@@ -415,8 +428,8 @@ static void uvStartSendResp(SSrvMsg* smsg) { ...@@ -415,8 +428,8 @@ static void uvStartSendResp(SSrvMsg* smsg) {
} }
if (taosArrayGetSize(pConn->srvMsgs) > 0) { if (taosArrayGetSize(pConn->srvMsgs) > 0) {
tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
taosArrayPush(pConn->srvMsgs, &smsg); taosArrayPush(pConn->srvMsgs, &smsg);
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册