提交 09eceed6 编写于 作者: dengyihao's avatar dengyihao

fix transport quit fix

上级 b2be5169
...@@ -57,7 +57,7 @@ typedef struct { ...@@ -57,7 +57,7 @@ typedef struct {
void* parent; void* parent;
void* tcphandle; // returned handle from TCP initialization void* tcphandle; // returned handle from TCP initialization
int32_t refMgt; int64_t refId;
TdThreadMutex mutex; TdThreadMutex mutex;
} SRpcInfo; } SRpcInfo;
......
...@@ -76,12 +76,16 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -76,12 +76,16 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->user) { if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user)); memcpy(pRpc->user, pInit->user, strlen(pInit->user));
} }
int64_t refId = taosAddRef(transGetInstMgt(), pRpc);
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
transAcquireExHandle(transGetInstMgt(), refId);
pRpc->refId = refId;
return (void*)refId; return (void*)refId;
} }
void rpcClose(void* arg) { void rpcClose(void* arg) {
tInfo("start to close rpc"); tInfo("start to close rpc");
taosRemoveRef(transGetInstMgt(), (int64_t)arg); transRemoveExHandle(transGetInstMgt(), (int64_t)arg);
transReleaseExHandle(transGetInstMgt(), (int64_t)arg);
tInfo("finish to close rpc"); tInfo("finish to close rpc");
return; return;
} }
......
...@@ -47,6 +47,7 @@ typedef struct SCliMsg { ...@@ -47,6 +47,7 @@ typedef struct SCliMsg {
queue q; queue q;
STransMsgType type; STransMsgType type;
int64_t refId;
uint64_t st; uint64_t st;
int sent; //(0: no send, 1: alread sent) int sent; //(0: no send, 1: alread sent)
} SCliMsg; } SCliMsg;
...@@ -606,11 +607,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { ...@@ -606,11 +607,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
if (clear) { if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) { if (!uv_is_closing((uv_handle_t*)conn->stream)) {
uv_read_stop(conn->stream);
uv_close((uv_handle_t*)conn->stream, cliDestroy); uv_close((uv_handle_t*)conn->stream, cliDestroy);
} }
//} else {
// cliDestroy((uv_handle_t*)conn->stream);
//}
} }
} }
static void cliDestroy(uv_handle_t* handle) { static void cliDestroy(uv_handle_t* handle) {
...@@ -635,7 +634,6 @@ static bool cliHandleNoResp(SCliConn* conn) { ...@@ -635,7 +634,6 @@ static bool cliHandleNoResp(SCliConn* conn) {
SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0); SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
if (REQUEST_NO_RESP(&pMsg->msg)) { if (REQUEST_NO_RESP(&pMsg->msg)) {
transQueuePop(&conn->cliMsgs); transQueuePop(&conn->cliMsgs);
// taosArrayRemove(msgs, 0);
destroyCmsg(pMsg); destroyCmsg(pMsg);
res = true; res = true;
} }
...@@ -979,6 +977,7 @@ void cliSendQuit(SCliThrd* thrd) { ...@@ -979,6 +977,7 @@ void cliSendQuit(SCliThrd* thrd) {
} }
void cliWalkCb(uv_handle_t* handle, void* arg) { void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
uv_read_stop((uv_stream_t*)handle);
uv_close(handle, cliDestroy); uv_close(handle, cliDestroy);
} }
} }
...@@ -1213,6 +1212,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1213,6 +1212,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
cliMsg->msg = *pReq; cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal; cliMsg->type = Normal;
cliMsg->refId = (int64_t)shandle;
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
...@@ -1250,6 +1250,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1250,6 +1250,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
cliMsg->msg = *pReq; cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal; cliMsg->type = Normal;
cliMsg->refId = (int64_t)shandle;
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
...@@ -1283,6 +1284,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { ...@@ -1283,6 +1284,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;
cliMsg->type = Update; cliMsg->type = Update;
cliMsg->refId = (int64_t)shandle;
SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid); tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt; static int32_t refMgt;
int32_t instMgt; static int32_t instMgt;
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context; T_MD5_CTX context;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册