提交 c803ebd8 编写于 作者: dengyihao's avatar dengyihao

[TD-6127]<feature> one taos driver connect two cluster

上级 2ab3c33f
...@@ -41,6 +41,15 @@ extern "C" { ...@@ -41,6 +41,15 @@ extern "C" {
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#define UTIL_GET_VGROUPMAP(pSql) \
(pSql->pTscObj->pClusterInfo->vgroupMap)
#define UTIL_GET_TABLEMETA(pSql) \
(pSql->pTscObj->pClusterInfo->tableMetaMap)
#define UTIL_GET_VGROUPLIST(pSql) \
(pSql->pTscObj->pClusterInfo->vgroupListBuf)
#pragma pack(push,1) #pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid // this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL, // an 'uid' whose low bytes is 0xff being recoginized as NULL,
...@@ -106,6 +115,7 @@ typedef struct SBlockKeyInfo { ...@@ -106,6 +115,7 @@ typedef struct SBlockKeyInfo {
SBlockKeyTuple* pKeyTuple; SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo; } SBlockKeyInfo;
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len); int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
...@@ -346,7 +356,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild); ...@@ -346,7 +356,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild);
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta); uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta); CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize(); uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable); int32_t tscCreateTableMetaFromSTableMeta(SSqlObj *pSql, STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta); STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo); SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
...@@ -367,6 +377,7 @@ void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id); ...@@ -367,6 +377,7 @@ void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
char* cloneCurrentDBName(SSqlObj* pSql); char* cloneCurrentDBName(SSqlObj* pSql);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -48,6 +48,7 @@ struct SSqlInfo; ...@@ -48,6 +48,7 @@ struct SSqlInfo;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef struct SNewVgroupInfo { typedef struct SNewVgroupInfo {
int32_t vgId; int32_t vgId;
int8_t inUse; int8_t inUse;
...@@ -139,6 +140,13 @@ typedef enum { ...@@ -139,6 +140,13 @@ typedef enum {
ROW_COMPARE_NEED = 1, ROW_COMPARE_NEED = 1,
} ERowCompareStat; } ERowCompareStat;
typedef struct {
void *vgroupMap;
void *tableMetaMap;
void *vgroupListBuf;
int64_t ref;
} SClusterInfo;
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec); int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec);
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
...@@ -334,6 +342,7 @@ typedef struct STscObj { ...@@ -334,6 +342,7 @@ typedef struct STscObj {
char acctId[TSDB_ACCT_ID_LEN]; char acctId[TSDB_ACCT_ID_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char sversion[TSDB_VERSION_LEN]; char sversion[TSDB_VERSION_LEN];
char clusterId[TSDB_CLUSTER_ID_LEN];
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;
uint32_t connId; uint32_t connId;
...@@ -342,9 +351,11 @@ typedef struct STscObj { ...@@ -342,9 +351,11 @@ typedef struct STscObj {
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
SRpcObj *pRpcObj; SRpcObj *pRpcObj;
SClusterInfo *pClusterInfo;
SRpcCorEpSet *tscCorMgmtEpSet; SRpcCorEpSet *tscCorMgmtEpSet;
pthread_mutex_t mutex; pthread_mutex_t mutex;
int32_t numOfObj; // number of sqlObj from this tscObj int32_t numOfObj; // number of sqlObj from this tscObj
SReqOrigin from; SReqOrigin from;
} STscObj; } STscObj;
...@@ -428,6 +439,9 @@ int tscAcquireRpc(const char *key, const char *user, const char *secret,void ** ...@@ -428,6 +439,9 @@ int tscAcquireRpc(const char *key, const char *user, const char *secret,void **
void tscReleaseRpc(void *param); void tscReleaseRpc(void *param);
void tscInitMsgsFp(); void tscInitMsgsFp();
void *tscAcquireClusterInfo(const char *clusterId);
void tscReleaseClusterInfo(const char *clusterId);
int tsParseSql(SSqlObj *pSql, bool initial); int tsParseSql(SSqlObj *pSql, bool initial);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
......
...@@ -924,8 +924,8 @@ int tscProcessLocalCmd(SSqlObj *pSql) { ...@@ -924,8 +924,8 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) { } else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) {
pRes->code = tscProcessShowCreateDatabase(pSql); pRes->code = tscProcessShowCreateDatabase(pSql);
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) { } else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosHashClear(tscTableMetaMap); taosHashClear(UTIL_GET_TABLEMETA(pSql));
taosCacheEmpty(tscVgroupListBuf); taosCacheEmpty(UTIL_GET_VGROUPLIST(pSql));
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) { } else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
pRes->code = tscProcessServerVer(pSql); pRes->code = tscProcessServerVer(pSql);
......
...@@ -608,7 +608,7 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl ...@@ -608,7 +608,7 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl
tscFreeRegisteredSqlObj(pSql); tscFreeRegisteredSqlObj(pSql);
size_t size = 0; size_t size = 0;
taosHashGetCloneExt(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
} }
if (tableMeta != NULL) { if (tableMeta != NULL) {
......
...@@ -8511,7 +8511,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8511,7 +8511,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
size_t len = strlen(name); size_t len = strlen(name);
if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&pTableMeta, &tableMetaCapacity)) { if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), name, len, NULL, (void **)&pTableMeta, &tableMetaCapacity)) {
// not found // not found
tfree(pTableMeta); tfree(pTableMeta);
} }
...@@ -8522,7 +8522,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8522,7 +8522,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// avoid mem leak, may should update pTableMeta // avoid mem leak, may should update pTableMeta
void* pVgroupIdList = NULL; void* pVgroupIdList = NULL;
if (pTableMeta->tableType == TSDB_CHILD_TABLE) { if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
code = tscCreateTableMetaFromSTableMeta((STableMeta **)(&pTableMeta), name, &tableMetaCapacity, (STableMeta **)(&pSTMeta)); code = tscCreateTableMetaFromSTableMeta(pSql, (STableMeta **)(&pTableMeta), name, &tableMetaCapacity, (STableMeta **)(&pSTMeta));
pSql->pBuf = (void *)pSTMeta; pSql->pBuf = (void *)pSTMeta;
// create the child table meta from super table failed, try load it from mnode // create the child table meta from super table failed, try load it from mnode
...@@ -8534,7 +8534,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8534,7 +8534,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} else if (pTableMeta->tableType == TSDB_SUPER_TABLE) { } else if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
// the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time // the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time
tscDebug("0x%"PRIx64" try to acquire cached super table %s vgroup id list", pSql->self, name); tscDebug("0x%"PRIx64" try to acquire cached super table %s vgroup id list", pSql->self, name);
void* pv = taosCacheAcquireByKey(tscVgroupListBuf, name, len); void* pv = taosCacheAcquireByKey(UTIL_GET_VGROUPLIST(pSql), name, len);
if (pv == NULL) { if (pv == NULL) {
char* t = strdup(name); char* t = strdup(name);
taosArrayPush(pVgroupList, &t); taosArrayPush(pVgroupList, &t);
...@@ -8547,7 +8547,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8547,7 +8547,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
taosArrayAddBatch(pVgroupIdList, pdata->data, (int32_t) pdata->num); taosArrayAddBatch(pVgroupIdList, pdata->data, (int32_t) pdata->num);
taosCacheRelease(tscVgroupListBuf, &pv, false); taosCacheRelease(UTIL_GET_VGROUPLIST(pSql), &pv, false);
} }
} }
...@@ -8697,7 +8697,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod ...@@ -8697,7 +8697,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
// check if current buffer contains the vgroup info. If not, add it // check if current buffer contains the vgroup info. If not, add it
SNewVgroupInfo existVgroupInfo = {.inUse = -1,}; SNewVgroupInfo existVgroupInfo = {.inUse = -1,};
taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo); taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), id, sizeof(*id), NULL, &existVgroupInfo);
assert(existVgroupInfo.inUse >= 0); assert(existVgroupInfo.inUse >= 0);
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j];
......
...@@ -157,7 +157,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { ...@@ -157,7 +157,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
assert(vgId > 0); assert(vgId > 0);
SNewVgroupInfo vgroupInfo = {.vgId = -1}; SNewVgroupInfo vgroupInfo = {.vgId = -1};
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), NULL, &vgroupInfo);
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0); assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
...@@ -169,7 +169,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { ...@@ -169,7 +169,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
} }
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo)); taosHashPut(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
// Update the local cached epSet info cached by SqlObj // Update the local cached epSet info cached by SqlObj
int32_t inUse = pSql->epSet.inUse; int32_t inUse = pSql->epSet.inUse;
...@@ -653,7 +653,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -653,7 +653,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
SNewVgroupInfo vgroupInfo = {0}; SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps); tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps);
...@@ -741,7 +741,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab ...@@ -741,7 +741,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab
vgId = pTableMeta->vgId; vgId = pTableMeta->vgId;
SNewVgroupInfo vgroupInfo = {0}; SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
} }
...@@ -1648,7 +1648,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { ...@@ -1648,7 +1648,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SNewVgroupInfo vgroupInfo = {.vgId = -1}; SNewVgroupInfo vgroupInfo = {.vgId = -1};
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
assert(vgroupInfo.vgId > 0); assert(vgroupInfo.vgId > 0);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
...@@ -2034,21 +2034,21 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) { ...@@ -2034,21 +2034,21 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) {
} }
// update the vgroupInfo if needed // update the vgroupInfo if needed
static void doUpdateVgroupInfo(int32_t vgId, SVgroupMsg *pVgroupMsg) { static void doUpdateVgroupInfo(SSqlObj *pSql, int32_t vgId, SVgroupMsg *pVgroupMsg) {
assert(vgId > 0); assert(vgId > 0);
SNewVgroupInfo vgroupInfo = {.inUse = -1}; SNewVgroupInfo vgroupInfo = {.inUse = -1};
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), NULL, &vgroupInfo);
// vgroup info exists, compare with it // vgroup info exists, compare with it
if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) { if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) {
vgroupInfo = createNewVgroupInfo(pVgroupMsg); vgroupInfo = createNewVgroupInfo(pVgroupMsg);
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); taosHashPut(UTIL_GET_VGROUPMAP(pSql), &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(UTIL_GET_VGROUPMAP(pSql)));
} }
} }
static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) { static void doAddTableMetaToLocalBuf(SSqlObj *pSql, STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) {
if (pTableMeta->tableType == TSDB_CHILD_TABLE) { if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
// add or update the corresponding super table meta data info // add or update the corresponding super table meta data info
int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN); int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
...@@ -2057,18 +2057,18 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet ...@@ -2057,18 +2057,18 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet
if (updateSTable) { if (updateSTable) {
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg); STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
uint32_t size = tscGetTableMetaSize(pSupTableMeta); uint32_t size = tscGetTableMetaSize(pSupTableMeta);
int32_t code = taosHashPut(tscTableMetaMap, pTableMeta->sTableName, len, pSupTableMeta, size); int32_t code = taosHashPut(UTIL_GET_TABLEMETA(pSql), pTableMeta->sTableName, len, pSupTableMeta, size);
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
tfree(pSupTableMeta); tfree(pSupTableMeta);
} }
CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta); CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); taosHashPut(UTIL_GET_TABLEMETA(pSql), pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta));
tfree(cMeta); tfree(cMeta);
} else { } else {
uint32_t s = tscGetTableMetaSize(pTableMeta); uint32_t s = tscGetTableMetaSize(pTableMeta);
taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); taosHashPut(UTIL_GET_TABLEMETA(pSql), pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s);
} }
} }
...@@ -2096,9 +2096,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -2096,9 +2096,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
tNameExtractFullName(&pTableMetaInfo->name, name); tNameExtractFullName(&pTableMetaInfo->name, name);
assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0); assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0);
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, true); doAddTableMetaToLocalBuf(pSql, pTableMeta, pMetaMsg, true);
if (pTableMeta->tableType != TSDB_SUPER_TABLE) { if (pTableMeta->tableType != TSDB_SUPER_TABLE) {
doUpdateVgroupInfo(pTableMeta->vgId, &pMetaMsg->vgroup); doUpdateVgroupInfo(pSql, pTableMeta->vgId, &pMetaMsg->vgroup);
} }
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self, tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self,
...@@ -2109,7 +2109,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -2109,7 +2109,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) { static SArray* createVgroupIdListFromMsg(SSqlObj *pSql, char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) {
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg;
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
...@@ -2132,7 +2132,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, ...@@ -2132,7 +2132,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name,
if (taosHashGet(pSet, &vmsg->vgId, sizeof(vmsg->vgId)) == NULL) { if (taosHashGet(pSet, &vmsg->vgId, sizeof(vmsg->vgId)) == NULL) {
taosHashPut(pSet, &vmsg->vgId, sizeof(vmsg->vgId), "", 0); taosHashPut(pSet, &vmsg->vgId, sizeof(vmsg->vgId), "", 0);
doUpdateVgroupInfo(vmsg->vgId, vmsg); doUpdateVgroupInfo(pSql, vmsg->vgId, vmsg);
} }
} }
} }
...@@ -2140,7 +2140,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, ...@@ -2140,7 +2140,7 @@ static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name,
return vgroupIdList; return vgroupIdList;
} }
static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) { static SVgroupsInfo* createVgroupInfoFromMsg(SSqlObj *pSql, char* pMsg, int32_t* size, uint64_t id) {
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg;
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
...@@ -2171,7 +2171,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t ...@@ -2171,7 +2171,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
} }
doUpdateVgroupInfo(pVgroup->vgId, vmsg); doUpdateVgroupInfo(pSql, pVgroup->vgId, vmsg);
} }
} }
...@@ -2306,12 +2306,12 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2306,12 +2306,12 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
} }
// create the tableMeta and add it into the TableMeta map // create the tableMeta and add it into the TableMeta map
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, updateStableMeta); doAddTableMetaToLocalBuf(pParentSql, pTableMeta, pMetaMsg, updateStableMeta);
// for each vgroup, only update the information once. // for each vgroup, only update the information once.
int64_t vgId = pMetaMsg->vgroup.vgId; int64_t vgId = pMetaMsg->vgroup.vgId;
if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) { if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) {
doUpdateVgroupInfo((int32_t) vgId, &pMetaMsg->vgroup); doUpdateVgroupInfo(pParentSql, (int32_t) vgId, &pMetaMsg->vgroup);
taosHashPut(pSet, &vgId, sizeof(vgId), "", 0); taosHashPut(pSet, &vgId, sizeof(vgId), "", 0);
} }
...@@ -2336,7 +2336,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2336,7 +2336,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
taosArrayDestroy(p->vgroupIdList); taosArrayDestroy(p->vgroupIdList);
} }
p->vgroupIdList = createVgroupIdListFromMsg(pMsg, pSet, fname, &size, pSql->self); p->vgroupIdList = createVgroupIdListFromMsg(pParentSql, pMsg, pSet, fname, &size, pSql->self);
int32_t numOfVgId = (int32_t) taosArrayGetSize(p->vgroupIdList); int32_t numOfVgId = (int32_t) taosArrayGetSize(p->vgroupIdList);
int32_t s = sizeof(tFilePage) + numOfVgId * sizeof(int32_t); int32_t s = sizeof(tFilePage) + numOfVgId * sizeof(int32_t);
...@@ -2345,8 +2345,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2345,8 +2345,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
idList->num = numOfVgId; idList->num = numOfVgId;
memcpy(idList->data, TARRAY_GET_START(p->vgroupIdList), numOfVgId * sizeof(int32_t)); memcpy(idList->data, TARRAY_GET_START(p->vgroupIdList), numOfVgId * sizeof(int32_t));
void* idListInst = taosCachePut(tscVgroupListBuf, fname, len, idList, s, 5000); void* idListInst = taosCachePut(UTIL_GET_VGROUPLIST(pParentSql), fname, len, idList, s, 5000);
taosCacheRelease(tscVgroupListBuf, (void*) &idListInst, false); taosCacheRelease(UTIL_GET_VGROUPLIST(pParentSql), (void*) &idListInst, false);
tfree(idList); tfree(idList);
pMsg += size; pMsg += size;
...@@ -2436,7 +2436,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { ...@@ -2436,7 +2436,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
continue; continue;
} }
int32_t size = 0; int32_t size = 0;
pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self); pInfo->vgroupList = createVgroupInfoFromMsg(parent, pMsg, &size, pSql->self);
pMsg += size; pMsg += size;
} }
...@@ -2567,7 +2567,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2567,7 +2567,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pObj->writeAuth = pConnect->writeAuth; pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth; pObj->superAuth = pConnect->superAuth;
pObj->connId = htonl(pConnect->connId); pObj->connId = htonl(pConnect->connId);
tstrncpy(pObj->clusterId, pConnect->clusterId, sizeof(pObj->clusterId));
createHbObj(pObj); createHbObj(pObj);
//launch a timer to send heartbeat to maintain the connection and send status to mnode //launch a timer to send heartbeat to maintain the connection and send status to mnode
...@@ -2592,9 +2593,9 @@ int tscProcessDropDbRsp(SSqlObj *pSql) { ...@@ -2592,9 +2593,9 @@ int tscProcessDropDbRsp(SSqlObj *pSql) {
//TODO LOCK DB WHEN MODIFY IT //TODO LOCK DB WHEN MODIFY IT
//pSql->pTscObj->db[0] = 0; //pSql->pTscObj->db[0] = 0;
taosHashClear(tscTableMetaMap); taosHashClear(UTIL_GET_TABLEMETA(pSql));
taosHashClear(tscVgroupMap); taosHashClear(UTIL_GET_VGROUPMAP(pSql));
taosCacheEmpty(tscVgroupListBuf); taosCacheEmpty(UTIL_GET_VGROUPLIST(pSql));
return 0; return 0;
} }
...@@ -2614,11 +2615,11 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { ...@@ -2614,11 +2615,11 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" remove tableMeta in hashMap after alter-table: %s", pSql->self, name); tscDebug("0x%"PRIx64" remove tableMeta in hashMap after alter-table: %s", pSql->self, name);
bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
tfree(pTableMetaInfo->pTableMeta); tfree(pTableMetaInfo->pTableMeta);
if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta
taosHashClear(tscTableMetaMap); taosHashClear(UTIL_GET_TABLEMETA(pSql));
} }
return 0; return 0;
...@@ -2925,7 +2926,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool ...@@ -2925,7 +2926,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity); memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity);
} }
if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) { if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) {
tfree(pTableMetaInfo->pTableMeta); tfree(pTableMetaInfo->pTableMeta);
} }
...@@ -2935,7 +2936,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool ...@@ -2935,7 +2936,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
if (pMeta && pMeta->id.uid > 0) { if (pMeta && pMeta->id.uid > 0) {
// in case of child table, here only get the // in case of child table, here only get the
if (pMeta->tableType == TSDB_CHILD_TABLE) { if (pMeta->tableType == TSDB_CHILD_TABLE) {
int32_t code = tscCreateTableMetaFromSTableMeta(&pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity, (STableMeta **)(&pSTMeta)); int32_t code = tscCreateTableMetaFromSTableMeta(pSql, &pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity, (STableMeta **)(&pSTMeta));
pSql->pBuf = (void *)(pSTMeta); pSql->pBuf = (void *)(pSTMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate); return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
......
...@@ -194,6 +194,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -194,6 +194,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
tscBuildAndSendRequest(pSql, NULL); tscBuildAndSendRequest(pSql, NULL);
tsem_wait(&pSql->rspSem); tsem_wait(&pSql->rspSem);
pSql->pTscObj->pClusterInfo = (SClusterInfo *)tscAcquireClusterInfo(pSql->pTscObj->clusterId);
if (pSql->res.code != TSDB_CODE_SUCCESS) { if (pSql->res.code != TSDB_CODE_SUCCESS) {
terrno = pSql->res.code; terrno = pSql->res.code;
if (terrno ==TSDB_CODE_RPC_FQDN_ERROR) { if (terrno ==TSDB_CODE_RPC_FQDN_ERROR) {
......
...@@ -3229,7 +3229,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3229,7 +3229,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) { for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name); tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(UTIL_GET_TABLEMETA(pParentObj), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
pParentObj->res.code = TSDB_CODE_SUCCESS; pParentObj->res.code = TSDB_CODE_SUCCESS;
......
...@@ -33,9 +33,11 @@ ...@@ -33,9 +33,11 @@
int32_t sentinel = TSC_VAR_NOT_RELEASE; int32_t sentinel = TSC_VAR_NOT_RELEASE;
SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode //SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode
SHashObj *tscTableMetaMap; // table meta info buffer //SHashObj *tscTableMetaMap; // table meta info buffer
SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list //SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list
SHashObj *tscClusterMap = NULL; // cluster obj
static pthread_mutex_t clusterMutex; // mutex to protect open the cluster obj
int32_t tscObjRef = -1; int32_t tscObjRef = -1;
void *tscTmr; void *tscTmr;
...@@ -120,6 +122,58 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry ...@@ -120,6 +122,58 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry
return 0; return 0;
} }
void tscClusterInfoDestroy(SClusterInfo *pObj) {
if (pObj == NULL) { return; }
taosHashCleanup(pObj->vgroupMap);
taosHashCleanup(pObj->tableMetaMap);
taosCacheCleanup(pObj->vgroupListBuf);
tfree(pObj);
}
void *tscAcquireClusterInfo(const char *clusterId) {
pthread_mutex_lock(&clusterMutex);
int32_t len = strlen(clusterId);
SClusterInfo *pObj = NULL;
SClusterInfo **ppObj = taosHashGet(tscClusterMap, clusterId, len);
if (ppObj == NULL || *ppObj == NULL) {
pObj = calloc(1, sizeof(SClusterInfo));
if (pObj) {
pObj->vgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
pObj->tableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); //
pObj->vgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list");
if (pObj->vgroupMap == NULL || pObj->tableMetaMap == NULL || pObj->vgroupListBuf == NULL) {
tscClusterInfoDestroy(pObj);
pObj = NULL;
} else {
taosHashPut(tscClusterMap, clusterId, len, &pObj, POINTER_BYTES);
}
}
} else {
pObj = *ppObj;
}
if (pObj) {
pObj->ref += 1;
}
pthread_mutex_unlock(&clusterMutex);
return pObj;
}
void tscReleaseClusterInfo(const char *clusterId) {
pthread_mutex_lock(&clusterMutex);
size_t len = strlen(clusterId);
SClusterInfo *pObj = NULL;
SClusterInfo **ppObj = taosHashGet(tscClusterMap, clusterId, len);
if (ppObj != NULL && *ppObj != NULL) {
pObj = *ppObj;
}
if (pObj && --pObj->ref == 0) {
taosHashRemove(tscClusterMap, clusterId, len);
tscClusterInfoDestroy(pObj);
}
pthread_mutex_unlock(&clusterMutex);
}
void taos_init_imp(void) { void taos_init_imp(void) {
char temp[128] = {0}; char temp[128] = {0};
...@@ -187,12 +241,16 @@ void taos_init_imp(void) { ...@@ -187,12 +241,16 @@ void taos_init_imp(void) {
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
if (tscTableMetaMap == NULL) { if (tscClusterMap == NULL) {
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tscClusterMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list"); pthread_mutex_init(&clusterMutex, NULL);
tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap); //tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
//tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
//tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list");
//tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap);
} }
int refreshTime = 5; int refreshTime = 5;
...@@ -221,12 +279,6 @@ void taos_cleanup(void) { ...@@ -221,12 +279,6 @@ void taos_cleanup(void) {
scriptEnvPoolCleanup(); scriptEnvPoolCleanup();
} }
taosHashCleanup(tscTableMetaMap);
tscTableMetaMap = NULL;
taosHashCleanup(tscVgroupMap);
tscVgroupMap = NULL;
int32_t id = tscObjRef; int32_t id = tscObjRef;
tscObjRef = -1; tscObjRef = -1;
taosCloseRef(id); taosCloseRef(id);
...@@ -249,14 +301,15 @@ void taos_cleanup(void) { ...@@ -249,14 +301,15 @@ void taos_cleanup(void) {
pthread_mutex_destroy(&rpcObjMutex); pthread_mutex_destroy(&rpcObjMutex);
} }
taosCacheCleanup(tscVgroupListBuf);
tscVgroupListBuf = NULL;
if (tscEmbedded == 0) { if (tscEmbedded == 0) {
rpcCleanup(); rpcCleanup();
taosCloseLog(); taosCloseLog();
}; };
taosHashCleanup(tscClusterMap);
tscClusterMap = NULL;
pthread_mutex_destroy(&clusterMutex);
p = tscTmr; p = tscTmr;
tscTmr = NULL; tscTmr = NULL;
taosTmrCleanUp(p); taosTmrCleanUp(p);
......
...@@ -1577,7 +1577,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { ...@@ -1577,7 +1577,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pDataBlock->tableName, name); tNameExtractFullName(&pDataBlock->tableName, name);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); //taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
if (!pDataBlock->cloned) { if (!pDataBlock->cloned) {
...@@ -2072,6 +2072,7 @@ void tscCloseTscObj(void *param) { ...@@ -2072,6 +2072,7 @@ void tscCloseTscObj(void *param) {
tfree(pObj->tscCorMgmtEpSet); tfree(pObj->tscCorMgmtEpSet);
tscReleaseRpc(pObj->pRpcObj); tscReleaseRpc(pObj->pRpcObj);
pthread_mutex_destroy(&pObj->mutex); pthread_mutex_destroy(&pObj->mutex);
tscReleaseClusterInfo(pObj->clusterId);
tfree(pObj); tfree(pObj);
} }
...@@ -4546,7 +4547,7 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) { ...@@ -4546,7 +4547,7 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) {
return cMeta; return cMeta;
} }
int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta**ppSTable) { int32_t tscCreateTableMetaFromSTableMeta(SSqlObj *pSql, STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta**ppSTable) {
assert(*ppChild != NULL); assert(*ppChild != NULL);
STableMeta* p = *ppSTable; STableMeta* p = *ppSTable;
STableMeta* pChild = *ppChild; STableMeta* pChild = *ppChild;
...@@ -4556,7 +4557,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, ...@@ -4556,7 +4557,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
memset((char *)p, 0, sz); memset((char *)p, 0, sz);
} }
if (NULL == taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) { if (NULL == taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) {
tfree(p); tfree(p);
} else { } else {
*ppSTable = p; *ppSTable = p;
...@@ -4582,7 +4583,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, ...@@ -4582,7 +4583,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
*ppChild = pChild; *ppChild = pChild;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // super table has been removed, current tableMeta is also expired. remove it here } else { // super table has been removed, current tableMeta is also expired. remove it here
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
return -1; return -1;
} }
} }
...@@ -5100,18 +5101,20 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { ...@@ -5100,18 +5101,20 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id) { void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id) {
char fname[TSDB_TABLE_FNAME_LEN] = {0}; char fname[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, fname); SSqlObj *p = (SSqlObj *)taosAcquireRef(tscObjRef, id);
tNameExtractFullName(&pTableMetaInfo->name, fname);
int32_t len = (int32_t) strnlen(fname, TSDB_TABLE_FNAME_LEN); int32_t len = (int32_t) strnlen(fname, TSDB_TABLE_FNAME_LEN);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
void* pv = taosCacheAcquireByKey(tscVgroupListBuf, fname, len); void* pv = taosCacheAcquireByKey(UTIL_GET_VGROUPLIST(p), fname, len);
if (pv != NULL) { if (pv != NULL) {
taosCacheRelease(tscVgroupListBuf, &pv, true); taosCacheRelease(UTIL_GET_VGROUPLIST(p), &pv, true);
} }
} }
taosHashRemove(tscTableMetaMap, fname, len); taosHashRemove(UTIL_GET_TABLEMETA(p), fname, len);
tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap)); tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(UTIL_GET_TABLEMETA(p)));
taosReleaseRef(tscObjRef, id);
} }
char* cloneCurrentDBName(SSqlObj* pSql) { char* cloneCurrentDBName(SSqlObj* pSql) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册