diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 6628e3087456c52b2daf1b9ef84efa9577ad4f36..c48f5098d919979c7cd0060e04b8e1cdd88262e3 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -277,7 +277,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); -int tscSetMgmtEpSetFromCfg(const char *first, const char *second); +int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet); bool tscSetSqlOwner(SSqlObj* pSql); void tscClearSqlOwner(SSqlObj* pSql); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4a732fc11f9f6ac8fbe59d6540c4a3e59b1957b3..9cfeffb2d80996b99e81413f072e31f165db17c4 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -334,6 +334,7 @@ typedef struct STscObj { int64_t hbrid; struct SSqlObj * sqlList; struct SSqlStream *streamList; + SRpcCorEpSet *tscCorMgmtEpSet; void* pDnodeConn; pthread_mutex_t mutex; T_REF_DECLARE() @@ -512,8 +513,6 @@ extern int tsInsertHeadSize; extern int tscNumOfThreads; extern int tscRefId; -extern SRpcCorEpSet tscMgmtEpSet; - extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 80e05f1ba2d0fb9d217dcbeb915bbc8e001bd173..fbe9d0db5fe90edcb9003e1b2953930ea1420d77 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -26,8 +26,6 @@ #include "ttimer.h" #include "tlockfree.h" -SRpcCorEpSet tscMgmtEpSet; - int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); @@ -73,33 +71,35 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) { assert(hasFqdn); } -static void tscDumpMgmtEpSet(SRpcEpSet *epSet) { - taosCorBeginRead(&tscMgmtEpSet.version); - *epSet = tscMgmtEpSet.epSet; - taosCorEndRead(&tscMgmtEpSet.version); -} +static void tscDumpMgmtEpSet(SSqlObj *pSql) { + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; + taosCorBeginRead(&pCorEpSet->version); + pSql->epSet = pCorEpSet->epSet; + taosCorEndRead(&pCorEpSet->version); +} static void tscEpSetHtons(SRpcEpSet *s) { - for (int32_t i = 0; i < s->numOfEps; i++) { - s->port[i] = htons(s->port[i]); - } + for (int32_t i = 0; i < s->numOfEps; i++) { + s->port[i] = htons(s->port[i]); + } } bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { - if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { - return false; - } - for (int32_t i = 0; i < s1->numOfEps; i++) { - if (s1->port[i] != s2->port[i] + if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { + return false; + } + for (int32_t i = 0; i < s1->numOfEps; i++) { + if (s1->port[i] != s2->port[i] || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0) - return false; - } - return true; + return false; + } + return true; } -void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) { - // no need to update if equal - taosCorBeginWrite(&tscMgmtEpSet.version); - tscMgmtEpSet.epSet = *pEpSet; - taosCorEndWrite(&tscMgmtEpSet.version); +void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) { + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; + taosCorBeginWrite(&pCorEpSet->version); + pCorEpSet->epSet = *pEpSet; + taosCorEndWrite(&pCorEpSet->version); } + static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) { if (pVgroupInfo == NULL) { return;} taosCorBeginRead(&pVgroupInfo->version); @@ -133,18 +133,6 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { taosCorEndWrite(&pVgroupInfo->version); } -void tscPrintMgmtEp() { - SRpcEpSet dump; - tscDumpMgmtEpSet(&dump); - if (dump.numOfEps <= 0) { - tscError("invalid mnode EP list:%d", dump.numOfEps); - } else { - for (int i = 0; i < dump.numOfEps; ++i) { - tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]); - } - } -} - void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { STscObj *pObj = (STscObj *)param; if (pObj == NULL) return; @@ -162,7 +150,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SRpcEpSet * epSet = &pRsp->epSet; if (epSet->numOfEps > 0) { tscEpSetHtons(epSet); - tscUpdateMgmtEpSet(epSet); + tscUpdateMgmtEpSet(pSql, epSet); } pSql->pTscObj->connId = htonl(pRsp->connId); @@ -208,7 +196,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { int tscSendMsgToServer(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; SSqlCmd* pCmd = &pSql->cmd; - + char *pMsg = rpcMallocCont(pCmd->payloadLen); if (NULL == pMsg) { tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]); @@ -217,18 +205,18 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - tscDumpMgmtEpSet(&pSql->epSet); + tscDumpMgmtEpSet(pSql); } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { - .msgType = pSql->cmd.msgType, - .pCont = pMsg, - .contLen = pSql->cmd.payloadLen, - .ahandle = (void *)pSql->self, - .handle = NULL, - .code = 0 + .msgType = pSql->cmd.msgType, + .pCont = pMsg, + .contLen = pSql->cmd.payloadLen, + .ahandle = (void *)pSql->self, + .handle = NULL, + .code = 0 }; // NOTE: the rpc context should be acquired before sending data to server. @@ -280,7 +268,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (pCmd->command < TSDB_SQL_MGMT) { tscUpdateVgroupInfo(pSql, pEpSet); } else { - tscUpdateMgmtEpSet(pEpSet); + tscUpdateMgmtEpSet(pSql, pEpSet); } } } @@ -1996,7 +1984,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { if (pConnect->epSet.numOfEps > 0) { tscEpSetHtons(&pConnect->epSet); - tscUpdateMgmtEpSet(&pConnect->epSet); + tscUpdateMgmtEpSet(pSql, &pConnect->epSet); } strcpy(pObj->sversion, pConnect->serverVersion); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 1514d75c412b2dbf4603abbd28472f259ea0c6f8..addb65a0a8348e5c45d445160747e4d350a596cc 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -60,6 +60,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa return NULL; } + SRpcCorEpSet corMgmtEpSet; char secretEncrypt[32] = {0}; int secretEncryptLen = 0; if (auth == NULL) { @@ -83,24 +84,35 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa } secretEncryptLen = outlen; } - if (ip) { - if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; - if (port) tscMgmtEpSet.epSet.port[0] = port; - } - + if (tscSetMgmtEpSetFromCfg(ip, NULL, &corMgmtEpSet) < 0) return NULL; + if (port) corMgmtEpSet.epSet.port[0] = port; + } else { + if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond, &corMgmtEpSet) < 0) return NULL; + } + void *pDnodeConn = NULL; if (tscInitRpc(user, secretEncrypt, &pDnodeConn) != 0) { terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; return NULL; } - + STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj)); if (NULL == pObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; rpcClose(pDnodeConn); return NULL; } + // set up tscObj's mgmtEpSet + pObj->tscCorMgmtEpSet = (SRpcCorEpSet *)malloc(sizeof(SRpcCorEpSet)); + if (NULL == pObj->tscCorMgmtEpSet) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + rpcClose(pDnodeConn); + free(pObj->tscCorMgmtEpSet); + free(pObj); + } + memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet)); + pObj->signature = pObj; pObj->pDnodeConn = pDnodeConn; @@ -116,6 +128,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (len >= TSDB_DB_NAME_LEN) { terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; rpcClose(pDnodeConn); + free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -133,6 +146,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (NULL == pSql) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; rpcClose(pDnodeConn); + free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -150,6 +164,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; rpcClose(pDnodeConn); free(pSql); + free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -168,12 +183,12 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa static void syncConnCallback(void *param, TAOS_RES *tres, int code) { SSqlObj *pSql = (SSqlObj *) tres; assert(pSql != NULL); - + tsem_post(&pSql->rspSem); } TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, - uint16_t port) { + uint16_t port) { STscObj *pObj = NULL; SSqlObj *pSql = taosConnectImpl(ip, user, pass, auth, db, port, syncConnCallback, NULL, (void **)&pObj); if (pSql != NULL) { @@ -189,10 +204,10 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, taos_close(pObj); return NULL; } - + tscDebug("%p DB connection is opening, dnodeConn:%p", pObj, pObj->pDnodeConn); taos_free_result(pSql); - + // version compare only requires the first 3 segments of the version string int code = taosCheckVersion(version, taos_get_server_info(pObj), 3); if (code != 0) { @@ -224,7 +239,7 @@ TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, cons } TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t userLen, const char *pass, - uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port) { + uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port) { char ipBuf[TSDB_EP_LEN] = {0}; char userBuf[TSDB_USER_LEN] = {0}; char passBuf[TSDB_PASSWORD_LEN] = {0}; @@ -239,12 +254,12 @@ TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t us static void asyncConnCallback(void *param, TAOS_RES *tres, int code) { SSqlObj *pSql = (SSqlObj *) tres; assert(pSql != NULL); - + pSql->fetchFp(pSql->param, tres, code); } TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), - void *param, TAOS **taos) { + void *param, TAOS **taos) { STscObj *pObj = NULL; SSqlObj *pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, (void **)&pObj); if (pSql == NULL) { @@ -304,7 +319,7 @@ void taos_close(TAOS *taos) { void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { assert(tres != NULL); - + SSqlObj *pSql = (SSqlObj *) tres; tsem_post(&pSql->rspSem); } @@ -320,7 +335,7 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, TAOS_RES terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - + if (sqlLen > (uint32_t)tsMaxSQLStringLen) { tscError("sql string exceeds max length:%d", tsMaxSQLStringLen); terrno = TSDB_CODE_TSC_INVALID_SQL; @@ -335,7 +350,7 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, TAOS_RES terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - + tsem_init(&pSql->rspSem, 0, 0); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); @@ -381,7 +396,7 @@ int taos_num_fields(TAOS_RES *res) { num++; } } - + return num; } @@ -407,7 +422,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { if (pQueryInfo == NULL) { return NULL; } - + size_t numOfCols = tscNumOfFields(pQueryInfo); if (numOfCols == 0) { return NULL; @@ -497,10 +512,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - + SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - + if (pRes->qhandle == 0 || pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || @@ -597,7 +612,7 @@ int taos_select_db(TAOS *taos, const char *db) { SSqlObj* pSql = taos_query(taos, sql); int32_t code = pSql->res.code; taos_free_result(pSql); - + return code; } @@ -673,10 +688,10 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { char *z = NULL; if (len > 0) { - z = strstr(pCmd->payload, "invalid SQL"); - if (z == NULL) { - z = strstr(pCmd->payload, "syntax error"); - } + z = strstr(pCmd->payload, "invalid SQL"); + if (z == NULL) { + z = strstr(pCmd->payload, "syntax error"); + } } return z != NULL; } @@ -714,7 +729,7 @@ int* taos_fetch_lengths(TAOS_RES *res) { if (pSql == NULL || pSql->signature != pSql) { return NULL; } - + return pSql->res.length; } @@ -819,36 +834,36 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) break; case TSDB_DATA_TYPE_FLOAT: { - float fv = 0; - fv = GET_FLOAT_VAL(row[i]); - len += sprintf(str + len, "%f", fv); - } break; + float fv = 0; + fv = GET_FLOAT_VAL(row[i]); + len += sprintf(str + len, "%f", fv); + } break; case TSDB_DATA_TYPE_DOUBLE: { - double dv = 0; - dv = GET_DOUBLE_VAL(row[i]); - len += sprintf(str + len, "%lf", dv); - } break; + double dv = 0; + dv = GET_DOUBLE_VAL(row[i]); + len += sprintf(str + len, "%lf", dv); + } break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: { - size_t xlen = 0; - for (xlen = 0; xlen < fields[i].bytes - VARSTR_HEADER_SIZE; xlen++) { - char c = ((char *)row[i])[xlen]; - if (c == 0) break; - str[len++] = c; - } - str[len] = 0; - } break; + size_t xlen = 0; + for (xlen = 0; xlen < fields[i].bytes - VARSTR_HEADER_SIZE; xlen++) { + char c = ((char *)row[i])[xlen]; + if (c == 0) break; + str[len++] = c; + } + str[len] = 0; + } break; case TSDB_DATA_TYPE_TIMESTAMP: - len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); - break; + len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); + break; case TSDB_DATA_TYPE_BOOL: - len += sprintf(str + len, "%d", *((int8_t *)row[i])); + len += sprintf(str + len, "%d", *((int8_t *)row[i])); default: - break; + break; } } @@ -876,7 +891,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - + pRes->numOfTotal = 0; pRes->numOfClauseTotal = 0; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 69753b9623c2eaefb2ac53416bcd88f408f5c4ef..96eba7a417c94316e9058cbe946505fd58fab82d 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -116,11 +116,6 @@ void taos_init_imp(void) { taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); } - if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond) < 0) { - tscError("failed to init mnode EP list"); - return; - } - tscInitMsgsFp(); int queueSize = tsMaxConnections*2; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d3732a2b9ff8c3303e81328f8078bcac7cefd26f..e3f717046b9359aa59587398d13ce241e37a033a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -783,7 +783,7 @@ void tscCloseTscObj(void *param) { rpcClose(pObj->pDnodeConn); pObj->pDnodeConn = NULL; } - + tfree(pObj->tscCorMgmtEpSet); pthread_mutex_destroy(&pObj->mutex); tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, p); @@ -2378,10 +2378,10 @@ char* strdup_throw(const char* str) { return p; } -int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { +int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corMgmtEpSet) { // init mgmt ip set - tscMgmtEpSet.version = 0; - SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet); + corMgmtEpSet->version = 0; + SRpcEpSet *mgmtEpSet = &(corMgmtEpSet->epSet); mgmtEpSet->numOfEps = 0; mgmtEpSet->inUse = 0;