提交 70c2d5ae 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

......@@ -314,7 +314,6 @@ connection.backoff.ms=5000
topic.prefix=tdengine-source-
poll.interval.ms=1000
fetch.max.rows=100
out.format=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```
......@@ -353,7 +352,7 @@ confluent local services connect connector load TDengineSourceConnector --config
### View topic data
Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-source-test. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data.
Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-source-test. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data. The output is in InfluxDB line protocol format.
````
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test
......@@ -428,9 +427,8 @@ The following configuration items apply to TDengine Sink Connector and TDengine
3. `timestamp.initial`: Data synchronization start time. The format is 'yyyy-MM-dd HH:mm:ss'. If it is not set, the data importing to Kafka will be started from the first/oldest row in the database.
4. `poll.interval.ms`: The time interval for checking newly created tables or removed tables, default value is 1000.
5. `fetch.max.rows`: The maximum number of rows retrieved when retrieving the database, default is 100.
6. `out.format`: The data format. The value could be `line`, which represents the InfluxDB Line protocol format.
7. 7. `query.interval.ms`: The time range of reading data from TDengine each time, its unit is millisecond. It should be adjusted according to the data flow in rate, the default value is 1000.
8. `topic.per.stable`: If it's set to true, it means one super table in TDengine corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>-<stable.name>`; if it's set to false, it means the whole DB corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>`.
6. `query.interval.ms`: The time range of reading data from TDengine each time, its unit is millisecond. It should be adjusted according to the data flow in rate, the default value is 1000.
7. `topic.per.stable`: If it's set to true, it means one super table in TDengine corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>-<stable.name>`; if it's set to false, it means the whole DB corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>`.
......
......@@ -318,7 +318,6 @@ connection.backoff.ms=5000
topic.prefix=tdengine-source-
poll.interval.ms=1000
fetch.max.rows=100
out.format=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```
......@@ -357,7 +356,7 @@ confluent local services connect connector load TDengineSourceConnector --config
### 查看 topic 数据
使用 kafka-console-consumer 命令行工具监控主题 tdengine-source-test 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后,kafka-console-consumer 也立即输出了新增的两条数据。
使用 kafka-console-consumer 命令行工具监控主题 tdengine-source-test 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后,kafka-console-consumer 也立即输出了新增的两条数据。 输出数据 InfluxDB line protocol 的格式。
```
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test
......@@ -438,9 +437,8 @@ confluent local services connect connector unload TDengineSourceConnector
3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss',若未指定则从指定 DB 中最早的一条记录开始。
4. `poll.interval.ms`: 检查是否有新建或删除的表的时间间隔,单位为 ms。默认为 1000。
5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。
6. `out.format`: 数据格式。取值为 `line`, 表示 InfluxDB Line 协议格式
7. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 1000.
8. `topic.per.stable`: 如果设置为true,表示一个超级表对应一个 Kafka topic,topic的命名规则 `<topic.prefix>-<connection.database>-<stable.name>`;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `<topic.prefix>-<connection.database>`
6. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 1000.
7. `topic.per.stable`: 如果设置为true,表示一个超级表对应一个 Kafka topic,topic的命名规则 `<topic.prefix>-<connection.database>-<stable.name>`;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `<topic.prefix>-<connection.database>`
## 其他说明
......
......@@ -124,6 +124,7 @@ extern int32_t tsRedirectFactor;
extern int32_t tsRedirectMaxPeriod;
extern int32_t tsMaxRetryWaitTime;
extern bool tsUseAdapter;
extern int32_t tsMetaCacheMaxSize;
extern int32_t tsSlowLogThreshold;
extern int32_t tsSlowLogScope;
......@@ -193,7 +194,7 @@ struct SConfig *taosGetCfg();
void taosSetAllDebugFlag(int32_t flag, bool rewrite);
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal, bool rewrite);
int32_t taosSetCfg(SConfig *pCfg, char *name);
int32_t taosApplyLocalCfg(SConfig *pCfg, char *name);
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
#ifdef __cplusplus
......
......@@ -3191,7 +3191,8 @@ typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t suid;
int32_t version;
SArray* pIndex;
int32_t indexSize;
SArray* pIndex; // STableIndexInfo
} STableIndexRsp;
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
......
......@@ -214,7 +214,7 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const
int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
int32_t catalogAsyncUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg);
int32_t catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta);
......
......@@ -90,28 +90,23 @@ typedef struct STbVerInfo {
int32_t tversion;
} STbVerInfo;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/
#pragma pack(push, 1)
typedef struct SCTableMeta {
int32_t vgId : 24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
int32_t vgId;
int8_t tableType;
} SCTableMeta;
#pragma pack(pop)
/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a
* SCTableMeta.
*/
#pragma pack(push, 1)
typedef struct STableMeta {
// BEGIN: KEEP THIS PART SAME WITH SCTableMeta
int32_t vgId : 24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
int32_t vgId;
int8_t tableType;
// END: KEEP THIS PART SAME WITH SCTableMeta
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta
......@@ -121,6 +116,7 @@ typedef struct STableMeta {
STableComInfo tableInfo;
SSchema schema[];
} STableMeta;
#pragma pack(pop)
typedef struct SDBVgInfo {
int32_t vgVersion;
......@@ -130,7 +126,7 @@ typedef struct SDBVgInfo {
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs;
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
SArray* vgArray;
SArray* vgArray; // SVgroupInfo
} SDBVgInfo;
typedef struct SUseDbOutput {
......
......@@ -656,7 +656,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
} else {
tscInfo("set cfg:%s to %s", pItem->name, str);
if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
code = taosSetCfg(pCfg, pItem->name);
code = taosApplyLocalCfg(pCfg, pItem->name);
}
}
......
......@@ -224,7 +224,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
return TSDB_CODE_TSC_INVALID_VALUE;
}
catalogUpdateTableMeta(pCatalog, rsp);
catalogAsyncUpdateTableMeta(pCatalog, rsp);
}
}
......
......@@ -815,7 +815,7 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
}
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
......
......@@ -117,12 +117,10 @@ int32_t tsRedirectFactor = 2;
int32_t tsRedirectMaxPeriod = 1000;
int32_t tsMaxRetryWaitTime = 10000;
bool tsUseAdapter = false;
int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 3; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
* metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server.
......@@ -351,6 +349,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, true) != 0) return -1;
if (cfgAddString(pCfg, "slowLogScope", "", true) != 0) return -1;
......@@ -788,6 +787,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64;
tsMetaCacheMaxSize = cfgGetItem(pCfg, "metaCacheMaxSize")->i32;
tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32;
if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) {
return -1;
......@@ -916,7 +916,7 @@ void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) {
*forbidden = false;
}
int32_t taosSetCfg(SConfig *pCfg, char *name) {
int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
......@@ -1051,6 +1051,12 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
}
break;
}
case 'e': {
if (strcasecmp("metaCacheMaxSize", name) == 0) {
atomic_store_32(&tsMetaCacheMaxSize, cfgGetItem(pCfg, "metaCacheMaxSize")->i32);
}
break;
}
case 'i': {
if (strcasecmp("minimalTmpDirGB", name) == 0) {
tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024);
......
......@@ -1566,21 +1566,21 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
int32_t len = strlen(db);
taosHashPut(pRsp->createdDbs, db, len, db, len);
taosHashPut(pRsp->createdDbs, db, len, db, len + 1);
}
for (int32_t i = 0; i < numOfReadDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
int32_t len = strlen(db);
taosHashPut(pRsp->readDbs, db, len, db, len);
taosHashPut(pRsp->readDbs, db, len, db, len + 1);
}
for (int32_t i = 0; i < numOfWriteDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
int32_t len = strlen(db);
taosHashPut(pRsp->writeDbs, db, len, db, len);
taosHashPut(pRsp->writeDbs, db, len, db, len + 1);
}
if (!tDecodeIsEnd(pDecoder)) {
......@@ -3416,6 +3416,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->indexSize) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
......@@ -3461,6 +3462,7 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->suid) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->indexSize) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
......@@ -3735,6 +3737,7 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
if (tEncodeCStr(&encoder, pIndexRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pIndexRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pIndexRsp->version) < 0) return -1;
if (tEncodeI32(&encoder, pIndexRsp->indexSize) < 0) return -1;
int32_t num = taosArrayGetSize(pIndexRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
......@@ -3797,6 +3800,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
if (tDecodeCStrTo(&decoder, tableIndexRsp.dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &tableIndexRsp.suid) < 0) return -1;
if (tDecodeI32(&decoder, &tableIndexRsp.version) < 0) return -1;
if (tDecodeI32(&decoder, &tableIndexRsp.indexSize) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
......
......@@ -1114,6 +1114,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
return code;
}
rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
*exist = true;
sdbRelease(pSdb, pSma);
......
......@@ -23,6 +23,8 @@ extern "C" {
#include "catalog.h"
#include "query.h"
#include "tcommon.h"
#include "ttimer.h"
#include "tglobal.h"
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
......@@ -34,6 +36,8 @@ extern "C" {
#define CTG_DEFAULT_BATCH_NUM 64
#define CTG_DEFAULT_FETCH_NUM 8
#define CTG_MAX_COMMAND_LEN 512
#define CTG_DEFAULT_CACHE_MON_MSEC 5000
#define CTG_CLEAR_CACHE_ROUND_TB_NUM 3000
#define CTG_RENT_SLOT_SECOND 1.5
......@@ -131,6 +135,7 @@ typedef struct SCtgDebug {
typedef struct SCtgCacheStat {
uint64_t cacheNum[CTG_CI_MAX_VALUE];
uint64_t cacheSize[CTG_CI_MAX_VALUE];
uint64_t cacheHit[CTG_CI_MAX_VALUE];
uint64_t cacheNHit[CTG_CI_MAX_VALUE];
} SCtgCacheStat;
......@@ -239,8 +244,8 @@ typedef STableIndexRsp STableIndex;
typedef struct SCtgTbCache {
SRWLatch metaLock;
STableMeta* pMeta;
SRWLatch indexLock;
STableMeta* pMeta;
STableIndex* pIndex;
} SCtgTbCache;
......@@ -263,6 +268,7 @@ typedef struct SCtgDBCache {
SHashObj* tbCache; // key:tbname, value:SCtgTbCache
SHashObj* stbCache; // key:suid, value:char*
uint64_t dbCacheNum[CTG_CI_MAX_VALUE];
uint64_t dbCacheSize;
} SCtgDBCache;
typedef struct SCtgRentSlot {
......@@ -276,12 +282,15 @@ typedef struct SCtgRentMgmt {
uint16_t slotNum;
uint16_t slotRIdx;
int64_t lastReadMsec;
uint64_t rentCacheSize;
int32_t metaSize;
SCtgRentSlot* slots;
} SCtgRentMgmt;
typedef struct SCtgUserAuth {
SRWLatch lock;
SGetUserAuthRsp userAuth;
uint64_t userCacheSize;
} SCtgUserAuth;
typedef struct SCatalog {
......@@ -412,6 +421,7 @@ typedef struct SCtgRuntimeStat {
uint64_t numOfOpAbort;
uint64_t numOfOpEnqueue;
uint64_t numOfOpDequeue;
uint64_t numOfOpClearMeta;
uint64_t numOfOpClearCache;
} SCtgRuntimeStat;
......@@ -488,6 +498,7 @@ typedef struct SCtgDropTbIndexMsg {
typedef struct SCtgClearCacheMsg {
SCatalog* pCtg;
bool clearMeta;
bool freeCtg;
} SCtgClearCacheMsg;
......@@ -526,6 +537,8 @@ typedef struct SCatalogMgmt {
int32_t jobPool;
SRWLatch lock;
SCtgQueue queue;
void *timer;
tmr_h cacheTimer;
TdThread updateThread;
SHashObj* pCluster; // key: clusterId, value: SCatalog*
SCatalogStat statInfo;
......@@ -542,8 +555,8 @@ typedef struct SCtgOperation {
} SCtgOperation;
typedef struct SCtgCacheItemInfo {
char* name;
int32_t flag;
char* name;
int32_t flag;
} SCtgCacheItemInfo;
#define CTG_AUTH_READ(_t) ((_t) == AUTH_TYPE_READ || (_t) == AUTH_TYPE_READ_OR_WRITE)
......@@ -556,11 +569,6 @@ typedef struct SCtgCacheItemInfo {
#define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1
#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1
#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1
#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0
#define CTG_STAT_API_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.api.item, n))
#define CTG_STAT_RT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.runtime.item, n))
#define CTG_STAT_NUM_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.cache.cacheNum[item], n))
......@@ -575,6 +583,11 @@ typedef struct SCtgCacheItemInfo {
#define CTG_CACHE_HIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheHit[item], n))
#define CTG_CACHE_NHIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheNHit[item], n))
#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1
#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1
#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1
#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0
#define CTG_META_NUM_INC(type) \
do { \
switch (type) { \
......@@ -685,6 +698,10 @@ typedef struct SCtgCacheItemInfo {
#define CTG_DB_NOT_EXIST(code) \
(code == TSDB_CODE_MND_DB_NOT_EXIST || code == TSDB_CODE_MND_DB_IN_CREATING || code == TSDB_CODE_MND_DB_IN_DROPPING)
#define CTG_CACHE_OVERFLOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) >= (_maxsize) * 1048576L * 0.9) : false)
#define CTG_CACHE_LOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) <= (_maxsize) * 1048576L * 0.75) : true)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCtg, __VA_ARGS__)
......@@ -787,6 +804,12 @@ typedef struct SCtgCacheItemInfo {
CTG_RET(__code); \
} while (0)
#define CTG_API_NLEAVE() \
do { \
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); \
CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \
} while (0)
#define CTG_API_ENTER() \
do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
......@@ -796,6 +819,15 @@ typedef struct SCtgCacheItemInfo {
} \
} while (0)
#define CTG_API_NENTER() \
do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
CTG_API_NLEAVE(); \
} \
} while (0)
#define CTG_API_JENTER() \
do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
......@@ -859,8 +891,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool sy
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp* pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char* dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex** pIndex, bool syncOp);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type, int32_t size);
int32_t ctgMetaRentAdd(SCtgRentMgmt* mgmt, void* meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt* mgmt, void** res, uint32_t* num, int32_t size);
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
......@@ -941,7 +973,7 @@ void ctgFreeSTableIndex(void* info);
void ctgClearSubTaskRes(SCtgSubRes* pRes);
void ctgFreeQNode(SCtgQNode* node);
void ctgClearHandle(SCatalog* pCtg);
void ctgFreeTbCacheImpl(SCtgTbCache* pCache);
void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock);
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup,
bool* exists);
......@@ -960,6 +992,16 @@ void ctgReleaseVgMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach
void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCache* pCache);
void ctgGetGlobalCacheStat(SCtgCacheStat* pStat);
int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res);
void ctgGetGlobalCacheSize(uint64_t *pSize);
uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex);
uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta);
uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg);
uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth);
uint64_t ctgGetClusterCacheSize(SCatalog *pCtg);
void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone);
void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone);
void ctgProcessTimerEvent(void *param, void *tmrId);
int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid, char **stbName);
int32_t ctgGetTbTagCb(SCtgTask* pTask);
......
......@@ -668,6 +668,36 @@ _return:
CTG_RET(code);
}
void ctgProcessTimerEvent(void *param, void *tmrId) {
CTG_API_NENTER();
int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize);
if (cacheMaxSize >= 0) {
uint64_t cacheSize = 0;
ctgGetGlobalCacheSize(&cacheSize);
bool overflow = CTG_CACHE_OVERFLOW(cacheSize, cacheMaxSize);
qDebug("catalog cache size: %" PRIu64"B, maxCaseSize:%dMB, %s", cacheSize, cacheMaxSize, overflow ? "overflow" : "NO overflow");
if (overflow) {
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
if (code) {
qError("clear cache enqueue failed, error:%s", tstrerror(code));
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
}
goto _return;
}
}
qTrace("reset catalog timer");
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
_return:
CTG_API_NLEAVE();
}
int32_t ctgGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* pDbCfg) {
CTG_ERR_RET(ctgReadDBCfgFromCache(pCtg, dbFName, pDbCfg));
......@@ -686,6 +716,7 @@ int32_t ctgGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName
int32_t catalogInit(SCatalogCfg* cfg) {
qDebug("catalogInit start");
if (gCtgMgmt.pCluster) {
qError("catalog already initialized");
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
......@@ -743,6 +774,18 @@ int32_t catalogInit(SCatalogCfg* cfg) {
CTG_ERR_RET(terrno);
}
gCtgMgmt.timer = taosTmrInit(0, 0, 0, "catalog");
if (NULL == gCtgMgmt.timer) {
qError("init timer failed, error:%s", tstrerror(terrno));
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
gCtgMgmt.cacheTimer = taosTmrStart(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer);
if (NULL == gCtgMgmt.cacheTimer) {
qError("start cache timer failed");
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgStartUpdateThread());
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum,
......@@ -786,8 +829,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
clusterCtg->clusterId = clusterId;
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbCacheInfo)));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion)));
clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
false, HASH_ENTRY_LOCK);
......@@ -1139,6 +1182,22 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogAsyncUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pMsg) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgUpdateTbMeta(pCtg, pMsg, false));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* pTables) {
CTG_API_ENTER();
......@@ -1600,11 +1659,11 @@ int32_t catalogClearCache(void) {
qInfo("start to clear catalog cache");
if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
if (NULL == gCtgMgmt.pCluster) {
CTG_API_LEAVE_NOLOCK(TSDB_CODE_SUCCESS);
}
int32_t code = ctgClearCacheEnqueue(NULL, false, false, true);
int32_t code = ctgClearCacheEnqueue(NULL, false, false, false, true);
qInfo("clear catalog cache end, code: %s", tstrerror(code));
......@@ -1618,10 +1677,17 @@ void catalogDestroy(void) {
return;
}
if (gCtgMgmt.cacheTimer) {
taosTmrStop(gCtgMgmt.cacheTimer);
gCtgMgmt.cacheTimer = NULL;
taosTmrCleanUp(gCtgMgmt.timer);
gCtgMgmt.timer = NULL;
}
atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
if (!taosCheckCurrentInDll()) {
ctgClearCacheEnqueue(NULL, true, true, true);
ctgClearCacheEnqueue(NULL, false, true, true, true);
taosThreadJoin(gCtgMgmt.updateThread, NULL);
}
......
......@@ -33,24 +33,25 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v
{CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = {
{"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL}, // CTG_CI_CLUSTER
{"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, // CTG_CI_DNODE,
{"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, // CTG_CI_QNODE,
{"DB ", CTG_CI_FLAG_LEVEL_CLUSTER}, // CTG_CI_DB,
{"DbVgroup ", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_DB_VGROUP,
{"DbCfg ", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_DB_CFG,
{"DbInfo ", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_DB_INFO,
{"StbMeta ", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_STABLE_META,
{"NtbMeta ", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_NTABLE_META,
{"CtbMeta ", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_CTABLE_META,
{"SysTblMeta", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_SYSTABLE_META,
{"OthTblMeta", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_OTHERTABLE_META,
{"TblSMA ", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_TBL_SMA,
{"TblCfg ", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_TBL_CFG,
{"IndexInfo ", CTG_CI_FLAG_LEVEL_DB}, // CTG_CI_INDEX_INFO,
{"User ", CTG_CI_FLAG_LEVEL_CLUSTER}, // CTG_CI_USER,
{"UDF ", CTG_CI_FLAG_LEVEL_CLUSTER}, // CTG_CI_UDF,
{"SvrVer ", CTG_CI_FLAG_LEVEL_CLUSTER} // CTG_CI_SVR_VER,
{"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL}, //CTG_CI_CLUSTER
{"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DNODE,
{"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_QNODE,
{"DB ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DB,
{"DbVgroup ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_VGROUP,
{"DbCfg ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_CFG,
{"DbInfo ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_INFO,
{"StbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_STABLE_META,
{"NtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_NTABLE_META,
{"CtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_CTABLE_META,
{"SysTblMeta", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_SYSTABLE_META,
{"OthTblMeta", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_OTHERTABLE_META,
{"TblSMA ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_TBL_SMA,
{"TblCfg ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_TBL_CFG,
{"TblTag ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_TBL_TAG,
{"IndexInfo ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_INDEX_INFO,
{"User ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_USER,
{"UDF ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_UDF,
{"SvrVer ", CTG_CI_FLAG_LEVEL_CLUSTER} //CTG_CI_SVR_VER,
};
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
......@@ -860,7 +861,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
ctgDebug("action [%s] added into queue", opName);
ctgDebug("%sync action [%s] added into queue", syncOp ? "S": "As", opName);
CTG_QUEUE_INC();
CTG_STAT_RT_INC(numOfOpEnqueue, 1);
......@@ -1242,7 +1243,7 @@ _return:
CTG_RET(code);
}
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_CLEAR_CACHE;
......@@ -1258,6 +1259,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
}
msg->pCtg = pCtg;
msg->clearMeta = clearMeta;
msg->freeCtg = freeCtg;
op->data = msg;
......@@ -1270,10 +1272,11 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type, int32_t size) {
mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
mgmt->type = type;
mgmt->metaSize = size;
size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
......@@ -1283,6 +1286,8 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
mgmt->rentCacheSize = msgSize;
qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
return TSDB_CODE_SUCCESS;
......@@ -1309,6 +1314,7 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size)
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
mgmt->rentCacheSize += size;
slot->needSort = true;
qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
......@@ -1389,6 +1395,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
}
taosArrayRemove(slot->meta, idx);
mgmt->rentCacheSize -= mgmt->metaSize;
qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
......@@ -1656,10 +1663,15 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
}
if (origType == TSDB_SUPER_TABLE) {
if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
} else {
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
char *stbName = taosHashGet(dbCache->stbCache, &orig->suid, sizeof(orig->suid));
if (stbName) {
uint64_t metaSize = strlen(stbName) + 1 + sizeof(orig->suid);
if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
} else {
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
atomic_sub_fetch_64(&dbCache->dbCacheSize, metaSize);
}
}
}
}
......@@ -1673,14 +1685,20 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(meta));
pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
} else {
CTG_LOCK(CTG_WRITE, &pCache->metaLock);
if (orig) {
CTG_META_NUM_DEC(origType);
}
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(meta) - ctgGetTbMetaCacheSize(pCache->pMeta));
taosMemoryFree(pCache->pMeta);
pCache->pMeta = meta;
CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
}
......@@ -1698,6 +1716,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
atomic_add_fetch_64(&dbCache->dbCacheSize, sizeof(meta->suid) + strlen(tbName) + 1);
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
meta->tableType);
......@@ -1730,6 +1750,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(SCtgTbCache) + ctgGetTbIndexCacheSize(pIndex));
CTG_DB_NUM_INC(CTG_CI_TBL_SMA);
*index = NULL;
......@@ -1746,6 +1768,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
CTG_LOCK(CTG_WRITE, &pCache->indexLock);
if (pCache->pIndex) {
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pCache->pIndex));
if (0 == suid) {
suid = pCache->pIndex->suid;
}
......@@ -1756,6 +1779,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
pCache->pIndex = pIndex;
CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pIndex));
*index = NULL;
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
......@@ -1785,7 +1810,7 @@ _return:
CTG_RET(code);
}
void ctgClearAllInstance(void) {
void ctgClearAllHandles(void) {
SCatalog *pCtg = NULL;
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
......@@ -1800,7 +1825,7 @@ void ctgClearAllInstance(void) {
}
}
void ctgFreeAllInstance(void) {
void ctgFreeAllHandles(void) {
SCatalog *pCtg = NULL;
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
......@@ -1841,7 +1866,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
goto _return;
}
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
if (dbInfo->vgVersion < 0 || (taosHashGetSize(dbInfo->vgHash) <= 0 && !IS_SYS_DBNAME(dbFName))) {
ctgDebug("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
......@@ -1859,6 +1884,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
}
SCtgVgCache *vgCache = &dbCache->vgCache;
CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
if (vgCache->vgInfo) {
......@@ -1881,6 +1907,11 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
goto _return;
}
uint64_t groupCacheSize = ctgGetDbVgroupCacheSize(vgCache->vgInfo);
ctgDebug("sub dbGroupCacheSize %" PRIu64 " from db, dbFName:%s", groupCacheSize, dbFName);
atomic_sub_fetch_64(&dbCache->dbCacheSize, groupCacheSize);
freeVgInfo(vgInfo);
CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP);
}
......@@ -1898,6 +1929,10 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
ctgWUnlockVgInfo(dbCache);
uint64_t groupCacheSize = ctgGetDbVgroupCacheSize(vgCache->vgInfo);
atomic_add_fetch_64(&dbCache->dbCacheSize, groupCacheSize);
ctgDebug("add dbGroupCacheSize %" PRIu64 " from db, dbFName:%s", groupCacheSize, dbFName);
dbCache = NULL;
// if (!IS_SYS_DBNAME(dbFName)) {
......@@ -2022,6 +2057,8 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(dbCache->vgCache.vgInfo));
freeVgInfo(dbCache->vgCache.vgInfo);
dbCache->vgCache.vgInfo = NULL;
......@@ -2113,26 +2150,32 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
goto _return;
}
if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
msg->stbName, msg->suid);
char *stbName = taosHashGet(dbCache->stbCache, &msg->suid, sizeof(msg->suid));
if (stbName) {
uint64_t metaSize = strlen(stbName) + 1 + sizeof(msg->suid);
if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
msg->stbName, msg->suid);
} else {
atomic_sub_fetch_64(&dbCache->dbCacheSize, metaSize);
}
}
SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
if (NULL == pTbCache) {
ctgDebug("stb %s already not in cache", msg->stbName);
goto _return;
}
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
tblType = pTbCache->pMeta->tableType;
ctgFreeTbCacheImpl(pTbCache);
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(pTbCache->pMeta) + ctgGetTbIndexCacheSize(pTbCache->pIndex));
ctgFreeTbCacheImpl(pTbCache, true);
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
} else {
CTG_META_NUM_DEC(tblType);
atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(*pTbCache) + strlen(msg->stbName));
}
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
......@@ -2176,15 +2219,15 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
goto _return;
}
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
tblType = pTbCache->pMeta->tableType;
ctgFreeTbCacheImpl(pTbCache);
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(pTbCache->pMeta) + ctgGetTbIndexCacheSize(pTbCache->pIndex));
ctgFreeTbCacheImpl(pTbCache, true);
if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} else {
atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(*pTbCache) + strlen(msg->tbName));
CTG_META_NUM_DEC(tblType);
}
......@@ -2211,7 +2254,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
SCtgUserAuth userAuth = {0};
memcpy(&userAuth.userAuth, &msg->userAuth, sizeof(msg->userAuth));
userAuth.userCacheSize = ctgGetUserCacheSize(&userAuth.userAuth);
if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
......@@ -2243,6 +2287,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
msg->userAuth.useDbs = NULL;
CTG_UNLOCK(CTG_WRITE, &pUser->lock);
atomic_store_64(&pUser->userCacheSize, ctgGetUserCacheSize(&pUser->userAuth));
_return:
......@@ -2378,33 +2424,78 @@ _return:
CTG_RET(code);
}
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
int32_t code = 0;
void ctgClearFreeCache(SCtgCacheOperation *operation) {
SCtgClearCacheMsg *msg = operation->data;
SCatalog *pCtg = msg->pCtg;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
if (pCtg) {
if (msg->freeCtg) {
ctgFreeHandle(pCtg);
} else {
ctgClearHandle(pCtg);
}
} else if (msg->freeCtg) {
ctgFreeAllHandles();
} else {
ctgClearAllHandles();
}
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
}
goto _return;
void ctgClearMetaCache(SCtgCacheOperation *operation) {
SCtgClearCacheMsg *msg = operation->data;
SCatalog *pCtg = msg->pCtg;
int64_t clearedSize = 0;
int64_t clearedNum = 0;
int64_t remainSize = 0;
bool roundDone = false;
if (pCtg) {
ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, &roundDone);
} else {
ctgClearAllHandleMeta(&clearedSize, &clearedNum, &roundDone);
}
qDebug("catalog finish one round meta clear, clearedSize:%" PRId64 ", clearedNum:%" PRId64 ", done:%d", clearedSize, clearedNum, roundDone);
ctgGetGlobalCacheSize(&remainSize);
int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize);
if (CTG_CACHE_LOW(remainSize, cacheMaxSize)) {
qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize);
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
return;
}
if (!roundDone) {
qDebug("catalog all meta cleared, remainSize:%" PRId64 ", cacheMaxSize:%dMB, to clear handle", remainSize, cacheMaxSize);
ctgClearFreeCache(operation);
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
return;
}
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
if (code) {
qError("clear cache enqueue failed, error:%s", tstrerror(code));
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
}
}
if (msg->freeCtg) {
ctgFreeAllInstance();
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgClearCacheMsg *msg = operation->data;
if (msg->clearMeta) {
ctgClearMetaCache(operation);
} else {
ctgClearAllInstance();
ctgClearFreeCache(operation);
}
_return:
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
taosMemoryFreeClear(msg);
CTG_RET(code);
......
......@@ -19,7 +19,7 @@
#include "trpc.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {0};
SCtgDebug gCTGDebug = {.statEnable = true};
#if 0
......@@ -547,7 +547,10 @@ int32_t ctgdShowStatInfo(void) {
CTG_API_ENTER();
SCtgCacheStat cache;
uint64_t cacheSize = 0;
ctgGetGlobalCacheStat(&cache);
ctgGetGlobalCacheSize(&cacheSize);
qDebug("## Global Stat Info %s ##", "begin");
qDebug("## \t%s \t%s \t%s ##", "Num", "Hit", "Nhit");
......@@ -555,6 +558,7 @@ int32_t ctgdShowStatInfo(void) {
qDebug("# %s \t%" PRIu64 " \t%" PRIu64 " \t%" PRIu64 " #", gCtgStatItem[i].name, cache.cacheNum[i], cache.cacheHit[i], cache.cacheNHit[i]);
}
qDebug("## Global Stat Info %s ##", "end");
qDebug("## Global Cache Size: %" PRIu64, cacheSize);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
......
......@@ -199,6 +199,7 @@ void ctgFreeMetaRent(SCtgRentMgmt* mgmt) {
}
taosMemoryFreeClear(mgmt->slots);
mgmt->rentCacheSize = 0;
}
void ctgFreeStbMetaCache(SCtgDBCache* dbCache) {
......@@ -211,12 +212,26 @@ void ctgFreeStbMetaCache(SCtgDBCache* dbCache) {
dbCache->stbCache = NULL;
}
void ctgFreeTbCacheImpl(SCtgTbCache* pCache) {
qDebug("tbMeta freed, p:%p", pCache->pMeta);
taosMemoryFreeClear(pCache->pMeta);
void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock) {
if (pCache->pMeta) {
if (lock) {
CTG_LOCK(CTG_WRITE, &pCache->metaLock);
}
taosMemoryFreeClear(pCache->pMeta);
if (lock) {
CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
}
}
if (pCache->pIndex) {
if (lock) {
CTG_LOCK(CTG_WRITE, &pCache->indexLock);
}
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pCache->pIndex);
if (lock) {
CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);
}
}
}
......@@ -228,7 +243,7 @@ void ctgFreeTbCache(SCtgDBCache* dbCache) {
int32_t tblNum = taosHashGetSize(dbCache->tbCache);
SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL);
while (NULL != pCache) {
ctgFreeTbCacheImpl(pCache);
ctgFreeTbCacheImpl(pCache, false);
pCache = taosHashIterate(dbCache->tbCache, pCache);
}
taosHashCleanup(dbCache->tbCache);
......@@ -316,21 +331,86 @@ void ctgFreeHandle(SCatalog* pCtg) {
ctgInfo("handle freed, clusterId:0x%" PRIx64, clusterId);
}
void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone) {
int64_t cacheSize = 0;
void* pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
SCtgDBCache* dbCache = pIter;
SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL);
while (NULL != pCache) {
size_t len = 0;
void* key = taosHashGetKey(pCache, &len);
if (pCache->pMeta && TSDB_SUPER_TABLE == pCache->pMeta->tableType) {
pCache = taosHashIterate(dbCache->tbCache, pCache);
continue;
}
taosHashRemove(dbCache->tbCache, key, len);
cacheSize = len + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(pCache->pMeta) + ctgGetTbIndexCacheSize(pCache->pIndex);
atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize);
*pClearedSize += cacheSize;
(*pCleardNum)++;
if (pCache->pMeta) {
CTG_META_NUM_DEC(pCache->pMeta->tableType);
}
ctgFreeTbCacheImpl(pCache, true);
if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) {
taosHashCancelIterate(dbCache->tbCache, pCache);
goto _return;
}
pCache = taosHashIterate(dbCache->tbCache, pCache);
}
pIter = taosHashIterate(pCtg->dbCache, pIter);
}
_return:
if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) {
*roundDone = true;
}
}
void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone) {
SCatalog *pCtg = NULL;
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog **)pIter;
if (pCtg) {
ctgClearHandleMeta(pCtg, clearedSize, clearedNum, roundDone);
if (*roundDone) {
taosHashCancelIterate(gCtgMgmt.pCluster, pIter);
break;
}
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
}
}
void ctgClearHandle(SCatalog* pCtg) {
if (NULL == pCtg) {
return;
}
uint64_t clusterId = pCtg->clusterId;
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB);
ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE);
ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbCacheInfo));
ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion));
pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false,
HASH_ENTRY_LOCK);
......@@ -1624,6 +1704,130 @@ void catalogFreeMetaData(SMetaData* pData) {
}
#endif
uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex) {
if (NULL == pIndex) {
return 0;
}
return sizeof(*pIndex) + pIndex->indexSize;
}
FORCE_INLINE uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta) {
if (NULL == pMeta) {
return 0;
}
switch (pMeta->tableType) {
case TSDB_SUPER_TABLE:
return sizeof(*pMeta) + (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags) * sizeof(SSchema);
case TSDB_CHILD_TABLE:
return sizeof(SCTableMeta);
default:
return sizeof(*pMeta) + pMeta->tableInfo.numOfColumns * sizeof(SSchema);
}
return 0;
}
uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg) {
if (NULL == pVg) {
return 0;
}
return sizeof(*pVg) + taosHashGetSize(pVg->vgHash) * (sizeof(SVgroupInfo) + sizeof(int32_t))
+ taosArrayGetSize(pVg->vgArray) * sizeof(SVgroupInfo);
}
uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth) {
if (NULL == pAuth) {
return 0;
}
uint64_t cacheSize = 0;
char* p = taosHashIterate(pAuth->createdDbs, NULL);
while (p != NULL) {
size_t len = 0;
void* key = taosHashGetKey(p, &len);
cacheSize += len + strlen(p);
p = taosHashIterate(pAuth->createdDbs, p);
}
p = taosHashIterate(pAuth->readDbs, NULL);
while (p != NULL) {
size_t len = 0;
void* key = taosHashGetKey(p, &len);
cacheSize += len + strlen(p);
p = taosHashIterate(pAuth->readDbs, p);
}
p = taosHashIterate(pAuth->writeDbs, NULL);
while (p != NULL) {
size_t len = 0;
void* key = taosHashGetKey(p, &len);
cacheSize += len + strlen(p);
p = taosHashIterate(pAuth->writeDbs, p);
}
p = taosHashIterate(pAuth->readTbs, NULL);
while (p != NULL) {
size_t len = 0;
void* key = taosHashGetKey(p, &len);
cacheSize += len + strlen(p);
p = taosHashIterate(pAuth->readTbs, p);
}
p = taosHashIterate(pAuth->writeTbs, NULL);
while (p != NULL) {
size_t len = 0;
void* key = taosHashGetKey(p, &len);
cacheSize += len + strlen(p);
p = taosHashIterate(pAuth->writeTbs, p);
}
int32_t *ref = taosHashIterate(pAuth->useDbs, NULL);
while (ref != NULL) {
size_t len = 0;
void* key = taosHashGetKey(ref, &len);
cacheSize += len + sizeof(*ref);
ref = taosHashIterate(pAuth->useDbs, ref);
}
return cacheSize;
}
uint64_t ctgGetClusterCacheSize(SCatalog *pCtg) {
uint64_t cacheSize = sizeof(SCatalog);
SCtgUserAuth* pAuth = taosHashIterate(pCtg->userCache, NULL);
while (pAuth != NULL) {
size_t len = 0;
void* key = taosHashGetKey(pAuth, &len);
cacheSize += len + sizeof(SCtgUserAuth) + atomic_load_64(&pAuth->userCacheSize);
pAuth = taosHashIterate(pCtg->userCache, pAuth);
}
SCtgDBCache* pDb = taosHashIterate(pCtg->dbCache, NULL);
while (pDb != NULL) {
size_t len = 0;
void* key = taosHashGetKey(pDb, &len);
cacheSize += len + sizeof(SCtgDBCache) + atomic_load_64(&pDb->dbCacheSize);
pDb = taosHashIterate(pCtg->dbCache, pDb);
}
cacheSize += pCtg->dbRent.rentCacheSize;
cacheSize += pCtg->stbRent.rentCacheSize;
return cacheSize;
}
void ctgGetClusterCacheStat(SCatalog* pCtg) {
for (int32_t i = 0; i < CTG_CI_MAX_VALUE; ++i) {
if (0 == (gCtgStatItem[i].flag & CTG_CI_FLAG_LEVEL_DB)) {
......@@ -1688,3 +1892,23 @@ void ctgGetGlobalCacheStat(SCtgCacheStat* pStat) {
memcpy(pStat, &gCtgMgmt.statInfo.cache, sizeof(gCtgMgmt.statInfo.cache));
}
void ctgGetGlobalCacheSize(uint64_t *pSize) {
*pSize = 0;
SCatalog* pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
size_t len = 0;
void* key = taosHashGetKey(pIter, &len);
*pSize += len + POINTER_BYTES;
pCtg = *(SCatalog**)pIter;
if (pCtg) {
*pSize += ctgGetClusterCacheSize(pCtg);
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
}
}
......@@ -37,6 +37,7 @@
#include "tglobal.h"
#include "trpc.h"
#include "tvariant.h"
#include "ttimer.h"
namespace {
......@@ -150,6 +151,7 @@ void ctgTestInitLogFile() {
tsAsyncLog = 0;
qDebugFlag = 159;
tmrDebugFlag = 159;
strcpy(tsLogDir, TD_LOG_DIR_PATH);
ctgdEnableDebug("api", true);
......@@ -1746,6 +1748,8 @@ TEST(tableMeta, updateStbMeta) {
code = catalogUpdateTableMeta(pCtg, &rsp);
ASSERT_EQ(code, 0);
code = catalogAsyncUpdateTableMeta(pCtg, &rsp);
ASSERT_EQ(code, 0);
taosMemoryFreeClear(rsp.pSchemas);
while (true) {
......
......@@ -748,7 +748,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
return terrno;
}
if (taosSetCfg(tsCfg, pStmt->config)) {
if (taosApplyLocalCfg(tsCfg, pStmt->config)) {
return terrno;
}
......
......@@ -25,7 +25,7 @@
#include "tref.h"
#include "ttimer.h"
#define MAX_TABLE_NAME_NUM 2000000
#define MAX_TABLE_NAME_NUM 200000
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) {
......
......@@ -556,6 +556,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer);
va_end(argpointer);
len = len > LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 ? LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 : len;
buffer[len++] = '\n';
buffer[len] = 0;
......
......@@ -113,7 +113,7 @@ typedef struct time_wheel_t {
static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100;
static TdThreadOnce tmrModuleInit = PTHREAD_ONCE_INIT;
static int32_t tmrModuleInit = 0;
static TdThreadMutex tmrCtrlMutex;
static tmr_ctrl_t* tmrCtrls;
static tmr_ctrl_t* unusedTmrCtrl = NULL;
......@@ -512,11 +512,11 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han
return stopped;
}
static void taosTmrModuleInit(void) {
static int32_t taosTmrModuleInit(void) {
tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
if (tmrCtrls == NULL) {
tmrError("failed to allocate memory for timer controllers.");
return;
return -1;
}
memset(&timerMap, 0, sizeof(timerMap));
......@@ -535,14 +535,14 @@ static void taosTmrModuleInit(void) {
time_wheel_t* wheel = wheels + i;
if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) {
tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
return;
return -1;
}
wheel->nextScanAt = now + wheel->resolution;
wheel->index = 0;
wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*));
if (wheel->slots == NULL) {
tmrError("failed to allocate wheel slots");
return;
return -1;
}
timerMap.size += wheel->size;
}
......@@ -551,20 +551,48 @@ static void taosTmrModuleInit(void) {
timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t));
if (timerMap.slots == NULL) {
tmrError("failed to allocate hash map");
return;
return -1;
}
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
return 2;
}
static int32_t taosTmrInitModule(void) {
if (atomic_load_32(&tmrModuleInit) == 2) {
return 0;
}
if (atomic_load_32(&tmrModuleInit) < 0) {
return -1;
}
while (true) {
if (0 == atomic_val_compare_exchange_32(&tmrModuleInit, 0, 1)) {
atomic_store_32(&tmrModuleInit, taosTmrModuleInit());
} else if (atomic_load_32(&tmrModuleInit) < 0) {
return -1;
} else if (atomic_load_32(&tmrModuleInit) == 2) {
return 0;
} else {
taosMsleep(1);
}
}
return -1;
}
void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) {
const char* ret = taosMonotonicInit();
tmrDebug("ttimer monotonic clock source:%s", ret);
taosThreadOnce(&tmrModuleInit, taosTmrModuleInit);
if (taosTmrInitModule() < 0) {
return NULL;
}
taosThreadMutexLock(&tmrCtrlMutex);
tmr_ctrl_t* ctrl = unusedTmrCtrl;
......@@ -581,6 +609,7 @@ void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, con
}
tstrncpy(ctrl->label, label, sizeof(ctrl->label));
tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
return ctrl;
}
......@@ -629,8 +658,6 @@ void taosTmrCleanUp(void* handle) {
tmrCtrls = NULL;
unusedTmrCtrl = NULL;
#if defined(LINUX)
tmrModuleInit = PTHREAD_ONCE_INIT; // to support restart
#endif
atomic_store_32(&tmrModuleInit, 0);
}
}
......@@ -442,9 +442,8 @@ sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s);
sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt21 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);
......@@ -468,7 +467,7 @@ if $loop_count == 10 then
return -1
endi
sql select * from streamt;
sql select * from streamt21;
# row 0
if $data01 != 2 then
......@@ -526,7 +525,7 @@ if $loop_count == 10 then
return -1
endi
sql select * from streamt2;
sql select * from streamt22;
# row 0
if $data01 != 4 then
......@@ -585,7 +584,39 @@ if $loop_count == 10 then
return -1
endi
sql select * from streamt3;
sql create database test3 vgroups 6;
sql use test3;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s);
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);
sql insert into t1 values(1648791233002,3,3,3,2.1);
sql insert into t1 values(1648791243003,4,4,4,3.1);
sql insert into t1 values(1648791213004,4,5,5,4.1);
sql insert into t2 values(1648791213000,1,6,6,1.0);
sql insert into t2 values(1648791223001,2,7,7,1.1);
sql insert into t2 values(1648791233002,3,8,8,2.1);
sql insert into t2 values(1648791243003,4,9,9,3.1);
sql insert into t2 values(1648791213004,4,10,10,4.1);
$loop_count = 0
print step 7
loop4:
sleep 100
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt23;
# row 0
if $rows != 5 then
......@@ -629,7 +660,7 @@ if $loop_count == 10 then
return -1
endi
sql select * from streamt3;
sql select * from streamt23;
# row 0
if $rows != 7 then
......@@ -688,6 +719,8 @@ sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 in
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791243000,2,1,1,1.0);
sleep 1000
sql insert into t2 values(1648791273000,3,1,1,1.0);
sql insert into t2 values(1648791313000,4,1,1,1.0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册