diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index c858bd5867c64da4c7397aed2035119ff414d112..d183fb4a721a57011ba7c09c304d4cf8ad7180dd 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -41,6 +41,15 @@ extern "C" { #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) +#define UTIL_GET_VGROUPMAP(pSql) \ + (pSql->pTscObj->pClusterInfo->vgroupMap) + +#define UTIL_GET_TABLEMETA(pSql) \ + (pSql->pTscObj->pClusterInfo->tableMetaMap) + +#define UTIL_GET_VGROUPLIST(pSql) \ + (pSql->pTscObj->pClusterInfo->vgroupListBuf) + #pragma pack(push,1) // this struct is transfered as binary, padding two bytes to avoid // an 'uid' whose low bytes is 0xff being recoginized as NULL, @@ -106,10 +115,11 @@ typedef struct SBlockKeyInfo { SBlockKeyTuple* pKeyTuple; } SBlockKeyInfo; + int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); -void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); +void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta); void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf); int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo); int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows); @@ -120,12 +130,12 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg); SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, uint32_t offset); -void* tscDestroyBlockArrayList(SArray* pDataBlockList); +void* tscDestroyBlockArrayList(SSqlObj* pSql, SArray* pDataBlockList); void* tscDestroyUdfArrayList(SArray* pUdfList); -void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta); +void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); -int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap); +int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap); int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); @@ -353,7 +363,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild); uint32_t tscGetTableMetaSize(STableMeta* pTableMeta); CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta); uint32_t tscGetTableMetaMaxSize(); -int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable); +int32_t tscCreateTableMetaFromSTableMeta(SSqlObj *pSql, STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable); STableMeta* tscTableMetaDup(STableMeta* pTableMeta); SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo); @@ -374,6 +384,7 @@ void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id); char* cloneCurrentDBName(SSqlObj* pSql); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index dd4ff7eb57f20cfc8d31328630fbb14b7acf7017..eebb471e305132fa5b3b403d149d449d36072ab2 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -48,6 +48,7 @@ struct SSqlInfo; typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); + typedef struct SNewVgroupInfo { int32_t vgId; int8_t inUse; @@ -139,6 +140,13 @@ typedef enum { ROW_COMPARE_NEED = 1, } ERowCompareStat; +typedef struct { + void *vgroupMap; + void *tableMetaMap; + void *vgroupListBuf; + int64_t ref; +} SClusterInfo; + int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec); int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, @@ -324,6 +332,7 @@ typedef struct STscObj { char acctId[TSDB_ACCT_ID_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char sversion[TSDB_VERSION_LEN]; + char clusterId[TSDB_CLUSTER_ID_LEN]; char writeAuth : 1; char superAuth : 1; uint32_t connId; @@ -332,9 +341,11 @@ typedef struct STscObj { struct SSqlObj * sqlList; struct SSqlStream *streamList; SRpcObj *pRpcObj; + SClusterInfo *pClusterInfo; SRpcCorEpSet *tscCorMgmtEpSet; pthread_mutex_t mutex; int32_t numOfObj; // number of sqlObj from this tscObj + SReqOrigin from; } STscObj; @@ -417,6 +428,9 @@ int tscAcquireRpc(const char *key, const char *user, const char *secret,void ** void tscReleaseRpc(void *param); void tscInitMsgsFp(); +void *tscAcquireClusterInfo(const char *clusterId); +void tscReleaseClusterInfo(const char *clusterId); + int tsParseSql(SSqlObj *pSql, bool initial); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 07db18b498873f4a023d8ea76aadd7e76a4cd8d2..df9857e97e7b8cc2f3ed8543939bc30d1fe5608b 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -924,8 +924,8 @@ int tscProcessLocalCmd(SSqlObj *pSql) { } else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) { pRes->code = tscProcessShowCreateDatabase(pSql); } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { - taosHashClear(tscTableMetaMap); - taosCacheEmpty(tscVgroupListBuf); + taosHashClear(UTIL_GET_TABLEMETA(pSql)); + taosCacheEmpty(UTIL_GET_VGROUPLIST(pSql)); pRes->code = TSDB_CODE_SUCCESS; } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { pRes->code = tscProcessServerVer(pSql); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 1bf27e6cad1d57fdfd4b786d1cdcea981bf3333b..4e2b89d64f95a284d1e50dd1847af61ab7b6b653 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1529,7 +1529,7 @@ int tsParseInsertSql(SSqlObj *pSql) { // merge according to vgId if (!TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pInsertParam->pTableBlockHashList) > 0) { - if ((code = tscMergeTableDataBlocks(pInsertParam, true)) != TSDB_CODE_SUCCESS) { + if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) { goto _clean; } } @@ -1635,7 +1635,7 @@ static int doPackSendDataBlock(SSqlObj* pSql, SInsertStatementParam *pInsertPara return tscInvalidOperationMsg(pInsertParam->msg, "too many rows in sql, total number of rows should be less than 32767", NULL); } - if ((code = tscMergeTableDataBlocks(pInsertParam, true)) != TSDB_CODE_SUCCESS) { + if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) { return code; } @@ -1696,7 +1696,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow SInsertStatementParam *pInsertParam = &pCmd->insertParam; destroyTableNameList(pInsertParam); - pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks); + pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pParentSql, pInsertParam->pDataBlocks); if (pInsertParam->pTableBlockHashList == NULL) { pInsertParam->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 22392ba306faeed05af5d695ca0090057ac211cf..382198e8b3e8422651b0072818a73bbd7b5c6feb 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -574,10 +574,10 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl char fullTableName[TSDB_TABLE_FNAME_LEN] = {0}; memset(fullTableName, 0, tListLen(fullTableName)); tNameExtractFullName(&sname, fullTableName); - tscFreeRegisteredSqlObj(pSql); size_t size = 0; - taosHashGetCloneExt(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); + taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); + tscFreeRegisteredSqlObj(pSql); } if (tableMeta != NULL) { diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index d0ac0ccf4ee4bfa381a78090409a761717ceb4b0..f6a64cb5b1c04b461e38acce2ea1cae65dec71e6 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1163,7 +1163,7 @@ static int insertStmtExecute(STscStmt* stmt) { fillTablesColumnsNull(stmt->pSql); - int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false); + int code = tscMergeTableDataBlocks(stmt->pSql, &stmt->pSql->cmd.insertParam, false); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1194,7 +1194,7 @@ static int insertStmtExecute(STscStmt* stmt) { pCmd->insertParam.numOfTables = 0; tfree(pCmd->insertParam.pTableNameList); - pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); return pSql->res.code; } @@ -1215,7 +1215,7 @@ static void insertBatchClean(STscStmt* pStmt) { tfree(pCmd->insertParam.pTableNameList); - pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); pCmd->insertParam.numOfTables = 0; taosHashClear(pCmd->insertParam.pTableBlockHashList); @@ -1242,7 +1242,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { fillTablesColumnsNull(pStmt->pSql); - if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) { + if ((code = tscMergeTableDataBlocks(pStmt->pSql, &pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) { return code; } @@ -1773,8 +1773,8 @@ int taos_stmt_close(TAOS_STMT* stmt) { if (pStmt->pSql && pStmt->pSql->res.code != 0) { rmMeta = true; } - tscDestroyDataBlock(pStmt->mtb.lastBlock, rmMeta); - pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta); + tscDestroyDataBlock(pStmt->pSql, pStmt->mtb.lastBlock, rmMeta); + pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->pSql, pStmt->mtb.pTableBlockHashList, rmMeta); if (pStmt->pSql){ taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index a62a8ac3efca0836faab778224aa4a831e84e580..4d5bfe350aa6435ef04a9646491fe2c93a996e92 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -8280,7 +8280,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { size_t len = strlen(name); - if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&pTableMeta, &tableMetaCapacity)) { + if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), name, len, NULL, (void **)&pTableMeta, &tableMetaCapacity)) { // not found tfree(pTableMeta); } @@ -8291,7 +8291,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { // avoid mem leak, may should update pTableMeta void* pVgroupIdList = NULL; if (pTableMeta->tableType == TSDB_CHILD_TABLE) { - code = tscCreateTableMetaFromSTableMeta((STableMeta **)(&pTableMeta), name, &tableMetaCapacity, (STableMeta **)(&pSTMeta)); + code = tscCreateTableMetaFromSTableMeta(pSql, (STableMeta **)(&pTableMeta), name, &tableMetaCapacity, (STableMeta **)(&pSTMeta)); pSql->pBuf = (void *)pSTMeta; // create the child table meta from super table failed, try load it from mnode @@ -8303,7 +8303,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { } else if (pTableMeta->tableType == TSDB_SUPER_TABLE) { // the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time tscDebug("0x%"PRIx64" try to acquire cached super table %s vgroup id list", pSql->self, name); - void* pv = taosCacheAcquireByKey(tscVgroupListBuf, name, len); + void* pv = taosCacheAcquireByKey(UTIL_GET_VGROUPLIST(pSql), name, len); if (pv == NULL) { char* t = strdup(name); taosArrayPush(pVgroupList, &t); @@ -8316,7 +8316,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { } taosArrayAddBatch(pVgroupIdList, pdata->data, (int32_t) pdata->num); - taosCacheRelease(tscVgroupListBuf, &pv, false); + taosCacheRelease(UTIL_GET_VGROUPLIST(pSql), &pv, false); } } @@ -8466,7 +8466,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod // check if current buffer contains the vgroup info. If not, add it SNewVgroupInfo existVgroupInfo = {.inUse = -1,}; - taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), id, sizeof(*id), NULL, &existVgroupInfo); assert(existVgroupInfo.inUse >= 0); SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index dcfbc857d5d6792b3796a098ea61046439fc5d0f..b7661d51f910dd320af95acfaae857bb51056b20 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -158,7 +158,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { assert(vgId > 0); SNewVgroupInfo vgroupInfo = {.vgId = -1}; - taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), NULL, &vgroupInfo); assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0); tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); @@ -170,7 +170,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { } tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); - taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashPut(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo)); // Update the local cached epSet info cached by SqlObj int32_t inUse = pSql->epSet.inUse; @@ -654,7 +654,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; SNewVgroupInfo vgroupInfo = {0}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps); @@ -737,7 +737,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab vgId = pTableMeta->vgId; SNewVgroupInfo vgroupInfo = {0}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); } @@ -1650,7 +1650,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; SNewVgroupInfo vgroupInfo = {.vgId = -1}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); assert(vgroupInfo.vgId > 0); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); @@ -2036,21 +2036,21 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) { } // update the vgroupInfo if needed -static void doUpdateVgroupInfo(int32_t vgId, SVgroupMsg *pVgroupMsg) { +static void doUpdateVgroupInfo(SSqlObj *pSql, int32_t vgId, SVgroupMsg *pVgroupMsg) { assert(vgId > 0); SNewVgroupInfo vgroupInfo = {.inUse = -1}; - taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), NULL, &vgroupInfo); // vgroup info exists, compare with it if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) { vgroupInfo = createNewVgroupInfo(pVgroupMsg); - taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); - tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); + taosHashPut(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); + tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(UTIL_GET_VGROUPMAP(pSql))); } } -static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) { +static void doAddTableMetaToLocalBuf(SSqlObj *pSql, STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) { if (pTableMeta->tableType == TSDB_CHILD_TABLE) { // add or update the corresponding super table meta data info int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN); @@ -2059,18 +2059,18 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet if (updateSTable) { STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg); uint32_t size = tscGetTableMetaSize(pSupTableMeta); - int32_t code = taosHashPut(tscTableMetaMap, pTableMeta->sTableName, len, pSupTableMeta, size); + int32_t code = taosHashPut(UTIL_GET_TABLEMETA(pSql), pTableMeta->sTableName, len, pSupTableMeta, size); assert(code == TSDB_CODE_SUCCESS); tfree(pSupTableMeta); } CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta); - taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); + taosHashPut(UTIL_GET_TABLEMETA(pSql), pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); tfree(cMeta); } else { uint32_t s = tscGetTableMetaSize(pTableMeta); - taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); + taosHashPut(UTIL_GET_TABLEMETA(pSql), pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); } } @@ -2098,9 +2098,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { tNameExtractFullName(&pTableMetaInfo->name, name); assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0); - doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, true); + doAddTableMetaToLocalBuf(pSql, pTableMeta, pMetaMsg, true); if (pTableMeta->tableType != TSDB_SUPER_TABLE) { - doUpdateVgroupInfo(pTableMeta->vgId, &pMetaMsg->vgroup); + doUpdateVgroupInfo(pSql, pTableMeta->vgId, &pMetaMsg->vgroup); } tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self, @@ -2111,7 +2111,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) { +static SArray* createVgroupIdListFromMsg(SSqlObj *pSql, char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) { SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); @@ -2134,7 +2134,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, if (taosHashGet(pSet, &vmsg->vgId, sizeof(vmsg->vgId)) == NULL) { taosHashPut(pSet, &vmsg->vgId, sizeof(vmsg->vgId), "", 0); - doUpdateVgroupInfo(vmsg->vgId, vmsg); + doUpdateVgroupInfo(pSql, vmsg->vgId, vmsg); } } } @@ -2142,7 +2142,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, return vgroupIdList; } -static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) { +static SVgroupsInfo* createVgroupInfoFromMsg(SSqlObj *pSql, char* pMsg, int32_t* size, uint64_t id) { SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); @@ -2174,7 +2174,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t // pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); } - doUpdateVgroupInfo(pVgroup->vgId, vmsg); + doUpdateVgroupInfo(pSql, pVgroup->vgId, vmsg); } } @@ -2309,12 +2309,12 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { } // create the tableMeta and add it into the TableMeta map - doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, updateStableMeta); + doAddTableMetaToLocalBuf(pParentSql, pTableMeta, pMetaMsg, updateStableMeta); // for each vgroup, only update the information once. int64_t vgId = pMetaMsg->vgroup.vgId; if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) { - doUpdateVgroupInfo((int32_t) vgId, &pMetaMsg->vgroup); + doUpdateVgroupInfo(pParentSql, (int32_t) vgId, &pMetaMsg->vgroup); taosHashPut(pSet, &vgId, sizeof(vgId), "", 0); } @@ -2339,7 +2339,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { taosArrayDestroy(p->vgroupIdList); } - p->vgroupIdList = createVgroupIdListFromMsg(pMsg, pSet, fname, &size, pSql->self); + p->vgroupIdList = createVgroupIdListFromMsg(pParentSql, pMsg, pSet, fname, &size, pSql->self); int32_t numOfVgId = (int32_t) taosArrayGetSize(p->vgroupIdList); int32_t s = sizeof(tFilePage) + numOfVgId * sizeof(int32_t); @@ -2348,8 +2348,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { idList->num = numOfVgId; memcpy(idList->data, TARRAY_GET_START(p->vgroupIdList), numOfVgId * sizeof(int32_t)); - void* idListInst = taosCachePut(tscVgroupListBuf, fname, len, idList, s, 5000); - taosCacheRelease(tscVgroupListBuf, (void*) &idListInst, false); + void* idListInst = taosCachePut(UTIL_GET_VGROUPLIST(pParentSql), fname, len, idList, s, 5000); + taosCacheRelease(UTIL_GET_VGROUPLIST(pParentSql), (void*) &idListInst, false); tfree(idList); pMsg += size; @@ -2439,7 +2439,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { continue; } int32_t size = 0; - pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self); + pInfo->vgroupList = createVgroupInfoFromMsg(parent, pMsg, &size, pSql->self); pMsg += size; } @@ -2570,7 +2570,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->writeAuth = pConnect->writeAuth; pObj->superAuth = pConnect->superAuth; pObj->connId = htonl(pConnect->connId); - + tstrncpy(pObj->clusterId, pConnect->clusterId, sizeof(pObj->clusterId)); + createHbObj(pObj); //launch a timer to send heartbeat to maintain the connection and send status to mnode @@ -2595,9 +2596,9 @@ int tscProcessDropDbRsp(SSqlObj *pSql) { //TODO LOCK DB WHEN MODIFY IT //pSql->pTscObj->db[0] = 0; - taosHashClear(tscTableMetaMap); - taosHashClear(tscVgroupMap); - taosCacheEmpty(tscVgroupListBuf); + taosHashClear(UTIL_GET_TABLEMETA(pSql)); + taosHashClear(UTIL_GET_VGROUPMAP(pSql)); + taosCacheEmpty(UTIL_GET_VGROUPLIST(pSql)); return 0; } @@ -2617,7 +2618,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { tscDebug("0x%"PRIx64" remove tableMeta in hashMap after alter-table: %s", pSql->self, name); bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); tfree(pTableMetaInfo->pTableMeta); if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta @@ -2932,7 +2933,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity); } - if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) { + if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) { tfree(pTableMetaInfo->pTableMeta); } @@ -2942,7 +2943,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool if (pMeta && pMeta->id.uid > 0) { // in case of child table, here only get the if (pMeta->tableType == TSDB_CHILD_TABLE) { - int32_t code = tscCreateTableMetaFromSTableMeta(&pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity, (STableMeta **)(&pSTMeta)); + int32_t code = tscCreateTableMetaFromSTableMeta(pSql, &pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity, (STableMeta **)(&pSTMeta)); pSql->pBuf = (void *)(pSTMeta); if (code != TSDB_CODE_SUCCESS) { return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5fdaad0d667c19548f699a9a8cfed7c9f017ad1b..2c8ef9b764a0aa2bbf8dd9304776175a8a599a27 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -194,6 +194,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, tscBuildAndSendRequest(pSql, NULL); tsem_wait(&pSql->rspSem); + pSql->pTscObj->pClusterInfo = (SClusterInfo *)tscAcquireClusterInfo(pSql->pTscObj->clusterId); if (pSql->res.code != TSDB_CODE_SUCCESS) { terrno = pSql->res.code; if (terrno ==TSDB_CODE_RPC_FQDN_ERROR) { @@ -256,6 +257,7 @@ static void asyncConnCallback(void *param, TAOS_RES *tres, int code) { SSqlObj *pSql = (SSqlObj *) tres; assert(pSql != NULL); + pSql->pTscObj->pClusterInfo = (SClusterInfo *)tscAcquireClusterInfo(pSql->pTscObj->clusterId); pSql->fetchFp(pSql->param, tres, code); } @@ -268,7 +270,6 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, } if (taos) *taos = pObj; - pSql->fetchFp = fp; pSql->res.code = tscBuildAndSendRequest(pSql, NULL); tscDebug("%p DB async connection is opening", taos); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 99a2a79dc60c89530eb9c2c7f6b5645ca0133ba1..58fb6c979b6d46cd71727b87c2e887d20abcdc15 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3213,7 +3213,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name); - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(UTIL_GET_TABLEMETA(pParentObj), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } pParentObj->res.code = TSDB_CODE_SUCCESS; @@ -3358,7 +3358,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { goto _error; } - pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); // use the local variable for (int32_t j = 0; j < numOfSub; ++j) { diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index b3b83db80a70c19f79d1cd6a732d729817436dd3..edb8169f761e2b5aaba1ddfd7cda8a9008298948 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -33,9 +33,11 @@ int32_t sentinel = TSC_VAR_NOT_RELEASE; -SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode -SHashObj *tscTableMetaMap; // table meta info buffer -SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list +//SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode +//SHashObj *tscTableMetaMap; // table meta info buffer +//SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list +SHashObj *tscClusterMap = NULL; // cluster obj +static pthread_mutex_t clusterMutex; // mutex to protect open the cluster obj int32_t tscObjRef = -1; void *tscTmr; @@ -121,6 +123,57 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry return 0; } +void tscClusterInfoDestroy(SClusterInfo *pObj) { + if (pObj == NULL) { return; } + taosHashCleanup(pObj->vgroupMap); + taosHashCleanup(pObj->tableMetaMap); + taosCacheCleanup(pObj->vgroupListBuf); + tfree(pObj); +} + +void *tscAcquireClusterInfo(const char *clusterId) { + pthread_mutex_lock(&clusterMutex); + + size_t len = strlen(clusterId); + SClusterInfo *pObj = NULL; + SClusterInfo **ppObj = taosHashGet(tscClusterMap, clusterId, len); + if (ppObj == NULL || *ppObj == NULL) { + pObj = calloc(1, sizeof(SClusterInfo)); + if (pObj) { + pObj->vgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + pObj->tableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); // + pObj->vgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list"); + if (pObj->vgroupMap == NULL || pObj->tableMetaMap == NULL || pObj->vgroupListBuf == NULL) { + tscClusterInfoDestroy(pObj); + pObj = NULL; + } else { + taosHashPut(tscClusterMap, clusterId, len, &pObj, POINTER_BYTES); + } + } + } else { + pObj = *ppObj; + } + + if (pObj) { pObj->ref += 1; } + + pthread_mutex_unlock(&clusterMutex); + return pObj; +} +void tscReleaseClusterInfo(const char *clusterId) { + pthread_mutex_lock(&clusterMutex); + + size_t len = strlen(clusterId); + SClusterInfo *pObj = NULL; + SClusterInfo **ppObj = taosHashGet(tscClusterMap, clusterId, len); + if (ppObj != NULL && *ppObj != NULL) { + pObj = *ppObj; + } + if (pObj && --pObj->ref == 0) { + taosHashRemove(tscClusterMap, clusterId, len); + tscClusterInfoDestroy(pObj); + } + pthread_mutex_unlock(&clusterMutex); +} void taos_init_imp(void) { char temp[128] = {0}; @@ -188,12 +241,16 @@ void taos_init_imp(void) { taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } - if (tscTableMetaMap == NULL) { + if (tscClusterMap == NULL) { tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); - tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list"); - tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap); + + tscClusterMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pthread_mutex_init(&clusterMutex, NULL); + //tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + //tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + //tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list"); + //tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap); + } int refreshTime = 5; @@ -222,12 +279,6 @@ void taos_cleanup(void) { scriptEnvPoolCleanup(); } - taosHashCleanup(tscTableMetaMap); - tscTableMetaMap = NULL; - - taosHashCleanup(tscVgroupMap); - tscVgroupMap = NULL; - int32_t id = tscObjRef; tscObjRef = -1; taosCloseRef(id); @@ -251,14 +302,16 @@ void taos_cleanup(void) { } pthread_mutex_destroy(&setConfMutex); - taosCacheCleanup(tscVgroupListBuf); - tscVgroupListBuf = NULL; if (tscEmbedded == 0) { rpcCleanup(); taosCloseLog(); }; + taosHashCleanup(tscClusterMap); + tscClusterMap = NULL; + pthread_mutex_destroy(&clusterMutex); + p = tscTmr; tscTmr = NULL; taosTmrCleanUp(p); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 04d1472bb0101e903aa697ccc443d60e386c4ba2..de588a953b5b38b94de7617e81eeef3bdb96037f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1421,6 +1421,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) { } void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) { + SSqlObj *pSql = (SSqlObj*)taosAcquireRef(tscObjRef, id); pCmd->command = 0; pCmd->numOfCols = 0; pCmd->count = 0; @@ -1429,13 +1430,14 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) { pCmd->insertParam.sql = NULL; destroyTableNameList(&pCmd->insertParam); - pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, clearCachedMeta); - pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); + pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pSql, pCmd->insertParam.pTableBlockHashList, clearCachedMeta); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); tfree(pCmd->insertParam.tagData.data); pCmd->insertParam.tagData.dataLen = 0; tscFreeQueryInfo(pCmd, clearCachedMeta, id); pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); + taosReleaseRef(tscObjRef, id); } void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) { @@ -1571,7 +1573,7 @@ void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) { tfree(pColInfo->colIdxInfo); } -void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { +void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) { if (pDataBlock == NULL) { return; } @@ -1582,7 +1584,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pDataBlock->tableName, name); - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } if (!pDataBlock->cloned) { @@ -1623,7 +1625,7 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint return param; } -void* tscDestroyBlockArrayList(SArray* pDataBlockList) { +void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) { if (pDataBlockList == NULL) { return NULL; } @@ -1631,7 +1633,7 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) { size_t size = taosArrayGetSize(pDataBlockList); for (int32_t i = 0; i < size; i++) { void* d = taosArrayGetP(pDataBlockList, i); - tscDestroyDataBlock(d, false); + tscDestroyDataBlock(pSql, d, false); } taosArrayDestroy(pDataBlockList); @@ -1679,14 +1681,14 @@ void* tscDestroyUdfArrayList(SArray* pUdfList) { -void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta) { +void* tscDestroyBlockHashTable(SSqlObj *pSql, SHashObj* pBlockHashTable, bool removeMeta) { if (pBlockHashTable == NULL) { return NULL; } STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL); while(p) { - tscDestroyDataBlock(*p, removeMeta); + tscDestroyDataBlock(pSql, *p, removeMeta); p = taosHashIterate(pBlockHashTable, p); } @@ -1927,7 +1929,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } -static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) { +static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) { pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); if (pInsertParam->pTableNameList == NULL) { pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); @@ -1944,11 +1946,11 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB } if (freeBlockMap) { - pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pInsertParam->pTableBlockHashList, false); + pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pSql, pInsertParam->pTableBlockHashList, false); } } -int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) { +int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) { const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType); @@ -1973,7 +1975,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl if (ret != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret); taosHashCleanup(pVnodeDataBlockHashList); - tscDestroyBlockArrayList(pVnodeDataBlockList); + tscDestroyBlockArrayList(pSql, pVnodeDataBlockList); tfree(blkKeyInfo.pKeyTuple); return ret; } @@ -1992,7 +1994,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize); taosHashCleanup(pVnodeDataBlockHashList); - tscDestroyBlockArrayList(pVnodeDataBlockList); + tscDestroyBlockArrayList(pSql, pVnodeDataBlockList); tfree(dataBuf->pData); tfree(blkKeyInfo.pKeyTuple); @@ -2010,7 +2012,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl } else { if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) { taosHashCleanup(pVnodeDataBlockHashList); - tscDestroyBlockArrayList(pVnodeDataBlockList); + tscDestroyBlockArrayList(pSql, pVnodeDataBlockList); tfree(dataBuf->pData); tfree(blkKeyInfo.pKeyTuple); return code; @@ -2057,7 +2059,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl pOneTableBlock = *p; } - extractTableNameList(pInsertParam, freeBlockMap); + extractTableNameList(pSql, pInsertParam, freeBlockMap); // free the table data blocks; pInsertParam->pDataBlocks = pVnodeDataBlockList; @@ -2077,6 +2079,7 @@ void tscCloseTscObj(void *param) { tfree(pObj->tscCorMgmtEpSet); tscReleaseRpc(pObj->pRpcObj); pthread_mutex_destroy(&pObj->mutex); + tscReleaseClusterInfo(pObj->clusterId); tfree(pObj); } @@ -4550,7 +4553,7 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) { return cMeta; } -int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta**ppSTable) { +int32_t tscCreateTableMetaFromSTableMeta(SSqlObj *pSql, STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta**ppSTable) { assert(*ppChild != NULL); STableMeta* p = *ppSTable; STableMeta* pChild = *ppChild; @@ -4560,11 +4563,10 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, memset((char *)p, 0, sz); } - if (NULL == taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) { + if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) { tfree(p); - } else { - *ppSTable = p; - } + } + *ppSTable = p; // tableMeta exists, build child table meta according to the super table meta // the uid need to be checked in addition to the general name of the super table. @@ -4586,7 +4588,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, *ppChild = pChild; return TSDB_CODE_SUCCESS; } else { // super table has been removed, current tableMeta is also expired. remove it here - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); return -1; } } @@ -5105,18 +5107,20 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id) { char fname[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, fname); + SSqlObj *p = (SSqlObj *)taosAcquireRef(tscObjRef, id); + tNameExtractFullName(&pTableMetaInfo->name, fname); int32_t len = (int32_t) strnlen(fname, TSDB_TABLE_FNAME_LEN); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - void* pv = taosCacheAcquireByKey(tscVgroupListBuf, fname, len); + void* pv = taosCacheAcquireByKey(UTIL_GET_VGROUPLIST(p), fname, len); if (pv != NULL) { - taosCacheRelease(tscVgroupListBuf, &pv, true); + taosCacheRelease(UTIL_GET_VGROUPLIST(p), &pv, true); } } - taosHashRemove(tscTableMetaMap, fname, len); - tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap)); + taosHashRemove(UTIL_GET_TABLEMETA(p), fname, len); + tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(UTIL_GET_TABLEMETA(p))); + taosReleaseRef(tscObjRef, id); } char* cloneCurrentDBName(SSqlObj* pSql) { diff --git a/tests/examples/c/connect_two_cluster.c b/tests/examples/c/connect_two_cluster.c new file mode 100644 index 0000000000000000000000000000000000000000..fa54dd437036f12915d62a60f96b90e6a7adc45f --- /dev/null +++ b/tests/examples/c/connect_two_cluster.c @@ -0,0 +1,162 @@ +#include +#include +#include +#include +#include "taos.h" +int numOfThreads = 1; + +void* connectClusterAndDeal(void *arg) { + int port = *(int *)arg; + const char *host = "127.0.0.1"; + const char *user = "root"; + const char *passwd = "taosdata"; + TAOS* taos1 = taos_connect(host, user, passwd, "", port); + TAOS* taos2 = taos_connect(host, user, passwd, "", port + 1000); + if (NULL == taos1 || NULL == taos2) { + printf("connect to (%d/%d) failed \n", port, port + 1000); + return NULL; + } + TAOS_RES *result = NULL; + result = taos_query(taos1, "drop database if exists db"); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + taos_free_result(result); + + taos_query(taos2, "drop database if exists db"); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + + taos_free_result(result); + // ========= build database + { + result = taos_query(taos1, "create database db"); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + + taos_free_result(result); + } + { + result = taos_query(taos2, "create database db"); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + taos_free_result(result); + } + + //======== create table + { + result = taos_query(taos1, "create stable db.stest (ts timestamp, port int) tags(tport int)"); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + taos_free_result(result); + } + { + result = taos_query(taos2, "create stable db.stest (ts timestamp, port int) tags(tport int)"); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + taos_free_result(result); + + } + //======== create table + { + result = taos_query(taos1, "use db"); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + taos_free_result(result); + } + { + result = taos_query(taos2, "use db"); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + taos_free_result(result); + + } + { + char buf[1024] = {0}; + sprintf(buf, "insert into db.t1 using stest tags(%d) values(now, %d)", port, port); + for (int i = 0; i < 100000; i++) { + //printf("error here\t"); + result = taos_query(taos1, buf); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + taos_free_result(result); + //sleep(1); + } + } + + { + char buf[1024] = {0}; + sprintf(buf, "insert into db.t1 using stest tags(%d) values(now, %d)", port + 1000, port + 1000); + for (int i = 0; i < 100000; i++) { + result = taos_query(taos2, buf); + if (0 != taos_errno(result)) { + printf("failed %s\n", taos_errstr(result)); + } + taos_free_result(result); + //sleep(1); + } + } + // query result + { + result = taos_query(taos1, "select * from stest"); + if (result == NULL || taos_errno(result) != 0) { + printf("query failed %s\n", taos_errstr(result)); + taos_free_result(result); + } + TAOS_ROW row; + int rows = 0; + int num_fields = taos_field_count(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + while ((row = taos_fetch_row(result))) { + char temp[1024] = {0}; + rows++; + taos_print_row(temp, row, fields , num_fields); + printf("%s\n", temp); + } + taos_free_result(result); + } + + // query result + { + result = taos_query(taos2, "select * from stest"); + if (result == NULL || taos_errno(result) != 0) { + printf("query failed %s\n", taos_errstr(result)); + taos_free_result(result); + } + TAOS_ROW row; + int rows = 0; + int num_fields = taos_field_count(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + while ((row = taos_fetch_row(result))) { + char temp[1024] = {0}; + rows++; + taos_print_row(temp, row, fields , num_fields); + printf("%s\n", temp); + } + taos_free_result(result); + } + taos_close(taos1); + taos_close(taos2); + return NULL; +} +int main(int argc, char* argv[]) { + pthread_t *pthreads = malloc(sizeof(pthread_t) * numOfThreads); + + int *port = malloc(sizeof(int) * numOfThreads); + port[0] = 6030; + for (int i = 0; i < numOfThreads; i++) { + pthread_create(&pthreads[i], NULL, connectClusterAndDeal, (void *)&port[i]); + } + for (int i = 0; i < numOfThreads; i++) { + pthread_join(pthreads[i], NULL); + } + free(port); +} diff --git a/tests/pytest/client/one_client_connect_two_server.py b/tests/pytest/client/one_client_connect_two_server.py new file mode 100644 index 0000000000000000000000000000000000000000..4d5b127405ffbdaa533a9f628b4bb2323b168d71 --- /dev/null +++ b/tests/pytest/client/one_client_connect_two_server.py @@ -0,0 +1,342 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import json +import taos +import time +import random + +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +''' + +Before test start,Two TDengine services have been set up on different servers + +''' + +host1 = '192.168.1.101' +host2 = '192.168.1.102' +user = 'root' +password = 'taosdata' +cfgdir = '/home/cp/taos/TDengine/sim/dnode1/cfg' + +conn1 = taos.connect(host=host1, user=user, password=password, config=cfgdir) +conn2 = taos.connect(host=host2, user=user, password=password, config=cfgdir) +cursor1 = conn1.cursor() +cursor2 = conn2.cursor() +tdSql1 = TDSql() +tdSql2 = TDSql() +tdSql1.init(cursor1) +tdSql2.init(cursor2) + +dbname11 = 'db11' +dbname12 = 'db12' +dbname21 = 'db21' +stbname11 = 'stb11' +stbname12 = 'stb12' +stbname21 = 'stb21' +tbnum = 100 +data_row = 100 +db1_stb1_column = 'ts timestamp, c1 int, c2 float, c3 timestamp, c4 binary(16), c5 double, c6 bool, c7 bigint, c8 smallint, c9 tinyint, c10 nchar(16) ' +db1_stb1_tag = 'st1 int, st2 float, st3 timestamp, st4 binary(16), st5 double, st6 bool, st7 bigint, st8 smallint, st9 tinyint, st10 nchar(16)' + +def dbsql(dbname): + return f"create database {dbname} keep 3650" + +def stbsql(stbname, columntype, tagtype): + return f'create stable {stbname} ({columntype}) tags ({tagtype}) ' + +def tbsql(tbname, stbname, tags): + return f'create table {tbname} using {stbname} tags ({tags})' + +def datasql(tbname, data): + return f'insert into {tbname} values ({data})' + +def testquery(): + ti = random.randint(0,tbnum-1) + + tdSql1.query(f"select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 from tm{ti}") + tdSql1.checkData(0, 0, ti) + tdSql1.checkData(0, 1, ti) + tdSql1.checkData(0, 2, ti) + tdSql1.checkData(0, 3, f'binary_{ti}') + tdSql1.checkData(0, 4, ti) + tdSql1.checkData(0, 5, ti%2) + tdSql1.checkData(0, 6, ti) + tdSql1.checkData(0, 7, ti%32768) + tdSql1.checkData(0, 8, ti%128) + tdSql1.checkData(0, 9, f'nchar_{ti}') + tdSql2.query(f"select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 from tn{ti}") + tdSql2.checkData(0, 0, ti+10000) + tdSql2.checkData(0, 1, ti+10000) + tdSql2.checkData(0, 2, ti+10000) + tdSql2.checkData(0, 3, f'binary_{ti+10000}') + tdSql2.checkData(0, 4, ti+10000) + tdSql2.checkData(0, 5, (ti+10000)%2) + tdSql2.checkData(0, 6, ti+10000) + tdSql2.checkData(0, 7, (ti+10000)%32768) + tdSql2.checkData(0, 8, (ti+10000)%128) + tdSql2.checkData(0, 9, f'nchar_{ti+10000}') + + tdSql1.query(f"select last(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10) from {stbname11}") + tdSql1.checkData(0, 0, data_row-1) + tdSql1.checkData(0, 1, data_row-1) + tdSql1.checkData(0, 2, data_row-1) + tdSql1.checkData(0, 3, f'binary_{data_row-1}') + tdSql1.checkData(0, 4, data_row-1) + tdSql1.checkData(0, 5, (data_row-1)%2) + tdSql1.checkData(0, 6, data_row-1) + tdSql1.checkData(0, 7, (data_row-1)%32768) + tdSql1.checkData(0, 8, (data_row-1)%128) + tdSql1.checkData(0, 9, f'nchar_{data_row-1}') + + tdSql1.query(f"select first(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10) from {stbname11}") + tdSql1.checkData(0, 0, 0) + tdSql1.checkData(0, 1, 0) + tdSql1.checkData(0, 2, 0) + tdSql1.checkData(0, 3, f'binary_0') + tdSql1.checkData(0, 4, 0) + tdSql1.checkData(0, 5, 0) + tdSql1.checkData(0, 6, 0) + tdSql1.checkData(0, 7, 0) + tdSql1.checkData(0, 8, 0) + tdSql1.checkData(0, 9, f'nchar_0') + + tdSql1.error("select * from") + + tdSql1.query(f"select last(*) from tm1") + tdSql1.checkData(0, 1, 1) + tdSql1.checkData(0, 4, "binary_1") + + + tdSql1.query(f"select min(c1),max(c2) from {stbname11}") + tdSql1.checkData(0, 0, 0) + tdSql1.checkData(0, 1, data_row-1) + + tdSql2.query(f"select count(*), count(c1) from {stbname21}") + tdSql2.checkData(0, 0, data_row) + tdSql2.checkData(0, 1, data_row) + + tdSql2.query(f"select first(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10) from {stbname21}") + tdSql2.checkData(0, 0, 10000) + tdSql2.checkData(0, 1, 10000) + tdSql2.checkData(0, 2, 10000) + tdSql2.checkData(0, 3, f'binary_10000') + tdSql2.checkData(0, 4, 10000) + tdSql2.checkData(0, 5, 10000%2) + tdSql2.checkData(0, 6, 10000) + tdSql2.checkData(0, 7, 10000%32768) + tdSql2.checkData(0, 8, 10000%128) + tdSql2.checkData(0, 9, f'nchar_10000') + + tdSql2.query(f"select last(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10) from {stbname21}") + tdSql2.checkData(0, 0, data_row+9999) + tdSql2.checkData(0, 1, data_row+9999) + tdSql2.checkData(0, 2, data_row+9999) + tdSql2.checkData(0, 3, f'binary_{data_row+9999}') + tdSql2.checkData(0, 4, data_row+9999) + tdSql2.checkData(0, 5, (data_row+9999)%2) + tdSql2.checkData(0, 6, data_row+9999) + tdSql2.checkData(0, 7, (data_row+9999)%32768) + tdSql2.checkData(0, 8, (data_row+9999)%128) + tdSql2.checkData(0, 9, f'nchar_{data_row+9999}') + + tdSql1.query(f"select max(c1) from (select top(c1,10) c1 from {stbname11})") + tdSql1.checkData(0, 0, data_row-1) + tdSql2.query(f"select max(c1) from (select top(c1,10) c1 from {stbname21})") + tdSql2.checkData(0, 0, data_row+9999) + + tdSql1.query(f"select avg(c1) from {stbname11}") + tdSql1.checkData(0, 0, sum(range(data_row))/data_row) + tdSql2.query(f"select avg(c1) from {stbname21}") + tdSql2.checkData(0, 0, sum(range(data_row))/data_row+10000) + + tdSql1.query(f"select spread(c1) from {stbname11}") + tdSql1.checkData(0, 0, data_row-1) + tdSql2.query(f"select spread(c1) from {stbname21}") + tdSql2.checkData(0, 0, data_row-1) + + tdSql1.query(f"select max(c1)*2 from {stbname11}") + tdSql1.checkData(0, 0, (data_row-1)*2) + tdSql2.query(f"select max(c1)*2 from {stbname21}") + tdSql2.checkData(0, 0, (data_row+9999)*2) + + tdSql1.query(f"select avg(c1) from {stbname11} where c1 <= 10") + tdSql1.checkData(0, 0, 5) + tdSql2.query(f"select avg(c1) from {stbname21} where c1 <= 10010") + tdSql2.checkData(0, 0, 10005) + + tdSql1.query(f"select * from {stbname11} where tbname like 'tn%'") + tdSql1.checkRows(0) + tdSql2.query(f"select * from {stbname21} where tbname like 'tm%'") + tdSql2.checkRows(0) + + tdSql1.query(f"select max(c1) from {stbname11} group by tbname") + tdSql1.checkRows(tbnum) + tdSql2.query(f"select max(c1) from {stbname21} group by tbname") + tdSql2.checkRows(tbnum) + + tdSql1.error(f"select * from {stbname11}, {stbname21} where {stbname11}.ts = {stbname21}.ts and {stbname11}.st1 = {stbname21}.st1") + tdSql2.error(f"select * from {stbname11}, {stbname21} where {stbname11}.ts = {stbname21}.ts and {stbname11}.st1 = {stbname21}.st1") + +if __name__ == '__main__': + + tdSql1.execute('reset query cache') + tdSql2.execute('reset query cache') + tdSql1.execute(f'drop database if exists {dbname11}') + tdSql1.execute(f'drop database if exists {dbname12}') + tdSql1.execute(f'drop database if exists {dbname21}') + tdSql2.execute(f'drop database if exists {dbname21}') + tdSql2.execute(f'drop database if exists {dbname11}') + tdSql2.execute(f'drop database if exists {dbname12}') + + tdSql1.execute(dbsql(dbname11)) + tdSql1.query('show databases') + tdSql1.checkRows(1) + tdSql2.query('show databases') + tdSql2.checkRows(0) + + tdSql2.execute(dbsql(dbname21)) + + tdSql1.query(f'show databases') + tdSql1.checkData(0, 0, dbname11) + tdSql2.query(f'show databases') + tdSql2.checkData(0, 0, dbname21) + + tdSql1.execute(f'use {dbname11}') + tdSql1.query("show stables") + tdSql1.checkRows(0) + tdSql2.error("show stables") + + + ### conn1 create stable + tdSql1.execute(stbsql(stbname11, db1_stb1_column, db1_stb1_tag)) + tdSql1.query(f"show stables like '{stbname11}' ") + tdSql1.checkRows(1) + tdSql2.error("show stables") + + # 'st1 int, st2 float, st3 timestamp, st4 binary(16), st5 double, st6 bool, st7 bigint, st8 smallint, st9 tinyint, st10 nchar(16)' + for i in range(100): + t1name = f"t{i}" + stname = stbname11 + tags = f'{i}, {i}, {i}, "binary_{i}", {i}, {i%2}, {i}, {i%32768}, {i%128}, "nchar_{i}"' + tdSql1.execute(tbsql(t1name, stname, tags)) + + tdSql2.error(f'select * from t{random.randint(0, 99)}') + + tdSql1.query("show tables") + tdSql1.checkRows(100) + tdSql2.error("show tables") + + tdSql1.query(f'select * from {stbname11}') + # tdSql1.query(f'select * from t1') + tdSql1.checkRows(0) + tdSql2.error(f'select * from {stname}') + + # conn1 insert data + # 'ts timestamp, c1 int, c2 float, c3 timestamp, c4 binary(16), c5 double, c6 bool, c7 bigint, c8 smallint, c9 tinyint, c10 nchar(16) ' + nowtime = int(round(time.time() * 1000)) + for i in range(100): + data = f'{nowtime+i*10}, {i}, {i}, {i}, "binary_{i}", {i}, {i%2}, {i}, {i%32768}, {i%128}, "nchar_{i}"' + tdSql1.execute(datasql(f"t{i}", data)) + # tdSql2.error(datasql(f't{i}', data)) + ti = random.randint(0,99) + tdSql1.query(f"select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 from t{ti}") + tdSql1.checkData(0, 0, ti) + tdSql1.checkData(0, 1, ti) + tdSql1.checkData(0, 2, ti) + tdSql1.checkData(0, 3, f'binary_{ti}') + tdSql1.checkData(0, 4, ti) + tdSql1.checkData(0, 5, ti%2) + tdSql1.checkData(0, 6, ti) + tdSql1.checkData(0, 7, ti%32768) + tdSql1.checkData(0, 8, ti%128) + tdSql1.checkData(0, 9, f'nchar_{ti}') + tdSql2.error(f"select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 from t{ti}") + + # delete conn1.database and reinsert the data to conn1.db and conn2.db + tdSql1.execute(f"drop database if exists {dbname11}") + tdSql1.query("show databases") + tdSql1.checkRows(0) + tdSql2.query(f"show databases") + tdSql2.checkData(0, 0, dbname21) + + tdSql1.execute(dbsql(dbname11)) + tdSql1.query(f"show databases") + tdSql1.checkData(0, 0, dbname11) + tdSql2.query(f"show databases ") + tdSql2.checkData(0, 0, dbname21) + + tdSql1.execute(dbsql(dbname12)) + tdSql1.query("show databases") + tdSql1.checkData(0, 0, dbname11) + tdSql1.checkData(1, 0, dbname12) + tdSql2.query("show databases") + tdSql2.checkData(0, 0, dbname21) + + tdSql1.execute(f"use {dbname11}") + tdSql1.query("show stables") + tdSql1.checkRows(0) + tdSql2.error("show stables") + + tdSql2.execute(f"use {dbname21}") + tdSql2.query("show stables") + tdSql2.checkRows(0) + tdSql2.error(f"use {dbname12}") + + tdSql1.execute(stbsql(stbname11, db1_stb1_column, db1_stb1_tag)) + tdSql1.query("show stables") + tdSql1.checkRows(1) + tdSql2.query("show stables") + tdSql2.checkRows(0) + + tdSql2.execute(stbsql(stbname21, db1_stb1_column, db1_stb1_tag)) + tdSql1.query("show stables ") + tdSql1.checkRows(1) + tdSql1.query(f"show stables like '{stbname11}' ") + tdSql1.checkRows(1) + tdSql2.query("show stables ") + tdSql1.checkRows(1) + tdSql2.query(f"show stables like '{stbname21}' ") + tdSql1.checkRows(1) + + for i in range(tbnum): + t1name = f"tm{i}" + t2name = f"tn{i}" + s1tname = stbname11 + s2tname = stbname21 + tags = f'{i}, {i}, {i}, "binary_{i}", {i}, {i % 2}, {i}, {i % 32768}, {i % 128}, "nchar_{i}"' + tdSql1.execute(tbsql(t1name, s1tname, tags)) + # tdSql2.error(f'select * from {t1name}') + tdSql2.execute(tbsql(t2name, s2tname, tags)) + # tdSql2.query(f'select * from {t2name}') + # tdSql1.error(f'select * from {t2name}') + + + tdSql1.query("show tables like 'tm%' ") + tdSql1.checkRows(tbnum) + tdSql2.query("show tables like 'tn%' ") + tdSql2.checkRows(tbnum) + + for i in range(data_row): + data1 = f'{nowtime + i * 10}, {i}, {i}, {i}, "binary_{i}", {i}, {i % 2}, {i}, {i % 32768}, {i % 128}, "nchar_{i}"' + data2 = f'{nowtime+i*10}, {i+10000}, {i+10000}, {i+10000}, "binary_{i+10000}", {i+10000}, {(i+10000)%2}, {i+10000}, {(i+10000)%32768}, {(i+10000)%128}, "nchar_{i+10000}" ' + tdSql1.execute(datasql(f"tm{i}", data1)) + tdSql2.execute(datasql(f'tn{i}', data2)) + + testquery() \ No newline at end of file