diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index ff211bf1b289344396193d6692ce494b9360ff17..aaf71838bfe2cc3916f10c579bd89e34acb89553 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -81,6 +81,8 @@ void dnodeCleanupRead() { } taosCloseQset(readQset); + free(readPool.readWorker); + dPrint("dnode read is closed"); } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 39d773f6a298f2a8cb01162f39f55a72029b3aef..cdd2d732e1c9f6065c052986b469ab67a65605e8 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -226,17 +226,6 @@ void *rpcOpen(const SRpcInit *pInit) { pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; - pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, - pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); - pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, - pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); - - if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { - tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort); - rpcClose(pRpc); - return NULL; - } - size_t size = sizeof(SRpcConn) * pRpc->sessions; pRpc->connList = (SRpcConn *)calloc(1, size); if (pRpc->connList == NULL) { @@ -277,6 +266,17 @@ void *rpcOpen(const SRpcInit *pInit) { pthread_mutex_init(&pRpc->mutex, NULL); + pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, + pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); + pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, + pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); + + if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { + tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort); + rpcClose(pRpc); + return NULL; + } + tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads); return pRpc; @@ -869,9 +869,9 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d", + tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, - pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); + pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); } int32_t code = terrno; @@ -939,6 +939,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { pContext->numOfTry = 0; memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); + for (int i=0; iipSet.numOfIps; ++i) + pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); rpcSendReqToServer(pRpc, pContext); } else if (pHead->code == TSDB_CODE_NOT_READY) { pContext->code = pHead->code;