diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 0f2f0513a30ffa37acbfbd35609a6d852592b5fe..c11c524f9666ba9f6fa959699c66a7ac7b4a0b71 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -290,6 +290,12 @@ typedef struct { struct SLocalReducer *pLocalReducer; } SSqlRes; +typedef struct { + char key[512]; + SRpcCorEpSet *tscCorMgmtEpSet; + void *pDnodeConn; +} SRpcObj; + typedef struct STscObj { void * signature; void * pTimer; @@ -305,9 +311,7 @@ typedef struct STscObj { int64_t hbrid; struct SSqlObj * sqlList; struct SSqlStream *streamList; - SRpcCorEpSet *tscCorMgmtEpSet; - void* pDnodeConn; - void* pRpcObj; + SRpcObj *pRpcObj; pthread_mutex_t mutex; int32_t numOfObj; // number of sqlObj from this tscObj } STscObj; @@ -378,14 +382,10 @@ typedef struct SSqlStream { void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); -typedef struct { - char key[512]; - void *pDnodeConn; -} SRpcObj; -void *tscAcquireRpc(const char *insKey); +void *tscAcquireRpc(const char *key); void tscReleaseRpc(void *param); -int32_t tscInitRpc(const char *key, const char *user, const char *secret, void **pRpcObj, void** pDnodeConn); +int32_t tscInitRpc(const char *key, const char *user, const char *secret, void **pRpcObj); void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool initial); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index f7a223626261d645687b32878f9f3c7b1fc72425..8a56ccdc9bd6b23178c044c791411f922f76b7df 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -72,7 +72,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { } static void tscDumpMgmtEpSet(SSqlObj *pSql) { - SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet; taosCorBeginRead(&pCorEpSet->version); pSql->epSet = pCorEpSet->epSet; taosCorEndRead(&pCorEpSet->version); @@ -95,7 +95,7 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { } void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) { // no need to update if equal - SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet; taosCorBeginWrite(&pCorEpSet->version); pCorEpSet->epSet = *pEpSet; taosCorEndWrite(&pCorEpSet->version); @@ -151,13 +151,16 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SRpcEpSet *epSet = &pRsp->epSet; if (epSet->numOfEps > 0) { tscEpSetHtons(epSet); - if (!tscEpSetIsEqual(&pSql->pTscObj->tscCorMgmtEpSet->epSet, epSet)) { - tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse); - for (int8_t i = 0; i < epSet->numOfEps; i++) { - tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]); - } - tscUpdateMgmtEpSet(pSql, epSet); - } + + //SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet; + //if (!tscEpSetIsEqual(&pCorEpSet->epSet, epSet)) { + // tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse); + // for (int8_t i = 0; i < epSet->numOfEps; i++) { + // tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]); + // } + //} + //concurrency problem, update mgmt epset anyway + tscUpdateMgmtEpSet(pSql, epSet); } pSql->pTscObj->connId = htonl(pRsp->connId); @@ -264,7 +267,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { .code = 0 }; - rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); + + rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index e70db12a7b2de37cc6ae5dd820c0efb2b2258d66..0f1824c1ed7bcfd0681e6dc9ccaf8e32df865aef 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -94,8 +94,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa sprintf(rpcKey, "%s:%s:%s:%d", user, pass, ip, port); void *pRpcObj = NULL; - void *pDnodeConn = NULL; - if (tscInitRpc(rpcKey, user, secretEncrypt, &pRpcObj, &pDnodeConn) != 0) { + if (tscInitRpc(rpcKey, user, secretEncrypt, &pRpcObj) != 0) { terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; return NULL; } @@ -106,20 +105,9 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa tscReleaseRpc(pRpcObj); return NULL; } - // set up tscObj's mgmtEpSet - pObj->tscCorMgmtEpSet = (SRpcCorEpSet *)malloc(sizeof(SRpcCorEpSet)); - if (NULL == pObj->tscCorMgmtEpSet) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscReleaseRpc(pRpcObj); - free(pObj->tscCorMgmtEpSet); - free(pObj); - } - memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet)); - pObj->signature = pObj; - pObj->pRpcObj = pRpcObj; - pObj->pDnodeConn = pDnodeConn; - + pObj->pRpcObj = (SRpcObj *)pRpcObj; + memcpy(pObj->pRpcObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet)); tstrncpy(pObj->user, user, sizeof(pObj->user)); secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass)); memcpy(pObj->pass, secretEncrypt, secretEncryptLen); @@ -130,7 +118,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (len >= TSDB_DB_NAME_LEN) { terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; tscReleaseRpc(pRpcObj); - free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -148,7 +135,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (NULL == pSql) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tscReleaseRpc(pRpcObj); - free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -166,7 +152,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tscReleaseRpc(pRpcObj); free(pSql); - free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -205,7 +190,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, return NULL; } - tscDebug("%p DB connection is opening, dnodeConn:%p", pObj, pObj->pDnodeConn); + tscDebug("%p DB connection is opening, rpcObj: %p, dnodeConn:%p", pObj, pObj->pRpcObj, pObj->pRpcObj->pDnodeConn); taos_free_result(pSql); // version compare only requires the first 3 segments of the version string @@ -282,7 +267,7 @@ void taos_close(TAOS *taos) { return; } - tscDebug("%p try to free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); + tscDebug("%p try to free tscObj", pObj); if (pObj->signature != pObj) { tscDebug("%p already closed or invalid tscObj", pObj); return; @@ -302,7 +287,7 @@ void taos_close(TAOS *taos) { } } - tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); + tscDebug("%p all sqlObj are freed, free tscObj", pObj); taosRemoveRef(tscRefId, pObj->rid); } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 4e23bd8a0359388e4d77003fb763a89ca7fbc1a1..f37daee3ca42b33c7ff74c9e54b43b8a4cea36a9 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -53,6 +53,7 @@ void tscFreeRpcObj(void *param) { assert(param); SRpcObj *pRpcObj = (SRpcObj *)(param); rpcClose(pRpcObj->pDnodeConn); + tfree(pRpcObj->tscCorMgmtEpSet); } void *tscAcquireRpc(const char *key) { SRpcObj *pRpcObj = taosCacheAcquireByKey(tscRpcCache, key, strlen(key)); @@ -71,13 +72,12 @@ void tscReleaseRpc(void *param) { pthread_mutex_unlock(&rpcObjMutex); } -int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj, void **pDnodeConn) { +int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) { pthread_mutex_lock(&rpcObjMutex); SRpcObj *pRpcObj = (SRpcObj *)tscAcquireRpc(key); if (pRpcObj != NULL) { *ppRpcObj = pRpcObj; - *pDnodeConn = pRpcObj->pDnodeConn; pthread_mutex_unlock(&rpcObjMutex); return 0; } @@ -86,7 +86,7 @@ int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; - rpcInit.numOfThreads = 1; + rpcInit.numOfThreads = tscNumOfThreads * 2; rpcInit.cfp = tscProcessMsgFromServer; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; @@ -111,9 +111,13 @@ int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, pthread_mutex_unlock(&rpcObjMutex); return -1; } + pRpcObj->tscCorMgmtEpSet = malloc(sizeof(SRpcCorEpSet)); + if (pRpcObj->tscCorMgmtEpSet == NULL) { + rpcClose(rpcObj.pDnodeConn); + pthread_mutex_unlock(&rpcObjMutex); + } *ppRpcObj = pRpcObj; - *pDnodeConn = pRpcObj->pDnodeConn; pthread_mutex_unlock(&rpcObjMutex); return 0; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c6182dcb5f43f3374b1a9419eca990c6c04f2683..5507748720a3795e2a287d15b08378ebe8234fe5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -896,7 +896,6 @@ void tscCloseTscObj(void *param) { taosTmrStopA(&(pObj->pTimer)); tscReleaseRpc(pObj->pRpcObj); - tfree(pObj->tscCorMgmtEpSet); pthread_mutex_destroy(&pObj->mutex); tfree(pObj); diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 178b96c423641ccc6be84c57ca71fe7b85fd7a33..b82e197bf719513070cd8de451952133ccfe8c71 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -55,6 +55,13 @@ typedef struct SThreadObj { void *(*processData)(SRecvInfo *pPacket); } SThreadObj; +typedef struct { + char label[TSDB_LABEL_LEN]; + int32_t index; + int numOfThreads; + SThreadObj **pThreadObj; +} SClientObj; + typedef struct { SOCKET fd; uint32_t ip; @@ -121,6 +128,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; + pThreadObj->stop = false; } // initialize mutex, thread, fd which may fail @@ -171,6 +179,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } static void taosStopTcpThread(SThreadObj* pThreadObj) { + if (pThreadObj == NULL) { return;} // save thread into local variable and signal thread to stop pthread_t thread = pThreadObj->thread; if (!taosCheckPthreadValid(thread)) { @@ -275,48 +284,77 @@ static void *taosAcceptTcpConnection(void *arg) { return NULL; } -void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) { - SThreadObj *pThreadObj; - pthread_attr_t thattr; - - pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj)); - memset(pThreadObj, 0, sizeof(SThreadObj)); - tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); - pThreadObj->ip = ip; - pThreadObj->shandle = shandle; - - if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) { - tError("%s failed to init TCP client mutex(%s)", label, strerror(errno)); - free(pThreadObj); +void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { + SClientObj *pClientObj = (SClientObj *)calloc(1, sizeof(SClientObj)); + if (pClientObj == NULL) { + tError("TCP:%s no enough memory", label); terrno = TAOS_SYSTEM_ERROR(errno); return NULL; - } + } - pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter - if (pThreadObj->pollFd < 0) { - tError("%s failed to create TCP client epoll", label); - free(pThreadObj); + + tstrncpy(pClientObj->label, label, sizeof(pClientObj->label)); + pClientObj->numOfThreads = numOfThreads; + pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*)); + if (pClientObj->pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + tfree(pClientObj); terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; } - pThreadObj->processData = fp; - + int code = 0; + pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); - pthread_attr_destroy(&thattr); - if (code != 0) { - taosCloseSocket(pThreadObj->pollFd); - free(pThreadObj); - terrno = TAOS_SYSTEM_ERROR(errno); - tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); - return NULL; + + for (int i = 0; i < numOfThreads; ++i) { + SThreadObj *pThreadObj = (SThreadObj *)calloc(1, sizeof(SThreadObj)); + if (pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); + for (int j=0; jpThreadObj[j]); + free(pClientObj); + pthread_attr_destroy(&thattr); + return NULL; + } + pClientObj->pThreadObj[i] = pThreadObj; + taosResetPthread(&pThreadObj->thread); + pThreadObj->ip = ip; + pThreadObj->stop = false; + tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); + pThreadObj->shandle = shandle; + pThreadObj->processData = fp; } - tDebug("%s TCP client is initialized, ip:%u:%hu", label, ip, port); + // initialize mutex, thread, fd which may fail + for (int i = 0; i < numOfThreads; ++i) { + SThreadObj *pThreadObj = pClientObj->pThreadObj[i]; + code = pthread_mutex_init(&(pThreadObj->mutex), NULL); + if (code < 0) { + tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); + break; + } - return pThreadObj; + pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter + if (pThreadObj->pollFd < 0) { + tError("%s failed to create TCP epoll", label); + code = -1; + break; + } + + code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); + if (code != 0) { + tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); + break; + } + pThreadObj->threadId = i; + } + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + taosCleanUpTcpClient(pClientObj); + pClientObj = NULL; + } + return pClientObj; } void taosStopTcpClient(void *chandle) { @@ -327,15 +365,23 @@ void taosStopTcpClient(void *chandle) { } void taosCleanUpTcpClient(void *chandle) { - SThreadObj *pThreadObj = chandle; - if (pThreadObj == NULL) return; - - tDebug ("%s TCP client will be cleaned up", pThreadObj->label); - taosStopTcpThread(pThreadObj); + SClientObj *pClientObj = chandle; + if (pClientObj == NULL) return; + for (int i = 0; i < pClientObj->numOfThreads; ++i) { + SThreadObj *pThreadObj= pClientObj->pThreadObj[i]; + taosStopTcpThread(pThreadObj); + } + + tDebug("%s TCP client is cleaned up", pClientObj->label); + tfree(pClientObj->pThreadObj); + tfree(pClientObj); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { - SThreadObj * pThreadObj = shandle; + SClientObj * pClientObj = shandle; + int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; + atomic_store_32(&pClientObj->index, index + 1); + SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); if (fd < 0) return NULL;