未验证 提交 5fadee58 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #7822 from taosdata/feature/TD-6217

Feature/td 6217
......@@ -41,6 +41,15 @@ extern "C" {
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#define UTIL_GET_VGROUPMAP(pSql) \
(pSql->pTscObj->pClusterInfo->vgroupMap)
#define UTIL_GET_TABLEMETA(pSql) \
(pSql->pTscObj->pClusterInfo->tableMetaMap)
#define UTIL_GET_VGROUPLIST(pSql) \
(pSql->pTscObj->pClusterInfo->vgroupListBuf)
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL,
......@@ -106,10 +115,11 @@ typedef struct SBlockKeyInfo {
SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
......@@ -120,12 +130,12 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset);
void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyBlockArrayList(SSqlObj* pSql, SArray* pDataBlockList);
void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta);
void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......@@ -353,7 +363,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild);
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable);
int32_t tscCreateTableMetaFromSTableMeta(SSqlObj *pSql, STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
......@@ -374,6 +384,7 @@ void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
char* cloneCurrentDBName(SSqlObj* pSql);
#ifdef __cplusplus
}
#endif
......
......@@ -48,6 +48,7 @@ struct SSqlInfo;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef struct SNewVgroupInfo {
int32_t vgId;
int8_t inUse;
......@@ -139,6 +140,13 @@ typedef enum {
ROW_COMPARE_NEED = 1,
} ERowCompareStat;
typedef struct {
void *vgroupMap;
void *tableMetaMap;
void *vgroupListBuf;
int64_t ref;
} SClusterInfo;
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec);
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
......@@ -324,6 +332,7 @@ typedef struct STscObj {
char acctId[TSDB_ACCT_ID_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char sversion[TSDB_VERSION_LEN];
char clusterId[TSDB_CLUSTER_ID_LEN];
char writeAuth : 1;
char superAuth : 1;
uint32_t connId;
......@@ -332,9 +341,11 @@ typedef struct STscObj {
struct SSqlObj * sqlList;
struct SSqlStream *streamList;
SRpcObj *pRpcObj;
SClusterInfo *pClusterInfo;
SRpcCorEpSet *tscCorMgmtEpSet;
pthread_mutex_t mutex;
int32_t numOfObj; // number of sqlObj from this tscObj
SReqOrigin from;
} STscObj;
......@@ -417,6 +428,9 @@ int tscAcquireRpc(const char *key, const char *user, const char *secret,void **
void tscReleaseRpc(void *param);
void tscInitMsgsFp();
void *tscAcquireClusterInfo(const char *clusterId);
void tscReleaseClusterInfo(const char *clusterId);
int tsParseSql(SSqlObj *pSql, bool initial);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
......
......@@ -924,8 +924,8 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) {
pRes->code = tscProcessShowCreateDatabase(pSql);
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosHashClear(tscTableMetaMap);
taosCacheEmpty(tscVgroupListBuf);
taosHashClear(UTIL_GET_TABLEMETA(pSql));
taosCacheEmpty(UTIL_GET_VGROUPLIST(pSql));
pRes->code = TSDB_CODE_SUCCESS;
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
pRes->code = tscProcessServerVer(pSql);
......
......@@ -1529,7 +1529,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
// merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pInsertParam->pTableBlockHashList) > 0) {
if ((code = tscMergeTableDataBlocks(pInsertParam, true)) != TSDB_CODE_SUCCESS) {
if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) {
goto _clean;
}
}
......@@ -1635,7 +1635,7 @@ static int doPackSendDataBlock(SSqlObj* pSql, SInsertStatementParam *pInsertPara
return tscInvalidOperationMsg(pInsertParam->msg, "too many rows in sql, total number of rows should be less than 32767", NULL);
}
if ((code = tscMergeTableDataBlocks(pInsertParam, true)) != TSDB_CODE_SUCCESS) {
if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1696,7 +1696,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
SInsertStatementParam *pInsertParam = &pCmd->insertParam;
destroyTableNameList(pInsertParam);
pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks);
pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pParentSql, pInsertParam->pDataBlocks);
if (pInsertParam->pTableBlockHashList == NULL) {
pInsertParam->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
......
......@@ -574,10 +574,10 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl
char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
memset(fullTableName, 0, tListLen(fullTableName));
tNameExtractFullName(&sname, fullTableName);
tscFreeRegisteredSqlObj(pSql);
size_t size = 0;
taosHashGetCloneExt(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
tscFreeRegisteredSqlObj(pSql);
}
if (tableMeta != NULL) {
......
......@@ -1163,7 +1163,7 @@ static int insertStmtExecute(STscStmt* stmt) {
fillTablesColumnsNull(stmt->pSql);
int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false);
int code = tscMergeTableDataBlocks(stmt->pSql, &stmt->pSql->cmd.insertParam, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1194,7 +1194,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pCmd->insertParam.numOfTables = 0;
tfree(pCmd->insertParam.pTableNameList);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
return pSql->res.code;
}
......@@ -1215,7 +1215,7 @@ static void insertBatchClean(STscStmt* pStmt) {
tfree(pCmd->insertParam.pTableNameList);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
pCmd->insertParam.numOfTables = 0;
taosHashClear(pCmd->insertParam.pTableBlockHashList);
......@@ -1242,7 +1242,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
fillTablesColumnsNull(pStmt->pSql);
if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
if ((code = tscMergeTableDataBlocks(pStmt->pSql, &pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1773,8 +1773,8 @@ int taos_stmt_close(TAOS_STMT* stmt) {
if (pStmt->pSql && pStmt->pSql->res.code != 0) {
rmMeta = true;
}
tscDestroyDataBlock(pStmt->mtb.lastBlock, rmMeta);
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta);
tscDestroyDataBlock(pStmt->pSql, pStmt->mtb.lastBlock, rmMeta);
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->pSql, pStmt->mtb.pTableBlockHashList, rmMeta);
if (pStmt->pSql){
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
}
......
......@@ -8280,7 +8280,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
size_t len = strlen(name);
if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&pTableMeta, &tableMetaCapacity)) {
if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), name, len, NULL, (void **)&pTableMeta, &tableMetaCapacity)) {
// not found
tfree(pTableMeta);
}
......@@ -8291,7 +8291,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// avoid mem leak, may should update pTableMeta
void* pVgroupIdList = NULL;
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
code = tscCreateTableMetaFromSTableMeta((STableMeta **)(&pTableMeta), name, &tableMetaCapacity, (STableMeta **)(&pSTMeta));
code = tscCreateTableMetaFromSTableMeta(pSql, (STableMeta **)(&pTableMeta), name, &tableMetaCapacity, (STableMeta **)(&pSTMeta));
pSql->pBuf = (void *)pSTMeta;
// create the child table meta from super table failed, try load it from mnode
......@@ -8303,7 +8303,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} else if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
// the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time
tscDebug("0x%"PRIx64" try to acquire cached super table %s vgroup id list", pSql->self, name);
void* pv = taosCacheAcquireByKey(tscVgroupListBuf, name, len);
void* pv = taosCacheAcquireByKey(UTIL_GET_VGROUPLIST(pSql), name, len);
if (pv == NULL) {
char* t = strdup(name);
taosArrayPush(pVgroupList, &t);
......@@ -8316,7 +8316,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
taosArrayAddBatch(pVgroupIdList, pdata->data, (int32_t) pdata->num);
taosCacheRelease(tscVgroupListBuf, &pv, false);
taosCacheRelease(UTIL_GET_VGROUPLIST(pSql), &pv, false);
}
}
......@@ -8466,7 +8466,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
// check if current buffer contains the vgroup info. If not, add it
SNewVgroupInfo existVgroupInfo = {.inUse = -1,};
taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo);
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), id, sizeof(*id), NULL, &existVgroupInfo);
assert(existVgroupInfo.inUse >= 0);
SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j];
......
......@@ -158,7 +158,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
assert(vgId > 0);
SNewVgroupInfo vgroupInfo = {.vgId = -1};
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo);
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), NULL, &vgroupInfo);
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
......@@ -170,7 +170,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
}
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
taosHashPut(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
// Update the local cached epSet info cached by SqlObj
int32_t inUse = pSql->epSet.inUse;
......@@ -654,7 +654,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps);
......@@ -737,7 +737,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab
vgId = pTableMeta->vgId;
SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
}
......@@ -1650,7 +1650,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SNewVgroupInfo vgroupInfo = {.vgId = -1};
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
assert(vgroupInfo.vgId > 0);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
......@@ -2036,21 +2036,21 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) {
}
// update the vgroupInfo if needed
static void doUpdateVgroupInfo(int32_t vgId, SVgroupMsg *pVgroupMsg) {
static void doUpdateVgroupInfo(SSqlObj *pSql, int32_t vgId, SVgroupMsg *pVgroupMsg) {
assert(vgId > 0);
SNewVgroupInfo vgroupInfo = {.inUse = -1};
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo);
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), NULL, &vgroupInfo);
// vgroup info exists, compare with it
if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) {
vgroupInfo = createNewVgroupInfo(pVgroupMsg);
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
taosHashPut(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(UTIL_GET_VGROUPMAP(pSql)));
}
}
static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) {
static void doAddTableMetaToLocalBuf(SSqlObj *pSql, STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) {
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
// add or update the corresponding super table meta data info
int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
......@@ -2059,18 +2059,18 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet
if (updateSTable) {
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
uint32_t size = tscGetTableMetaSize(pSupTableMeta);
int32_t code = taosHashPut(tscTableMetaMap, pTableMeta->sTableName, len, pSupTableMeta, size);
int32_t code = taosHashPut(UTIL_GET_TABLEMETA(pSql), pTableMeta->sTableName, len, pSupTableMeta, size);
assert(code == TSDB_CODE_SUCCESS);
tfree(pSupTableMeta);
}
CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta));
taosHashPut(UTIL_GET_TABLEMETA(pSql), pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta));
tfree(cMeta);
} else {
uint32_t s = tscGetTableMetaSize(pTableMeta);
taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s);
taosHashPut(UTIL_GET_TABLEMETA(pSql), pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s);
}
}
......@@ -2098,9 +2098,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
tNameExtractFullName(&pTableMetaInfo->name, name);
assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0);
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, true);
doAddTableMetaToLocalBuf(pSql, pTableMeta, pMetaMsg, true);
if (pTableMeta->tableType != TSDB_SUPER_TABLE) {
doUpdateVgroupInfo(pTableMeta->vgId, &pMetaMsg->vgroup);
doUpdateVgroupInfo(pSql, pTableMeta->vgId, &pMetaMsg->vgroup);
}
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self,
......@@ -2111,7 +2111,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) {
static SArray* createVgroupIdListFromMsg(SSqlObj *pSql, char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) {
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg;
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
......@@ -2134,7 +2134,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name,
if (taosHashGet(pSet, &vmsg->vgId, sizeof(vmsg->vgId)) == NULL) {
taosHashPut(pSet, &vmsg->vgId, sizeof(vmsg->vgId), "", 0);
doUpdateVgroupInfo(vmsg->vgId, vmsg);
doUpdateVgroupInfo(pSql, vmsg->vgId, vmsg);
}
}
}
......@@ -2142,7 +2142,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name,
return vgroupIdList;
}
static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) {
static SVgroupsInfo* createVgroupInfoFromMsg(SSqlObj *pSql, char* pMsg, int32_t* size, uint64_t id) {
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg;
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
......@@ -2174,7 +2174,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
// pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
}
doUpdateVgroupInfo(pVgroup->vgId, vmsg);
doUpdateVgroupInfo(pSql, pVgroup->vgId, vmsg);
}
}
......@@ -2309,12 +2309,12 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
}
// create the tableMeta and add it into the TableMeta map
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, updateStableMeta);
doAddTableMetaToLocalBuf(pParentSql, pTableMeta, pMetaMsg, updateStableMeta);
// for each vgroup, only update the information once.
int64_t vgId = pMetaMsg->vgroup.vgId;
if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) {
doUpdateVgroupInfo((int32_t) vgId, &pMetaMsg->vgroup);
doUpdateVgroupInfo(pParentSql, (int32_t) vgId, &pMetaMsg->vgroup);
taosHashPut(pSet, &vgId, sizeof(vgId), "", 0);
}
......@@ -2339,7 +2339,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
taosArrayDestroy(p->vgroupIdList);
}
p->vgroupIdList = createVgroupIdListFromMsg(pMsg, pSet, fname, &size, pSql->self);
p->vgroupIdList = createVgroupIdListFromMsg(pParentSql, pMsg, pSet, fname, &size, pSql->self);
int32_t numOfVgId = (int32_t) taosArrayGetSize(p->vgroupIdList);
int32_t s = sizeof(tFilePage) + numOfVgId * sizeof(int32_t);
......@@ -2348,8 +2348,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
idList->num = numOfVgId;
memcpy(idList->data, TARRAY_GET_START(p->vgroupIdList), numOfVgId * sizeof(int32_t));
void* idListInst = taosCachePut(tscVgroupListBuf, fname, len, idList, s, 5000);
taosCacheRelease(tscVgroupListBuf, (void*) &idListInst, false);
void* idListInst = taosCachePut(UTIL_GET_VGROUPLIST(pParentSql), fname, len, idList, s, 5000);
taosCacheRelease(UTIL_GET_VGROUPLIST(pParentSql), (void*) &idListInst, false);
tfree(idList);
pMsg += size;
......@@ -2439,7 +2439,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
continue;
}
int32_t size = 0;
pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self);
pInfo->vgroupList = createVgroupInfoFromMsg(parent, pMsg, &size, pSql->self);
pMsg += size;
}
......@@ -2570,7 +2570,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth;
pObj->connId = htonl(pConnect->connId);
tstrncpy(pObj->clusterId, pConnect->clusterId, sizeof(pObj->clusterId));
createHbObj(pObj);
//launch a timer to send heartbeat to maintain the connection and send status to mnode
......@@ -2595,9 +2596,9 @@ int tscProcessDropDbRsp(SSqlObj *pSql) {
//TODO LOCK DB WHEN MODIFY IT
//pSql->pTscObj->db[0] = 0;
taosHashClear(tscTableMetaMap);
taosHashClear(tscVgroupMap);
taosCacheEmpty(tscVgroupListBuf);
taosHashClear(UTIL_GET_TABLEMETA(pSql));
taosHashClear(UTIL_GET_VGROUPMAP(pSql));
taosCacheEmpty(UTIL_GET_VGROUPLIST(pSql));
return 0;
}
......@@ -2617,7 +2618,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" remove tableMeta in hashMap after alter-table: %s", pSql->self, name);
bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
tfree(pTableMetaInfo->pTableMeta);
if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta
......@@ -2932,7 +2933,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity);
}
if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) {
if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) {
tfree(pTableMetaInfo->pTableMeta);
}
......@@ -2942,7 +2943,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
if (pMeta && pMeta->id.uid > 0) {
// in case of child table, here only get the
if (pMeta->tableType == TSDB_CHILD_TABLE) {
int32_t code = tscCreateTableMetaFromSTableMeta(&pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity, (STableMeta **)(&pSTMeta));
int32_t code = tscCreateTableMetaFromSTableMeta(pSql, &pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity, (STableMeta **)(&pSTMeta));
pSql->pBuf = (void *)(pSTMeta);
if (code != TSDB_CODE_SUCCESS) {
return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
......
......@@ -194,6 +194,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
tscBuildAndSendRequest(pSql, NULL);
tsem_wait(&pSql->rspSem);
pSql->pTscObj->pClusterInfo = (SClusterInfo *)tscAcquireClusterInfo(pSql->pTscObj->clusterId);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
terrno = pSql->res.code;
if (terrno ==TSDB_CODE_RPC_FQDN_ERROR) {
......@@ -256,6 +257,7 @@ static void asyncConnCallback(void *param, TAOS_RES *tres, int code) {
SSqlObj *pSql = (SSqlObj *) tres;
assert(pSql != NULL);
pSql->pTscObj->pClusterInfo = (SClusterInfo *)tscAcquireClusterInfo(pSql->pTscObj->clusterId);
pSql->fetchFp(pSql->param, tres, code);
}
......@@ -268,7 +270,6 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
}
if (taos) *taos = pObj;
pSql->fetchFp = fp;
pSql->res.code = tscBuildAndSendRequest(pSql, NULL);
tscDebug("%p DB async connection is opening", taos);
......
......@@ -3213,7 +3213,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
taosHashRemove(UTIL_GET_TABLEMETA(pParentObj), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
}
pParentObj->res.code = TSDB_CODE_SUCCESS;
......@@ -3358,7 +3358,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
goto _error;
}
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
// use the local variable
for (int32_t j = 0; j < numOfSub; ++j) {
......
......@@ -33,9 +33,11 @@
int32_t sentinel = TSC_VAR_NOT_RELEASE;
SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode
SHashObj *tscTableMetaMap; // table meta info buffer
SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list
//SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode
//SHashObj *tscTableMetaMap; // table meta info buffer
//SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list
SHashObj *tscClusterMap = NULL; // cluster obj
static pthread_mutex_t clusterMutex; // mutex to protect open the cluster obj
int32_t tscObjRef = -1;
void *tscTmr;
......@@ -121,6 +123,57 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry
return 0;
}
void tscClusterInfoDestroy(SClusterInfo *pObj) {
if (pObj == NULL) { return; }
taosHashCleanup(pObj->vgroupMap);
taosHashCleanup(pObj->tableMetaMap);
taosCacheCleanup(pObj->vgroupListBuf);
tfree(pObj);
}
void *tscAcquireClusterInfo(const char *clusterId) {
pthread_mutex_lock(&clusterMutex);
size_t len = strlen(clusterId);
SClusterInfo *pObj = NULL;
SClusterInfo **ppObj = taosHashGet(tscClusterMap, clusterId, len);
if (ppObj == NULL || *ppObj == NULL) {
pObj = calloc(1, sizeof(SClusterInfo));
if (pObj) {
pObj->vgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
pObj->tableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); //
pObj->vgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list");
if (pObj->vgroupMap == NULL || pObj->tableMetaMap == NULL || pObj->vgroupListBuf == NULL) {
tscClusterInfoDestroy(pObj);
pObj = NULL;
} else {
taosHashPut(tscClusterMap, clusterId, len, &pObj, POINTER_BYTES);
}
}
} else {
pObj = *ppObj;
}
if (pObj) { pObj->ref += 1; }
pthread_mutex_unlock(&clusterMutex);
return pObj;
}
void tscReleaseClusterInfo(const char *clusterId) {
pthread_mutex_lock(&clusterMutex);
size_t len = strlen(clusterId);
SClusterInfo *pObj = NULL;
SClusterInfo **ppObj = taosHashGet(tscClusterMap, clusterId, len);
if (ppObj != NULL && *ppObj != NULL) {
pObj = *ppObj;
}
if (pObj && --pObj->ref == 0) {
taosHashRemove(tscClusterMap, clusterId, len);
tscClusterInfoDestroy(pObj);
}
pthread_mutex_unlock(&clusterMutex);
}
void taos_init_imp(void) {
char temp[128] = {0};
......@@ -188,12 +241,16 @@ void taos_init_imp(void) {
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
}
if (tscTableMetaMap == NULL) {
if (tscClusterMap == NULL) {
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list");
tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap);
tscClusterMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pthread_mutex_init(&clusterMutex, NULL);
//tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
//tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
//tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list");
//tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap);
}
int refreshTime = 5;
......@@ -222,12 +279,6 @@ void taos_cleanup(void) {
scriptEnvPoolCleanup();
}
taosHashCleanup(tscTableMetaMap);
tscTableMetaMap = NULL;
taosHashCleanup(tscVgroupMap);
tscVgroupMap = NULL;
int32_t id = tscObjRef;
tscObjRef = -1;
taosCloseRef(id);
......@@ -251,14 +302,16 @@ void taos_cleanup(void) {
}
pthread_mutex_destroy(&setConfMutex);
taosCacheCleanup(tscVgroupListBuf);
tscVgroupListBuf = NULL;
if (tscEmbedded == 0) {
rpcCleanup();
taosCloseLog();
};
taosHashCleanup(tscClusterMap);
tscClusterMap = NULL;
pthread_mutex_destroy(&clusterMutex);
p = tscTmr;
tscTmr = NULL;
taosTmrCleanUp(p);
......
......@@ -1421,6 +1421,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) {
}
void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) {
SSqlObj *pSql = (SSqlObj*)taosAcquireRef(tscObjRef, id);
pCmd->command = 0;
pCmd->numOfCols = 0;
pCmd->count = 0;
......@@ -1429,13 +1430,14 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) {
pCmd->insertParam.sql = NULL;
destroyTableNameList(&pCmd->insertParam);
pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, clearCachedMeta);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pSql, pCmd->insertParam.pTableBlockHashList, clearCachedMeta);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
tfree(pCmd->insertParam.tagData.data);
pCmd->insertParam.tagData.dataLen = 0;
tscFreeQueryInfo(pCmd, clearCachedMeta, id);
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
taosReleaseRef(tscObjRef, id);
}
void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) {
......@@ -1571,7 +1573,7 @@ void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) {
tfree(pColInfo->colIdxInfo);
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) {
return;
}
......@@ -1582,7 +1584,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pDataBlock->tableName, name);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
}
if (!pDataBlock->cloned) {
......@@ -1623,7 +1625,7 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
return param;
}
void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) {
if (pDataBlockList == NULL) {
return NULL;
}
......@@ -1631,7 +1633,7 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; i++) {
void* d = taosArrayGetP(pDataBlockList, i);
tscDestroyDataBlock(d, false);
tscDestroyDataBlock(pSql, d, false);
}
taosArrayDestroy(pDataBlockList);
......@@ -1679,14 +1681,14 @@ void* tscDestroyUdfArrayList(SArray* pUdfList) {
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta) {
void* tscDestroyBlockHashTable(SSqlObj *pSql, SHashObj* pBlockHashTable, bool removeMeta) {
if (pBlockHashTable == NULL) {
return NULL;
}
STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL);
while(p) {
tscDestroyDataBlock(*p, removeMeta);
tscDestroyDataBlock(pSql, *p, removeMeta);
p = taosHashIterate(pBlockHashTable, p);
}
......@@ -1927,7 +1929,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
if (pInsertParam->pTableNameList == NULL) {
pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES);
......@@ -1944,11 +1946,11 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB
}
if (freeBlockMap) {
pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pInsertParam->pTableBlockHashList, false);
pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pSql, pInsertParam->pTableBlockHashList, false);
}
}
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType);
......@@ -1973,7 +1975,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
if (ret != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret);
taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
tscDestroyBlockArrayList(pSql, pVnodeDataBlockList);
tfree(blkKeyInfo.pKeyTuple);
return ret;
}
......@@ -1992,7 +1994,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize);
taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
tscDestroyBlockArrayList(pSql, pVnodeDataBlockList);
tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple);
......@@ -2010,7 +2012,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
} else {
if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
tscDestroyBlockArrayList(pSql, pVnodeDataBlockList);
tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple);
return code;
......@@ -2057,7 +2059,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
pOneTableBlock = *p;
}
extractTableNameList(pInsertParam, freeBlockMap);
extractTableNameList(pSql, pInsertParam, freeBlockMap);
// free the table data blocks;
pInsertParam->pDataBlocks = pVnodeDataBlockList;
......@@ -2077,6 +2079,7 @@ void tscCloseTscObj(void *param) {
tfree(pObj->tscCorMgmtEpSet);
tscReleaseRpc(pObj->pRpcObj);
pthread_mutex_destroy(&pObj->mutex);
tscReleaseClusterInfo(pObj->clusterId);
tfree(pObj);
}
......@@ -4550,7 +4553,7 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) {
return cMeta;
}
int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta**ppSTable) {
int32_t tscCreateTableMetaFromSTableMeta(SSqlObj *pSql, STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta**ppSTable) {
assert(*ppChild != NULL);
STableMeta* p = *ppSTable;
STableMeta* pChild = *ppChild;
......@@ -4560,11 +4563,10 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
memset((char *)p, 0, sz);
}
if (NULL == taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) {
if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) {
tfree(p);
} else {
*ppSTable = p;
}
}
*ppSTable = p;
// tableMeta exists, build child table meta according to the super table meta
// the uid need to be checked in addition to the general name of the super table.
......@@ -4586,7 +4588,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
*ppChild = pChild;
return TSDB_CODE_SUCCESS;
} else { // super table has been removed, current tableMeta is also expired. remove it here
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
return -1;
}
}
......@@ -5105,18 +5107,20 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id) {
char fname[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, fname);
SSqlObj *p = (SSqlObj *)taosAcquireRef(tscObjRef, id);
tNameExtractFullName(&pTableMetaInfo->name, fname);
int32_t len = (int32_t) strnlen(fname, TSDB_TABLE_FNAME_LEN);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
void* pv = taosCacheAcquireByKey(tscVgroupListBuf, fname, len);
void* pv = taosCacheAcquireByKey(UTIL_GET_VGROUPLIST(p), fname, len);
if (pv != NULL) {
taosCacheRelease(tscVgroupListBuf, &pv, true);
taosCacheRelease(UTIL_GET_VGROUPLIST(p), &pv, true);
}
}
taosHashRemove(tscTableMetaMap, fname, len);
tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap));
taosHashRemove(UTIL_GET_TABLEMETA(p), fname, len);
tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(UTIL_GET_TABLEMETA(p)));
taosReleaseRef(tscObjRef, id);
}
char* cloneCurrentDBName(SSqlObj* pSql) {
......
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include "taos.h"
int numOfThreads = 1;
void* connectClusterAndDeal(void *arg) {
int port = *(int *)arg;
const char *host = "127.0.0.1";
const char *user = "root";
const char *passwd = "taosdata";
TAOS* taos1 = taos_connect(host, user, passwd, "", port);
TAOS* taos2 = taos_connect(host, user, passwd, "", port + 1000);
if (NULL == taos1 || NULL == taos2) {
printf("connect to (%d/%d) failed \n", port, port + 1000);
return NULL;
}
TAOS_RES *result = NULL;
result = taos_query(taos1, "drop database if exists db");
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
taos_query(taos2, "drop database if exists db");
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
// ========= build database
{
result = taos_query(taos1, "create database db");
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
}
{
result = taos_query(taos2, "create database db");
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
}
//======== create table
{
result = taos_query(taos1, "create stable db.stest (ts timestamp, port int) tags(tport int)");
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
}
{
result = taos_query(taos2, "create stable db.stest (ts timestamp, port int) tags(tport int)");
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
}
//======== create table
{
result = taos_query(taos1, "use db");
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
}
{
result = taos_query(taos2, "use db");
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
}
{
char buf[1024] = {0};
sprintf(buf, "insert into db.t1 using stest tags(%d) values(now, %d)", port, port);
for (int i = 0; i < 100000; i++) {
//printf("error here\t");
result = taos_query(taos1, buf);
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
//sleep(1);
}
}
{
char buf[1024] = {0};
sprintf(buf, "insert into db.t1 using stest tags(%d) values(now, %d)", port + 1000, port + 1000);
for (int i = 0; i < 100000; i++) {
result = taos_query(taos2, buf);
if (0 != taos_errno(result)) {
printf("failed %s\n", taos_errstr(result));
}
taos_free_result(result);
//sleep(1);
}
}
// query result
{
result = taos_query(taos1, "select * from stest");
if (result == NULL || taos_errno(result) != 0) {
printf("query failed %s\n", taos_errstr(result));
taos_free_result(result);
}
TAOS_ROW row;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
char temp[1024] = {0};
rows++;
taos_print_row(temp, row, fields , num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
}
// query result
{
result = taos_query(taos2, "select * from stest");
if (result == NULL || taos_errno(result) != 0) {
printf("query failed %s\n", taos_errstr(result));
taos_free_result(result);
}
TAOS_ROW row;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
char temp[1024] = {0};
rows++;
taos_print_row(temp, row, fields , num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
}
taos_close(taos1);
taos_close(taos2);
return NULL;
}
int main(int argc, char* argv[]) {
pthread_t *pthreads = malloc(sizeof(pthread_t) * numOfThreads);
int *port = malloc(sizeof(int) * numOfThreads);
port[0] = 6030;
for (int i = 0; i < numOfThreads; i++) {
pthread_create(&pthreads[i], NULL, connectClusterAndDeal, (void *)&port[i]);
}
for (int i = 0; i < numOfThreads; i++) {
pthread_join(pthreads[i], NULL);
}
free(port);
}
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import json
import taos
import time
import random
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
'''
Before test start,Two TDengine services have been set up on different servers
'''
host1 = '192.168.1.101'
host2 = '192.168.1.102'
user = 'root'
password = 'taosdata'
cfgdir = '/home/cp/taos/TDengine/sim/dnode1/cfg'
conn1 = taos.connect(host=host1, user=user, password=password, config=cfgdir)
conn2 = taos.connect(host=host2, user=user, password=password, config=cfgdir)
cursor1 = conn1.cursor()
cursor2 = conn2.cursor()
tdSql1 = TDSql()
tdSql2 = TDSql()
tdSql1.init(cursor1)
tdSql2.init(cursor2)
dbname11 = 'db11'
dbname12 = 'db12'
dbname21 = 'db21'
stbname11 = 'stb11'
stbname12 = 'stb12'
stbname21 = 'stb21'
tbnum = 100
data_row = 100
db1_stb1_column = 'ts timestamp, c1 int, c2 float, c3 timestamp, c4 binary(16), c5 double, c6 bool, c7 bigint, c8 smallint, c9 tinyint, c10 nchar(16) '
db1_stb1_tag = 'st1 int, st2 float, st3 timestamp, st4 binary(16), st5 double, st6 bool, st7 bigint, st8 smallint, st9 tinyint, st10 nchar(16)'
def dbsql(dbname):
return f"create database {dbname} keep 3650"
def stbsql(stbname, columntype, tagtype):
return f'create stable {stbname} ({columntype}) tags ({tagtype}) '
def tbsql(tbname, stbname, tags):
return f'create table {tbname} using {stbname} tags ({tags})'
def datasql(tbname, data):
return f'insert into {tbname} values ({data})'
def testquery():
ti = random.randint(0,tbnum-1)
tdSql1.query(f"select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 from tm{ti}")
tdSql1.checkData(0, 0, ti)
tdSql1.checkData(0, 1, ti)
tdSql1.checkData(0, 2, ti)
tdSql1.checkData(0, 3, f'binary_{ti}')
tdSql1.checkData(0, 4, ti)
tdSql1.checkData(0, 5, ti%2)
tdSql1.checkData(0, 6, ti)
tdSql1.checkData(0, 7, ti%32768)
tdSql1.checkData(0, 8, ti%128)
tdSql1.checkData(0, 9, f'nchar_{ti}')
tdSql2.query(f"select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 from tn{ti}")
tdSql2.checkData(0, 0, ti+10000)
tdSql2.checkData(0, 1, ti+10000)
tdSql2.checkData(0, 2, ti+10000)
tdSql2.checkData(0, 3, f'binary_{ti+10000}')
tdSql2.checkData(0, 4, ti+10000)
tdSql2.checkData(0, 5, (ti+10000)%2)
tdSql2.checkData(0, 6, ti+10000)
tdSql2.checkData(0, 7, (ti+10000)%32768)
tdSql2.checkData(0, 8, (ti+10000)%128)
tdSql2.checkData(0, 9, f'nchar_{ti+10000}')
tdSql1.query(f"select last(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10) from {stbname11}")
tdSql1.checkData(0, 0, data_row-1)
tdSql1.checkData(0, 1, data_row-1)
tdSql1.checkData(0, 2, data_row-1)
tdSql1.checkData(0, 3, f'binary_{data_row-1}')
tdSql1.checkData(0, 4, data_row-1)
tdSql1.checkData(0, 5, (data_row-1)%2)
tdSql1.checkData(0, 6, data_row-1)
tdSql1.checkData(0, 7, (data_row-1)%32768)
tdSql1.checkData(0, 8, (data_row-1)%128)
tdSql1.checkData(0, 9, f'nchar_{data_row-1}')
tdSql1.query(f"select first(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10) from {stbname11}")
tdSql1.checkData(0, 0, 0)
tdSql1.checkData(0, 1, 0)
tdSql1.checkData(0, 2, 0)
tdSql1.checkData(0, 3, f'binary_0')
tdSql1.checkData(0, 4, 0)
tdSql1.checkData(0, 5, 0)
tdSql1.checkData(0, 6, 0)
tdSql1.checkData(0, 7, 0)
tdSql1.checkData(0, 8, 0)
tdSql1.checkData(0, 9, f'nchar_0')
tdSql1.error("select * from")
tdSql1.query(f"select last(*) from tm1")
tdSql1.checkData(0, 1, 1)
tdSql1.checkData(0, 4, "binary_1")
tdSql1.query(f"select min(c1),max(c2) from {stbname11}")
tdSql1.checkData(0, 0, 0)
tdSql1.checkData(0, 1, data_row-1)
tdSql2.query(f"select count(*), count(c1) from {stbname21}")
tdSql2.checkData(0, 0, data_row)
tdSql2.checkData(0, 1, data_row)
tdSql2.query(f"select first(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10) from {stbname21}")
tdSql2.checkData(0, 0, 10000)
tdSql2.checkData(0, 1, 10000)
tdSql2.checkData(0, 2, 10000)
tdSql2.checkData(0, 3, f'binary_10000')
tdSql2.checkData(0, 4, 10000)
tdSql2.checkData(0, 5, 10000%2)
tdSql2.checkData(0, 6, 10000)
tdSql2.checkData(0, 7, 10000%32768)
tdSql2.checkData(0, 8, 10000%128)
tdSql2.checkData(0, 9, f'nchar_10000')
tdSql2.query(f"select last(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10) from {stbname21}")
tdSql2.checkData(0, 0, data_row+9999)
tdSql2.checkData(0, 1, data_row+9999)
tdSql2.checkData(0, 2, data_row+9999)
tdSql2.checkData(0, 3, f'binary_{data_row+9999}')
tdSql2.checkData(0, 4, data_row+9999)
tdSql2.checkData(0, 5, (data_row+9999)%2)
tdSql2.checkData(0, 6, data_row+9999)
tdSql2.checkData(0, 7, (data_row+9999)%32768)
tdSql2.checkData(0, 8, (data_row+9999)%128)
tdSql2.checkData(0, 9, f'nchar_{data_row+9999}')
tdSql1.query(f"select max(c1) from (select top(c1,10) c1 from {stbname11})")
tdSql1.checkData(0, 0, data_row-1)
tdSql2.query(f"select max(c1) from (select top(c1,10) c1 from {stbname21})")
tdSql2.checkData(0, 0, data_row+9999)
tdSql1.query(f"select avg(c1) from {stbname11}")
tdSql1.checkData(0, 0, sum(range(data_row))/data_row)
tdSql2.query(f"select avg(c1) from {stbname21}")
tdSql2.checkData(0, 0, sum(range(data_row))/data_row+10000)
tdSql1.query(f"select spread(c1) from {stbname11}")
tdSql1.checkData(0, 0, data_row-1)
tdSql2.query(f"select spread(c1) from {stbname21}")
tdSql2.checkData(0, 0, data_row-1)
tdSql1.query(f"select max(c1)*2 from {stbname11}")
tdSql1.checkData(0, 0, (data_row-1)*2)
tdSql2.query(f"select max(c1)*2 from {stbname21}")
tdSql2.checkData(0, 0, (data_row+9999)*2)
tdSql1.query(f"select avg(c1) from {stbname11} where c1 <= 10")
tdSql1.checkData(0, 0, 5)
tdSql2.query(f"select avg(c1) from {stbname21} where c1 <= 10010")
tdSql2.checkData(0, 0, 10005)
tdSql1.query(f"select * from {stbname11} where tbname like 'tn%'")
tdSql1.checkRows(0)
tdSql2.query(f"select * from {stbname21} where tbname like 'tm%'")
tdSql2.checkRows(0)
tdSql1.query(f"select max(c1) from {stbname11} group by tbname")
tdSql1.checkRows(tbnum)
tdSql2.query(f"select max(c1) from {stbname21} group by tbname")
tdSql2.checkRows(tbnum)
tdSql1.error(f"select * from {stbname11}, {stbname21} where {stbname11}.ts = {stbname21}.ts and {stbname11}.st1 = {stbname21}.st1")
tdSql2.error(f"select * from {stbname11}, {stbname21} where {stbname11}.ts = {stbname21}.ts and {stbname11}.st1 = {stbname21}.st1")
if __name__ == '__main__':
tdSql1.execute('reset query cache')
tdSql2.execute('reset query cache')
tdSql1.execute(f'drop database if exists {dbname11}')
tdSql1.execute(f'drop database if exists {dbname12}')
tdSql1.execute(f'drop database if exists {dbname21}')
tdSql2.execute(f'drop database if exists {dbname21}')
tdSql2.execute(f'drop database if exists {dbname11}')
tdSql2.execute(f'drop database if exists {dbname12}')
tdSql1.execute(dbsql(dbname11))
tdSql1.query('show databases')
tdSql1.checkRows(1)
tdSql2.query('show databases')
tdSql2.checkRows(0)
tdSql2.execute(dbsql(dbname21))
tdSql1.query(f'show databases')
tdSql1.checkData(0, 0, dbname11)
tdSql2.query(f'show databases')
tdSql2.checkData(0, 0, dbname21)
tdSql1.execute(f'use {dbname11}')
tdSql1.query("show stables")
tdSql1.checkRows(0)
tdSql2.error("show stables")
### conn1 create stable
tdSql1.execute(stbsql(stbname11, db1_stb1_column, db1_stb1_tag))
tdSql1.query(f"show stables like '{stbname11}' ")
tdSql1.checkRows(1)
tdSql2.error("show stables")
# 'st1 int, st2 float, st3 timestamp, st4 binary(16), st5 double, st6 bool, st7 bigint, st8 smallint, st9 tinyint, st10 nchar(16)'
for i in range(100):
t1name = f"t{i}"
stname = stbname11
tags = f'{i}, {i}, {i}, "binary_{i}", {i}, {i%2}, {i}, {i%32768}, {i%128}, "nchar_{i}"'
tdSql1.execute(tbsql(t1name, stname, tags))
tdSql2.error(f'select * from t{random.randint(0, 99)}')
tdSql1.query("show tables")
tdSql1.checkRows(100)
tdSql2.error("show tables")
tdSql1.query(f'select * from {stbname11}')
# tdSql1.query(f'select * from t1')
tdSql1.checkRows(0)
tdSql2.error(f'select * from {stname}')
# conn1 insert data
# 'ts timestamp, c1 int, c2 float, c3 timestamp, c4 binary(16), c5 double, c6 bool, c7 bigint, c8 smallint, c9 tinyint, c10 nchar(16) '
nowtime = int(round(time.time() * 1000))
for i in range(100):
data = f'{nowtime+i*10}, {i}, {i}, {i}, "binary_{i}", {i}, {i%2}, {i}, {i%32768}, {i%128}, "nchar_{i}"'
tdSql1.execute(datasql(f"t{i}", data))
# tdSql2.error(datasql(f't{i}', data))
ti = random.randint(0,99)
tdSql1.query(f"select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 from t{ti}")
tdSql1.checkData(0, 0, ti)
tdSql1.checkData(0, 1, ti)
tdSql1.checkData(0, 2, ti)
tdSql1.checkData(0, 3, f'binary_{ti}')
tdSql1.checkData(0, 4, ti)
tdSql1.checkData(0, 5, ti%2)
tdSql1.checkData(0, 6, ti)
tdSql1.checkData(0, 7, ti%32768)
tdSql1.checkData(0, 8, ti%128)
tdSql1.checkData(0, 9, f'nchar_{ti}')
tdSql2.error(f"select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 from t{ti}")
# delete conn1.database and reinsert the data to conn1.db and conn2.db
tdSql1.execute(f"drop database if exists {dbname11}")
tdSql1.query("show databases")
tdSql1.checkRows(0)
tdSql2.query(f"show databases")
tdSql2.checkData(0, 0, dbname21)
tdSql1.execute(dbsql(dbname11))
tdSql1.query(f"show databases")
tdSql1.checkData(0, 0, dbname11)
tdSql2.query(f"show databases ")
tdSql2.checkData(0, 0, dbname21)
tdSql1.execute(dbsql(dbname12))
tdSql1.query("show databases")
tdSql1.checkData(0, 0, dbname11)
tdSql1.checkData(1, 0, dbname12)
tdSql2.query("show databases")
tdSql2.checkData(0, 0, dbname21)
tdSql1.execute(f"use {dbname11}")
tdSql1.query("show stables")
tdSql1.checkRows(0)
tdSql2.error("show stables")
tdSql2.execute(f"use {dbname21}")
tdSql2.query("show stables")
tdSql2.checkRows(0)
tdSql2.error(f"use {dbname12}")
tdSql1.execute(stbsql(stbname11, db1_stb1_column, db1_stb1_tag))
tdSql1.query("show stables")
tdSql1.checkRows(1)
tdSql2.query("show stables")
tdSql2.checkRows(0)
tdSql2.execute(stbsql(stbname21, db1_stb1_column, db1_stb1_tag))
tdSql1.query("show stables ")
tdSql1.checkRows(1)
tdSql1.query(f"show stables like '{stbname11}' ")
tdSql1.checkRows(1)
tdSql2.query("show stables ")
tdSql1.checkRows(1)
tdSql2.query(f"show stables like '{stbname21}' ")
tdSql1.checkRows(1)
for i in range(tbnum):
t1name = f"tm{i}"
t2name = f"tn{i}"
s1tname = stbname11
s2tname = stbname21
tags = f'{i}, {i}, {i}, "binary_{i}", {i}, {i % 2}, {i}, {i % 32768}, {i % 128}, "nchar_{i}"'
tdSql1.execute(tbsql(t1name, s1tname, tags))
# tdSql2.error(f'select * from {t1name}')
tdSql2.execute(tbsql(t2name, s2tname, tags))
# tdSql2.query(f'select * from {t2name}')
# tdSql1.error(f'select * from {t2name}')
tdSql1.query("show tables like 'tm%' ")
tdSql1.checkRows(tbnum)
tdSql2.query("show tables like 'tn%' ")
tdSql2.checkRows(tbnum)
for i in range(data_row):
data1 = f'{nowtime + i * 10}, {i}, {i}, {i}, "binary_{i}", {i}, {i % 2}, {i}, {i % 32768}, {i % 128}, "nchar_{i}"'
data2 = f'{nowtime+i*10}, {i+10000}, {i+10000}, {i+10000}, "binary_{i+10000}", {i+10000}, {(i+10000)%2}, {i+10000}, {(i+10000)%32768}, {(i+10000)%128}, "nchar_{i+10000}" '
tdSql1.execute(datasql(f"tm{i}", data1))
tdSql2.execute(datasql(f'tn{i}', data2))
testquery()
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册