From c803ebd8e7f5c2216829df0268042e87a816f02a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 7 Sep 2021 20:16:42 +0800 Subject: [PATCH] [TD-6127] one taos driver connect two cluster --- src/client/inc/tscUtil.h | 13 +++- src/client/inc/tsclient.h | 14 +++++ src/client/src/tscLocal.c | 4 +- src/client/src/tscParseLineProtocol.c | 2 +- src/client/src/tscSQLParser.c | 10 +-- src/client/src/tscServer.c | 67 +++++++++++---------- src/client/src/tscSql.c | 1 + src/client/src/tscSubquery.c | 2 +- src/client/src/tscSystem.c | 87 +++++++++++++++++++++------ src/client/src/tscUtil.c | 21 ++++--- 10 files changed, 152 insertions(+), 69 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index c59ec3e624..359530a00a 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -41,6 +41,15 @@ extern "C" { #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) +#define UTIL_GET_VGROUPMAP(pSql) \ + (pSql->pTscObj->pClusterInfo->vgroupMap) + +#define UTIL_GET_TABLEMETA(pSql) \ + (pSql->pTscObj->pClusterInfo->tableMetaMap) + +#define UTIL_GET_VGROUPLIST(pSql) \ + (pSql->pTscObj->pClusterInfo->vgroupListBuf) + #pragma pack(push,1) // this struct is transfered as binary, padding two bytes to avoid // an 'uid' whose low bytes is 0xff being recoginized as NULL, @@ -106,6 +115,7 @@ typedef struct SBlockKeyInfo { SBlockKeyTuple* pKeyTuple; } SBlockKeyInfo; + int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); @@ -346,7 +356,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild); uint32_t tscGetTableMetaSize(STableMeta* pTableMeta); CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta); uint32_t tscGetTableMetaMaxSize(); -int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable); +int32_t tscCreateTableMetaFromSTableMeta(SSqlObj *pSql, STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable); STableMeta* tscTableMetaDup(STableMeta* pTableMeta); SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo); @@ -367,6 +377,7 @@ void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id); char* cloneCurrentDBName(SSqlObj* pSql); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b8eb0a5286..87c44c0823 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -48,6 +48,7 @@ struct SSqlInfo; typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); + typedef struct SNewVgroupInfo { int32_t vgId; int8_t inUse; @@ -139,6 +140,13 @@ typedef enum { ROW_COMPARE_NEED = 1, } ERowCompareStat; +typedef struct { + void *vgroupMap; + void *tableMetaMap; + void *vgroupListBuf; + int64_t ref; +} SClusterInfo; + int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec); int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, @@ -334,6 +342,7 @@ typedef struct STscObj { char acctId[TSDB_ACCT_ID_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char sversion[TSDB_VERSION_LEN]; + char clusterId[TSDB_CLUSTER_ID_LEN]; char writeAuth : 1; char superAuth : 1; uint32_t connId; @@ -342,9 +351,11 @@ typedef struct STscObj { struct SSqlObj * sqlList; struct SSqlStream *streamList; SRpcObj *pRpcObj; + SClusterInfo *pClusterInfo; SRpcCorEpSet *tscCorMgmtEpSet; pthread_mutex_t mutex; int32_t numOfObj; // number of sqlObj from this tscObj + SReqOrigin from; } STscObj; @@ -428,6 +439,9 @@ int tscAcquireRpc(const char *key, const char *user, const char *secret,void ** void tscReleaseRpc(void *param); void tscInitMsgsFp(); +void *tscAcquireClusterInfo(const char *clusterId); +void tscReleaseClusterInfo(const char *clusterId); + int tsParseSql(SSqlObj *pSql, bool initial); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 07db18b498..df9857e97e 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -924,8 +924,8 @@ int tscProcessLocalCmd(SSqlObj *pSql) { } else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) { pRes->code = tscProcessShowCreateDatabase(pSql); } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { - taosHashClear(tscTableMetaMap); - taosCacheEmpty(tscVgroupListBuf); + taosHashClear(UTIL_GET_TABLEMETA(pSql)); + taosCacheEmpty(UTIL_GET_VGROUPLIST(pSql)); pRes->code = TSDB_CODE_SUCCESS; } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { pRes->code = tscProcessServerVer(pSql); diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 778c0cfb47..3f35c90134 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -608,7 +608,7 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl tscFreeRegisteredSqlObj(pSql); size_t size = 0; - taosHashGetCloneExt(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); + taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); } if (tableMeta != NULL) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 87b6b07652..cbd96be74d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -8511,7 +8511,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { size_t len = strlen(name); - if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&pTableMeta, &tableMetaCapacity)) { + if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), name, len, NULL, (void **)&pTableMeta, &tableMetaCapacity)) { // not found tfree(pTableMeta); } @@ -8522,7 +8522,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { // avoid mem leak, may should update pTableMeta void* pVgroupIdList = NULL; if (pTableMeta->tableType == TSDB_CHILD_TABLE) { - code = tscCreateTableMetaFromSTableMeta((STableMeta **)(&pTableMeta), name, &tableMetaCapacity, (STableMeta **)(&pSTMeta)); + code = tscCreateTableMetaFromSTableMeta(pSql, (STableMeta **)(&pTableMeta), name, &tableMetaCapacity, (STableMeta **)(&pSTMeta)); pSql->pBuf = (void *)pSTMeta; // create the child table meta from super table failed, try load it from mnode @@ -8534,7 +8534,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { } else if (pTableMeta->tableType == TSDB_SUPER_TABLE) { // the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time tscDebug("0x%"PRIx64" try to acquire cached super table %s vgroup id list", pSql->self, name); - void* pv = taosCacheAcquireByKey(tscVgroupListBuf, name, len); + void* pv = taosCacheAcquireByKey(UTIL_GET_VGROUPLIST(pSql), name, len); if (pv == NULL) { char* t = strdup(name); taosArrayPush(pVgroupList, &t); @@ -8547,7 +8547,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { } taosArrayAddBatch(pVgroupIdList, pdata->data, (int32_t) pdata->num); - taosCacheRelease(tscVgroupListBuf, &pv, false); + taosCacheRelease(UTIL_GET_VGROUPLIST(pSql), &pv, false); } } @@ -8697,7 +8697,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod // check if current buffer contains the vgroup info. If not, add it SNewVgroupInfo existVgroupInfo = {.inUse = -1,}; - taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), id, sizeof(*id), NULL, &existVgroupInfo); assert(existVgroupInfo.inUse >= 0); SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9d523f2730..4a284d6c5c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -157,7 +157,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { assert(vgId > 0); SNewVgroupInfo vgroupInfo = {.vgId = -1}; - taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), NULL, &vgroupInfo); assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0); tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); @@ -169,7 +169,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { } tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); - taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashPut(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo)); // Update the local cached epSet info cached by SqlObj int32_t inUse = pSql->epSet.inUse; @@ -653,7 +653,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; SNewVgroupInfo vgroupInfo = {0}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps); @@ -741,7 +741,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab vgId = pTableMeta->vgId; SNewVgroupInfo vgroupInfo = {0}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); } @@ -1648,7 +1648,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; SNewVgroupInfo vgroupInfo = {.vgId = -1}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); assert(vgroupInfo.vgId > 0); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); @@ -2034,21 +2034,21 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) { } // update the vgroupInfo if needed -static void doUpdateVgroupInfo(int32_t vgId, SVgroupMsg *pVgroupMsg) { +static void doUpdateVgroupInfo(SSqlObj *pSql, int32_t vgId, SVgroupMsg *pVgroupMsg) { assert(vgId > 0); SNewVgroupInfo vgroupInfo = {.inUse = -1}; - taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), NULL, &vgroupInfo); // vgroup info exists, compare with it if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) { vgroupInfo = createNewVgroupInfo(pVgroupMsg); - taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); - tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); + taosHashPut(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); + tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(UTIL_GET_VGROUPMAP(pSql))); } } -static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) { +static void doAddTableMetaToLocalBuf(SSqlObj *pSql, STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) { if (pTableMeta->tableType == TSDB_CHILD_TABLE) { // add or update the corresponding super table meta data info int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN); @@ -2057,18 +2057,18 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet if (updateSTable) { STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg); uint32_t size = tscGetTableMetaSize(pSupTableMeta); - int32_t code = taosHashPut(tscTableMetaMap, pTableMeta->sTableName, len, pSupTableMeta, size); + int32_t code = taosHashPut(UTIL_GET_TABLEMETA(pSql), pTableMeta->sTableName, len, pSupTableMeta, size); assert(code == TSDB_CODE_SUCCESS); tfree(pSupTableMeta); } CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta); - taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); + taosHashPut(UTIL_GET_TABLEMETA(pSql), pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); tfree(cMeta); } else { uint32_t s = tscGetTableMetaSize(pTableMeta); - taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); + taosHashPut(UTIL_GET_TABLEMETA(pSql), pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); } } @@ -2096,9 +2096,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { tNameExtractFullName(&pTableMetaInfo->name, name); assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0); - doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, true); + doAddTableMetaToLocalBuf(pSql, pTableMeta, pMetaMsg, true); if (pTableMeta->tableType != TSDB_SUPER_TABLE) { - doUpdateVgroupInfo(pTableMeta->vgId, &pMetaMsg->vgroup); + doUpdateVgroupInfo(pSql, pTableMeta->vgId, &pMetaMsg->vgroup); } tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self, @@ -2109,7 +2109,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) { +static SArray* createVgroupIdListFromMsg(SSqlObj *pSql, char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) { SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); @@ -2132,7 +2132,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, if (taosHashGet(pSet, &vmsg->vgId, sizeof(vmsg->vgId)) == NULL) { taosHashPut(pSet, &vmsg->vgId, sizeof(vmsg->vgId), "", 0); - doUpdateVgroupInfo(vmsg->vgId, vmsg); + doUpdateVgroupInfo(pSql, vmsg->vgId, vmsg); } } } @@ -2140,7 +2140,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, return vgroupIdList; } -static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) { +static SVgroupsInfo* createVgroupInfoFromMsg(SSqlObj *pSql, char* pMsg, int32_t* size, uint64_t id) { SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); @@ -2171,7 +2171,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); } - doUpdateVgroupInfo(pVgroup->vgId, vmsg); + doUpdateVgroupInfo(pSql, pVgroup->vgId, vmsg); } } @@ -2306,12 +2306,12 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { } // create the tableMeta and add it into the TableMeta map - doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, updateStableMeta); + doAddTableMetaToLocalBuf(pParentSql, pTableMeta, pMetaMsg, updateStableMeta); // for each vgroup, only update the information once. int64_t vgId = pMetaMsg->vgroup.vgId; if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) { - doUpdateVgroupInfo((int32_t) vgId, &pMetaMsg->vgroup); + doUpdateVgroupInfo(pParentSql, (int32_t) vgId, &pMetaMsg->vgroup); taosHashPut(pSet, &vgId, sizeof(vgId), "", 0); } @@ -2336,7 +2336,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { taosArrayDestroy(p->vgroupIdList); } - p->vgroupIdList = createVgroupIdListFromMsg(pMsg, pSet, fname, &size, pSql->self); + p->vgroupIdList = createVgroupIdListFromMsg(pParentSql, pMsg, pSet, fname, &size, pSql->self); int32_t numOfVgId = (int32_t) taosArrayGetSize(p->vgroupIdList); int32_t s = sizeof(tFilePage) + numOfVgId * sizeof(int32_t); @@ -2345,8 +2345,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { idList->num = numOfVgId; memcpy(idList->data, TARRAY_GET_START(p->vgroupIdList), numOfVgId * sizeof(int32_t)); - void* idListInst = taosCachePut(tscVgroupListBuf, fname, len, idList, s, 5000); - taosCacheRelease(tscVgroupListBuf, (void*) &idListInst, false); + void* idListInst = taosCachePut(UTIL_GET_VGROUPLIST(pParentSql), fname, len, idList, s, 5000); + taosCacheRelease(UTIL_GET_VGROUPLIST(pParentSql), (void*) &idListInst, false); tfree(idList); pMsg += size; @@ -2436,7 +2436,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { continue; } int32_t size = 0; - pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self); + pInfo->vgroupList = createVgroupInfoFromMsg(parent, pMsg, &size, pSql->self); pMsg += size; } @@ -2567,7 +2567,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->writeAuth = pConnect->writeAuth; pObj->superAuth = pConnect->superAuth; pObj->connId = htonl(pConnect->connId); - + tstrncpy(pObj->clusterId, pConnect->clusterId, sizeof(pObj->clusterId)); + createHbObj(pObj); //launch a timer to send heartbeat to maintain the connection and send status to mnode @@ -2592,9 +2593,9 @@ int tscProcessDropDbRsp(SSqlObj *pSql) { //TODO LOCK DB WHEN MODIFY IT //pSql->pTscObj->db[0] = 0; - taosHashClear(tscTableMetaMap); - taosHashClear(tscVgroupMap); - taosCacheEmpty(tscVgroupListBuf); + taosHashClear(UTIL_GET_TABLEMETA(pSql)); + taosHashClear(UTIL_GET_VGROUPMAP(pSql)); + taosCacheEmpty(UTIL_GET_VGROUPLIST(pSql)); return 0; } @@ -2614,11 +2615,11 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { tscDebug("0x%"PRIx64" remove tableMeta in hashMap after alter-table: %s", pSql->self, name); bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); tfree(pTableMetaInfo->pTableMeta); if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta - taosHashClear(tscTableMetaMap); + taosHashClear(UTIL_GET_TABLEMETA(pSql)); } return 0; @@ -2925,7 +2926,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity); } - if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) { + if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) { tfree(pTableMetaInfo->pTableMeta); } @@ -2935,7 +2936,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool if (pMeta && pMeta->id.uid > 0) { // in case of child table, here only get the if (pMeta->tableType == TSDB_CHILD_TABLE) { - int32_t code = tscCreateTableMetaFromSTableMeta(&pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity, (STableMeta **)(&pSTMeta)); + int32_t code = tscCreateTableMetaFromSTableMeta(pSql, &pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity, (STableMeta **)(&pSTMeta)); pSql->pBuf = (void *)(pSTMeta); if (code != TSDB_CODE_SUCCESS) { return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5fdaad0d66..e3e50e58f8 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -194,6 +194,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, tscBuildAndSendRequest(pSql, NULL); tsem_wait(&pSql->rspSem); + pSql->pTscObj->pClusterInfo = (SClusterInfo *)tscAcquireClusterInfo(pSql->pTscObj->clusterId); if (pSql->res.code != TSDB_CODE_SUCCESS) { terrno = pSql->res.code; if (terrno ==TSDB_CODE_RPC_FQDN_ERROR) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index e3bec2c2ea..1875c5faa6 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3229,7 +3229,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name); - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(UTIL_GET_TABLEMETA(pParentObj), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } pParentObj->res.code = TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 8af340030c..a1cc09ce1c 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -33,9 +33,11 @@ int32_t sentinel = TSC_VAR_NOT_RELEASE; -SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode -SHashObj *tscTableMetaMap; // table meta info buffer -SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list +//SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode +//SHashObj *tscTableMetaMap; // table meta info buffer +//SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list +SHashObj *tscClusterMap = NULL; // cluster obj +static pthread_mutex_t clusterMutex; // mutex to protect open the cluster obj int32_t tscObjRef = -1; void *tscTmr; @@ -120,6 +122,58 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry return 0; } +void tscClusterInfoDestroy(SClusterInfo *pObj) { + if (pObj == NULL) { return; } + taosHashCleanup(pObj->vgroupMap); + taosHashCleanup(pObj->tableMetaMap); + taosCacheCleanup(pObj->vgroupListBuf); + tfree(pObj); +} + +void *tscAcquireClusterInfo(const char *clusterId) { + pthread_mutex_lock(&clusterMutex); + + int32_t len = strlen(clusterId); + SClusterInfo *pObj = NULL; + SClusterInfo **ppObj = taosHashGet(tscClusterMap, clusterId, len); + if (ppObj == NULL || *ppObj == NULL) { + pObj = calloc(1, sizeof(SClusterInfo)); + if (pObj) { + pObj->vgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + pObj->tableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); // + pObj->vgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list"); + if (pObj->vgroupMap == NULL || pObj->tableMetaMap == NULL || pObj->vgroupListBuf == NULL) { + tscClusterInfoDestroy(pObj); + pObj = NULL; + } else { + taosHashPut(tscClusterMap, clusterId, len, &pObj, POINTER_BYTES); + } + } + } else { + pObj = *ppObj; + } + + if (pObj) { + pObj->ref += 1; + } + pthread_mutex_unlock(&clusterMutex); + return pObj; +} +void tscReleaseClusterInfo(const char *clusterId) { + pthread_mutex_lock(&clusterMutex); + + size_t len = strlen(clusterId); + SClusterInfo *pObj = NULL; + SClusterInfo **ppObj = taosHashGet(tscClusterMap, clusterId, len); + if (ppObj != NULL && *ppObj != NULL) { + pObj = *ppObj; + } + if (pObj && --pObj->ref == 0) { + taosHashRemove(tscClusterMap, clusterId, len); + tscClusterInfoDestroy(pObj); + } + pthread_mutex_unlock(&clusterMutex); +} void taos_init_imp(void) { char temp[128] = {0}; @@ -187,12 +241,16 @@ void taos_init_imp(void) { taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } - if (tscTableMetaMap == NULL) { + if (tscClusterMap == NULL) { tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); - tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list"); - tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap); + + tscClusterMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pthread_mutex_init(&clusterMutex, NULL); + //tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + //tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + //tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list"); + //tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap); + } int refreshTime = 5; @@ -221,12 +279,6 @@ void taos_cleanup(void) { scriptEnvPoolCleanup(); } - taosHashCleanup(tscTableMetaMap); - tscTableMetaMap = NULL; - - taosHashCleanup(tscVgroupMap); - tscVgroupMap = NULL; - int32_t id = tscObjRef; tscObjRef = -1; taosCloseRef(id); @@ -249,14 +301,15 @@ void taos_cleanup(void) { pthread_mutex_destroy(&rpcObjMutex); } - taosCacheCleanup(tscVgroupListBuf); - tscVgroupListBuf = NULL; - if (tscEmbedded == 0) { rpcCleanup(); taosCloseLog(); }; + taosHashCleanup(tscClusterMap); + tscClusterMap = NULL; + pthread_mutex_destroy(&clusterMutex); + p = tscTmr; tscTmr = NULL; taosTmrCleanUp(p); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 31631560af..da8e53cab3 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1577,7 +1577,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pDataBlock->tableName, name); - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + //taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } if (!pDataBlock->cloned) { @@ -2072,6 +2072,7 @@ void tscCloseTscObj(void *param) { tfree(pObj->tscCorMgmtEpSet); tscReleaseRpc(pObj->pRpcObj); pthread_mutex_destroy(&pObj->mutex); + tscReleaseClusterInfo(pObj->clusterId); tfree(pObj); } @@ -4546,7 +4547,7 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) { return cMeta; } -int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta**ppSTable) { +int32_t tscCreateTableMetaFromSTableMeta(SSqlObj *pSql, STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta**ppSTable) { assert(*ppChild != NULL); STableMeta* p = *ppSTable; STableMeta* pChild = *ppChild; @@ -4556,7 +4557,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, memset((char *)p, 0, sz); } - if (NULL == taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) { + if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) { tfree(p); } else { *ppSTable = p; @@ -4582,7 +4583,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, *ppChild = pChild; return TSDB_CODE_SUCCESS; } else { // super table has been removed, current tableMeta is also expired. remove it here - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); return -1; } } @@ -5100,18 +5101,20 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id) { char fname[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, fname); + SSqlObj *p = (SSqlObj *)taosAcquireRef(tscObjRef, id); + tNameExtractFullName(&pTableMetaInfo->name, fname); int32_t len = (int32_t) strnlen(fname, TSDB_TABLE_FNAME_LEN); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - void* pv = taosCacheAcquireByKey(tscVgroupListBuf, fname, len); + void* pv = taosCacheAcquireByKey(UTIL_GET_VGROUPLIST(p), fname, len); if (pv != NULL) { - taosCacheRelease(tscVgroupListBuf, &pv, true); + taosCacheRelease(UTIL_GET_VGROUPLIST(p), &pv, true); } } - taosHashRemove(tscTableMetaMap, fname, len); - tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap)); + taosHashRemove(UTIL_GET_TABLEMETA(p), fname, len); + tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(UTIL_GET_TABLEMETA(p))); + taosReleaseRef(tscObjRef, id); } char* cloneCurrentDBName(SSqlObj* pSql) { -- GitLab