diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d8c920daa7c0ad79412419d228ab8e87180c25b3..dc55e3f15995ec6c432a07b41056df91629caa2c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -241,11 +241,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .code = 0 }; - // NOTE: the rpc context should be acquired before sending data to server. - // Otherwise, the pSql object may have been released already during the response function, which is - // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely - // cause crash. - pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); + rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 01046a2840aab2e8cdfb27682778f572d3179dfb..9e9a00550a4497ae11cbdffe48991c2fd7b742d6 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -105,6 +105,7 @@ void taos_init_imp(void) { taosReadGlobalCfg(); taosCheckGlobalCfg(); + rpcInit(); tscDebug("starting to initialize TAOS client ..."); tscDebug("Local End Point is:%s", tsLocalEp); } @@ -179,6 +180,7 @@ void taos_cleanup(void) { taosCloseRef(tscRefId); taosCleanupKeywordsTable(); taosCloseLog(); + if (tscEmbedded == 0) rpcCleanup(); m = tscTmr; if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) { diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index f4c0ee565e4246e0d8a5c4fe4d67a590721e8de2..130be0af202a4882a100335713ba0f81af7a3c11 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -20,6 +20,7 @@ #include "tconfig.h" #include "tglobal.h" #include "twal.h" +#include "trpc.h" #include "dnode.h" #include "dnodeInt.h" #include "dnodeMgmt.h" @@ -54,6 +55,7 @@ typedef struct { } SDnodeComponent; static const SDnodeComponent tsDnodeComponents[] = { + {"rpc", rpcInit, rpcCleanup}, {"storage", dnodeInitStorage, dnodeCleanupStorage}, {"dnodecfg", dnodeInitCfg, dnodeCleanupCfg}, {"dnodeeps", dnodeInitEps, dnodeCleanupEps}, diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index afa712a965e8233baaf9d4f5f14abeee8e88cc38..fe8b7442e040aa1866c6a1fa7722df68bd3695f5 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -169,7 +169,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { } void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { - rpcSendRequest(tsClientRpc, epSet, rpcMsg); + rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL); } void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { @@ -180,4 +180,4 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp); -} \ No newline at end of file +} diff --git a/src/inc/trpc.h b/src/inc/trpc.h index e430a43807bc6fb0dd5c48d8a912223c1ba015af..0ce2e3da14d1cec204fc755db13da53f08295bff 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -78,12 +78,14 @@ typedef struct SRpcInit { int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); } SRpcInit; +int32_t rpcInit(); +void rpcCleanup(); void *rpcOpen(const SRpcInit *pRpc); void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); +void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f963eeb68dd4ffcecf4a945c81b7658924670b71..acceaf9d7a63f3378afbc8d8f485f14f77b50af6 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -135,7 +135,7 @@ int tsRpcOverhead; static int tsRpcRefId = -1; static int32_t tsRpcNum = 0; -static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; +//static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; // server:0 client:1 tcp:2 udp:0 #define RPC_CONN_UDPS 0 @@ -221,13 +221,15 @@ static void rpcFree(void *p) { free(p); } -void rpcInit(void) { +int32_t rpcInit(void) { tsProgressTimer = tsRpcTimer/2; tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcOverhead = sizeof(SRpcReqContext); tsRpcRefId = taosOpenRef(200, rpcFree); + + return 0; } void rpcCleanup(void) { @@ -238,7 +240,7 @@ void rpcCleanup(void) { void *rpcOpen(const SRpcInit *pInit) { SRpcInfo *pRpc; - pthread_once(&tsRpcInit, rpcInit); + //pthread_once(&tsRpcInit, rpcInit); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) return NULL; @@ -379,7 +381,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { +void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -405,14 +407,10 @@ int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { || type == TSDB_MSG_TYPE_CM_SHOW ) pContext->connType = RPC_CONN_TCPC; - // set the handle to pContext, so app can cancel the request - if (pMsg->handle) *((void **)pMsg->handle) = pContext; - pContext->rid = taosAddRef(tsRpcRefId, pContext); + if (pRid) *pRid = pContext->rid; rpcSendReqToServer(pRpc, pContext); - - return pContext->rid; } void rpcSendResponse(const SRpcMsg *pRsp) { @@ -528,7 +526,7 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) pContext->pRsp = pRsp; pContext->pSet = pEpSet; - rpcSendRequest(shandle, pEpSet, pMsg); + rpcSendRequest(shandle, pEpSet, pMsg, NULL); tsem_wait(&sem); tsem_destroy(&sem); diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 7a963e9ce47f7c89edc0204d6502d548dbdb40eb..5721525adee3fc847a1ba2476ccb0995fb50a65c 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -57,7 +57,7 @@ static void *sendRequest(void *param) { rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); + rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if ( pInfo->num % 20000 == 0 ) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); tsem_wait(&pInfo->rspSem); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 93739ca3d14a2895aedb3bbe37085c95ac722c96..302f08bdb9be10a4704a1126e4b33e9988f392b3 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -578,6 +578,7 @@ static void syncChooseMaster(SSyncNode *pNode) { #if 0 for (int32_t i = 0; i < pNode->replica; ++i) { + if (i == index) continue; pPeer = pNode->peerInfo[i]; if (pPeer->version == nodeVersion) { pPeer->role = TAOS_SYNC_ROLE_SLAVE; diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 968f5becaddc7b06c06171a4d91cc0e0ffffc0da..21151f119937519e477e73d0e5e7cb82c1155788 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -182,6 +182,8 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead, uint32_t *pEve return 0; } + assert(pHead->len <= TSDB_MAX_WAL_SIZE); + ret = read(sfd, pHead->cont, pHead->len); if (ret < 0) return -1; diff --git a/src/sync/test/syncClient.c b/src/sync/test/syncClient.c index 23264dc8a0d969e238f35951b0a02e10261ab0c3..23ea54ee0c19b6ad2f93d7577d8d711874b10968 100644 --- a/src/sync/test/syncClient.c +++ b/src/sync/test/syncClient.c @@ -57,7 +57,7 @@ void *sendRequest(void *param) { rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); + rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if (pInfo->num % 20000 == 0) { uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); } diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 760d1c0eb47c8af1991d6551cf063630a24ce2ab..4c1a87c96070534e25149357b766a8539e362680 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -426,11 +426,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { (*pSet->fp)(pNode->p); - uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); free(pNode); released = 1; } else { - uTrace("rsetId:%d p:%p rid:%" PRId64 "is released, count:%d", rsetId, pNode->p, rid, pNode->count); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is released, count:%d", rsetId, pNode->p, rid, pNode->count); } } else { uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 1003bc6178754e2eb6a3f8dbc61d0a878022e8f5..11932ac03abf8f14d4033aa2bd691498cc55a9cb 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -107,7 +107,7 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { while (nleft > 0) { nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft); if (nwritten <= 0) { - if (errno == EINTR) + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; else return -1; @@ -133,7 +133,7 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) { if (nread == 0) { break; } else if (nread < 0) { - if (errno == EINTR) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { continue; } else { return -1;