diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index bde27d2932a5bacd09864c76ee81faa6adef04a7..cfcecc658257fbdc93b4a58a7fa0b3af6afb1c9e 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -278,7 +278,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); -int tscSetMgmtEpSetFromCfg(const char *first, const char *second); +int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet); bool tscSetSqlOwner(SSqlObj* pSql); void tscClearSqlOwner(SSqlObj* pSql); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 5f1108ae9750dd63739456f1418cc9617e3c2dca..a1b6174de0d21f02fdde1e367bcb45901707c6ba 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -339,6 +339,7 @@ typedef struct STscObj { int64_t hbrid; struct SSqlObj * sqlList; struct SSqlStream *streamList; + SRpcCorEpSet *tscCorMgmtEpSet; void* pDnodeConn; pthread_mutex_t mutex; T_REF_DECLARE() @@ -518,7 +519,6 @@ extern int tsInsertHeadSize; extern int tscNumOfThreads; extern int tscRefId; -extern SRpcCorEpSet tscMgmtEpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5e3acaa0b742ae53622cc610b0f3383f3e071ff4..66ca4faa616236d2189314e10def4a3abbf50808 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -26,7 +26,7 @@ #include "ttimer.h" #include "tlockfree.h" -SRpcCorEpSet tscMgmtEpSet; +///SRpcCorEpSet tscMgmtEpSet; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -73,10 +73,11 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) { assert(hasFqdn); } -static void tscDumpMgmtEpSet(SRpcEpSet *epSet) { - taosCorBeginRead(&tscMgmtEpSet.version); - *epSet = tscMgmtEpSet.epSet; - taosCorEndRead(&tscMgmtEpSet.version); +static void tscDumpMgmtEpSet(SSqlObj *pSql) { + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; + taosCorBeginRead(&pCorEpSet->version); + pSql->epSet = pCorEpSet->epSet; + taosCorEndRead(&pCorEpSet->version); } static void tscEpSetHtons(SRpcEpSet *s) { for (int32_t i = 0; i < s->numOfEps; i++) { @@ -94,11 +95,12 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { } return true; } -void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) { +void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) { // no need to update if equal - taosCorBeginWrite(&tscMgmtEpSet.version); - tscMgmtEpSet.epSet = *pEpSet; - taosCorEndWrite(&tscMgmtEpSet.version); + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; + taosCorBeginWrite(&pCorEpSet->version); + pCorEpSet->epSet = *pEpSet; + taosCorEndWrite(&pCorEpSet->version); } static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) { if (pVgroupInfo == NULL) { return;} @@ -133,18 +135,6 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { taosCorEndWrite(&pVgroupInfo->version); } -void tscPrintMgmtEp() { - SRpcEpSet dump; - tscDumpMgmtEpSet(&dump); - if (dump.numOfEps <= 0) { - tscError("invalid mnode EP list:%d", dump.numOfEps); - } else { - for (int i = 0; i < dump.numOfEps; ++i) { - tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]); - } - } -} - void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { STscObj *pObj = (STscObj *)param; if (pObj == NULL) return; @@ -162,7 +152,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SRpcEpSet * epSet = &pRsp->epSet; if (epSet->numOfEps > 0) { tscEpSetHtons(epSet); - tscUpdateMgmtEpSet(epSet); + tscUpdateMgmtEpSet(pSql, epSet); } pSql->pTscObj->connId = htonl(pRsp->connId); @@ -219,7 +209,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - tscDumpMgmtEpSet(&pSql->epSet); + tscDumpMgmtEpSet(pSql); } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); @@ -277,7 +267,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (pCmd->command < TSDB_SQL_MGMT) { tscUpdateVgroupInfo(pSql, pEpSet); } else { - tscUpdateMgmtEpSet(pEpSet); + tscUpdateMgmtEpSet(pSql, pEpSet); } } } @@ -2058,11 +2048,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { if (pConnect->epSet.numOfEps > 0) { tscEpSetHtons(&pConnect->epSet); - tscUpdateMgmtEpSet(&pConnect->epSet); - - for (int i = 0; i < pConnect->epSet.numOfEps; ++i) { - tscDebug("%p epSet.fqdn[%d]: %s, pObj:%p", pSql, i, pConnect->epSet.fqdn[i], pObj); - } + tscUpdateMgmtEpSet(pSql, &pConnect->epSet); } strcpy(pObj->sversion, pConnect->serverVersion); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index dcca63ff0ebc2b6939c1bfa074ddfad15e5f6560..70b91bd68547568c0b85d14ee16744d3cf7bf0be 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -58,6 +58,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH; return NULL; } + SRpcCorEpSet corMgmtEpSet; char secretEncrypt[32] = {0}; int secretEncryptLen = 0; @@ -82,11 +83,13 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa } secretEncryptLen = outlen; } - + if (ip) { - if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; - if (port) tscMgmtEpSet.epSet.port[0] = port; - } + if (tscSetMgmtEpSetFromCfg(ip, NULL, &corMgmtEpSet) < 0) return NULL; + if (port) corMgmtEpSet.epSet.port[0] = port; + } else { + if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond, &corMgmtEpSet) < 0) return NULL; + } void *pDnodeConn = NULL; if (tscInitRpc(user, secretEncrypt, &pDnodeConn) != 0) { @@ -100,10 +103,20 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa rpcClose(pDnodeConn); return NULL; } + // set up tscObj's mgmtEpSet + pObj->tscCorMgmtEpSet = (SRpcCorEpSet *)malloc(sizeof(SRpcCorEpSet)); + if (NULL == pObj->tscCorMgmtEpSet) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + rpcClose(pDnodeConn); + free(pObj->tscCorMgmtEpSet); + free(pObj); + } + memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet)); pObj->signature = pObj; pObj->pDnodeConn = pDnodeConn; T_REF_INIT_VAL(pObj, 1); + tstrncpy(pObj->user, user, sizeof(pObj->user)); secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass)); @@ -115,6 +128,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (len >= TSDB_DB_NAME_LEN) { terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; rpcClose(pDnodeConn); + free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -132,6 +146,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (NULL == pSql) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; rpcClose(pDnodeConn); + free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -149,6 +164,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; rpcClose(pDnodeConn); free(pSql); + free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 6aea03d2141f3f512c0df042c5984f4a408fcb52..03b6ac8404c1f3d55a1afce81342c497101a6589 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -116,11 +116,6 @@ void taos_init_imp(void) { taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); } - if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond) < 0) { - tscError("failed to init mnode EP list"); - return; - } - tscInitMsgsFp(); int queueSize = tsMaxConnections*2; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index af17f9b45729ea244a36b76218a0f1aed6de7db0..a98132d319cf5403503c415d99c5fd0a72fc1941 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -864,7 +864,7 @@ void tscCloseTscObj(void *param) { rpcClose(pObj->pDnodeConn); pObj->pDnodeConn = NULL; } - + tfree(pObj->tscCorMgmtEpSet); pthread_mutex_destroy(&pObj->mutex); tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, p); @@ -2440,10 +2440,10 @@ char* strdup_throw(const char* str) { return p; } -int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { +int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corMgmtEpSet) { + corMgmtEpSet->version = 0; // init mgmt ip set - tscMgmtEpSet.version = 0; - SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet); + SRpcEpSet *mgmtEpSet = &(corMgmtEpSet->epSet); mgmtEpSet->numOfEps = 0; mgmtEpSet->inUse = 0;