Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
42c65898
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
42c65898
编写于
5月 19, 2023
作者:
D
dapan1121
提交者:
GitHub
5月 19, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21335 from taosdata/feat/TD-22746
feat: support client meta control
上级
b7075c35
efbc3fc7
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
577 addition
and
110 deletion
+577
-110
include/common/tglobal.h
include/common/tglobal.h
+2
-1
include/common/tmsg.h
include/common/tmsg.h
+2
-1
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+1
-1
include/libs/qcom/query.h
include/libs/qcom/query.h
+10
-14
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+1
-1
source/client/src/clientHb.c
source/client/src/clientHb.c
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/common/src/tglobal.c
source/common/src/tglobal.c
+10
-4
source/common/src/tmsg.c
source/common/src/tmsg.c
+7
-3
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+1
-0
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+53
-11
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+71
-5
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+139
-48
source/libs/catalog/src/ctgDbg.c
source/libs/catalog/src/ctgDbg.c
+5
-1
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+231
-7
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+4
-0
source/libs/command/src/command.c
source/libs/command/src/command.c
+1
-1
source/util/src/ttimer.c
source/util/src/ttimer.c
+37
-10
未找到文件。
include/common/tglobal.h
浏览文件 @
42c65898
...
@@ -124,6 +124,7 @@ extern int32_t tsRedirectFactor;
...
@@ -124,6 +124,7 @@ extern int32_t tsRedirectFactor;
extern
int32_t
tsRedirectMaxPeriod
;
extern
int32_t
tsRedirectMaxPeriod
;
extern
int32_t
tsMaxRetryWaitTime
;
extern
int32_t
tsMaxRetryWaitTime
;
extern
bool
tsUseAdapter
;
extern
bool
tsUseAdapter
;
extern
int32_t
tsMetaCacheMaxSize
;
extern
int32_t
tsSlowLogThreshold
;
extern
int32_t
tsSlowLogThreshold
;
extern
int32_t
tsSlowLogScope
;
extern
int32_t
tsSlowLogScope
;
...
@@ -193,7 +194,7 @@ struct SConfig *taosGetCfg();
...
@@ -193,7 +194,7 @@ struct SConfig *taosGetCfg();
void
taosSetAllDebugFlag
(
int32_t
flag
,
bool
rewrite
);
void
taosSetAllDebugFlag
(
int32_t
flag
,
bool
rewrite
);
void
taosSetDebugFlag
(
int32_t
*
pFlagPtr
,
const
char
*
flagName
,
int32_t
flagVal
,
bool
rewrite
);
void
taosSetDebugFlag
(
int32_t
*
pFlagPtr
,
const
char
*
flagName
,
int32_t
flagVal
,
bool
rewrite
);
int32_t
taos
Set
Cfg
(
SConfig
*
pCfg
,
char
*
name
);
int32_t
taos
ApplyLocal
Cfg
(
SConfig
*
pCfg
,
char
*
name
);
void
taosLocalCfgForbiddenToChange
(
char
*
name
,
bool
*
forbidden
);
void
taosLocalCfgForbiddenToChange
(
char
*
name
,
bool
*
forbidden
);
#ifdef __cplusplus
#ifdef __cplusplus
...
...
include/common/tmsg.h
浏览文件 @
42c65898
...
@@ -3191,7 +3191,8 @@ typedef struct {
...
@@ -3191,7 +3191,8 @@ typedef struct {
char
dbFName
[
TSDB_DB_FNAME_LEN
];
char
dbFName
[
TSDB_DB_FNAME_LEN
];
uint64_t
suid
;
uint64_t
suid
;
int32_t
version
;
int32_t
version
;
SArray
*
pIndex
;
int32_t
indexSize
;
SArray
*
pIndex
;
// STableIndexInfo
}
STableIndexRsp
;
}
STableIndexRsp
;
int32_t
tSerializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
STableIndexRsp
*
pRsp
);
int32_t
tSerializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
STableIndexRsp
*
pRsp
);
...
...
include/libs/catalog/catalog.h
浏览文件 @
42c65898
...
@@ -214,7 +214,7 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const
...
@@ -214,7 +214,7 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const
int32_t
catalogUpdateTableMeta
(
SCatalog
*
pCatalog
,
STableMetaRsp
*
rspMsg
);
int32_t
catalogUpdateTableMeta
(
SCatalog
*
pCatalog
,
STableMetaRsp
*
rspMsg
);
int32_t
catalog
UpdateTableMeta
(
SCatalog
*
pCatalog
,
STableMetaRsp
*
rs
pMsg
);
int32_t
catalog
AsyncUpdateTableMeta
(
SCatalog
*
pCtg
,
STableMetaRsp
*
pMsg
);
int32_t
catalogGetCachedTableMeta
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
);
int32_t
catalogGetCachedTableMeta
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
);
...
...
include/libs/qcom/query.h
浏览文件 @
42c65898
...
@@ -90,28 +90,23 @@ typedef struct STbVerInfo {
...
@@ -90,28 +90,23 @@ typedef struct STbVerInfo {
int32_t
tversion
;
int32_t
tversion
;
}
STbVerInfo
;
}
STbVerInfo
;
/*
#pragma pack(push, 1)
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/
typedef
struct
SCTableMeta
{
typedef
struct
SCTableMeta
{
int32_t
vgId
:
24
;
int8_t
tableType
;
uint64_t
uid
;
uint64_t
uid
;
uint64_t
suid
;
uint64_t
suid
;
int32_t
vgId
;
int8_t
tableType
;
}
SCTableMeta
;
}
SCTableMeta
;
#pragma pack(pop)
/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a
#pragma pack(push, 1)
* SCTableMeta.
*/
typedef
struct
STableMeta
{
typedef
struct
STableMeta
{
// BEGIN: KEEP THIS PART SAME WITH SCTableMeta
// BEGIN: KEEP THIS PART SAME WITH SCTableMeta
int32_t
vgId
:
24
;
int8_t
tableType
;
uint64_t
uid
;
uint64_t
uid
;
uint64_t
suid
;
uint64_t
suid
;
int32_t
vgId
;
int8_t
tableType
;
// END: KEEP THIS PART SAME WITH SCTableMeta
// END: KEEP THIS PART SAME WITH SCTableMeta
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta
...
@@ -121,6 +116,7 @@ typedef struct STableMeta {
...
@@ -121,6 +116,7 @@ typedef struct STableMeta {
STableComInfo
tableInfo
;
STableComInfo
tableInfo
;
SSchema
schema
[];
SSchema
schema
[];
}
STableMeta
;
}
STableMeta
;
#pragma pack(pop)
typedef
struct
SDBVgInfo
{
typedef
struct
SDBVgInfo
{
int32_t
vgVersion
;
int32_t
vgVersion
;
...
@@ -130,7 +126,7 @@ typedef struct SDBVgInfo {
...
@@ -130,7 +126,7 @@ typedef struct SDBVgInfo {
int32_t
numOfTable
;
// DB's table num, unit is TSDB_TABLE_NUM_UNIT
int32_t
numOfTable
;
// DB's table num, unit is TSDB_TABLE_NUM_UNIT
int64_t
stateTs
;
int64_t
stateTs
;
SHashObj
*
vgHash
;
// key:vgId, value:SVgroupInfo
SHashObj
*
vgHash
;
// key:vgId, value:SVgroupInfo
SArray
*
vgArray
;
SArray
*
vgArray
;
// SVgroupInfo
}
SDBVgInfo
;
}
SDBVgInfo
;
typedef
struct
SUseDbOutput
{
typedef
struct
SUseDbOutput
{
...
...
source/client/src/clientEnv.c
浏览文件 @
42c65898
...
@@ -656,7 +656,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
...
@@ -656,7 +656,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
}
else
{
}
else
{
tscInfo
(
"set cfg:%s to %s"
,
pItem
->
name
,
str
);
tscInfo
(
"set cfg:%s to %s"
,
pItem
->
name
,
str
);
if
(
TSDB_OPTION_SHELL_ACTIVITY_TIMER
==
option
||
TSDB_OPTION_USE_ADAPTER
==
option
)
{
if
(
TSDB_OPTION_SHELL_ACTIVITY_TIMER
==
option
||
TSDB_OPTION_USE_ADAPTER
==
option
)
{
code
=
taos
Set
Cfg
(
pCfg
,
pItem
->
name
);
code
=
taos
ApplyLocal
Cfg
(
pCfg
,
pItem
->
name
);
}
}
}
}
...
...
source/client/src/clientHb.c
浏览文件 @
42c65898
...
@@ -224,7 +224,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
...
@@ -224,7 +224,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
return
TSDB_CODE_TSC_INVALID_VALUE
;
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
}
catalogUpdateTableMeta
(
pCatalog
,
rsp
);
catalog
Async
UpdateTableMeta
(
pCatalog
,
rsp
);
}
}
}
}
...
...
source/client/src/clientImpl.c
浏览文件 @
42c65898
...
@@ -815,7 +815,7 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
...
@@ -815,7 +815,7 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
}
}
int32_t
handleCreateTbExecRes
(
void
*
res
,
SCatalog
*
pCatalog
)
{
int32_t
handleCreateTbExecRes
(
void
*
res
,
SCatalog
*
pCatalog
)
{
return
catalogUpdateTableMeta
(
pCatalog
,
(
STableMetaRsp
*
)
res
);
return
catalog
Async
UpdateTableMeta
(
pCatalog
,
(
STableMetaRsp
*
)
res
);
}
}
int32_t
handleQueryExecRsp
(
SRequestObj
*
pRequest
)
{
int32_t
handleQueryExecRsp
(
SRequestObj
*
pRequest
)
{
...
...
source/common/src/tglobal.c
浏览文件 @
42c65898
...
@@ -117,12 +117,10 @@ int32_t tsRedirectFactor = 2;
...
@@ -117,12 +117,10 @@ int32_t tsRedirectFactor = 2;
int32_t
tsRedirectMaxPeriod
=
1000
;
int32_t
tsRedirectMaxPeriod
=
1000
;
int32_t
tsMaxRetryWaitTime
=
10000
;
int32_t
tsMaxRetryWaitTime
=
10000
;
bool
tsUseAdapter
=
false
;
bool
tsUseAdapter
=
false
;
int32_t
tsMetaCacheMaxSize
=
-
1
;
// MB
int32_t
tsSlowLogThreshold
=
3
;
// seconds
int32_t
tsSlowLogThreshold
=
3
;
// seconds
int32_t
tsSlowLogScope
=
SLOW_LOG_TYPE_ALL
;
int32_t
tsSlowLogScope
=
SLOW_LOG_TYPE_ALL
;
/*
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
* denote if the server needs to compress response message at the application layer to client, including query rsp,
* metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server.
* metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server.
...
@@ -351,6 +349,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
...
@@ -351,6 +349,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if
(
cfgAddBool
(
pCfg
,
"useAdapter"
,
tsUseAdapter
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"useAdapter"
,
tsUseAdapter
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"crashReporting"
,
tsEnableCrashReport
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"crashReporting"
,
tsEnableCrashReport
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"queryMaxConcurrentTables"
,
tsQueryMaxConcurrentTables
,
INT64_MIN
,
INT64_MAX
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"queryMaxConcurrentTables"
,
tsQueryMaxConcurrentTables
,
INT64_MIN
,
INT64_MAX
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"metaCacheMaxSize"
,
tsMetaCacheMaxSize
,
-
1
,
INT32_MAX
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"slowLogThreshold"
,
tsSlowLogThreshold
,
0
,
INT32_MAX
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"slowLogThreshold"
,
tsSlowLogThreshold
,
0
,
INT32_MAX
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"slowLogScope"
,
""
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"slowLogScope"
,
""
,
true
)
!=
0
)
return
-
1
;
...
@@ -788,6 +787,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
...
@@ -788,6 +787,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsUseAdapter
=
cfgGetItem
(
pCfg
,
"useAdapter"
)
->
bval
;
tsUseAdapter
=
cfgGetItem
(
pCfg
,
"useAdapter"
)
->
bval
;
tsEnableCrashReport
=
cfgGetItem
(
pCfg
,
"crashReporting"
)
->
bval
;
tsEnableCrashReport
=
cfgGetItem
(
pCfg
,
"crashReporting"
)
->
bval
;
tsQueryMaxConcurrentTables
=
cfgGetItem
(
pCfg
,
"queryMaxConcurrentTables"
)
->
i64
;
tsQueryMaxConcurrentTables
=
cfgGetItem
(
pCfg
,
"queryMaxConcurrentTables"
)
->
i64
;
tsMetaCacheMaxSize
=
cfgGetItem
(
pCfg
,
"metaCacheMaxSize"
)
->
i32
;
tsSlowLogThreshold
=
cfgGetItem
(
pCfg
,
"slowLogThreshold"
)
->
i32
;
tsSlowLogThreshold
=
cfgGetItem
(
pCfg
,
"slowLogThreshold"
)
->
i32
;
if
(
taosSetSlowLogScope
(
cfgGetItem
(
pCfg
,
"slowLogScope"
)
->
str
))
{
if
(
taosSetSlowLogScope
(
cfgGetItem
(
pCfg
,
"slowLogScope"
)
->
str
))
{
return
-
1
;
return
-
1
;
...
@@ -916,7 +916,7 @@ void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) {
...
@@ -916,7 +916,7 @@ void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) {
*
forbidden
=
false
;
*
forbidden
=
false
;
}
}
int32_t
taos
Set
Cfg
(
SConfig
*
pCfg
,
char
*
name
)
{
int32_t
taos
ApplyLocal
Cfg
(
SConfig
*
pCfg
,
char
*
name
)
{
int32_t
len
=
strlen
(
name
);
int32_t
len
=
strlen
(
name
);
char
lowcaseName
[
CFG_NAME_MAX_LEN
+
1
]
=
{
0
};
char
lowcaseName
[
CFG_NAME_MAX_LEN
+
1
]
=
{
0
};
strntolower
(
lowcaseName
,
name
,
TMIN
(
CFG_NAME_MAX_LEN
,
len
));
strntolower
(
lowcaseName
,
name
,
TMIN
(
CFG_NAME_MAX_LEN
,
len
));
...
@@ -1051,6 +1051,12 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
...
@@ -1051,6 +1051,12 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
}
}
break
;
break
;
}
}
case
'e'
:
{
if
(
strcasecmp
(
"metaCacheMaxSize"
,
name
)
==
0
)
{
atomic_store_32
(
&
tsMetaCacheMaxSize
,
cfgGetItem
(
pCfg
,
"metaCacheMaxSize"
)
->
i32
);
}
break
;
}
case
'i'
:
{
case
'i'
:
{
if
(
strcasecmp
(
"minimalTmpDirGB"
,
name
)
==
0
)
{
if
(
strcasecmp
(
"minimalTmpDirGB"
,
name
)
==
0
)
{
tsTempSpace
.
reserved
=
(
int64_t
)(((
double
)
cfgGetItem
(
pCfg
,
"minimalTmpDirGB"
)
->
fval
)
*
1024
*
1024
*
1024
);
tsTempSpace
.
reserved
=
(
int64_t
)(((
double
)
cfgGetItem
(
pCfg
,
"minimalTmpDirGB"
)
->
fval
)
*
1024
*
1024
*
1024
);
...
...
source/common/src/tmsg.c
浏览文件 @
42c65898
...
@@ -1566,21 +1566,21 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
...
@@ -1566,21 +1566,21 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
if
(
tDecodeCStrTo
(
pDecoder
,
db
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
db
)
<
0
)
return
-
1
;
int32_t
len
=
strlen
(
db
);
int32_t
len
=
strlen
(
db
);
taosHashPut
(
pRsp
->
createdDbs
,
db
,
len
,
db
,
len
);
taosHashPut
(
pRsp
->
createdDbs
,
db
,
len
,
db
,
len
+
1
);
}
}
for
(
int32_t
i
=
0
;
i
<
numOfReadDbs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfReadDbs
;
++
i
)
{
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
if
(
tDecodeCStrTo
(
pDecoder
,
db
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
db
)
<
0
)
return
-
1
;
int32_t
len
=
strlen
(
db
);
int32_t
len
=
strlen
(
db
);
taosHashPut
(
pRsp
->
readDbs
,
db
,
len
,
db
,
len
);
taosHashPut
(
pRsp
->
readDbs
,
db
,
len
,
db
,
len
+
1
);
}
}
for
(
int32_t
i
=
0
;
i
<
numOfWriteDbs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfWriteDbs
;
++
i
)
{
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
if
(
tDecodeCStrTo
(
pDecoder
,
db
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
db
)
<
0
)
return
-
1
;
int32_t
len
=
strlen
(
db
);
int32_t
len
=
strlen
(
db
);
taosHashPut
(
pRsp
->
writeDbs
,
db
,
len
,
db
,
len
);
taosHashPut
(
pRsp
->
writeDbs
,
db
,
len
,
db
,
len
+
1
);
}
}
if
(
!
tDecodeIsEnd
(
pDecoder
))
{
if
(
!
tDecodeIsEnd
(
pDecoder
))
{
...
@@ -3416,6 +3416,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
...
@@ -3416,6 +3416,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
dbFName
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
dbFName
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pRsp
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pRsp
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
version
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
version
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
indexSize
)
<
0
)
return
-
1
;
int32_t
num
=
taosArrayGetSize
(
pRsp
->
pIndex
);
int32_t
num
=
taosArrayGetSize
(
pRsp
->
pIndex
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
if
(
num
>
0
)
{
if
(
num
>
0
)
{
...
@@ -3461,6 +3462,7 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
...
@@ -3461,6 +3462,7 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
dbFName
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
dbFName
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pRsp
->
suid
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pRsp
->
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
version
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
version
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
indexSize
)
<
0
)
return
-
1
;
int32_t
num
=
0
;
int32_t
num
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
if
(
num
>
0
)
{
if
(
num
>
0
)
{
...
@@ -3735,6 +3737,7 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
...
@@ -3735,6 +3737,7 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
if
(
tEncodeCStr
(
&
encoder
,
pIndexRsp
->
dbFName
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pIndexRsp
->
dbFName
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pIndexRsp
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pIndexRsp
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pIndexRsp
->
version
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pIndexRsp
->
version
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pIndexRsp
->
indexSize
)
<
0
)
return
-
1
;
int32_t
num
=
taosArrayGetSize
(
pIndexRsp
->
pIndex
);
int32_t
num
=
taosArrayGetSize
(
pIndexRsp
->
pIndex
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
...
@@ -3797,6 +3800,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
...
@@ -3797,6 +3800,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
if
(
tDecodeCStrTo
(
&
decoder
,
tableIndexRsp
.
dbFName
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
tableIndexRsp
.
dbFName
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
tableIndexRsp
.
suid
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
tableIndexRsp
.
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
tableIndexRsp
.
version
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
tableIndexRsp
.
version
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
tableIndexRsp
.
indexSize
)
<
0
)
return
-
1
;
int32_t
num
=
0
;
int32_t
num
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
if
(
num
>
0
)
{
if
(
num
>
0
)
{
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
42c65898
...
@@ -1114,6 +1114,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
...
@@ -1114,6 +1114,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
return
code
;
return
code
;
}
}
rsp
->
indexSize
+=
sizeof
(
info
)
+
pSma
->
exprLen
+
1
;
*
exist
=
true
;
*
exist
=
true
;
sdbRelease
(
pSdb
,
pSma
);
sdbRelease
(
pSdb
,
pSma
);
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
42c65898
...
@@ -23,6 +23,8 @@ extern "C" {
...
@@ -23,6 +23,8 @@ extern "C" {
#include "catalog.h"
#include "catalog.h"
#include "query.h"
#include "query.h"
#include "tcommon.h"
#include "tcommon.h"
#include "ttimer.h"
#include "tglobal.h"
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
...
@@ -34,6 +36,8 @@ extern "C" {
...
@@ -34,6 +36,8 @@ extern "C" {
#define CTG_DEFAULT_BATCH_NUM 64
#define CTG_DEFAULT_BATCH_NUM 64
#define CTG_DEFAULT_FETCH_NUM 8
#define CTG_DEFAULT_FETCH_NUM 8
#define CTG_MAX_COMMAND_LEN 512
#define CTG_MAX_COMMAND_LEN 512
#define CTG_DEFAULT_CACHE_MON_MSEC 5000
#define CTG_CLEAR_CACHE_ROUND_TB_NUM 3000
#define CTG_RENT_SLOT_SECOND 1.5
#define CTG_RENT_SLOT_SECOND 1.5
...
@@ -131,6 +135,7 @@ typedef struct SCtgDebug {
...
@@ -131,6 +135,7 @@ typedef struct SCtgDebug {
typedef
struct
SCtgCacheStat
{
typedef
struct
SCtgCacheStat
{
uint64_t
cacheNum
[
CTG_CI_MAX_VALUE
];
uint64_t
cacheNum
[
CTG_CI_MAX_VALUE
];
uint64_t
cacheSize
[
CTG_CI_MAX_VALUE
];
uint64_t
cacheHit
[
CTG_CI_MAX_VALUE
];
uint64_t
cacheHit
[
CTG_CI_MAX_VALUE
];
uint64_t
cacheNHit
[
CTG_CI_MAX_VALUE
];
uint64_t
cacheNHit
[
CTG_CI_MAX_VALUE
];
}
SCtgCacheStat
;
}
SCtgCacheStat
;
...
@@ -239,8 +244,8 @@ typedef STableIndexRsp STableIndex;
...
@@ -239,8 +244,8 @@ typedef STableIndexRsp STableIndex;
typedef
struct
SCtgTbCache
{
typedef
struct
SCtgTbCache
{
SRWLatch
metaLock
;
SRWLatch
metaLock
;
STableMeta
*
pMeta
;
SRWLatch
indexLock
;
SRWLatch
indexLock
;
STableMeta
*
pMeta
;
STableIndex
*
pIndex
;
STableIndex
*
pIndex
;
}
SCtgTbCache
;
}
SCtgTbCache
;
...
@@ -263,6 +268,7 @@ typedef struct SCtgDBCache {
...
@@ -263,6 +268,7 @@ typedef struct SCtgDBCache {
SHashObj
*
tbCache
;
// key:tbname, value:SCtgTbCache
SHashObj
*
tbCache
;
// key:tbname, value:SCtgTbCache
SHashObj
*
stbCache
;
// key:suid, value:char*
SHashObj
*
stbCache
;
// key:suid, value:char*
uint64_t
dbCacheNum
[
CTG_CI_MAX_VALUE
];
uint64_t
dbCacheNum
[
CTG_CI_MAX_VALUE
];
uint64_t
dbCacheSize
;
}
SCtgDBCache
;
}
SCtgDBCache
;
typedef
struct
SCtgRentSlot
{
typedef
struct
SCtgRentSlot
{
...
@@ -276,12 +282,15 @@ typedef struct SCtgRentMgmt {
...
@@ -276,12 +282,15 @@ typedef struct SCtgRentMgmt {
uint16_t
slotNum
;
uint16_t
slotNum
;
uint16_t
slotRIdx
;
uint16_t
slotRIdx
;
int64_t
lastReadMsec
;
int64_t
lastReadMsec
;
uint64_t
rentCacheSize
;
int32_t
metaSize
;
SCtgRentSlot
*
slots
;
SCtgRentSlot
*
slots
;
}
SCtgRentMgmt
;
}
SCtgRentMgmt
;
typedef
struct
SCtgUserAuth
{
typedef
struct
SCtgUserAuth
{
SRWLatch
lock
;
SRWLatch
lock
;
SGetUserAuthRsp
userAuth
;
SGetUserAuthRsp
userAuth
;
uint64_t
userCacheSize
;
}
SCtgUserAuth
;
}
SCtgUserAuth
;
typedef
struct
SCatalog
{
typedef
struct
SCatalog
{
...
@@ -412,6 +421,7 @@ typedef struct SCtgRuntimeStat {
...
@@ -412,6 +421,7 @@ typedef struct SCtgRuntimeStat {
uint64_t
numOfOpAbort
;
uint64_t
numOfOpAbort
;
uint64_t
numOfOpEnqueue
;
uint64_t
numOfOpEnqueue
;
uint64_t
numOfOpDequeue
;
uint64_t
numOfOpDequeue
;
uint64_t
numOfOpClearMeta
;
uint64_t
numOfOpClearCache
;
uint64_t
numOfOpClearCache
;
}
SCtgRuntimeStat
;
}
SCtgRuntimeStat
;
...
@@ -488,6 +498,7 @@ typedef struct SCtgDropTbIndexMsg {
...
@@ -488,6 +498,7 @@ typedef struct SCtgDropTbIndexMsg {
typedef
struct
SCtgClearCacheMsg
{
typedef
struct
SCtgClearCacheMsg
{
SCatalog
*
pCtg
;
SCatalog
*
pCtg
;
bool
clearMeta
;
bool
freeCtg
;
bool
freeCtg
;
}
SCtgClearCacheMsg
;
}
SCtgClearCacheMsg
;
...
@@ -526,6 +537,8 @@ typedef struct SCatalogMgmt {
...
@@ -526,6 +537,8 @@ typedef struct SCatalogMgmt {
int32_t
jobPool
;
int32_t
jobPool
;
SRWLatch
lock
;
SRWLatch
lock
;
SCtgQueue
queue
;
SCtgQueue
queue
;
void
*
timer
;
tmr_h
cacheTimer
;
TdThread
updateThread
;
TdThread
updateThread
;
SHashObj
*
pCluster
;
// key: clusterId, value: SCatalog*
SHashObj
*
pCluster
;
// key: clusterId, value: SCatalog*
SCatalogStat
statInfo
;
SCatalogStat
statInfo
;
...
@@ -542,8 +555,8 @@ typedef struct SCtgOperation {
...
@@ -542,8 +555,8 @@ typedef struct SCtgOperation {
}
SCtgOperation
;
}
SCtgOperation
;
typedef
struct
SCtgCacheItemInfo
{
typedef
struct
SCtgCacheItemInfo
{
char
*
name
;
char
*
name
;
int32_t
flag
;
int32_t
flag
;
}
SCtgCacheItemInfo
;
}
SCtgCacheItemInfo
;
#define CTG_AUTH_READ(_t) ((_t) == AUTH_TYPE_READ || (_t) == AUTH_TYPE_READ_OR_WRITE)
#define CTG_AUTH_READ(_t) ((_t) == AUTH_TYPE_READ || (_t) == AUTH_TYPE_READ_OR_WRITE)
...
@@ -556,11 +569,6 @@ typedef struct SCtgCacheItemInfo {
...
@@ -556,11 +569,6 @@ typedef struct SCtgCacheItemInfo {
#define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1
#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1
#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1
#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0
#define CTG_STAT_API_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.api.item, n))
#define CTG_STAT_API_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.api.item, n))
#define CTG_STAT_RT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.runtime.item, n))
#define CTG_STAT_RT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.runtime.item, n))
#define CTG_STAT_NUM_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.cache.cacheNum[item], n))
#define CTG_STAT_NUM_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.cache.cacheNum[item], n))
...
@@ -575,6 +583,11 @@ typedef struct SCtgCacheItemInfo {
...
@@ -575,6 +583,11 @@ typedef struct SCtgCacheItemInfo {
#define CTG_CACHE_HIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheHit[item], n))
#define CTG_CACHE_HIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheHit[item], n))
#define CTG_CACHE_NHIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheNHit[item], n))
#define CTG_CACHE_NHIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheNHit[item], n))
#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1
#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1
#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1
#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0
#define CTG_META_NUM_INC(type) \
#define CTG_META_NUM_INC(type) \
do { \
do { \
switch (type) { \
switch (type) { \
...
@@ -685,6 +698,10 @@ typedef struct SCtgCacheItemInfo {
...
@@ -685,6 +698,10 @@ typedef struct SCtgCacheItemInfo {
#define CTG_DB_NOT_EXIST(code) \
#define CTG_DB_NOT_EXIST(code) \
(code == TSDB_CODE_MND_DB_NOT_EXIST || code == TSDB_CODE_MND_DB_IN_CREATING || code == TSDB_CODE_MND_DB_IN_DROPPING)
(code == TSDB_CODE_MND_DB_NOT_EXIST || code == TSDB_CODE_MND_DB_IN_CREATING || code == TSDB_CODE_MND_DB_IN_DROPPING)
#define CTG_CACHE_OVERFLOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) >= (_maxsize) * 1048576L * 0.9) : false)
#define CTG_CACHE_LOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) <= (_maxsize) * 1048576L * 0.75) : true)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCtg, __VA_ARGS__)
...
@@ -787,6 +804,12 @@ typedef struct SCtgCacheItemInfo {
...
@@ -787,6 +804,12 @@ typedef struct SCtgCacheItemInfo {
CTG_RET(__code); \
CTG_RET(__code); \
} while (0)
} while (0)
#define CTG_API_NLEAVE() \
do { \
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); \
CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \
} while (0)
#define CTG_API_ENTER() \
#define CTG_API_ENTER() \
do { \
do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
...
@@ -796,6 +819,15 @@ typedef struct SCtgCacheItemInfo {
...
@@ -796,6 +819,15 @@ typedef struct SCtgCacheItemInfo {
} \
} \
} while (0)
} while (0)
#define CTG_API_NENTER() \
do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
CTG_API_NLEAVE(); \
} \
} while (0)
#define CTG_API_JENTER() \
#define CTG_API_JENTER() \
do { \
do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
...
@@ -859,8 +891,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool sy
...
@@ -859,8 +891,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool sy
int32_t
ctgUpdateUserEnqueue
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
,
bool
syncReq
);
int32_t
ctgUpdateUserEnqueue
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
,
bool
syncReq
);
int32_t
ctgUpdateVgEpsetEnqueue
(
SCatalog
*
pCtg
,
char
*
dbFName
,
int32_t
vgId
,
SEpSet
*
pEpSet
);
int32_t
ctgUpdateVgEpsetEnqueue
(
SCatalog
*
pCtg
,
char
*
dbFName
,
int32_t
vgId
,
SEpSet
*
pEpSet
);
int32_t
ctgUpdateTbIndexEnqueue
(
SCatalog
*
pCtg
,
STableIndex
**
pIndex
,
bool
syncOp
);
int32_t
ctgUpdateTbIndexEnqueue
(
SCatalog
*
pCtg
,
STableIndex
**
pIndex
,
bool
syncOp
);
int32_t
ctgClearCacheEnqueue
(
SCatalog
*
pCtg
,
bool
freeCtg
,
bool
stopQueue
,
bool
syncOp
);
int32_t
ctgClearCacheEnqueue
(
SCatalog
*
pCtg
,
bool
clearMeta
,
bool
freeCtg
,
bool
stopQueue
,
bool
syncOp
);
int32_t
ctgMetaRentInit
(
SCtgRentMgmt
*
mgmt
,
uint32_t
rentSec
,
int8_t
type
);
int32_t
ctgMetaRentInit
(
SCtgRentMgmt
*
mgmt
,
uint32_t
rentSec
,
int8_t
type
,
int32_t
size
);
int32_t
ctgMetaRentAdd
(
SCtgRentMgmt
*
mgmt
,
void
*
meta
,
int64_t
id
,
int32_t
size
);
int32_t
ctgMetaRentAdd
(
SCtgRentMgmt
*
mgmt
,
void
*
meta
,
int64_t
id
,
int32_t
size
);
int32_t
ctgMetaRentGet
(
SCtgRentMgmt
*
mgmt
,
void
**
res
,
uint32_t
*
num
,
int32_t
size
);
int32_t
ctgMetaRentGet
(
SCtgRentMgmt
*
mgmt
,
void
**
res
,
uint32_t
*
num
,
int32_t
size
);
int32_t
ctgUpdateTbMetaToCache
(
SCatalog
*
pCtg
,
STableMetaOutput
*
pOut
,
bool
syncReq
);
int32_t
ctgUpdateTbMetaToCache
(
SCatalog
*
pCtg
,
STableMetaOutput
*
pOut
,
bool
syncReq
);
...
@@ -941,7 +973,7 @@ void ctgFreeSTableIndex(void* info);
...
@@ -941,7 +973,7 @@ void ctgFreeSTableIndex(void* info);
void
ctgClearSubTaskRes
(
SCtgSubRes
*
pRes
);
void
ctgClearSubTaskRes
(
SCtgSubRes
*
pRes
);
void
ctgFreeQNode
(
SCtgQNode
*
node
);
void
ctgFreeQNode
(
SCtgQNode
*
node
);
void
ctgClearHandle
(
SCatalog
*
pCtg
);
void
ctgClearHandle
(
SCatalog
*
pCtg
);
void
ctgFreeTbCacheImpl
(
SCtgTbCache
*
pCache
);
void
ctgFreeTbCacheImpl
(
SCtgTbCache
*
pCache
,
bool
lock
);
int32_t
ctgRemoveTbMeta
(
SCatalog
*
pCtg
,
SName
*
pTableName
);
int32_t
ctgRemoveTbMeta
(
SCatalog
*
pCtg
,
SName
*
pTableName
);
int32_t
ctgGetTbHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
,
int32_t
ctgGetTbHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
,
bool
*
exists
);
bool
*
exists
);
...
@@ -960,6 +992,16 @@ void ctgReleaseVgMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach
...
@@ -960,6 +992,16 @@ void ctgReleaseVgMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach
void
ctgReleaseTbMetaToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
SCtgTbCache
*
pCache
);
void
ctgReleaseTbMetaToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
SCtgTbCache
*
pCache
);
void
ctgGetGlobalCacheStat
(
SCtgCacheStat
*
pStat
);
void
ctgGetGlobalCacheStat
(
SCtgCacheStat
*
pStat
);
int32_t
ctgChkSetAuthRes
(
SCatalog
*
pCtg
,
SCtgAuthReq
*
req
,
SCtgAuthRsp
*
res
);
int32_t
ctgChkSetAuthRes
(
SCatalog
*
pCtg
,
SCtgAuthReq
*
req
,
SCtgAuthRsp
*
res
);
void
ctgGetGlobalCacheSize
(
uint64_t
*
pSize
);
uint64_t
ctgGetTbIndexCacheSize
(
STableIndex
*
pIndex
);
uint64_t
ctgGetTbMetaCacheSize
(
STableMeta
*
pMeta
);
uint64_t
ctgGetDbVgroupCacheSize
(
SDBVgInfo
*
pVg
);
uint64_t
ctgGetUserCacheSize
(
SGetUserAuthRsp
*
pAuth
);
uint64_t
ctgGetClusterCacheSize
(
SCatalog
*
pCtg
);
void
ctgClearHandleMeta
(
SCatalog
*
pCtg
,
int64_t
*
pClearedSize
,
int64_t
*
pCleardNum
,
bool
*
roundDone
);
void
ctgClearAllHandleMeta
(
int64_t
*
clearedSize
,
int64_t
*
clearedNum
,
bool
*
roundDone
);
void
ctgProcessTimerEvent
(
void
*
param
,
void
*
tmrId
);
int32_t
ctgGetTbMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SCtgTbMetaCtx
*
ctx
,
STableMeta
**
pTableMeta
);
int32_t
ctgGetTbMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SCtgTbMetaCtx
*
ctx
,
STableMeta
**
pTableMeta
);
int32_t
ctgGetCachedStbNameFromSuid
(
SCatalog
*
pCtg
,
char
*
dbFName
,
uint64_t
suid
,
char
**
stbName
);
int32_t
ctgGetCachedStbNameFromSuid
(
SCatalog
*
pCtg
,
char
*
dbFName
,
uint64_t
suid
,
char
**
stbName
);
int32_t
ctgGetTbTagCb
(
SCtgTask
*
pTask
);
int32_t
ctgGetTbTagCb
(
SCtgTask
*
pTask
);
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
42c65898
...
@@ -668,6 +668,36 @@ _return:
...
@@ -668,6 +668,36 @@ _return:
CTG_RET
(
code
);
CTG_RET
(
code
);
}
}
void
ctgProcessTimerEvent
(
void
*
param
,
void
*
tmrId
)
{
CTG_API_NENTER
();
int32_t
cacheMaxSize
=
atomic_load_32
(
&
tsMetaCacheMaxSize
);
if
(
cacheMaxSize
>=
0
)
{
uint64_t
cacheSize
=
0
;
ctgGetGlobalCacheSize
(
&
cacheSize
);
bool
overflow
=
CTG_CACHE_OVERFLOW
(
cacheSize
,
cacheMaxSize
);
qDebug
(
"catalog cache size: %"
PRIu64
"B, maxCaseSize:%dMB, %s"
,
cacheSize
,
cacheMaxSize
,
overflow
?
"overflow"
:
"NO overflow"
);
if
(
overflow
)
{
int32_t
code
=
ctgClearCacheEnqueue
(
NULL
,
true
,
false
,
false
,
false
);
if
(
code
)
{
qError
(
"clear cache enqueue failed, error:%s"
,
tstrerror
(
code
));
taosTmrReset
(
ctgProcessTimerEvent
,
CTG_DEFAULT_CACHE_MON_MSEC
,
NULL
,
gCtgMgmt
.
timer
,
&
gCtgMgmt
.
cacheTimer
);
}
goto
_return
;
}
}
qTrace
(
"reset catalog timer"
);
taosTmrReset
(
ctgProcessTimerEvent
,
CTG_DEFAULT_CACHE_MON_MSEC
,
NULL
,
gCtgMgmt
.
timer
,
&
gCtgMgmt
.
cacheTimer
);
_return:
CTG_API_NLEAVE
();
}
int32_t
ctgGetDBCfg
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
dbFName
,
SDbCfgInfo
*
pDbCfg
)
{
int32_t
ctgGetDBCfg
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
dbFName
,
SDbCfgInfo
*
pDbCfg
)
{
CTG_ERR_RET
(
ctgReadDBCfgFromCache
(
pCtg
,
dbFName
,
pDbCfg
));
CTG_ERR_RET
(
ctgReadDBCfgFromCache
(
pCtg
,
dbFName
,
pDbCfg
));
...
@@ -686,6 +716,7 @@ int32_t ctgGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName
...
@@ -686,6 +716,7 @@ int32_t ctgGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName
int32_t
catalogInit
(
SCatalogCfg
*
cfg
)
{
int32_t
catalogInit
(
SCatalogCfg
*
cfg
)
{
qDebug
(
"catalogInit start"
);
if
(
gCtgMgmt
.
pCluster
)
{
if
(
gCtgMgmt
.
pCluster
)
{
qError
(
"catalog already initialized"
);
qError
(
"catalog already initialized"
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
...
@@ -743,6 +774,18 @@ int32_t catalogInit(SCatalogCfg* cfg) {
...
@@ -743,6 +774,18 @@ int32_t catalogInit(SCatalogCfg* cfg) {
CTG_ERR_RET
(
terrno
);
CTG_ERR_RET
(
terrno
);
}
}
gCtgMgmt
.
timer
=
taosTmrInit
(
0
,
0
,
0
,
"catalog"
);
if
(
NULL
==
gCtgMgmt
.
timer
)
{
qError
(
"init timer failed, error:%s"
,
tstrerror
(
terrno
));
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
gCtgMgmt
.
cacheTimer
=
taosTmrStart
(
ctgProcessTimerEvent
,
CTG_DEFAULT_CACHE_MON_MSEC
,
NULL
,
gCtgMgmt
.
timer
);
if
(
NULL
==
gCtgMgmt
.
cacheTimer
)
{
qError
(
"start cache timer failed"
);
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_ERR_RET
(
ctgStartUpdateThread
());
CTG_ERR_RET
(
ctgStartUpdateThread
());
qDebug
(
"catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u"
,
gCtgMgmt
.
cfg
.
maxDBCacheNum
,
qDebug
(
"catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u"
,
gCtgMgmt
.
cfg
.
maxDBCacheNum
,
...
@@ -786,8 +829,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
...
@@ -786,8 +829,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
clusterCtg
->
clusterId
=
clusterId
;
clusterCtg
->
clusterId
=
clusterId
;
CTG_ERR_JRET
(
ctgMetaRentInit
(
&
clusterCtg
->
dbRent
,
gCtgMgmt
.
cfg
.
dbRentSec
,
CTG_RENT_DB
));
CTG_ERR_JRET
(
ctgMetaRentInit
(
&
clusterCtg
->
dbRent
,
gCtgMgmt
.
cfg
.
dbRentSec
,
CTG_RENT_DB
,
sizeof
(
SDbCacheInfo
)
));
CTG_ERR_JRET
(
ctgMetaRentInit
(
&
clusterCtg
->
stbRent
,
gCtgMgmt
.
cfg
.
stbRentSec
,
CTG_RENT_STABLE
));
CTG_ERR_JRET
(
ctgMetaRentInit
(
&
clusterCtg
->
stbRent
,
gCtgMgmt
.
cfg
.
stbRentSec
,
CTG_RENT_STABLE
,
sizeof
(
SSTableVersion
)
));
clusterCtg
->
dbCache
=
taosHashInit
(
gCtgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
clusterCtg
->
dbCache
=
taosHashInit
(
gCtgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
false
,
HASH_ENTRY_LOCK
);
...
@@ -1139,6 +1182,22 @@ _return:
...
@@ -1139,6 +1182,22 @@ _return:
CTG_API_LEAVE
(
code
);
CTG_API_LEAVE
(
code
);
}
}
int32_t
catalogAsyncUpdateTableMeta
(
SCatalog
*
pCtg
,
STableMetaRsp
*
pMsg
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pMsg
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
int32_t
code
=
0
;
CTG_ERR_JRET
(
ctgUpdateTbMeta
(
pCtg
,
pMsg
,
false
));
_return:
CTG_API_LEAVE
(
code
);
}
int32_t
catalogChkTbMetaVersion
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SArray
*
pTables
)
{
int32_t
catalogChkTbMetaVersion
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SArray
*
pTables
)
{
CTG_API_ENTER
();
CTG_API_ENTER
();
...
@@ -1600,11 +1659,11 @@ int32_t catalogClearCache(void) {
...
@@ -1600,11 +1659,11 @@ int32_t catalogClearCache(void) {
qInfo
(
"start to clear catalog cache"
);
qInfo
(
"start to clear catalog cache"
);
if
(
NULL
==
gCtgMgmt
.
pCluster
||
atomic_load_8
((
int8_t
*
)
&
gCtgMgmt
.
exit
)
)
{
if
(
NULL
==
gCtgMgmt
.
pCluster
)
{
CTG_API_LEAVE_NOLOCK
(
TSDB_CODE_SUCCESS
);
CTG_API_LEAVE_NOLOCK
(
TSDB_CODE_SUCCESS
);
}
}
int32_t
code
=
ctgClearCacheEnqueue
(
NULL
,
false
,
false
,
true
);
int32_t
code
=
ctgClearCacheEnqueue
(
NULL
,
false
,
false
,
false
,
true
);
qInfo
(
"clear catalog cache end, code: %s"
,
tstrerror
(
code
));
qInfo
(
"clear catalog cache end, code: %s"
,
tstrerror
(
code
));
...
@@ -1618,10 +1677,17 @@ void catalogDestroy(void) {
...
@@ -1618,10 +1677,17 @@ void catalogDestroy(void) {
return
;
return
;
}
}
if
(
gCtgMgmt
.
cacheTimer
)
{
taosTmrStop
(
gCtgMgmt
.
cacheTimer
);
gCtgMgmt
.
cacheTimer
=
NULL
;
taosTmrCleanUp
(
gCtgMgmt
.
timer
);
gCtgMgmt
.
timer
=
NULL
;
}
atomic_store_8
((
int8_t
*
)
&
gCtgMgmt
.
exit
,
true
);
atomic_store_8
((
int8_t
*
)
&
gCtgMgmt
.
exit
,
true
);
if
(
!
taosCheckCurrentInDll
())
{
if
(
!
taosCheckCurrentInDll
())
{
ctgClearCacheEnqueue
(
NULL
,
true
,
true
,
true
);
ctgClearCacheEnqueue
(
NULL
,
false
,
true
,
true
,
true
);
taosThreadJoin
(
gCtgMgmt
.
updateThread
,
NULL
);
taosThreadJoin
(
gCtgMgmt
.
updateThread
,
NULL
);
}
}
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
42c65898
...
@@ -33,24 +33,25 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v
...
@@ -33,24 +33,25 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v
{
CTG_OP_CLEAR_CACHE
,
"clear cache"
,
ctgOpClearCache
}};
{
CTG_OP_CLEAR_CACHE
,
"clear cache"
,
ctgOpClearCache
}};
SCtgCacheItemInfo
gCtgStatItem
[
CTG_CI_MAX_VALUE
]
=
{
SCtgCacheItemInfo
gCtgStatItem
[
CTG_CI_MAX_VALUE
]
=
{
{
"Cluster "
,
CTG_CI_FLAG_LEVEL_GLOBAL
},
// CTG_CI_CLUSTER
{
"Cluster "
,
CTG_CI_FLAG_LEVEL_GLOBAL
},
//CTG_CI_CLUSTER
{
"Dnode "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
// CTG_CI_DNODE,
{
"Dnode "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_DNODE,
{
"Qnode "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
// CTG_CI_QNODE,
{
"Qnode "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_QNODE,
{
"DB "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
// CTG_CI_DB,
{
"DB "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_DB,
{
"DbVgroup "
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_DB_VGROUP,
{
"DbVgroup "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_DB_VGROUP,
{
"DbCfg "
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_DB_CFG,
{
"DbCfg "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_DB_CFG,
{
"DbInfo "
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_DB_INFO,
{
"DbInfo "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_DB_INFO,
{
"StbMeta "
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_STABLE_META,
{
"StbMeta "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_STABLE_META,
{
"NtbMeta "
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_NTABLE_META,
{
"NtbMeta "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_NTABLE_META,
{
"CtbMeta "
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_CTABLE_META,
{
"CtbMeta "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_CTABLE_META,
{
"SysTblMeta"
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_SYSTABLE_META,
{
"SysTblMeta"
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_SYSTABLE_META,
{
"OthTblMeta"
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_OTHERTABLE_META,
{
"OthTblMeta"
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_OTHERTABLE_META,
{
"TblSMA "
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_TBL_SMA,
{
"TblSMA "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_TBL_SMA,
{
"TblCfg "
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_TBL_CFG,
{
"TblCfg "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_TBL_CFG,
{
"IndexInfo "
,
CTG_CI_FLAG_LEVEL_DB
},
// CTG_CI_INDEX_INFO,
{
"TblTag "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_TBL_TAG,
{
"User "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
// CTG_CI_USER,
{
"IndexInfo "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_INDEX_INFO,
{
"UDF "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
// CTG_CI_UDF,
{
"User "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_USER,
{
"SvrVer "
,
CTG_CI_FLAG_LEVEL_CLUSTER
}
// CTG_CI_SVR_VER,
{
"UDF "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_UDF,
{
"SvrVer "
,
CTG_CI_FLAG_LEVEL_CLUSTER
}
//CTG_CI_SVR_VER,
};
};
int32_t
ctgRLockVgInfo
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
bool
*
inCache
)
{
int32_t
ctgRLockVgInfo
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
bool
*
inCache
)
{
...
@@ -860,7 +861,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
...
@@ -860,7 +861,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
CTG_UNLOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
queue
.
qlock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
queue
.
qlock
);
ctgDebug
(
"
action [%s] added into queue
"
,
opName
);
ctgDebug
(
"
%sync action [%s] added into queue"
,
syncOp
?
"S"
:
"As
"
,
opName
);
CTG_QUEUE_INC
();
CTG_QUEUE_INC
();
CTG_STAT_RT_INC
(
numOfOpEnqueue
,
1
);
CTG_STAT_RT_INC
(
numOfOpEnqueue
,
1
);
...
@@ -1242,7 +1243,7 @@ _return:
...
@@ -1242,7 +1243,7 @@ _return:
CTG_RET
(
code
);
CTG_RET
(
code
);
}
}
int32_t
ctgClearCacheEnqueue
(
SCatalog
*
pCtg
,
bool
freeCtg
,
bool
stopQueue
,
bool
syncOp
)
{
int32_t
ctgClearCacheEnqueue
(
SCatalog
*
pCtg
,
bool
clearMeta
,
bool
freeCtg
,
bool
stopQueue
,
bool
syncOp
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_CLEAR_CACHE
;
op
->
opId
=
CTG_OP_CLEAR_CACHE
;
...
@@ -1258,6 +1259,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
...
@@ -1258,6 +1259,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
}
}
msg
->
pCtg
=
pCtg
;
msg
->
pCtg
=
pCtg
;
msg
->
clearMeta
=
clearMeta
;
msg
->
freeCtg
=
freeCtg
;
msg
->
freeCtg
=
freeCtg
;
op
->
data
=
msg
;
op
->
data
=
msg
;
...
@@ -1270,10 +1272,11 @@ _return:
...
@@ -1270,10 +1272,11 @@ _return:
CTG_RET
(
code
);
CTG_RET
(
code
);
}
}
int32_t
ctgMetaRentInit
(
SCtgRentMgmt
*
mgmt
,
uint32_t
rentSec
,
int8_t
type
)
{
int32_t
ctgMetaRentInit
(
SCtgRentMgmt
*
mgmt
,
uint32_t
rentSec
,
int8_t
type
,
int32_t
size
)
{
mgmt
->
slotRIdx
=
0
;
mgmt
->
slotRIdx
=
0
;
mgmt
->
slotNum
=
rentSec
/
CTG_RENT_SLOT_SECOND
;
mgmt
->
slotNum
=
rentSec
/
CTG_RENT_SLOT_SECOND
;
mgmt
->
type
=
type
;
mgmt
->
type
=
type
;
mgmt
->
metaSize
=
size
;
size_t
msgSize
=
sizeof
(
SCtgRentSlot
)
*
mgmt
->
slotNum
;
size_t
msgSize
=
sizeof
(
SCtgRentSlot
)
*
mgmt
->
slotNum
;
...
@@ -1283,6 +1286,8 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
...
@@ -1283,6 +1286,8 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
}
mgmt
->
rentCacheSize
=
msgSize
;
qDebug
(
"meta rent initialized, type:%d, slotNum:%d"
,
type
,
mgmt
->
slotNum
);
qDebug
(
"meta rent initialized, type:%d, slotNum:%d"
,
type
,
mgmt
->
slotNum
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -1309,6 +1314,7 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size)
...
@@ -1309,6 +1314,7 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size)
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
}
mgmt
->
rentCacheSize
+=
size
;
slot
->
needSort
=
true
;
slot
->
needSort
=
true
;
qDebug
(
"add meta to rent, id:0x%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
qDebug
(
"add meta to rent, id:0x%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
...
@@ -1389,6 +1395,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
...
@@ -1389,6 +1395,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
}
}
taosArrayRemove
(
slot
->
meta
,
idx
);
taosArrayRemove
(
slot
->
meta
,
idx
);
mgmt
->
rentCacheSize
-=
mgmt
->
metaSize
;
qDebug
(
"meta in rent removed, id:0x%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
qDebug
(
"meta in rent removed, id:0x%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
...
@@ -1656,10 +1663,15 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
...
@@ -1656,10 +1663,15 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
}
}
if
(
origType
==
TSDB_SUPER_TABLE
)
{
if
(
origType
==
TSDB_SUPER_TABLE
)
{
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
char
*
stbName
=
taosHashGet
(
dbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
));
ctgError
(
"stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
if
(
stbName
)
{
}
else
{
uint64_t
metaSize
=
strlen
(
stbName
)
+
1
+
sizeof
(
orig
->
suid
);
ctgDebug
(
"stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
ctgError
(
"stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
}
else
{
ctgDebug
(
"stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
metaSize
);
}
}
}
}
}
}
}
...
@@ -1673,14 +1685,20 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
...
@@ -1673,14 +1685,20 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
}
atomic_add_fetch_64
(
&
dbCache
->
dbCacheSize
,
strlen
(
tbName
)
+
sizeof
(
SCtgTbCache
)
+
ctgGetTbMetaCacheSize
(
meta
));
pCache
=
taosHashGet
(
dbCache
->
tbCache
,
tbName
,
strlen
(
tbName
));
pCache
=
taosHashGet
(
dbCache
->
tbCache
,
tbName
,
strlen
(
tbName
));
}
else
{
}
else
{
CTG_LOCK
(
CTG_WRITE
,
&
pCache
->
metaLock
);
CTG_LOCK
(
CTG_WRITE
,
&
pCache
->
metaLock
);
if
(
orig
)
{
if
(
orig
)
{
CTG_META_NUM_DEC
(
origType
);
CTG_META_NUM_DEC
(
origType
);
}
}
atomic_add_fetch_64
(
&
dbCache
->
dbCacheSize
,
ctgGetTbMetaCacheSize
(
meta
)
-
ctgGetTbMetaCacheSize
(
pCache
->
pMeta
));
taosMemoryFree
(
pCache
->
pMeta
);
taosMemoryFree
(
pCache
->
pMeta
);
pCache
->
pMeta
=
meta
;
pCache
->
pMeta
=
meta
;
CTG_UNLOCK
(
CTG_WRITE
,
&
pCache
->
metaLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
pCache
->
metaLock
);
}
}
...
@@ -1698,6 +1716,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
...
@@ -1698,6 +1716,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
}
atomic_add_fetch_64
(
&
dbCache
->
dbCacheSize
,
sizeof
(
meta
->
suid
)
+
strlen
(
tbName
)
+
1
);
ctgDebug
(
"stb 0x%"
PRIx64
" updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
meta
->
suid
,
dbFName
,
tbName
,
ctgDebug
(
"stb 0x%"
PRIx64
" updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
meta
->
suid
,
dbFName
,
tbName
,
meta
->
tableType
);
meta
->
tableType
);
...
@@ -1730,6 +1750,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
...
@@ -1730,6 +1750,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
}
atomic_add_fetch_64
(
&
dbCache
->
dbCacheSize
,
strlen
(
tbName
)
+
sizeof
(
SCtgTbCache
)
+
ctgGetTbIndexCacheSize
(
pIndex
));
CTG_DB_NUM_INC
(
CTG_CI_TBL_SMA
);
CTG_DB_NUM_INC
(
CTG_CI_TBL_SMA
);
*
index
=
NULL
;
*
index
=
NULL
;
...
@@ -1746,6 +1768,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
...
@@ -1746,6 +1768,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
CTG_LOCK
(
CTG_WRITE
,
&
pCache
->
indexLock
);
CTG_LOCK
(
CTG_WRITE
,
&
pCache
->
indexLock
);
if
(
pCache
->
pIndex
)
{
if
(
pCache
->
pIndex
)
{
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
ctgGetTbIndexCacheSize
(
pCache
->
pIndex
));
if
(
0
==
suid
)
{
if
(
0
==
suid
)
{
suid
=
pCache
->
pIndex
->
suid
;
suid
=
pCache
->
pIndex
->
suid
;
}
}
...
@@ -1756,6 +1779,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
...
@@ -1756,6 +1779,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
pCache
->
pIndex
=
pIndex
;
pCache
->
pIndex
=
pIndex
;
CTG_UNLOCK
(
CTG_WRITE
,
&
pCache
->
indexLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
pCache
->
indexLock
);
atomic_add_fetch_64
(
&
dbCache
->
dbCacheSize
,
ctgGetTbIndexCacheSize
(
pIndex
));
*
index
=
NULL
;
*
index
=
NULL
;
ctgDebug
(
"table %s index updated to cache, ver:%d, num:%d"
,
tbName
,
pIndex
->
version
,
ctgDebug
(
"table %s index updated to cache, ver:%d, num:%d"
,
tbName
,
pIndex
->
version
,
...
@@ -1785,7 +1810,7 @@ _return:
...
@@ -1785,7 +1810,7 @@ _return:
CTG_RET
(
code
);
CTG_RET
(
code
);
}
}
void
ctgClearAll
Instance
(
void
)
{
void
ctgClearAll
Handles
(
void
)
{
SCatalog
*
pCtg
=
NULL
;
SCatalog
*
pCtg
=
NULL
;
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
...
@@ -1800,7 +1825,7 @@ void ctgClearAllInstance(void) {
...
@@ -1800,7 +1825,7 @@ void ctgClearAllInstance(void) {
}
}
}
}
void
ctgFreeAll
Instance
(
void
)
{
void
ctgFreeAll
Handles
(
void
)
{
SCatalog
*
pCtg
=
NULL
;
SCatalog
*
pCtg
=
NULL
;
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
...
@@ -1841,7 +1866,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
...
@@ -1841,7 +1866,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
goto
_return
;
goto
_return
;
}
}
if
(
dbInfo
->
vgVersion
<
0
||
taosHashGetSize
(
dbInfo
->
vgHash
)
<=
0
)
{
if
(
dbInfo
->
vgVersion
<
0
||
(
taosHashGetSize
(
dbInfo
->
vgHash
)
<=
0
&&
!
IS_SYS_DBNAME
(
dbFName
))
)
{
ctgDebug
(
"invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d"
,
dbFName
,
dbInfo
->
vgHash
,
ctgDebug
(
"invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d"
,
dbFName
,
dbInfo
->
vgHash
,
dbInfo
->
vgVersion
,
taosHashGetSize
(
dbInfo
->
vgHash
));
dbInfo
->
vgVersion
,
taosHashGetSize
(
dbInfo
->
vgHash
));
CTG_ERR_JRET
(
TSDB_CODE_APP_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_APP_ERROR
);
...
@@ -1859,6 +1884,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
...
@@ -1859,6 +1884,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
}
}
SCtgVgCache
*
vgCache
=
&
dbCache
->
vgCache
;
SCtgVgCache
*
vgCache
=
&
dbCache
->
vgCache
;
CTG_ERR_JRET
(
ctgWLockVgInfo
(
msg
->
pCtg
,
dbCache
));
CTG_ERR_JRET
(
ctgWLockVgInfo
(
msg
->
pCtg
,
dbCache
));
if
(
vgCache
->
vgInfo
)
{
if
(
vgCache
->
vgInfo
)
{
...
@@ -1881,6 +1907,11 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
...
@@ -1881,6 +1907,11 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
goto
_return
;
goto
_return
;
}
}
uint64_t
groupCacheSize
=
ctgGetDbVgroupCacheSize
(
vgCache
->
vgInfo
);
ctgDebug
(
"sub dbGroupCacheSize %"
PRIu64
" from db, dbFName:%s"
,
groupCacheSize
,
dbFName
);
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
groupCacheSize
);
freeVgInfo
(
vgInfo
);
freeVgInfo
(
vgInfo
);
CTG_DB_NUM_RESET
(
CTG_CI_DB_VGROUP
);
CTG_DB_NUM_RESET
(
CTG_CI_DB_VGROUP
);
}
}
...
@@ -1898,6 +1929,10 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
...
@@ -1898,6 +1929,10 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
ctgWUnlockVgInfo
(
dbCache
);
ctgWUnlockVgInfo
(
dbCache
);
uint64_t
groupCacheSize
=
ctgGetDbVgroupCacheSize
(
vgCache
->
vgInfo
);
atomic_add_fetch_64
(
&
dbCache
->
dbCacheSize
,
groupCacheSize
);
ctgDebug
(
"add dbGroupCacheSize %"
PRIu64
" from db, dbFName:%s"
,
groupCacheSize
,
dbFName
);
dbCache
=
NULL
;
dbCache
=
NULL
;
// if (!IS_SYS_DBNAME(dbFName)) {
// if (!IS_SYS_DBNAME(dbFName)) {
...
@@ -2022,6 +2057,8 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
...
@@ -2022,6 +2057,8 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
CTG_ERR_JRET
(
ctgWLockVgInfo
(
pCtg
,
dbCache
));
CTG_ERR_JRET
(
ctgWLockVgInfo
(
pCtg
,
dbCache
));
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
ctgGetDbVgroupCacheSize
(
dbCache
->
vgCache
.
vgInfo
));
freeVgInfo
(
dbCache
->
vgCache
.
vgInfo
);
freeVgInfo
(
dbCache
->
vgCache
.
vgInfo
);
dbCache
->
vgCache
.
vgInfo
=
NULL
;
dbCache
->
vgCache
.
vgInfo
=
NULL
;
...
@@ -2113,26 +2150,32 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
...
@@ -2113,26 +2150,32 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
goto
_return
;
goto
_return
;
}
}
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
)))
{
char
*
stbName
=
taosHashGet
(
dbCache
->
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
));
ctgDebug
(
"stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
if
(
stbName
)
{
msg
->
stbName
,
msg
->
suid
);
uint64_t
metaSize
=
strlen
(
stbName
)
+
1
+
sizeof
(
msg
->
suid
);
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
)))
{
ctgDebug
(
"stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
else
{
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
metaSize
);
}
}
}
SCtgTbCache
*
pTbCache
=
taosHashGet
(
dbCache
->
tbCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
));
SCtgTbCache
*
pTbCache
=
taosHashGet
(
dbCache
->
tbCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
));
if
(
NULL
==
pTbCache
)
{
if
(
NULL
==
pTbCache
)
{
ctgDebug
(
"stb %s already not in cache"
,
msg
->
stbName
);
ctgDebug
(
"stb %s already not in cache"
,
msg
->
stbName
);
goto
_return
;
goto
_return
;
}
}
CTG_LOCK
(
CTG_WRITE
,
&
pTbCache
->
metaLock
);
tblType
=
pTbCache
->
pMeta
->
tableType
;
tblType
=
pTbCache
->
pMeta
->
tableType
;
ctgFreeTbCacheImpl
(
pTbCache
);
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
ctgGetTbMetaCacheSize
(
pTbCache
->
pMeta
)
+
ctgGetTbIndexCacheSize
(
pTbCache
->
pIndex
)
);
CTG_UNLOCK
(
CTG_WRITE
,
&
pTbCache
->
metaLock
);
ctgFreeTbCacheImpl
(
pTbCache
,
true
);
if
(
taosHashRemove
(
dbCache
->
tbCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
)))
{
if
(
taosHashRemove
(
dbCache
->
tbCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
)))
{
ctgError
(
"stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
ctgError
(
"stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
else
{
}
else
{
CTG_META_NUM_DEC
(
tblType
);
CTG_META_NUM_DEC
(
tblType
);
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
sizeof
(
*
pTbCache
)
+
strlen
(
msg
->
stbName
));
}
}
ctgInfo
(
"stb removed from cache, dbFName:%s, stbName:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
ctgInfo
(
"stb removed from cache, dbFName:%s, stbName:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
...
@@ -2176,15 +2219,15 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
...
@@ -2176,15 +2219,15 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
goto
_return
;
goto
_return
;
}
}
CTG_LOCK
(
CTG_WRITE
,
&
pTbCache
->
metaLock
);
tblType
=
pTbCache
->
pMeta
->
tableType
;
tblType
=
pTbCache
->
pMeta
->
tableType
;
ctgFreeTbCacheImpl
(
pTbCache
);
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
ctgGetTbMetaCacheSize
(
pTbCache
->
pMeta
)
+
ctgGetTbIndexCacheSize
(
pTbCache
->
pIndex
)
);
CTG_UNLOCK
(
CTG_WRITE
,
&
pTbCache
->
metaLock
);
ctgFreeTbCacheImpl
(
pTbCache
,
true
);
if
(
taosHashRemove
(
dbCache
->
tbCache
,
msg
->
tbName
,
strlen
(
msg
->
tbName
)))
{
if
(
taosHashRemove
(
dbCache
->
tbCache
,
msg
->
tbName
,
strlen
(
msg
->
tbName
)))
{
ctgError
(
"tb %s not exist in cache, dbFName:%s"
,
msg
->
tbName
,
msg
->
dbFName
);
ctgError
(
"tb %s not exist in cache, dbFName:%s"
,
msg
->
tbName
,
msg
->
dbFName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
else
{
}
else
{
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
sizeof
(
*
pTbCache
)
+
strlen
(
msg
->
tbName
));
CTG_META_NUM_DEC
(
tblType
);
CTG_META_NUM_DEC
(
tblType
);
}
}
...
@@ -2211,7 +2254,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
...
@@ -2211,7 +2254,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
SCtgUserAuth
userAuth
=
{
0
};
SCtgUserAuth
userAuth
=
{
0
};
memcpy
(
&
userAuth
.
userAuth
,
&
msg
->
userAuth
,
sizeof
(
msg
->
userAuth
));
memcpy
(
&
userAuth
.
userAuth
,
&
msg
->
userAuth
,
sizeof
(
msg
->
userAuth
));
userAuth
.
userCacheSize
=
ctgGetUserCacheSize
(
&
userAuth
.
userAuth
);
if
(
taosHashPut
(
pCtg
->
userCache
,
msg
->
userAuth
.
user
,
strlen
(
msg
->
userAuth
.
user
),
&
userAuth
,
sizeof
(
userAuth
)))
{
if
(
taosHashPut
(
pCtg
->
userCache
,
msg
->
userAuth
.
user
,
strlen
(
msg
->
userAuth
.
user
),
&
userAuth
,
sizeof
(
userAuth
)))
{
ctgError
(
"taosHashPut user %s to cache failed"
,
msg
->
userAuth
.
user
);
ctgError
(
"taosHashPut user %s to cache failed"
,
msg
->
userAuth
.
user
);
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
...
@@ -2243,6 +2287,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
...
@@ -2243,6 +2287,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
msg
->
userAuth
.
useDbs
=
NULL
;
msg
->
userAuth
.
useDbs
=
NULL
;
CTG_UNLOCK
(
CTG_WRITE
,
&
pUser
->
lock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
pUser
->
lock
);
atomic_store_64
(
&
pUser
->
userCacheSize
,
ctgGetUserCacheSize
(
&
pUser
->
userAuth
));
_return:
_return:
...
@@ -2378,33 +2424,78 @@ _return:
...
@@ -2378,33 +2424,78 @@ _return:
CTG_RET
(
code
);
CTG_RET
(
code
);
}
}
int32_t
ctgOpClearCache
(
SCtgCacheOperation
*
operation
)
{
void
ctgClearFreeCache
(
SCtgCacheOperation
*
operation
)
{
int32_t
code
=
0
;
SCtgClearCacheMsg
*
msg
=
operation
->
data
;
SCtgClearCacheMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
CTG_LOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
lock
);
CTG_LOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
lock
);
if
(
pCtg
)
{
if
(
pCtg
)
{
if
(
msg
->
freeCtg
)
{
if
(
msg
->
freeCtg
)
{
ctgFreeHandle
(
pCtg
);
ctgFreeHandle
(
pCtg
);
}
else
{
}
else
{
ctgClearHandle
(
pCtg
);
ctgClearHandle
(
pCtg
);
}
}
}
else
if
(
msg
->
freeCtg
)
{
ctgFreeAllHandles
();
}
else
{
ctgClearAllHandles
();
}
CTG_UNLOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
lock
);
}
goto
_return
;
void
ctgClearMetaCache
(
SCtgCacheOperation
*
operation
)
{
SCtgClearCacheMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
int64_t
clearedSize
=
0
;
int64_t
clearedNum
=
0
;
int64_t
remainSize
=
0
;
bool
roundDone
=
false
;
if
(
pCtg
)
{
ctgClearHandleMeta
(
pCtg
,
&
clearedSize
,
&
clearedNum
,
&
roundDone
);
}
else
{
ctgClearAllHandleMeta
(
&
clearedSize
,
&
clearedNum
,
&
roundDone
);
}
qDebug
(
"catalog finish one round meta clear, clearedSize:%"
PRId64
", clearedNum:%"
PRId64
", done:%d"
,
clearedSize
,
clearedNum
,
roundDone
);
ctgGetGlobalCacheSize
(
&
remainSize
);
int32_t
cacheMaxSize
=
atomic_load_32
(
&
tsMetaCacheMaxSize
);
if
(
CTG_CACHE_LOW
(
remainSize
,
cacheMaxSize
))
{
qDebug
(
"catalog finish meta clear, remainSize:%"
PRId64
", cacheMaxSize:%dMB"
,
remainSize
,
cacheMaxSize
);
taosTmrReset
(
ctgProcessTimerEvent
,
CTG_DEFAULT_CACHE_MON_MSEC
,
NULL
,
gCtgMgmt
.
timer
,
&
gCtgMgmt
.
cacheTimer
);
return
;
}
if
(
!
roundDone
)
{
qDebug
(
"catalog all meta cleared, remainSize:%"
PRId64
", cacheMaxSize:%dMB, to clear handle"
,
remainSize
,
cacheMaxSize
);
ctgClearFreeCache
(
operation
);
taosTmrReset
(
ctgProcessTimerEvent
,
CTG_DEFAULT_CACHE_MON_MSEC
,
NULL
,
gCtgMgmt
.
timer
,
&
gCtgMgmt
.
cacheTimer
);
return
;
}
int32_t
code
=
ctgClearCacheEnqueue
(
NULL
,
true
,
false
,
false
,
false
);
if
(
code
)
{
qError
(
"clear cache enqueue failed, error:%s"
,
tstrerror
(
code
));
taosTmrReset
(
ctgProcessTimerEvent
,
CTG_DEFAULT_CACHE_MON_MSEC
,
NULL
,
gCtgMgmt
.
timer
,
&
gCtgMgmt
.
cacheTimer
);
}
}
}
if
(
msg
->
freeCtg
)
{
int32_t
ctgOpClearCache
(
SCtgCacheOperation
*
operation
)
{
ctgFreeAllInstance
();
int32_t
code
=
0
;
SCtgClearCacheMsg
*
msg
=
operation
->
data
;
if
(
msg
->
clearMeta
)
{
ctgClearMetaCache
(
operation
);
}
else
{
}
else
{
ctgClear
AllInstance
(
);
ctgClear
FreeCache
(
operation
);
}
}
_return:
_return:
CTG_UNLOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
lock
);
taosMemoryFreeClear
(
msg
);
taosMemoryFreeClear
(
msg
);
CTG_RET
(
code
);
CTG_RET
(
code
);
...
...
source/libs/catalog/src/ctgDbg.c
浏览文件 @
42c65898
...
@@ -19,7 +19,7 @@
...
@@ -19,7 +19,7 @@
#include "trpc.h"
#include "trpc.h"
extern
SCatalogMgmt
gCtgMgmt
;
extern
SCatalogMgmt
gCtgMgmt
;
SCtgDebug
gCTGDebug
=
{
0
};
SCtgDebug
gCTGDebug
=
{
.
statEnable
=
true
};
#if 0
#if 0
...
@@ -547,7 +547,10 @@ int32_t ctgdShowStatInfo(void) {
...
@@ -547,7 +547,10 @@ int32_t ctgdShowStatInfo(void) {
CTG_API_ENTER
();
CTG_API_ENTER
();
SCtgCacheStat
cache
;
SCtgCacheStat
cache
;
uint64_t
cacheSize
=
0
;
ctgGetGlobalCacheStat
(
&
cache
);
ctgGetGlobalCacheStat
(
&
cache
);
ctgGetGlobalCacheSize
(
&
cacheSize
);
qDebug
(
"## Global Stat Info %s ##"
,
"begin"
);
qDebug
(
"## Global Stat Info %s ##"
,
"begin"
);
qDebug
(
"##
\t
%s
\t
%s
\t
%s ##"
,
"Num"
,
"Hit"
,
"Nhit"
);
qDebug
(
"##
\t
%s
\t
%s
\t
%s ##"
,
"Num"
,
"Hit"
,
"Nhit"
);
...
@@ -555,6 +558,7 @@ int32_t ctgdShowStatInfo(void) {
...
@@ -555,6 +558,7 @@ int32_t ctgdShowStatInfo(void) {
qDebug
(
"# %s
\t
%"
PRIu64
"
\t
%"
PRIu64
"
\t
%"
PRIu64
" #"
,
gCtgStatItem
[
i
].
name
,
cache
.
cacheNum
[
i
],
cache
.
cacheHit
[
i
],
cache
.
cacheNHit
[
i
]);
qDebug
(
"# %s
\t
%"
PRIu64
"
\t
%"
PRIu64
"
\t
%"
PRIu64
" #"
,
gCtgStatItem
[
i
].
name
,
cache
.
cacheNum
[
i
],
cache
.
cacheHit
[
i
],
cache
.
cacheNHit
[
i
]);
}
}
qDebug
(
"## Global Stat Info %s ##"
,
"end"
);
qDebug
(
"## Global Stat Info %s ##"
,
"end"
);
qDebug
(
"## Global Cache Size: %"
PRIu64
,
cacheSize
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
}
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
42c65898
...
@@ -199,6 +199,7 @@ void ctgFreeMetaRent(SCtgRentMgmt* mgmt) {
...
@@ -199,6 +199,7 @@ void ctgFreeMetaRent(SCtgRentMgmt* mgmt) {
}
}
taosMemoryFreeClear
(
mgmt
->
slots
);
taosMemoryFreeClear
(
mgmt
->
slots
);
mgmt
->
rentCacheSize
=
0
;
}
}
void
ctgFreeStbMetaCache
(
SCtgDBCache
*
dbCache
)
{
void
ctgFreeStbMetaCache
(
SCtgDBCache
*
dbCache
)
{
...
@@ -211,12 +212,26 @@ void ctgFreeStbMetaCache(SCtgDBCache* dbCache) {
...
@@ -211,12 +212,26 @@ void ctgFreeStbMetaCache(SCtgDBCache* dbCache) {
dbCache
->
stbCache
=
NULL
;
dbCache
->
stbCache
=
NULL
;
}
}
void
ctgFreeTbCacheImpl
(
SCtgTbCache
*
pCache
)
{
void
ctgFreeTbCacheImpl
(
SCtgTbCache
*
pCache
,
bool
lock
)
{
qDebug
(
"tbMeta freed, p:%p"
,
pCache
->
pMeta
);
if
(
pCache
->
pMeta
)
{
taosMemoryFreeClear
(
pCache
->
pMeta
);
if
(
lock
)
{
CTG_LOCK
(
CTG_WRITE
,
&
pCache
->
metaLock
);
}
taosMemoryFreeClear
(
pCache
->
pMeta
);
if
(
lock
)
{
CTG_UNLOCK
(
CTG_WRITE
,
&
pCache
->
metaLock
);
}
}
if
(
pCache
->
pIndex
)
{
if
(
pCache
->
pIndex
)
{
if
(
lock
)
{
CTG_LOCK
(
CTG_WRITE
,
&
pCache
->
indexLock
);
}
taosArrayDestroyEx
(
pCache
->
pIndex
->
pIndex
,
tFreeSTableIndexInfo
);
taosArrayDestroyEx
(
pCache
->
pIndex
->
pIndex
,
tFreeSTableIndexInfo
);
taosMemoryFreeClear
(
pCache
->
pIndex
);
taosMemoryFreeClear
(
pCache
->
pIndex
);
if
(
lock
)
{
CTG_UNLOCK
(
CTG_WRITE
,
&
pCache
->
indexLock
);
}
}
}
}
}
...
@@ -228,7 +243,7 @@ void ctgFreeTbCache(SCtgDBCache* dbCache) {
...
@@ -228,7 +243,7 @@ void ctgFreeTbCache(SCtgDBCache* dbCache) {
int32_t
tblNum
=
taosHashGetSize
(
dbCache
->
tbCache
);
int32_t
tblNum
=
taosHashGetSize
(
dbCache
->
tbCache
);
SCtgTbCache
*
pCache
=
taosHashIterate
(
dbCache
->
tbCache
,
NULL
);
SCtgTbCache
*
pCache
=
taosHashIterate
(
dbCache
->
tbCache
,
NULL
);
while
(
NULL
!=
pCache
)
{
while
(
NULL
!=
pCache
)
{
ctgFreeTbCacheImpl
(
pCache
);
ctgFreeTbCacheImpl
(
pCache
,
false
);
pCache
=
taosHashIterate
(
dbCache
->
tbCache
,
pCache
);
pCache
=
taosHashIterate
(
dbCache
->
tbCache
,
pCache
);
}
}
taosHashCleanup
(
dbCache
->
tbCache
);
taosHashCleanup
(
dbCache
->
tbCache
);
...
@@ -316,21 +331,86 @@ void ctgFreeHandle(SCatalog* pCtg) {
...
@@ -316,21 +331,86 @@ void ctgFreeHandle(SCatalog* pCtg) {
ctgInfo
(
"handle freed, clusterId:0x%"
PRIx64
,
clusterId
);
ctgInfo
(
"handle freed, clusterId:0x%"
PRIx64
,
clusterId
);
}
}
void
ctgClearHandleMeta
(
SCatalog
*
pCtg
,
int64_t
*
pClearedSize
,
int64_t
*
pCleardNum
,
bool
*
roundDone
)
{
int64_t
cacheSize
=
0
;
void
*
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
NULL
);
while
(
pIter
)
{
SCtgDBCache
*
dbCache
=
pIter
;
SCtgTbCache
*
pCache
=
taosHashIterate
(
dbCache
->
tbCache
,
NULL
);
while
(
NULL
!=
pCache
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
pCache
,
&
len
);
if
(
pCache
->
pMeta
&&
TSDB_SUPER_TABLE
==
pCache
->
pMeta
->
tableType
)
{
pCache
=
taosHashIterate
(
dbCache
->
tbCache
,
pCache
);
continue
;
}
taosHashRemove
(
dbCache
->
tbCache
,
key
,
len
);
cacheSize
=
len
+
sizeof
(
SCtgTbCache
)
+
ctgGetTbMetaCacheSize
(
pCache
->
pMeta
)
+
ctgGetTbIndexCacheSize
(
pCache
->
pIndex
);
atomic_sub_fetch_64
(
&
dbCache
->
dbCacheSize
,
cacheSize
);
*
pClearedSize
+=
cacheSize
;
(
*
pCleardNum
)
++
;
if
(
pCache
->
pMeta
)
{
CTG_META_NUM_DEC
(
pCache
->
pMeta
->
tableType
);
}
ctgFreeTbCacheImpl
(
pCache
,
true
);
if
(
*
pCleardNum
>=
CTG_CLEAR_CACHE_ROUND_TB_NUM
)
{
taosHashCancelIterate
(
dbCache
->
tbCache
,
pCache
);
goto
_return
;
}
pCache
=
taosHashIterate
(
dbCache
->
tbCache
,
pCache
);
}
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
pIter
);
}
_return:
if
(
*
pCleardNum
>=
CTG_CLEAR_CACHE_ROUND_TB_NUM
)
{
*
roundDone
=
true
;
}
}
void
ctgClearAllHandleMeta
(
int64_t
*
clearedSize
,
int64_t
*
clearedNum
,
bool
*
roundDone
)
{
SCatalog
*
pCtg
=
NULL
;
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
while
(
pIter
)
{
pCtg
=
*
(
SCatalog
**
)
pIter
;
if
(
pCtg
)
{
ctgClearHandleMeta
(
pCtg
,
clearedSize
,
clearedNum
,
roundDone
);
if
(
*
roundDone
)
{
taosHashCancelIterate
(
gCtgMgmt
.
pCluster
,
pIter
);
break
;
}
}
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
pIter
);
}
}
void
ctgClearHandle
(
SCatalog
*
pCtg
)
{
void
ctgClearHandle
(
SCatalog
*
pCtg
)
{
if
(
NULL
==
pCtg
)
{
if
(
NULL
==
pCtg
)
{
return
;
return
;
}
}
uint64_t
clusterId
=
pCtg
->
clusterId
;
uint64_t
clusterId
=
pCtg
->
clusterId
;
ctgFreeMetaRent
(
&
pCtg
->
dbRent
);
ctgFreeMetaRent
(
&
pCtg
->
dbRent
);
ctgFreeMetaRent
(
&
pCtg
->
stbRent
);
ctgFreeMetaRent
(
&
pCtg
->
stbRent
);
ctgFreeInstDbCache
(
pCtg
->
dbCache
);
ctgFreeInstDbCache
(
pCtg
->
dbCache
);
ctgFreeInstUserCache
(
pCtg
->
userCache
);
ctgFreeInstUserCache
(
pCtg
->
userCache
);
ctgMetaRentInit
(
&
pCtg
->
dbRent
,
gCtgMgmt
.
cfg
.
dbRentSec
,
CTG_RENT_DB
);
ctgMetaRentInit
(
&
pCtg
->
dbRent
,
gCtgMgmt
.
cfg
.
dbRentSec
,
CTG_RENT_DB
,
sizeof
(
SDbCacheInfo
)
);
ctgMetaRentInit
(
&
pCtg
->
stbRent
,
gCtgMgmt
.
cfg
.
stbRentSec
,
CTG_RENT_STABLE
);
ctgMetaRentInit
(
&
pCtg
->
stbRent
,
gCtgMgmt
.
cfg
.
stbRentSec
,
CTG_RENT_STABLE
,
sizeof
(
SSTableVersion
)
);
pCtg
->
dbCache
=
taosHashInit
(
gCtgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
pCtg
->
dbCache
=
taosHashInit
(
gCtgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
HASH_ENTRY_LOCK
);
...
@@ -1624,6 +1704,130 @@ void catalogFreeMetaData(SMetaData* pData) {
...
@@ -1624,6 +1704,130 @@ void catalogFreeMetaData(SMetaData* pData) {
}
}
#endif
#endif
uint64_t
ctgGetTbIndexCacheSize
(
STableIndex
*
pIndex
)
{
if
(
NULL
==
pIndex
)
{
return
0
;
}
return
sizeof
(
*
pIndex
)
+
pIndex
->
indexSize
;
}
FORCE_INLINE
uint64_t
ctgGetTbMetaCacheSize
(
STableMeta
*
pMeta
)
{
if
(
NULL
==
pMeta
)
{
return
0
;
}
switch
(
pMeta
->
tableType
)
{
case
TSDB_SUPER_TABLE
:
return
sizeof
(
*
pMeta
)
+
(
pMeta
->
tableInfo
.
numOfColumns
+
pMeta
->
tableInfo
.
numOfTags
)
*
sizeof
(
SSchema
);
case
TSDB_CHILD_TABLE
:
return
sizeof
(
SCTableMeta
);
default:
return
sizeof
(
*
pMeta
)
+
pMeta
->
tableInfo
.
numOfColumns
*
sizeof
(
SSchema
);
}
return
0
;
}
uint64_t
ctgGetDbVgroupCacheSize
(
SDBVgInfo
*
pVg
)
{
if
(
NULL
==
pVg
)
{
return
0
;
}
return
sizeof
(
*
pVg
)
+
taosHashGetSize
(
pVg
->
vgHash
)
*
(
sizeof
(
SVgroupInfo
)
+
sizeof
(
int32_t
))
+
taosArrayGetSize
(
pVg
->
vgArray
)
*
sizeof
(
SVgroupInfo
);
}
uint64_t
ctgGetUserCacheSize
(
SGetUserAuthRsp
*
pAuth
)
{
if
(
NULL
==
pAuth
)
{
return
0
;
}
uint64_t
cacheSize
=
0
;
char
*
p
=
taosHashIterate
(
pAuth
->
createdDbs
,
NULL
);
while
(
p
!=
NULL
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
p
,
&
len
);
cacheSize
+=
len
+
strlen
(
p
);
p
=
taosHashIterate
(
pAuth
->
createdDbs
,
p
);
}
p
=
taosHashIterate
(
pAuth
->
readDbs
,
NULL
);
while
(
p
!=
NULL
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
p
,
&
len
);
cacheSize
+=
len
+
strlen
(
p
);
p
=
taosHashIterate
(
pAuth
->
readDbs
,
p
);
}
p
=
taosHashIterate
(
pAuth
->
writeDbs
,
NULL
);
while
(
p
!=
NULL
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
p
,
&
len
);
cacheSize
+=
len
+
strlen
(
p
);
p
=
taosHashIterate
(
pAuth
->
writeDbs
,
p
);
}
p
=
taosHashIterate
(
pAuth
->
readTbs
,
NULL
);
while
(
p
!=
NULL
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
p
,
&
len
);
cacheSize
+=
len
+
strlen
(
p
);
p
=
taosHashIterate
(
pAuth
->
readTbs
,
p
);
}
p
=
taosHashIterate
(
pAuth
->
writeTbs
,
NULL
);
while
(
p
!=
NULL
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
p
,
&
len
);
cacheSize
+=
len
+
strlen
(
p
);
p
=
taosHashIterate
(
pAuth
->
writeTbs
,
p
);
}
int32_t
*
ref
=
taosHashIterate
(
pAuth
->
useDbs
,
NULL
);
while
(
ref
!=
NULL
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
ref
,
&
len
);
cacheSize
+=
len
+
sizeof
(
*
ref
);
ref
=
taosHashIterate
(
pAuth
->
useDbs
,
ref
);
}
return
cacheSize
;
}
uint64_t
ctgGetClusterCacheSize
(
SCatalog
*
pCtg
)
{
uint64_t
cacheSize
=
sizeof
(
SCatalog
);
SCtgUserAuth
*
pAuth
=
taosHashIterate
(
pCtg
->
userCache
,
NULL
);
while
(
pAuth
!=
NULL
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
pAuth
,
&
len
);
cacheSize
+=
len
+
sizeof
(
SCtgUserAuth
)
+
atomic_load_64
(
&
pAuth
->
userCacheSize
);
pAuth
=
taosHashIterate
(
pCtg
->
userCache
,
pAuth
);
}
SCtgDBCache
*
pDb
=
taosHashIterate
(
pCtg
->
dbCache
,
NULL
);
while
(
pDb
!=
NULL
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
pDb
,
&
len
);
cacheSize
+=
len
+
sizeof
(
SCtgDBCache
)
+
atomic_load_64
(
&
pDb
->
dbCacheSize
);
pDb
=
taosHashIterate
(
pCtg
->
dbCache
,
pDb
);
}
cacheSize
+=
pCtg
->
dbRent
.
rentCacheSize
;
cacheSize
+=
pCtg
->
stbRent
.
rentCacheSize
;
return
cacheSize
;
}
void
ctgGetClusterCacheStat
(
SCatalog
*
pCtg
)
{
void
ctgGetClusterCacheStat
(
SCatalog
*
pCtg
)
{
for
(
int32_t
i
=
0
;
i
<
CTG_CI_MAX_VALUE
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
CTG_CI_MAX_VALUE
;
++
i
)
{
if
(
0
==
(
gCtgStatItem
[
i
].
flag
&
CTG_CI_FLAG_LEVEL_DB
))
{
if
(
0
==
(
gCtgStatItem
[
i
].
flag
&
CTG_CI_FLAG_LEVEL_DB
))
{
...
@@ -1688,3 +1892,23 @@ void ctgGetGlobalCacheStat(SCtgCacheStat* pStat) {
...
@@ -1688,3 +1892,23 @@ void ctgGetGlobalCacheStat(SCtgCacheStat* pStat) {
memcpy
(
pStat
,
&
gCtgMgmt
.
statInfo
.
cache
,
sizeof
(
gCtgMgmt
.
statInfo
.
cache
));
memcpy
(
pStat
,
&
gCtgMgmt
.
statInfo
.
cache
,
sizeof
(
gCtgMgmt
.
statInfo
.
cache
));
}
}
void
ctgGetGlobalCacheSize
(
uint64_t
*
pSize
)
{
*
pSize
=
0
;
SCatalog
*
pCtg
=
NULL
;
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
while
(
pIter
)
{
size_t
len
=
0
;
void
*
key
=
taosHashGetKey
(
pIter
,
&
len
);
*
pSize
+=
len
+
POINTER_BYTES
;
pCtg
=
*
(
SCatalog
**
)
pIter
;
if
(
pCtg
)
{
*
pSize
+=
ctgGetClusterCacheSize
(
pCtg
);
}
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
pIter
);
}
}
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
42c65898
...
@@ -37,6 +37,7 @@
...
@@ -37,6 +37,7 @@
#include "tglobal.h"
#include "tglobal.h"
#include "trpc.h"
#include "trpc.h"
#include "tvariant.h"
#include "tvariant.h"
#include "ttimer.h"
namespace
{
namespace
{
...
@@ -150,6 +151,7 @@ void ctgTestInitLogFile() {
...
@@ -150,6 +151,7 @@ void ctgTestInitLogFile() {
tsAsyncLog
=
0
;
tsAsyncLog
=
0
;
qDebugFlag
=
159
;
qDebugFlag
=
159
;
tmrDebugFlag
=
159
;
strcpy
(
tsLogDir
,
TD_LOG_DIR_PATH
);
strcpy
(
tsLogDir
,
TD_LOG_DIR_PATH
);
ctgdEnableDebug
(
"api"
,
true
);
ctgdEnableDebug
(
"api"
,
true
);
...
@@ -1746,6 +1748,8 @@ TEST(tableMeta, updateStbMeta) {
...
@@ -1746,6 +1748,8 @@ TEST(tableMeta, updateStbMeta) {
code
=
catalogUpdateTableMeta
(
pCtg
,
&
rsp
);
code
=
catalogUpdateTableMeta
(
pCtg
,
&
rsp
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogAsyncUpdateTableMeta
(
pCtg
,
&
rsp
);
ASSERT_EQ
(
code
,
0
);
taosMemoryFreeClear
(
rsp
.
pSchemas
);
taosMemoryFreeClear
(
rsp
.
pSchemas
);
while
(
true
)
{
while
(
true
)
{
...
...
source/libs/command/src/command.c
浏览文件 @
42c65898
...
@@ -748,7 +748,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
...
@@ -748,7 +748,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
return
terrno
;
return
terrno
;
}
}
if
(
taos
Set
Cfg
(
tsCfg
,
pStmt
->
config
))
{
if
(
taos
ApplyLocal
Cfg
(
tsCfg
,
pStmt
->
config
))
{
return
terrno
;
return
terrno
;
}
}
...
...
source/util/src/ttimer.c
浏览文件 @
42c65898
...
@@ -113,7 +113,7 @@ typedef struct time_wheel_t {
...
@@ -113,7 +113,7 @@ typedef struct time_wheel_t {
static
int32_t
tsMaxTmrCtrl
=
TSDB_MAX_VNODES_PER_DB
+
100
;
static
int32_t
tsMaxTmrCtrl
=
TSDB_MAX_VNODES_PER_DB
+
100
;
static
TdThreadOnce
tmrModuleInit
=
PTHREAD_ONCE_INIT
;
static
int32_t
tmrModuleInit
=
0
;
static
TdThreadMutex
tmrCtrlMutex
;
static
TdThreadMutex
tmrCtrlMutex
;
static
tmr_ctrl_t
*
tmrCtrls
;
static
tmr_ctrl_t
*
tmrCtrls
;
static
tmr_ctrl_t
*
unusedTmrCtrl
=
NULL
;
static
tmr_ctrl_t
*
unusedTmrCtrl
=
NULL
;
...
@@ -512,11 +512,11 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han
...
@@ -512,11 +512,11 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han
return
stopped
;
return
stopped
;
}
}
static
void
taosTmrModuleInit
(
void
)
{
static
int32_t
taosTmrModuleInit
(
void
)
{
tmrCtrls
=
taosMemoryMalloc
(
sizeof
(
tmr_ctrl_t
)
*
tsMaxTmrCtrl
);
tmrCtrls
=
taosMemoryMalloc
(
sizeof
(
tmr_ctrl_t
)
*
tsMaxTmrCtrl
);
if
(
tmrCtrls
==
NULL
)
{
if
(
tmrCtrls
==
NULL
)
{
tmrError
(
"failed to allocate memory for timer controllers."
);
tmrError
(
"failed to allocate memory for timer controllers."
);
return
;
return
-
1
;
}
}
memset
(
&
timerMap
,
0
,
sizeof
(
timerMap
));
memset
(
&
timerMap
,
0
,
sizeof
(
timerMap
));
...
@@ -535,14 +535,14 @@ static void taosTmrModuleInit(void) {
...
@@ -535,14 +535,14 @@ static void taosTmrModuleInit(void) {
time_wheel_t
*
wheel
=
wheels
+
i
;
time_wheel_t
*
wheel
=
wheels
+
i
;
if
(
taosThreadMutexInit
(
&
wheel
->
mutex
,
NULL
)
!=
0
)
{
if
(
taosThreadMutexInit
(
&
wheel
->
mutex
,
NULL
)
!=
0
)
{
tmrError
(
"failed to create the mutex for wheel, reason:%s"
,
strerror
(
errno
));
tmrError
(
"failed to create the mutex for wheel, reason:%s"
,
strerror
(
errno
));
return
;
return
-
1
;
}
}
wheel
->
nextScanAt
=
now
+
wheel
->
resolution
;
wheel
->
nextScanAt
=
now
+
wheel
->
resolution
;
wheel
->
index
=
0
;
wheel
->
index
=
0
;
wheel
->
slots
=
(
tmr_obj_t
**
)
taosMemoryCalloc
(
wheel
->
size
,
sizeof
(
tmr_obj_t
*
));
wheel
->
slots
=
(
tmr_obj_t
**
)
taosMemoryCalloc
(
wheel
->
size
,
sizeof
(
tmr_obj_t
*
));
if
(
wheel
->
slots
==
NULL
)
{
if
(
wheel
->
slots
==
NULL
)
{
tmrError
(
"failed to allocate wheel slots"
);
tmrError
(
"failed to allocate wheel slots"
);
return
;
return
-
1
;
}
}
timerMap
.
size
+=
wheel
->
size
;
timerMap
.
size
+=
wheel
->
size
;
}
}
...
@@ -551,20 +551,48 @@ static void taosTmrModuleInit(void) {
...
@@ -551,20 +551,48 @@ static void taosTmrModuleInit(void) {
timerMap
.
slots
=
(
timer_list_t
*
)
taosMemoryCalloc
(
timerMap
.
size
,
sizeof
(
timer_list_t
));
timerMap
.
slots
=
(
timer_list_t
*
)
taosMemoryCalloc
(
timerMap
.
size
,
sizeof
(
timer_list_t
));
if
(
timerMap
.
slots
==
NULL
)
{
if
(
timerMap
.
slots
==
NULL
)
{
tmrError
(
"failed to allocate hash map"
);
tmrError
(
"failed to allocate hash map"
);
return
;
return
-
1
;
}
}
tmrQhandle
=
taosInitScheduler
(
10000
,
taosTmrThreads
,
"tmr"
,
NULL
);
tmrQhandle
=
taosInitScheduler
(
10000
,
taosTmrThreads
,
"tmr"
,
NULL
);
taosInitTimer
(
taosTimerLoopFunc
,
MSECONDS_PER_TICK
);
taosInitTimer
(
taosTimerLoopFunc
,
MSECONDS_PER_TICK
);
tmrDebug
(
"timer module is initialized, number of threads: %d"
,
taosTmrThreads
);
tmrDebug
(
"timer module is initialized, number of threads: %d"
,
taosTmrThreads
);
return
2
;
}
static
int32_t
taosTmrInitModule
(
void
)
{
if
(
atomic_load_32
(
&
tmrModuleInit
)
==
2
)
{
return
0
;
}
if
(
atomic_load_32
(
&
tmrModuleInit
)
<
0
)
{
return
-
1
;
}
while
(
true
)
{
if
(
0
==
atomic_val_compare_exchange_32
(
&
tmrModuleInit
,
0
,
1
))
{
atomic_store_32
(
&
tmrModuleInit
,
taosTmrModuleInit
());
}
else
if
(
atomic_load_32
(
&
tmrModuleInit
)
<
0
)
{
return
-
1
;
}
else
if
(
atomic_load_32
(
&
tmrModuleInit
)
==
2
)
{
return
0
;
}
else
{
taosMsleep
(
1
);
}
}
return
-
1
;
}
}
void
*
taosTmrInit
(
int32_t
maxNumOfTmrs
,
int32_t
resolution
,
int32_t
longest
,
const
char
*
label
)
{
void
*
taosTmrInit
(
int32_t
maxNumOfTmrs
,
int32_t
resolution
,
int32_t
longest
,
const
char
*
label
)
{
const
char
*
ret
=
taosMonotonicInit
();
const
char
*
ret
=
taosMonotonicInit
();
tmrDebug
(
"ttimer monotonic clock source:%s"
,
ret
);
tmrDebug
(
"ttimer monotonic clock source:%s"
,
ret
);
taosThreadOnce
(
&
tmrModuleInit
,
taosTmrModuleInit
);
if
(
taosTmrInitModule
()
<
0
)
{
return
NULL
;
}
taosThreadMutexLock
(
&
tmrCtrlMutex
);
taosThreadMutexLock
(
&
tmrCtrlMutex
);
tmr_ctrl_t
*
ctrl
=
unusedTmrCtrl
;
tmr_ctrl_t
*
ctrl
=
unusedTmrCtrl
;
...
@@ -581,6 +609,7 @@ void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, con
...
@@ -581,6 +609,7 @@ void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, con
}
}
tstrncpy
(
ctrl
->
label
,
label
,
sizeof
(
ctrl
->
label
));
tstrncpy
(
ctrl
->
label
,
label
,
sizeof
(
ctrl
->
label
));
tmrDebug
(
"%s timer controller is initialized, number of timer controllers: %d."
,
label
,
numOfTmrCtrl
);
tmrDebug
(
"%s timer controller is initialized, number of timer controllers: %d."
,
label
,
numOfTmrCtrl
);
return
ctrl
;
return
ctrl
;
}
}
...
@@ -629,8 +658,6 @@ void taosTmrCleanUp(void* handle) {
...
@@ -629,8 +658,6 @@ void taosTmrCleanUp(void* handle) {
tmrCtrls
=
NULL
;
tmrCtrls
=
NULL
;
unusedTmrCtrl
=
NULL
;
unusedTmrCtrl
=
NULL
;
#if defined(LINUX)
atomic_store_32
(
&
tmrModuleInit
,
0
);
tmrModuleInit
=
PTHREAD_ONCE_INIT
;
// to support restart
#endif
}
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录