提交 102ea9e0 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/new_data_format

...@@ -238,6 +238,7 @@ There are about 200 keywords reserved by TDengine, they can't be used as the nam ...@@ -238,6 +238,7 @@ There are about 200 keywords reserved by TDengine, they can't be used as the nam
- TOPICS - TOPICS
- TRIGGER - TRIGGER
- TSERIES - TSERIES
- TTL
### U ### U
...@@ -253,6 +254,7 @@ There are about 200 keywords reserved by TDengine, they can't be used as the nam ...@@ -253,6 +254,7 @@ There are about 200 keywords reserved by TDengine, they can't be used as the nam
### V ### V
- VALUE
- VALUES - VALUES
- VARIABLE - VARIABLE
- VARIABLES - VARIABLES
......
...@@ -40,11 +40,11 @@ Comparing the connector support for TDengine functional features as follows. ...@@ -40,11 +40,11 @@ Comparing the connector support for TDengine functional features as follows.
### Using the native interface (taosc) ### Using the native interface (taosc)
| **Functional Features** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** | | **Functional Features** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** |
| -------------- | -------- | ---------- | ------ | ------ | ----------- | -------- | | ----------------------------- | ------------- | ---------- | ------------- | ------------- | ------------- | ------------- |
| **Connection Management** | Support | Support | Support | Support | Support | Support | | **Connection Management** | Support | Support | Support | Support | Support | Support |
| **Regular Query** | Support | Support | Support | Support | Support | Support | | **Regular Query** | Support | Support | Support | Support | Support | Support |
| **Parameter Binding** | Support | Support | Support | Support | Support | Support | | **Parameter Binding** | Support | Support | Support | Support | Support | Support |
| ** TMQ ** | Support | Support | Support | Support | Support | Support | | **Subscription (TMQ)** | Support | Support | Support | Support | Support | Support |
| **Schemaless** | Support | Support | Support | Support | Support | Support | | **Schemaless** | Support | Support | Support | Support | Support | Support |
| **DataFrame** | Not Supported | Support | Not Supported | Not Supported | Not Supported | Not Supported | | **DataFrame** | Not Supported | Support | Not Supported | Not Supported | Not Supported | Not Supported |
...@@ -54,16 +54,15 @@ The different database framework specifications for various programming language ...@@ -54,16 +54,15 @@ The different database framework specifications for various programming language
### Use HTTP Interfaces (REST or WebSocket) ### Use HTTP Interfaces (REST or WebSocket)
| **Functional Features** | **Java** | **Python** | **Go** | **C# (not supported yet)** | **Node.js** | **Rust** | | **Functional Features** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** |
| ------------------------------ | -------- | ---------- | -------- | ------------------ | ----------- | -------- | | -------------------------------------- | ------------- | --------------- | ------------- | ------------- | ------------- | ------------- |
| **Connection Management** | Support | Support | Support | N/A | Support | Support | | **Connection Management** | Support | Support | Support | Support | Support | Support |
| **Regular Query** | Support | Support | Support | N/A | Support | Support | | **Regular Query** | Support | Support | Support | Support | Support | Support |
| **Continous Query ** | Support | Support | Support | N/A | Support | Support | | **Parameter Binding** | Not supported | Not supported | Not supported | Support | Not supported | Support |
| **Parameter Binding** | Not supported | Not supported | Not supported | N/A | Not supported | Support | | **Subscription (TMQ) ** | Not supported | Not supported | Not supported | Not supported | Not supported | Support |
| ** TMQ ** | Not supported | Not supported | Not supported | N/A | Not supported | Support | | **Schemaless** | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported |
| **Schemaless** | Not supported | Not supported | Not supported | N/A | Not supported | Not supported | | **Bulk Pulling (based on WebSocket) ** | Support | Support | Not Supported | support | Not Supported | Supported |
| **Bulk Pulling (based on WebSocket) **| Support | Support | Not Supported | N/A | Not Supported | Supported | | **DataFrame** | Not supported | Support | Not supported | Not supported | Not supported | Not supported |
| **DataFrame** | Not supported | Support | Not supported | N/A | Not supported | Not supported |
:::warning :::warning
......
...@@ -42,11 +42,11 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器 ...@@ -42,11 +42,11 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
### 使用原生接口(taosc) ### 使用原生接口(taosc)
| **功能特性** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** | | **功能特性** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** |
| -------------- | -------- | ---------- | ------ | ------ | ----------- | -------- | | ------------------- | -------- | ---------- | ------ | ------ | ----------- | -------- |
| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | | **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **普通查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | | **普通查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | | **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| ** TMQ ** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | | **数据订阅(TMQ)** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **Schemaless** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | | **Schemaless** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **DataFrame** | 不支持 | 支持 | 不支持 | 不支持 | 不支持 | 不支持 | | **DataFrame** | 不支持 | 支持 | 不支持 | 不支持 | 不支持 | 不支持 |
...@@ -56,16 +56,15 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器 ...@@ -56,16 +56,15 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
### 使用 http (REST 或 WebSocket) 接口 ### 使用 http (REST 或 WebSocket) 接口
| **功能特性** | **Java** | **Python** | **Go** | **C#(暂不支持)** | **Node.js** | **Rust** | | **功能特性** | **Java** | **Python** | **Go** | **C# ** | **Node.js** | **Rust** |
| ------------------------------ | -------- | ---------- | -------- | ------------------ | ----------- | -------- | | ------------------------------ | -------- | ---------- | -------- | -------- | ----------- | -------- |
| **连接管理** | 支持 | 支持 | 支持 | N/A | 支持 | 支持 | | **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **普通查询** | 支持 | 支持 | 支持 | N/A | 支持 | 支持 | | **普通查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **连续查询** | 支持 | 支持 | 支持 | N/A | 支持 | 支持 | | **参数绑定** | 暂不支持 | 暂不支持 | 暂不支持 | 支持 | 暂不支持 | 支持 |
| **参数绑定** | 不支持 | 暂不支持 | 暂不支持 | N/A | 不支持 | 支持 | | **数据订阅(TMQ)** | 暂不支持 | 暂不支持 | 暂不支持 | 暂不支持 | 暂不支持 | 支持 |
| ** TMQ ** | 不支持 | 暂不支持 | 暂不支持 | N/A | 不支持 | 支持 | | **Schemaless** | 暂不支持 | 暂不支持 | 暂不支持 | 暂不支持 | 暂不支持 | 暂不支持 |
| **Schemaless** | 暂不支持 | 暂不支持 | 暂不支持 | N/A | 不支持 | 暂不支持 | | **批量拉取(基于 WebSocket)** | 支持 | 支持 | 暂不支持 | 支持 | 暂不支持 | 支持 |
| **批量拉取(基于 WebSocket)** | 支持 | 支持 | 暂不支持 | N/A | 不支持 | 支持 | | **DataFrame** | 不支持 | 支持 | 不支持 | 不支持 | 不支持 | 不支持 |
| **DataFrame** | 不支持 | 支持 | 不支持 | N/A | 不支持 | 不支持 |
:::warning :::warning
......
...@@ -256,6 +256,7 @@ description: TDengine 保留关键字的详细列表 ...@@ -256,6 +256,7 @@ description: TDengine 保留关键字的详细列表
### V ### V
- VALUE
- VALUES - VALUES
- VARIABLE - VARIABLE
- VARIABLES - VARIABLES
......
...@@ -319,7 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -319,7 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
bool multiTarget = pDbObj->cfg.numOfVgroups > 1; bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
if (planTotLevel == 2 || externalTargetDB || multiTarget) { if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
/*if (true) {*/ /*if (true) {*/
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
......
...@@ -279,7 +279,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { ...@@ -279,7 +279,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
goto _err; goto _err;
} }
if (size != pTsdb->fs.pDelFile->size) { if (size != tsdbLogicToFileSize(pTsdb->fs.pDelFile->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; goto _err;
} }
......
...@@ -13,72 +13,23 @@ ...@@ -13,72 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h" #include "catalogInt.h"
#include "query.h"
#include "systable.h" #include "systable.h"
#include "tname.h"
#include "trpc.h"
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = { SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
{ {CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
CTG_OP_UPDATE_VGROUP, {CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
"update vgInfo", {CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
ctgOpUpdateVgroup {CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta},
}, {CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta},
{ {CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser},
CTG_OP_UPDATE_TB_META, {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
"update tbMeta", {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
ctgOpUpdateTbMeta {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
}, {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
{
CTG_OP_DROP_DB_CACHE,
"drop DB",
ctgOpDropDbCache
},
{
CTG_OP_DROP_DB_VGROUP,
"drop DBVgroup",
ctgOpDropDbVgroup
},
{
CTG_OP_DROP_STB_META,
"drop stbMeta",
ctgOpDropStbMeta
},
{
CTG_OP_DROP_TB_META,
"drop tbMeta",
ctgOpDropTbMeta
},
{
CTG_OP_UPDATE_USER,
"update user",
ctgOpUpdateUser
},
{
CTG_OP_UPDATE_VG_EPSET,
"update epset",
ctgOpUpdateEpset
},
{
CTG_OP_UPDATE_TB_INDEX,
"update tbIndex",
ctgOpUpdateTbIndex
},
{
CTG_OP_DROP_TB_INDEX,
"drop tbIndex",
ctgOpDropTbIndex
},
{
CTG_OP_CLEAR_CACHE,
"clear cache",
ctgOpClearCache
}
};
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) { int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock); CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
...@@ -86,18 +37,17 @@ int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) { ...@@ -86,18 +37,17 @@ int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
if (dbCache->deleted) { if (dbCache->deleted) {
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
ctgDebug("db is dropping, dbId:0x%"PRIx64, dbCache->dbId); ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
*inCache = false; *inCache = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (NULL == dbCache->vgCache.vgInfo) { if (NULL == dbCache->vgCache.vgInfo) {
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
*inCache = false; *inCache = false;
ctgDebug("db vgInfo is empty, dbId:0x%"PRIx64, dbCache->dbId); ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -110,7 +60,7 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) { ...@@ -110,7 +60,7 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock); CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
if (dbCache->deleted) { if (dbCache->deleted) {
ctgDebug("db is dropping, dbId:0x%"PRIx64, dbCache->dbId); ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
} }
...@@ -118,19 +68,13 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) { ...@@ -118,19 +68,13 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
}
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
}
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) { void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->dbLock); }
CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
}
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
char *p = strchr(dbFName, '.'); char *p = strchr(dbFName, '.');
if (p && IS_SYS_DBNAME(p + 1)) { if (p && IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1; dbFName = p + 1;
...@@ -162,20 +106,20 @@ int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache * ...@@ -162,20 +106,20 @@ int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache *
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) { int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true)); CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
} }
int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) { int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false)); CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
} }
void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache *dbCache) { void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
ctgRUnlockVgInfo(dbCache); ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
} }
void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* pCache) { void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
if (pCache) { if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock); CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache); taosHashRelease(dbCache->tbCache, pCache);
...@@ -186,7 +130,7 @@ void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* ...@@ -186,7 +130,7 @@ void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache*
} }
} }
void ctgReleaseTbIndexToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* pCache) { void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
if (pCache) { if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->indexLock); CTG_UNLOCK(CTG_READ, &pCache->indexLock);
taosHashRelease(dbCache->tbCache, pCache); taosHashRelease(dbCache->tbCache, pCache);
...@@ -197,7 +141,7 @@ void ctgReleaseTbIndexToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* ...@@ -197,7 +141,7 @@ void ctgReleaseTbIndexToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache*
} }
} }
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) { int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache); ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) { if (NULL == dbCache) {
...@@ -233,9 +177,9 @@ _return: ...@@ -233,9 +177,9 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, SCtgDBCache **pDb, SCtgTbCache** pTb) { int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL; SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache); ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) { if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName); ctgDebug("db %s not in cache", dbFName);
...@@ -272,16 +216,16 @@ _return: ...@@ -272,16 +216,16 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache** pTb) { int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache* dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL; SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache); ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) { if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName); ctgDebug("db %s not in cache", dbFName);
goto _return; goto _return;
} }
char* stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid)); char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
if (NULL == stName) { if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName); ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
goto _return; goto _return;
...@@ -321,10 +265,9 @@ _return: ...@@ -321,10 +265,9 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
int32_t ctgAcquireTbIndexFromCache(SCatalog* pCtg, char *dbFName, char* tbName, SCtgDBCache **pDb, SCtgTbCache** pTb) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL; SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache); ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) { if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName); ctgDebug("db %s not in cache", dbFName);
...@@ -362,8 +305,7 @@ _return: ...@@ -362,8 +305,7 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL; SCtgTbCache *tbCache = NULL;
ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache); ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
...@@ -380,7 +322,7 @@ int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32 ...@@ -380,7 +322,7 @@ int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL; SCtgTbCache *tbCache = NULL;
...@@ -399,7 +341,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** ...@@ -399,7 +341,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableMeta* tbMeta = tbCache->pMeta; STableMeta *tbMeta = tbCache->pMeta;
ctx->tbInfo.inCache = true; ctx->tbInfo.inCache = true;
ctx->tbInfo.dbId = dbCache->dbId; ctx->tbInfo.dbId = dbCache->dbId;
ctx->tbInfo.suid = tbMeta->suid; ctx->tbInfo.suid = tbMeta->suid;
...@@ -431,8 +373,8 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** ...@@ -431,8 +373,8 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
memcpy(*pTableMeta, tbMeta, metaSize); memcpy(*pTableMeta, tbMeta, metaSize);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
ctx->pName->tname, ctx->tbInfo.tbType, dbFName); ctx->tbInfo.tbType, dbFName);
ctgAcquireStbMetaFromCache(pCtg, dbFName, ctx->tbInfo.suid, &dbCache, &tbCache); ctgAcquireStbMetaFromCache(pCtg, dbFName, ctx->tbInfo.suid, &dbCache, &tbCache);
if (NULL == tbCache) { if (NULL == tbCache) {
...@@ -442,10 +384,10 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** ...@@ -442,10 +384,10 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableMeta* stbMeta = tbCache->pMeta; STableMeta *stbMeta = tbCache->pMeta;
if (stbMeta->suid != ctx->tbInfo.suid) { if (stbMeta->suid != ctx->tbInfo.suid) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, ctx->tbInfo.suid); ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
...@@ -472,8 +414,8 @@ _return: ...@@ -472,8 +414,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType,
char *stbName) { uint64_t *suid, char *stbName) {
*sver = -1; *sver = -1;
*tver = -1; *tver = -1;
...@@ -488,7 +430,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, ...@@ -488,7 +430,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableMeta* tbMeta = tbCache->pMeta; STableMeta *tbMeta = tbCache->pMeta;
*tbType = tbMeta->tableType; *tbType = tbMeta->tableType;
*suid = tbMeta->suid; *suid = tbMeta->suid;
...@@ -496,8 +438,8 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, ...@@ -496,8 +438,8 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
*sver = tbMeta->sversion; *sver = tbMeta->sversion;
*tver = tbMeta->tversion; *tver = tbMeta->tversion;
ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
pTableName->tname, dbFName, *tbType, *sver, *tver, *suid); dbFName, *tbType, *sver, *tver, *suid);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -515,10 +457,10 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, ...@@ -515,10 +457,10 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableMeta* stbMeta = tbCache->pMeta; STableMeta *stbMeta = tbCache->pMeta;
if (stbMeta->suid != *suid) { if (stbMeta->suid != *suid) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64 , stbMeta->suid, *suid); ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
...@@ -533,13 +475,13 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, ...@@ -533,13 +475,13 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType, dbFName); ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) {
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tbName, int32_t *tbType) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL; SCtgTbCache *tbCache = NULL;
CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache)); CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
...@@ -556,7 +498,7 @@ int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tbName, int3 ...@@ -556,7 +498,7 @@ int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tbName, int3
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, SName* pTableName, SArray** pRes) { int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) {
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL; SCtgTbCache *tbCache = NULL;
...@@ -580,7 +522,7 @@ _return: ...@@ -580,7 +522,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) { int32_t ctgChkAuthFromCache(SCatalog *pCtg, char *user, char *dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
char *p = strchr(dbFName, '.'); char *p = strchr(dbFName, '.');
if (p) { if (p) {
++p; ++p;
...@@ -651,8 +593,7 @@ void ctgDequeue(SCtgCacheOperation **op) { ...@@ -651,8 +593,7 @@ void ctgDequeue(SCtgCacheOperation **op) {
*op = node->op; *op = node->op;
} }
int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode)); SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == node) { if (NULL == node) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
...@@ -660,7 +601,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { ...@@ -660,7 +601,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
} }
bool syncOp = operation->syncOp; bool syncOp = operation->syncOp;
char* opName = gCtgCacheOperation[operation->opId].name; char *opName = gCtgCacheOperation[operation->opId].name;
if (operation->syncOp) { if (operation->syncOp) {
tsem_init(&operation->rspSem, 0, 0); tsem_init(&operation->rspSem, 0, 0);
} }
...@@ -699,8 +640,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { ...@@ -699,8 +640,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_CACHE; op->opId = CTG_OP_DROP_DB_CACHE;
...@@ -732,7 +672,7 @@ _return: ...@@ -732,7 +672,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) { int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_VGROUP; op->opId = CTG_OP_DROP_DB_VGROUP;
...@@ -764,9 +704,8 @@ _return: ...@@ -764,9 +704,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
bool syncOp) {
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_STB_META; op->opId = CTG_OP_DROP_STB_META;
...@@ -796,9 +735,7 @@ _return: ...@@ -796,9 +735,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_TB_META; op->opId = CTG_OP_DROP_TB_META;
...@@ -827,7 +764,7 @@ _return: ...@@ -827,7 +764,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) { int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VGROUP; op->opId = CTG_OP_UPDATE_VGROUP;
...@@ -864,7 +801,7 @@ _return: ...@@ -864,7 +801,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) { int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_META; op->opId = CTG_OP_UPDATE_TB_META;
...@@ -902,7 +839,7 @@ _return: ...@@ -902,7 +839,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) { int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VG_EPSET; op->opId = CTG_OP_UPDATE_VG_EPSET;
...@@ -931,9 +868,7 @@ _return: ...@@ -931,9 +868,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_USER; op->opId = CTG_OP_UPDATE_USER;
...@@ -962,7 +897,7 @@ _return: ...@@ -962,7 +897,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp) { int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_INDEX; op->opId = CTG_OP_UPDATE_TB_INDEX;
...@@ -993,7 +928,7 @@ _return: ...@@ -993,7 +928,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp) { int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_TB_INDEX; op->opId = CTG_OP_DROP_TB_INDEX;
...@@ -1022,8 +957,7 @@ _return: ...@@ -1022,8 +957,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_CLEAR_CACHE; op->opId = CTG_OP_CLEAR_CACHE;
...@@ -1052,7 +986,6 @@ _return: ...@@ -1052,7 +986,6 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slotRIdx = 0; mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
...@@ -1071,7 +1004,6 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { ...@@ -1071,7 +1004,6 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) { int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
int16_t widx = abs((int)(id % mgmt->slotNum)); int16_t widx = abs((int)(id % mgmt->slotNum));
...@@ -1082,19 +1014,20 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) ...@@ -1082,19 +1014,20 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size)
if (NULL == slot->meta) { if (NULL == slot->meta) {
slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size); slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
if (NULL == slot->meta) { if (NULL == slot->meta) {
qError("taosArrayInit %d failed, id:0x%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type); qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
mgmt->type);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
if (NULL == taosArrayPush(slot->meta, meta)) { if (NULL == taosArrayPush(slot->meta, meta)) {
qError("taosArrayPush meta to rent failed, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
slot->needSort = true; slot->needSort = true;
qDebug("add meta to rent, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return: _return:
...@@ -1102,7 +1035,8 @@ _return: ...@@ -1102,7 +1035,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
__compar_fn_t searchCompare) {
int16_t widx = abs((int)(id % mgmt->slotNum)); int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx]; SCtgRentSlot *slot = &mgmt->slots[widx];
...@@ -1110,12 +1044,13 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si ...@@ -1110,12 +1044,13 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
CTG_LOCK(CTG_WRITE, &slot->lock); CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) { if (NULL == slot->meta) {
qDebug("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
if (slot->needSort) { if (slot->needSort) {
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
(int32_t)taosArrayGetSize(slot->meta));
taosArraySort(slot->meta, sortCompare); taosArraySort(slot->meta, sortCompare);
slot->needSort = false; slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
...@@ -1123,20 +1058,22 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si ...@@ -1123,20 +1058,22 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ); void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
if (NULL == orig) { if (NULL == orig) {
qDebug("meta not found in slot, id:0x%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
(int32_t)taosArrayGetSize(slot->meta));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
memcpy(orig, meta, size); memcpy(orig, meta, size);
qDebug("meta in rent updated, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return: _return:
CTG_UNLOCK(CTG_WRITE, &slot->lock); CTG_UNLOCK(CTG_WRITE, &slot->lock);
if (code) { if (code) {
qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type); qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
widx, mgmt->type);
CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size)); CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
} }
...@@ -1151,7 +1088,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp ...@@ -1151,7 +1088,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
CTG_LOCK(CTG_WRITE, &slot->lock); CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) { if (NULL == slot->meta) {
qError("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
...@@ -1163,13 +1100,13 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp ...@@ -1163,13 +1100,13 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ); int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
if (idx < 0) { if (idx < 0) {
qError("meta not found in slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
taosArrayRemove(slot->meta, idx); taosArrayRemove(slot->meta, idx);
qDebug("meta in rent removed, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return: _return:
...@@ -1178,7 +1115,6 @@ _return: ...@@ -1178,7 +1115,6 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1); int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
if (ridx >= mgmt->slotNum) { if (ridx >= mgmt->slotNum) {
...@@ -1254,13 +1190,15 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { ...@@ -1254,13 +1190,15 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
SCtgDBCache newDBCache = {0}; SCtgDBCache newDBCache = {0};
newDBCache.dbId = dbId; newDBCache.dbId = dbId;
newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.tbCache) { if (NULL == newDBCache.tbCache) {
ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum); ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT),
true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.stbCache) { if (NULL == newDBCache.stbCache) {
ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum); ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -1282,11 +1220,11 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { ...@@ -1282,11 +1220,11 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1}; SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
ctgDebug("db added to cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId); ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion))); CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%"PRIx64, dbFName, vgVersion.vgVersion, dbId); ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1297,8 +1235,7 @@ _return: ...@@ -1297,8 +1235,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) {
if (NULL == dbCache->stbCache) { if (NULL == dbCache->stbCache) {
return; return;
} }
...@@ -1308,19 +1245,19 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) { ...@@ -1308,19 +1245,19 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) {
uint64_t *suid = NULL; uint64_t *suid = NULL;
suid = taosHashGetKey(pIter, NULL); suid = taosHashGetKey(pIter, NULL);
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) { if (TSDB_CODE_SUCCESS ==
ctgDebug("stb removed from rent, suid:0x%"PRIx64, *suid); ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
} }
pIter = taosHashIterate(dbCache->stbCache, pIter); pIter = taosHashIterate(dbCache->stbCache, pIter);
} }
} }
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
uint64_t dbId = dbCache->dbId; uint64_t dbId = dbCache->dbId;
ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbCache->dbId); ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId);
CTG_LOCK(CTG_WRITE, &dbCache->dbLock); CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
...@@ -1331,7 +1268,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d ...@@ -1331,7 +1268,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock); CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
ctgDebug("db removed from rent, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId); ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) { if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName); ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
...@@ -1339,13 +1276,12 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d ...@@ -1339,13 +1276,12 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
} }
CTG_CACHE_STAT_DEC(numOfDb, 1); CTG_CACHE_STAT_DEC(numOfDb, 1);
ctgInfo("db removed from cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId); ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, dbFName, &dbCache); ctgGetDBCache(pCtg, dbFName, &dbCache);
...@@ -1386,7 +1322,8 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt ...@@ -1386,7 +1322,8 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uint64_t dbId, uint64_t suid, SCtgTbCache* pCache) { int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
SCtgTbCache *pCache) {
SSTableVersion metaRent = {.dbId = dbId, .suid = suid}; SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
if (pCache->pMeta) { if (pCache->pMeta) {
metaRent.sversion = pCache->pMeta->sversion; metaRent.sversion = pCache->pMeta->sversion;
...@@ -1400,24 +1337,25 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uin ...@@ -1400,24 +1337,25 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uin
strcpy(metaRent.dbFName, dbFName); strcpy(metaRent.dbFName, dbFName);
strcpy(metaRent.stbName, tbName); strcpy(metaRent.stbName, tbName);
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion), ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
dbFName, dbId, tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer); tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) { STableMeta *meta, int32_t metaSize) {
if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) { if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
taosMemoryFree(meta); taosMemoryFree(meta);
ctgError("db is dropping, dbId:0x%"PRIx64, dbCache->dbId); ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
} }
bool isStb = meta->tableType == TSDB_SUPER_TABLE; bool isStb = meta->tableType == TSDB_SUPER_TABLE;
SCtgTbCache* pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
STableMeta *orig = (pCache ? pCache->pMeta : NULL); STableMeta *orig = (pCache ? pCache->pMeta : NULL);
int8_t origType = 0; int8_t origType = 0;
uint64_t origSuid = 0; uint64_t origSuid = 0;
...@@ -1425,7 +1363,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam ...@@ -1425,7 +1363,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
if (orig) { if (orig) {
origType = orig->tableType; origType = orig->tableType;
if (origType == meta->tableType && orig->uid == meta->uid && (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) { if (origType == meta->tableType && orig->uid == meta->uid &&
(origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
taosMemoryFree(meta); taosMemoryFree(meta);
ctgDebug("ignore table %s meta update", tbName); ctgDebug("ignore table %s meta update", tbName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1433,10 +1372,10 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam ...@@ -1433,10 +1372,10 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
if (origType == TSDB_SUPER_TABLE) { if (origType == TSDB_SUPER_TABLE) {
if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) { if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid); ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
} else { } else {
CTG_CACHE_STAT_DEC(numOfStb, 1); CTG_CACHE_STAT_DEC(numOfStb, 1);
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid); ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
} }
origSuid = orig->suid; origSuid = orig->suid;
...@@ -1469,31 +1408,33 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam ...@@ -1469,31 +1408,33 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (origSuid != meta->suid && taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) { if (origSuid != meta->suid &&
ctgError("taosHashPut to stable cache failed, suid:0x%"PRIx64, meta->suid); taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_CACHE_STAT_INC(numOfStb, 1); CTG_CACHE_STAT_INC(numOfStb, 1);
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, meta->tableType); ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
meta->tableType);
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache)); CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFName, char *tbName, STableIndex **index) { int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) {
if (NULL == dbCache->tbCache) { if (NULL == dbCache->tbCache) {
ctgFreeSTableIndex(*index); ctgFreeSTableIndex(*index);
taosMemoryFreeClear(*index); taosMemoryFreeClear(*index);
ctgError("db is dropping, dbId:0x%"PRIx64, dbCache->dbId); ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
} }
STableIndex* pIndex = *index; STableIndex *pIndex = *index;
uint64_t suid = pIndex->suid; uint64_t suid = pIndex->suid;
SCtgTbCache* pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) { if (NULL == pCache) {
SCtgTbCache cache = {0}; SCtgTbCache cache = {0};
cache.pIndex = pIndex; cache.pIndex = pIndex;
...@@ -1506,7 +1447,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa ...@@ -1506,7 +1447,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa
} }
*index = NULL; *index = NULL;
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, (int32_t)taosArrayGetSize(pIndex->pIndex)); ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
(int32_t)taosArrayGetSize(pIndex->pIndex));
if (suid) { if (suid) {
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache)); CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache));
...@@ -1526,7 +1468,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa ...@@ -1526,7 +1468,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa
pCache->pIndex = pIndex; pCache->pIndex = pIndex;
*index = NULL; *index = NULL;
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, (int32_t)taosArrayGetSize(pIndex->pIndex)); ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
(int32_t)taosArrayGetSize(pIndex->pIndex));
if (suid) { if (suid) {
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache)); CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache));
...@@ -1535,8 +1478,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa ...@@ -1535,8 +1478,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq) { int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
STableMetaOutput* pOutput = NULL; STableMetaOutput *pOutput = NULL;
int32_t code = 0; int32_t code = 0;
CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput)); CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
...@@ -1551,11 +1494,11 @@ _return: ...@@ -1551,11 +1494,11 @@ _return:
} }
void ctgClearAllInstance(void) { void ctgClearAllInstance(void) {
SCatalog* pCtg = NULL; SCatalog *pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) { while (pIter) {
pCtg = *(SCatalog**)pIter; pCtg = *(SCatalog **)pIter;
if (pCtg) { if (pCtg) {
ctgClearHandle(pCtg); ctgClearHandle(pCtg);
...@@ -1566,11 +1509,11 @@ void ctgClearAllInstance(void) { ...@@ -1566,11 +1509,11 @@ void ctgClearAllInstance(void) {
} }
void ctgFreeAllInstance(void) { void ctgFreeAllInstance(void) {
SCatalog* pCtg = NULL; SCatalog *pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) { while (pIter) {
pCtg = *(SCatalog**)pIter; pCtg = *(SCatalog **)pIter;
if (pCtg) { if (pCtg) {
ctgFreeHandle(pCtg); ctgFreeHandle(pCtg);
...@@ -1582,21 +1525,20 @@ void ctgFreeAllInstance(void) { ...@@ -1582,21 +1525,20 @@ void ctgFreeAllInstance(void) {
taosHashClear(gCtgMgmt.pCluster); taosHashClear(gCtgMgmt.pCluster);
} }
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgUpdateVgMsg *msg = operation->data; SCtgUpdateVgMsg *msg = operation->data;
SDBVgInfo* dbInfo = msg->dbInfo; SDBVgInfo *dbInfo = msg->dbInfo;
char* dbFName = msg->dbFName; char *dbFName = msg->dbFName;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
if (NULL == dbInfo->vgHash) { if (NULL == dbInfo->vgHash) {
goto _return; goto _return;
} }
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_JRET(TSDB_CODE_APP_ERROR); CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
} }
...@@ -1606,7 +1548,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ...@@ -1606,7 +1548,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache)); CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
if (NULL == dbCache) { if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%"PRIx64, dbFName, msg->dbId); ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
...@@ -1624,7 +1566,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ...@@ -1624,7 +1566,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
} }
if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) { if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) {
ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable); ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion,
dbInfo->numOfTable);
ctgWUnlockVgInfo(dbCache); ctgWUnlockVgInfo(dbCache);
goto _return; goto _return;
...@@ -1636,14 +1579,15 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ...@@ -1636,14 +1579,15 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
vgCache->vgInfo = dbInfo; vgCache->vgInfo = dbInfo;
msg->dbInfo = NULL; msg->dbInfo = NULL;
ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, dbId:0x%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId); ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
ctgWUnlockVgInfo(dbCache); ctgWUnlockVgInfo(dbCache);
dbCache = NULL; dbCache = NULL;
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_RET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
_return: _return:
...@@ -1656,7 +1600,7 @@ _return: ...@@ -1656,7 +1600,7 @@ _return:
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) { int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgDropDBMsg *msg = operation->data; SCtgDropDBMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache); ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
...@@ -1665,7 +1609,8 @@ int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) { ...@@ -1665,7 +1609,8 @@ int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
} }
if (dbCache->dbId != msg->dbId) { if (dbCache->dbId != msg->dbId) {
ctgInfo("dbId already updated, dbFName:%s, dbId:0x%"PRIx64 ", targetId:0x%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId); ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId,
msg->dbId);
goto _return; goto _return;
} }
...@@ -1681,7 +1626,7 @@ _return: ...@@ -1681,7 +1626,7 @@ _return:
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) { int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgDropDbVgroupMsg *msg = operation->data; SCtgDropDbVgroupMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache); ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
...@@ -1689,7 +1634,7 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) { ...@@ -1689,7 +1634,7 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
goto _return; goto _return;
} }
CTG_ERR_RET(ctgWLockVgInfo(pCtg, dbCache)); CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
ctgFreeVgInfo(dbCache->vgCache.vgInfo); ctgFreeVgInfo(dbCache->vgCache.vgInfo);
dbCache->vgCache.vgInfo = NULL; dbCache->vgCache.vgInfo = NULL;
...@@ -1705,12 +1650,11 @@ _return: ...@@ -1705,12 +1650,11 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgUpdateTbMetaMsg *msg = operation->data; SCtgUpdateTbMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
STableMetaOutput* pMeta = msg->pMeta; STableMetaOutput *pMeta = msg->pMeta;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) { if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
...@@ -1731,17 +1675,19 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { ...@@ -1731,17 +1675,19 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) { if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta); int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize)); CTG_ERR_JRET(
ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize));
pMeta->tbMeta = NULL; pMeta->tbMeta = NULL;
} }
if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) { if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
SCTableMeta* ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta)); SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
if (NULL == ctbMeta) { if (NULL == ctbMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta)); memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName, (STableMeta *)ctbMeta, sizeof(SCTableMeta))); CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName,
(STableMeta *)ctbMeta, sizeof(SCTableMeta)));
} }
_return: _return:
...@@ -1756,31 +1702,31 @@ _return: ...@@ -1756,31 +1702,31 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgDropStbMetaMsg *msg = operation->data; SCtgDropStbMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, msg->dbFName, &dbCache); ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) { if (NULL == dbCache) {
return TSDB_CODE_SUCCESS; goto _return;
} }
if (msg->dbId && (dbCache->dbId != msg->dbId)) { if (msg->dbId && (dbCache->dbId != msg->dbId)) {
ctgDebug("dbId already modified, dbFName:%s, current:0x%"PRIx64", dbId:0x%"PRIx64", stb:%s, suid:0x%"PRIx64, ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid); msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
return TSDB_CODE_SUCCESS; goto _return;
} }
if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) { if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
msg->stbName, msg->suid);
} else { } else {
CTG_CACHE_STAT_DEC(numOfStb, 1); CTG_CACHE_STAT_DEC(numOfStb, 1);
} }
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName)); SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
if (NULL == pTbCache) { if (NULL == pTbCache) {
ctgDebug("stb %s already not in cache", msg->stbName); ctgDebug("stb %s already not in cache", msg->stbName);
goto _return; goto _return;
...@@ -1791,16 +1737,16 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { ...@@ -1791,16 +1737,16 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) { if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
} else { } else {
CTG_CACHE_STAT_DEC(numOfTbl, 1); CTG_CACHE_STAT_DEC(numOfTbl, 1);
} }
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
_return: _return:
...@@ -1812,7 +1758,7 @@ _return: ...@@ -1812,7 +1758,7 @@ _return:
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgDropTblMetaMsg *msg = operation->data; SCtgDropTblMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, msg->dbFName, &dbCache); ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
...@@ -1821,11 +1767,12 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { ...@@ -1821,11 +1767,12 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
} }
if (dbCache->dbId != msg->dbId) { if (dbCache->dbId != msg->dbId) {
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName); ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
msg->dbFName, msg->tbName);
goto _return; goto _return;
} }
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName)); SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
if (NULL == pTbCache) { if (NULL == pTbCache) {
ctgDebug("tb %s already not in cache", msg->tbName); ctgDebug("tb %s already not in cache", msg->tbName);
goto _return; goto _return;
...@@ -1854,7 +1801,7 @@ _return: ...@@ -1854,7 +1801,7 @@ _return:
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) { int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgUpdateUserMsg *msg = operation->data; SCtgUpdateUserMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user)); SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
if (NULL == pUser) { if (NULL == pUser) {
...@@ -1908,7 +1855,7 @@ _return: ...@@ -1908,7 +1855,7 @@ _return:
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) { int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgUpdateEpsetMsg *msg = operation->data; SCtgUpdateEpsetMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache)); CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
...@@ -1925,17 +1872,17 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) { ...@@ -1925,17 +1872,17 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
goto _return; goto _return;
} }
SVgroupInfo* pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId)); SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
if (NULL == pInfo) { if (NULL == pInfo) {
ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName); ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName);
goto _return; goto _return;
} }
SEp* pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse]; SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
SEp* pNewEp = &msg->epSet.eps[msg->epSet.inUse]; SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse];
ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId,
pInfo->vgId, pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse,
msg->epSet.inUse, msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName); msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
pInfo->epSet = msg->epSet; pInfo->epSet = msg->epSet;
...@@ -1953,8 +1900,8 @@ _return: ...@@ -1953,8 +1900,8 @@ _return:
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) { int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgUpdateTbIndexMsg *msg = operation->data; SCtgUpdateTbIndexMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
STableIndex* pIndex = msg->pIndex; STableIndex *pIndex = msg->pIndex;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache)); CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
...@@ -1976,7 +1923,7 @@ _return: ...@@ -1976,7 +1923,7 @@ _return:
int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) { int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgDropTbIndexMsg *msg = operation->data; SCtgDropTbIndexMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache)); CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
...@@ -1984,7 +1931,7 @@ int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) { ...@@ -1984,7 +1931,7 @@ int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableIndex* pIndex = taosMemoryCalloc(1, sizeof(STableIndex)); STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pIndex) { if (NULL == pIndex) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -2006,11 +1953,10 @@ _return: ...@@ -2006,11 +1953,10 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgOpClearCache(SCtgCacheOperation *operation) { int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgClearCacheMsg *msg = operation->data; SCtgClearCacheMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
...@@ -2085,7 +2031,7 @@ void ctgCleanupCacheQueue(void) { ...@@ -2085,7 +2031,7 @@ void ctgCleanupCacheQueue(void) {
gCtgMgmt.queue.tail = NULL; gCtgMgmt.queue.tail = NULL;
} }
void* ctgUpdateThreadFunc(void* param) { void *ctgUpdateThreadFunc(void *param) {
setThreadName("catalog"); setThreadName("catalog");
qInfo("catalog update thread started"); qInfo("catalog update thread started");
...@@ -2095,7 +2041,7 @@ void* ctgUpdateThreadFunc(void* param) { ...@@ -2095,7 +2041,7 @@ void* ctgUpdateThreadFunc(void* param) {
qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
} }
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { if (atomic_load_8((int8_t *)&gCtgMgmt.exit)) {
ctgCleanupCacheQueue(); ctgCleanupCacheQueue();
break; break;
} }
...@@ -2125,7 +2071,6 @@ void* ctgUpdateThreadFunc(void* param) { ...@@ -2125,7 +2071,6 @@ void* ctgUpdateThreadFunc(void* param) {
return NULL; return NULL;
} }
int32_t ctgStartUpdateThread() { int32_t ctgStartUpdateThread() {
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); taosThreadAttrInit(&thAttr);
...@@ -2140,8 +2085,7 @@ int32_t ctgStartUpdateThread() { ...@@ -2140,8 +2085,7 @@ int32_t ctgStartUpdateThread() {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
if (IS_SYS_DBNAME(ctx->pName->dbname)) { if (IS_SYS_DBNAME(ctx->pName->dbname)) {
CTG_FLAG_SET_SYS_DB(ctx->flag); CTG_FLAG_SET_SYS_DB(ctx->flag);
} }
...@@ -2221,13 +2165,14 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ...@@ -2221,13 +2165,14 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
} }
#endif #endif
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList) { int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
int32_t tbNum = taosArrayGetSize(pList); int32_t tbNum = taosArrayGetSize(pList);
SName* pName = taosArrayGet(pList, 0); SName *pName = taosArrayGet(pList, 0);
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB; int32_t flag = CTG_FLAG_UNKNOWN_STB;
uint64_t lastSuid = 0; uint64_t lastSuid = 0;
STableMeta* lastTableMeta = NULL; STableMeta *lastTableMeta = NULL;
if (IS_SYS_DBNAME(pName->dbname)) { if (IS_SYS_DBNAME(pName->dbname)) {
CTG_FLAG_SET_SYS_DB(flag); CTG_FLAG_SET_SYS_DB(flag);
...@@ -2237,7 +2182,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ...@@ -2237,7 +2182,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
} }
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL; SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache); ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) { if (NULL == dbCache) {
...@@ -2251,7 +2196,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ...@@ -2251,7 +2196,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
} }
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(pList, i); SName *pName = taosArrayGet(pList, i);
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname)); pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
if (NULL == pCache) { if (NULL == pCache) {
...@@ -2271,7 +2216,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ...@@ -2271,7 +2216,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
continue; continue;
} }
STableMeta* tbMeta = pCache->pMeta; STableMeta *tbMeta = pCache->pMeta;
SCtgTbMetaCtx nctx = {0}; SCtgTbMetaCtx nctx = {0};
nctx.flag = flag; nctx.flag = flag;
...@@ -2281,7 +2226,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ...@@ -2281,7 +2226,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
nctx.tbInfo.tbType = tbMeta->tableType; nctx.tbInfo.tbType = tbMeta->tableType;
SMetaRes res = {0}; SMetaRes res = {0};
STableMeta* pTableMeta = NULL; STableMeta *pTableMeta = NULL;
if (tbMeta->tableType != TSDB_CHILD_TABLE) { if (tbMeta->tableType != TSDB_CHILD_TABLE) {
int32_t metaSize = CTG_META_SIZE(tbMeta); int32_t metaSize = CTG_META_SIZE(tbMeta);
pTableMeta = taosMemoryCalloc(1, metaSize); pTableMeta = taosMemoryCalloc(1, metaSize);
...@@ -2332,10 +2277,10 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ...@@ -2332,10 +2277,10 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
CTG_UNLOCK(CTG_READ, &pCache->metaLock); CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache); taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname,
pName->tname, nctx.tbInfo.tbType, dbFName); nctx.tbInfo.tbType, dbFName);
char* stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid)); char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
if (NULL == stName) { if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName); ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
...@@ -2373,12 +2318,13 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ...@@ -2373,12 +2318,13 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
continue; continue;
} }
STableMeta* stbMeta = pCache->pMeta; STableMeta *stbMeta = pCache->pMeta;
if (stbMeta->suid != nctx.tbInfo.suid) { if (stbMeta->suid != nctx.tbInfo.suid) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock); CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache); taosHashRelease(dbCache->tbCache, pCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, nctx.tbInfo.suid); ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid,
nctx.tbInfo.suid);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
...@@ -2412,10 +2358,9 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ...@@ -2412,10 +2358,9 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) {
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
int32_t code = 0; int32_t code = 0;
STableMeta* tblMeta = NULL; STableMeta *tblMeta = NULL;
SCtgTbMetaCtx tbCtx = {0}; SCtgTbMetaCtx tbCtx = {0};
tbCtx.flag = CTG_FLAG_UNKNOWN_STB; tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
tbCtx.pName = pTableName; tbCtx.pName = pTableName;
...@@ -2449,7 +2394,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg ...@@ -2449,7 +2394,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
SCtgDBCache* dbCache = NULL; SCtgDBCache *dbCache = NULL;
int32_t code = 0; int32_t code = 0;
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
...@@ -2476,5 +2421,3 @@ _return: ...@@ -2476,5 +2421,3 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
...@@ -459,10 +459,11 @@ typedef struct SPartitionDataInfo { ...@@ -459,10 +459,11 @@ typedef struct SPartitionDataInfo {
SArray* rowIds; SArray* rowIds;
} SPartitionDataInfo; } SPartitionDataInfo;
typedef struct STimeWindowSupp { typedef struct STimeWindowAggSupp {
int8_t calTrigger; int8_t calTrigger;
int64_t waterMark; int64_t waterMark;
TSKEY maxTs; TSKEY maxTs;
TSKEY minTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp; } STimeWindowAggSupp;
......
...@@ -1355,7 +1355,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type ...@@ -1355,7 +1355,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
} }
} }
void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
SqlFunctionCtx* pCtx = pSup->pCtx; SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
...@@ -1374,7 +1374,7 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS ...@@ -1374,7 +1374,7 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS
releaseBufPage(pResultBuf, bufPage); releaseBufPage(pResultBuf, bufPage);
} }
bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId, static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
int32_t numOfOutput) { int32_t numOfOutput) {
SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId); SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
...@@ -1402,18 +1402,21 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) ...@@ -1402,18 +1402,21 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
return true; return true;
} }
void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval, static void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, STimeWindowAggSupp* pTwSup, SSDataBlock* pBlock,
SHashObj* pUpdatedMap) { SArray* pDelWins, SInterval* pInterval, SHashObj* pUpdatedMap) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsStarts = (TSKEY*)pStartCol->pData; TSKEY* tsStarts = (TSKEY*)pStartCol->pData;
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* tsEnds = (TSKEY*)pEndCol->pData; TSKEY* tsEnds = (TSKEY*)pEndCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* groupIds = (uint64_t*)pGroupCol->pData; uint64_t* groupIds = (uint64_t*)pGroupCol->pData;
int64_t numOfWin = tSimpleHashGetSize(pAggSup->pResultRowHashTable);
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
TSKEY startTs = TMAX(tsStarts[i], pTwSup->minTs);
TSKEY endTs = TMIN(tsEnds[i], pTwSup->maxTs);
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTs, pInterval, TSDB_ORDER_ASC);
do { do {
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]}; SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]};
...@@ -1424,7 +1427,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, ...@@ -1424,7 +1427,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
} }
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
} while (win.skey <= tsEnds[i]); } while (win.skey <= endTs);
} }
} }
...@@ -3030,6 +3033,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3030,6 +3033,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
TSKEY maxTs = INT64_MIN; TSKEY maxTs = INT64_MIN;
TSKEY minTs = INT64_MAX;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
...@@ -3101,8 +3105,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3101,8 +3105,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
maxTs = TMAX(maxTs, pBlock->info.watermark);
ASSERT(pBlock->info.type != STREAM_INVERT); ASSERT(pBlock->info.type != STREAM_INVERT);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
...@@ -3129,13 +3131,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3129,13 +3131,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, delWins, &pInfo->interval, pUpdatedMap); doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, delWins, &pInfo->interval, pUpdatedMap);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp; SExprSupp* pChildSup = &pChildOp->exprSupp;
doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval, NULL); doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, &pInfo->twAggSup, pBlock, NULL, &pChildInfo->interval, NULL);
rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs, rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs,
pOperator->pTaskInfo, pUpdatedMap); pOperator->pTaskInfo, pUpdatedMap);
addRetriveWindow(delWins, pInfo); addRetriveWindow(delWins, pInfo);
...@@ -3189,9 +3191,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3189,9 +3191,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashIntervalAgg(pChildOp, pBlock, pBlock->info.groupId, NULL); doHashIntervalAgg(pChildOp, pBlock, pBlock->info.groupId, NULL);
} }
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
maxTs = TMAX(maxTs, pBlock->info.watermark);
minTs = TMIN(minTs, pBlock->info.window.skey);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
...@@ -3264,6 +3270,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3264,6 +3270,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.waterMark = pIntervalPhyNode->window.watermark, .waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX,
}; };
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
...@@ -3507,7 +3514,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3507,7 +3514,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols); initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols);
pInfo->twAggSup = (STimeWindowAggSupp){ pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType, .maxTs = INT64_MIN}; .waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
};
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
...@@ -4832,6 +4843,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4832,6 +4843,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
.waterMark = pStateNode->window.watermark, .waterMark = pStateNode->window.watermark,
.calTrigger = pStateNode->window.triggerType, .calTrigger = pStateNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX,
}; };
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
...@@ -5632,6 +5644,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5632,6 +5644,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int64_t maxTs = INT64_MIN; int64_t maxTs = INT64_MIN;
int64_t minTs = INT64_MAX;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -5676,7 +5689,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5676,7 +5689,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue; continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval, pUpdatedMap); doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval,
pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
...@@ -5702,11 +5716,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5702,11 +5716,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
minTs = TMIN(minTs, pBlock->info.window.skey);
doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
// new disc buf // new disc buf
/*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/ /*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
#if 0 #if 0
if (pState) { if (pState) {
...@@ -5805,6 +5821,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5805,6 +5821,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.waterMark = pIntervalPhyNode->window.watermark, .waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX,
}; };
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
......
...@@ -247,8 +247,9 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) { ...@@ -247,8 +247,9 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) {
} }
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
int32_t code = TSDB_CODE_FAILED;
if (!pHashObj || !key) { if (!pHashObj || !key) {
return TSDB_CODE_FAILED; return code;
} }
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
...@@ -266,13 +267,14 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { ...@@ -266,13 +267,14 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
} }
FREE_HASH_NODE(pNode); FREE_HASH_NODE(pNode);
atomic_sub_fetch_64(&pHashObj->size, 1); atomic_sub_fetch_64(&pHashObj->size, 1);
code = TSDB_CODE_SUCCESS;
break; break;
} }
pPrev = pNode; pPrev = pNode;
pNode = pNode->next; pNode = pNode->next;
} }
return TSDB_CODE_SUCCESS; return code;
} }
int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter) { int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter) {
......
...@@ -1129,7 +1129,7 @@ static int32_t parseTableOptions(SInsertParseContext* pCxt) { ...@@ -1129,7 +1129,7 @@ static int32_t parseTableOptions(SInsertParseContext* pCxt) {
NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index); NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
if (TK_TTL == sToken.type) { if (TK_TTL == sToken.type) {
pCxt->pSql += index; pCxt->pSql += index;
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
if (TK_NK_INTEGER != sToken.type) { if (TK_NK_INTEGER != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", sToken.z); return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", sToken.z);
} }
...@@ -1745,7 +1745,7 @@ static int32_t skipTableOptions(SInsertParseSyntaxCxt* pCxt) { ...@@ -1745,7 +1745,7 @@ static int32_t skipTableOptions(SInsertParseSyntaxCxt* pCxt) {
NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index); NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
if (TK_TTL == sToken.type || TK_COMMENT == sToken.type) { if (TK_TTL == sToken.type || TK_COMMENT == sToken.type) {
pCxt->pSql += index; pCxt->pSql += index;
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
} else { } else {
break; break;
} }
......
...@@ -186,7 +186,9 @@ endi ...@@ -186,7 +186,9 @@ endi
sql drop stream if exists streams2; sql drop stream if exists streams2;
sql drop database if exists test2; sql drop database if exists test2;
sql drop database if exists test;
sql create database test2 vgroups 4; sql create database test2 vgroups 4;
sql create database test vgroups 1;
sql use test2; sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1); sql create table t1 using st tags(1,1,1);
...@@ -411,6 +413,80 @@ if $data12 != 3 then ...@@ -411,6 +413,80 @@ if $data12 != 3 then
goto loop14 goto loop14
endi endi
return 1
sql drop stream if exists streams3;
sql drop database if exists test3;
sql drop database if exists test;
sql create database test3 vgroups 4;
sql create database test vgroups 1;
sql use test3;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams3 trigger at_once into test.streamt3 as select _wstart c1, count(*) c2, max(a) c3 from st interval(10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
sql delete from t1;
loop15:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop15
endi
$loop_count = 0
sql delete from t1 where ts > 100;
loop16:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop16
endi
$loop_count = 0
sql delete from st;
loop17:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows
goto loop17
endi
$loop_all = $loop_all + 1 $loop_all = $loop_all + 1
print ============loop_all=$loop_all print ============loop_all=$loop_all
......
...@@ -137,7 +137,8 @@ class TDTestCase: ...@@ -137,7 +137,8 @@ class TDTestCase:
tdSql.execute(f"create table {dbname}.child_table2 using {dbname}.super_table1 tags(1) comment ''") tdSql.execute(f"create table {dbname}.child_table2 using {dbname}.super_table1 tags(1) comment ''")
tdSql.execute(f"create table {dbname}.child_table3 using {dbname}.super_table1 tags(1) comment 'child'") tdSql.execute(f"create table {dbname}.child_table3 using {dbname}.super_table1 tags(1) comment 'child'")
tdSql.execute(f"insert into {dbname}.child_table4 using {dbname}.super_table1 tags(1) values(now, 1)") tdSql.execute(f"insert into {dbname}.child_table4 using {dbname}.super_table1 tags(1) values(now, 1)")
tdSql.execute(f"insert into {dbname}.child_table5 using {dbname}.super_table1 tags(1) ttl 23 comment '' values(now, 1)")
tdSql.error(f"insert into {dbname}.child_table6 using {dbname}.super_table1 tags(1) ttl -23 comment '' values(now, 1)")
tdSql.query("select * from information_schema.ins_tables where table_name like 'child_table1'") tdSql.query("select * from information_schema.ins_tables where table_name like 'child_table1'")
tdSql.checkData(0, 0, 'child_table1') tdSql.checkData(0, 0, 'child_table1')
...@@ -160,6 +161,11 @@ class TDTestCase: ...@@ -160,6 +161,11 @@ class TDTestCase:
tdSql.checkData(0, 7, 0) tdSql.checkData(0, 7, 0)
tdSql.checkData(0, 8, None) tdSql.checkData(0, 8, None)
tdSql.query("select * from information_schema.ins_tables where table_name like 'child_table5'")
tdSql.checkData(0, 0, 'child_table5')
tdSql.checkData(0, 7, 23)
tdSql.checkData(0, 8, '')
tdSql.execute(f"alter table {dbname}.child_table1 comment 'nihao'") tdSql.execute(f"alter table {dbname}.child_table1 comment 'nihao'")
tdSql.query("select * from information_schema.ins_tables where table_name like 'child_table1'") tdSql.query("select * from information_schema.ins_tables where table_name like 'child_table1'")
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
set -e set -e
set -x set -x
python3 ./test.py -f 0-others/taosShell.py #python3 ./test.py -f 0-others/taosShell.py
python3 ./test.py -f 0-others/taosShellError.py python3 ./test.py -f 0-others/taosShellError.py
python3 ./test.py -f 0-others/taosShellNetChk.py python3 ./test.py -f 0-others/taosShellNetChk.py
python3 ./test.py -f 0-others/telemetry.py python3 ./test.py -f 0-others/telemetry.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册