Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
17f62750
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
17f62750
编写于
6月 13, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add sma cache
上级
31f3bed3
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
124 addition
and
60 deletion
+124
-60
include/common/tmsg.h
include/common/tmsg.h
+9
-7
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+4
-3
source/client/src/clientHb.c
source/client/src/clientHb.c
+10
-9
source/common/src/tmsg.c
source/common/src/tmsg.c
+5
-3
source/dnode/mnode/impl/inc/mndStb.h
source/dnode/mnode/impl/inc/mndStb.h
+1
-1
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+1
-1
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+14
-3
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+38
-13
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+2
-2
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+31
-9
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+4
-4
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+4
-4
未找到文件。
include/common/tmsg.h
浏览文件 @
17f62750
...
...
@@ -1136,12 +1136,13 @@ int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp
void
tFreeSTableMetaRsp
(
STableMetaRsp
*
pRsp
);
typedef
struct
{
SArray
*
pArray
;
// Array of STableMetaRsp
}
STableMetaBatchRsp
;
SArray
*
pArray
;
// Array of STableMetaRsp
STableIndexRsp
*
pIndexRsp
;
}
SSTbHbRsp
;
int32_t
tSerializeS
TableMetaBatchRsp
(
void
*
buf
,
int32_t
bufLen
,
STableMetaBatch
Rsp
*
pRsp
);
int32_t
tDeserializeS
TableMetaBatchRsp
(
void
*
buf
,
int32_t
bufLen
,
STableMetaBatch
Rsp
*
pRsp
);
void
tFreeS
TableMetaBatchRsp
(
STableMetaBatch
Rsp
*
pRsp
);
int32_t
tSerializeS
STbHbRsp
(
void
*
buf
,
int32_t
bufLen
,
SSTbHb
Rsp
*
pRsp
);
int32_t
tDeserializeS
STbHbRsp
(
void
*
buf
,
int32_t
bufLen
,
SSTbHb
Rsp
*
pRsp
);
void
tFreeS
STbHbRsp
(
SSTbHb
Rsp
*
pRsp
);
typedef
struct
{
int32_t
numOfTables
;
...
...
@@ -2513,8 +2514,9 @@ typedef struct {
}
STableIndexInfo
;
typedef
struct
{
int32_t
version
;
SArray
*
pIndex
;
uint64_t
suid
;
int32_t
version
;
SArray
*
pIndex
;
}
STableIndexRsp
;
int32_t
tSerializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
STableIndexRsp
*
pRsp
);
...
...
include/libs/catalog/catalog.h
浏览文件 @
17f62750
...
...
@@ -98,14 +98,15 @@ typedef struct SCatalogCfg {
uint32_t
stbRentSec
;
}
SCatalogCfg
;
typedef
struct
SSTable
Meta
Version
{
typedef
struct
SSTableVersion
{
char
dbFName
[
TSDB_DB_FNAME_LEN
];
char
stbName
[
TSDB_TABLE_NAME_LEN
];
uint64_t
dbId
;
uint64_t
suid
;
int16_t
sversion
;
int16_t
tversion
;
}
SSTableMetaVersion
;
int32_t
smaVer
;
}
SSTableVersion
;
typedef
struct
SDbVgVersion
{
char
dbFName
[
TSDB_DB_FNAME_LEN
];
...
...
@@ -269,7 +270,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t
int32_t
catalogGetQnodeList
(
SCatalog
*
pCatalog
,
SRequestConnInfo
*
pConn
,
SArray
*
pQnodeList
);
int32_t
catalogGetExpiredSTables
(
SCatalog
*
pCatalog
,
SSTable
Meta
Version
**
stables
,
uint32_t
*
num
);
int32_t
catalogGetExpiredSTables
(
SCatalog
*
pCatalog
,
SSTableVersion
**
stables
,
uint32_t
*
num
);
int32_t
catalogGetExpiredDBs
(
SCatalog
*
pCatalog
,
SDbVgVersion
**
dbs
,
uint32_t
*
num
);
...
...
source/client/src/clientHb.c
浏览文件 @
17f62750
...
...
@@ -99,15 +99,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
static
int32_t
hbProcessStbInfoRsp
(
void
*
value
,
int32_t
valueLen
,
struct
SCatalog
*
pCatalog
)
{
int32_t
code
=
0
;
S
TableMetaBatchRsp
batchMeta
Rsp
=
{
0
};
if
(
tDeserializeS
TableMetaBatchRsp
(
value
,
valueLen
,
&
batchMeta
Rsp
)
!=
0
)
{
S
STbHbRsp
hb
Rsp
=
{
0
};
if
(
tDeserializeS
STbHbRsp
(
value
,
valueLen
,
&
hb
Rsp
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
int32_t
numOfBatchs
=
taosArrayGetSize
(
batchMeta
Rsp
.
pArray
);
int32_t
numOfBatchs
=
taosArrayGetSize
(
hb
Rsp
.
pArray
);
for
(
int32_t
i
=
0
;
i
<
numOfBatchs
;
++
i
)
{
STableMetaRsp
*
rsp
=
taosArrayGet
(
batchMeta
Rsp
.
pArray
,
i
);
STableMetaRsp
*
rsp
=
taosArrayGet
(
hb
Rsp
.
pArray
,
i
);
if
(
rsp
->
numOfColumns
<
0
)
{
tscDebug
(
"hb remove stb, db:%s, stb:%s"
,
rsp
->
dbFName
,
rsp
->
stbName
);
...
...
@@ -116,7 +116,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
tscDebug
(
"hb update stb, db:%s, stb:%s"
,
rsp
->
dbFName
,
rsp
->
stbName
);
if
(
rsp
->
pSchemas
[
0
].
colId
!=
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
tscError
(
"invalid colId[%"
PRIi16
"] for the first column in table meta rsp msg"
,
rsp
->
pSchemas
[
0
].
colId
);
tFreeS
TableMetaBatchRsp
(
&
batchMeta
Rsp
);
tFreeS
STbHbRsp
(
&
hb
Rsp
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
...
...
@@ -124,7 +124,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
}
}
tFreeS
TableMetaBatchRsp
(
&
batchMeta
Rsp
);
tFreeS
STbHbRsp
(
&
hb
Rsp
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -455,7 +455,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
}
int32_t
hbGetExpiredStbInfo
(
SClientHbKey
*
connKey
,
struct
SCatalog
*
pCatalog
,
SClientHbReq
*
req
)
{
SSTable
Meta
Version
*
stbs
=
NULL
;
SSTableVersion
*
stbs
=
NULL
;
uint32_t
stbNum
=
0
;
int32_t
code
=
0
;
...
...
@@ -469,15 +469,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
}
for
(
int32_t
i
=
0
;
i
<
stbNum
;
++
i
)
{
SSTable
Meta
Version
*
stb
=
&
stbs
[
i
];
SSTableVersion
*
stb
=
&
stbs
[
i
];
stb
->
suid
=
htobe64
(
stb
->
suid
);
stb
->
sversion
=
htons
(
stb
->
sversion
);
stb
->
tversion
=
htons
(
stb
->
tversion
);
stb
->
smaVer
=
htonl
(
stb
->
smaVer
);
}
SKv
kv
=
{
.
key
=
HEARTBEAT_KEY_STBINFO
,
.
valueLen
=
sizeof
(
SSTable
Meta
Version
)
*
stbNum
,
.
valueLen
=
sizeof
(
SSTableVersion
)
*
stbNum
,
.
value
=
stbs
,
};
...
...
source/common/src/tmsg.c
浏览文件 @
17f62750
...
...
@@ -2437,6 +2437,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pRsp
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
version
)
<
0
)
return
-
1
;
int32_t
num
=
taosArrayGetSize
(
pRsp
->
pIndex
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
...
...
@@ -2472,6 +2473,7 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pRsp
->
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
version
)
<
0
)
return
-
1
;
int32_t
num
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
...
...
@@ -2632,7 +2634,7 @@ int32_t tSerializeSTableMetaRsp(void *buf, int32_t bufLen, STableMetaRsp *pRsp)
return
tlen
;
}
int32_t
tSerializeS
TableMetaBatchRsp
(
void
*
buf
,
int32_t
bufLen
,
STableMetaBatch
Rsp
*
pRsp
)
{
int32_t
tSerializeS
STbHbRsp
(
void
*
buf
,
int32_t
bufLen
,
SSTbHb
Rsp
*
pRsp
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
...
...
@@ -2663,7 +2665,7 @@ int32_t tDeserializeSTableMetaRsp(void *buf, int32_t bufLen, STableMetaRsp *pRsp
return
0
;
}
int32_t
tDeserializeS
TableMetaBatchRsp
(
void
*
buf
,
int32_t
bufLen
,
STableMetaBatch
Rsp
*
pRsp
)
{
int32_t
tDeserializeS
STbHbRsp
(
void
*
buf
,
int32_t
bufLen
,
SSTbHb
Rsp
*
pRsp
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
...
...
@@ -2691,7 +2693,7 @@ int32_t tDeserializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatc
void
tFreeSTableMetaRsp
(
STableMetaRsp
*
pRsp
)
{
taosMemoryFreeClear
(
pRsp
->
pSchemas
);
}
void
tFreeS
TableMetaBatchRsp
(
STableMetaBatch
Rsp
*
pRsp
)
{
void
tFreeS
STbHbRsp
(
SSTbHb
Rsp
*
pRsp
)
{
int32_t
numOfBatch
=
taosArrayGetSize
(
pRsp
->
pArray
);
for
(
int32_t
i
=
0
;
i
<
numOfBatch
;
++
i
)
{
STableMetaRsp
*
pMetaRsp
=
taosArrayGet
(
pRsp
->
pArray
,
i
);
...
...
source/dnode/mnode/impl/inc/mndStb.h
浏览文件 @
17f62750
...
...
@@ -27,7 +27,7 @@ void mndCleanupStb(SMnode *pMnode);
SStbObj
*
mndAcquireStb
(
SMnode
*
pMnode
,
char
*
stbName
);
void
mndReleaseStb
(
SMnode
*
pMnode
,
SStbObj
*
pStb
);
SSdbRaw
*
mndStbActionEncode
(
SStbObj
*
pStb
);
int32_t
mndValidateStbInfo
(
SMnode
*
pMnode
,
SSTable
Meta
Version
*
pStbs
,
int32_t
numOfStbs
,
void
**
ppRsp
,
int32_t
mndValidateStbInfo
(
SMnode
*
pMnode
,
SSTableVersion
*
pStbs
,
int32_t
numOfStbs
,
void
**
ppRsp
,
int32_t
*
pRspLen
);
int32_t
mndGetNumOfStbs
(
SMnode
*
pMnode
,
char
*
dbName
,
int32_t
*
pNumOfStbs
);
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
17f62750
...
...
@@ -433,7 +433,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
case
HEARTBEAT_KEY_STBINFO
:
{
void
*
rspMsg
=
NULL
;
int32_t
rspLen
=
0
;
mndValidateStbInfo
(
pMnode
,
kv
->
value
,
kv
->
valueLen
/
sizeof
(
SSTable
Meta
Version
),
&
rspMsg
,
&
rspLen
);
mndValidateStbInfo
(
pMnode
,
kv
->
value
,
kv
->
valueLen
/
sizeof
(
SSTableVersion
),
&
rspMsg
,
&
rspLen
);
if
(
rspMsg
&&
rspLen
>
0
)
{
SKv
kv1
=
{.
key
=
HEARTBEAT_KEY_STBINFO
,
.
valueLen
=
rspLen
,
.
value
=
rspMsg
};
taosArrayPush
(
hbRsp
.
info
,
&
kv1
);
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
17f62750
...
...
@@ -872,18 +872,29 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
return
code
;
}
static
int32_t
mndGetTableSma
(
SMnode
*
pMnode
,
STableIndexReq
*
indexReq
,
STableIndexRsp
*
rsp
,
bool
*
exist
)
{
static
int32_t
mndGetTableSma
(
SMnode
*
pMnode
,
char
*
tbFName
,
STableIndexRsp
*
rsp
,
bool
*
exist
)
{
int32_t
code
=
0
;
SSmaObj
*
pSma
=
NULL
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
STableIndexInfo
info
;
SStbObj
*
pStb
=
mndAcquireStb
(
pMnode
,
tbFName
);
if
(
NULL
==
pStb
)
{
*
exist
=
false
;
return
TSDB_CODE_SUCCESS
;
}
rsp
->
suid
=
pStb
->
uid
;
rsp
->
version
=
pStb
->
smaVer
;
mndReleaseStb
(
pMnode
,
pStb
);
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_SMA
,
pIter
,
(
void
**
)
&
pSma
);
if
(
pIter
==
NULL
)
break
;
if
(
pSma
->
stb
[
0
]
!=
indexReq
->
tbFName
[
0
]
||
strcmp
(
pSma
->
stb
,
indexReq
->
tbFName
))
{
if
(
pSma
->
stb
[
0
]
!=
tbFName
[
0
]
||
strcmp
(
pSma
->
stb
,
tbFName
))
{
continue
;
}
...
...
@@ -995,7 +1006,7 @@ static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
goto
_OVER
;
}
code
=
mndGetTableSma
(
pMnode
,
&
indexReq
,
&
rsp
,
&
exist
);
code
=
mndGetTableSma
(
pMnode
,
indexReq
.
tbFName
,
&
rsp
,
&
exist
);
if
(
code
)
{
goto
_OVER
;
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
17f62750
...
...
@@ -1271,7 +1271,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
return
0
;
}
static
int32_t
mndBuildStbSchema
(
SMnode
*
pMnode
,
const
char
*
dbFName
,
const
char
*
tbName
,
STableMetaRsp
*
pRsp
)
{
static
int32_t
mndBuildStbSchema
(
SMnode
*
pMnode
,
const
char
*
dbFName
,
const
char
*
tbName
,
STableMetaRsp
*
pRsp
,
int32_t
*
smaVer
)
{
char
tbFName
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
snprintf
(
tbFName
,
sizeof
(
tbFName
),
"%s.%s"
,
dbFName
,
tbName
);
...
...
@@ -1288,6 +1288,10 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
return
-
1
;
}
if
(
smaVer
)
{
*
smaVer
=
pStb
->
smaVer
;
}
int32_t
code
=
mndBuildStbSchemaImp
(
pDb
,
pStb
,
tbName
,
pRsp
);
mndReleaseDb
(
pMnode
,
pDb
);
mndReleaseStb
(
pMnode
,
pStb
);
...
...
@@ -1667,51 +1671,72 @@ _OVER:
return
code
;
}
int32_t
mndValidateStbInfo
(
SMnode
*
pMnode
,
SSTable
Meta
Version
*
pStbVersions
,
int32_t
numOfStbs
,
void
**
ppRsp
,
int32_t
mndValidateStbInfo
(
SMnode
*
pMnode
,
SSTableVersion
*
pStbVersions
,
int32_t
numOfStbs
,
void
**
ppRsp
,
int32_t
*
pRspLen
)
{
S
TableMetaBatchRsp
batchMeta
Rsp
=
{
0
};
batchMeta
Rsp
.
pArray
=
taosArrayInit
(
numOfStbs
,
sizeof
(
STableMetaRsp
));
if
(
batchMeta
Rsp
.
pArray
==
NULL
)
{
S
STbHbRsp
hb
Rsp
=
{
0
};
hb
Rsp
.
pArray
=
taosArrayInit
(
numOfStbs
,
sizeof
(
STableMetaRsp
));
if
(
hb
Rsp
.
pArray
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
hbRsp
.
pIndexRsp
=
taosMemoryCalloc
(
1
,
sizeof
(
STableIndexRsp
));
if
(
NULL
==
hbRsp
.
pIndexRsp
)
{
taosArrayDestroy
(
hbRsp
.
pArray
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
numOfStbs
;
++
i
)
{
SSTable
Meta
Version
*
pStbVersion
=
&
pStbVersions
[
i
];
SSTableVersion
*
pStbVersion
=
&
pStbVersions
[
i
];
pStbVersion
->
suid
=
be64toh
(
pStbVersion
->
suid
);
pStbVersion
->
sversion
=
ntohs
(
pStbVersion
->
sversion
);
pStbVersion
->
tversion
=
ntohs
(
pStbVersion
->
tversion
);
pStbVersion
->
smaVer
=
ntohl
(
pStbVersion
->
smaVer
);
STableMetaRsp
metaRsp
=
{
0
};
int32_t
smaVer
=
0
;
mDebug
(
"stb:%s.%s, start to retrieve meta"
,
pStbVersion
->
dbFName
,
pStbVersion
->
stbName
);
if
(
mndBuildStbSchema
(
pMnode
,
pStbVersion
->
dbFName
,
pStbVersion
->
stbName
,
&
metaRsp
)
!=
0
)
{
if
(
mndBuildStbSchema
(
pMnode
,
pStbVersion
->
dbFName
,
pStbVersion
->
stbName
,
&
metaRsp
,
&
smaVer
)
!=
0
)
{
metaRsp
.
numOfColumns
=
-
1
;
metaRsp
.
suid
=
pStbVersion
->
suid
;
taosArrayPush
(
hbRsp
.
pArray
,
&
metaRsp
);
continue
;
}
if
(
pStbVersion
->
sversion
!=
metaRsp
.
sversion
||
pStbVersion
->
tversion
!=
metaRsp
.
tversion
)
{
taosArrayPush
(
batchMeta
Rsp
.
pArray
,
&
metaRsp
);
taosArrayPush
(
hb
Rsp
.
pArray
,
&
metaRsp
);
}
else
{
tFreeSTableMetaRsp
(
&
metaRsp
);
}
if
(
pStbVersion
->
smaVer
!=
smaVer
)
{
bool
exist
=
false
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
sprintf
(
tbFName
,
"%s.%s"
,
pStbVersion
->
dbFName
,
pStbVersion
->
stbName
);
int32_t
code
=
mndGetTableSma
(
pMnode
,
tbFName
,
hbRsp
.
pIndexRsp
,
&
exist
);
if
(
code
||
!
exist
)
{
taosMemoryFreeClear
(
hbRsp
.
pIndexRsp
);
}
}
}
int32_t
rspLen
=
tSerializeS
TableMetaBatchRsp
(
NULL
,
0
,
&
batchMeta
Rsp
);
int32_t
rspLen
=
tSerializeS
STbHbRsp
(
NULL
,
0
,
&
hb
Rsp
);
if
(
rspLen
<
0
)
{
tFreeS
TableMetaBatchRsp
(
&
batchMeta
Rsp
);
tFreeS
STbHbRsp
(
&
hb
Rsp
);
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
void
*
pRsp
=
taosMemoryMalloc
(
rspLen
);
if
(
pRsp
==
NULL
)
{
tFreeS
TableMetaBatchRsp
(
&
batchMeta
Rsp
);
tFreeS
STbHbRsp
(
&
hb
Rsp
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
tSerializeS
TableMetaBatchRsp
(
pRsp
,
rspLen
,
&
batchMeta
Rsp
);
tFreeS
TableMetaBatchRsp
(
&
batchMeta
Rsp
);
tSerializeS
STbHbRsp
(
pRsp
,
rspLen
,
&
hb
Rsp
);
tFreeS
STbHbRsp
(
&
hb
Rsp
);
*
ppRsp
=
pRsp
;
*
pRspLen
=
rspLen
;
return
0
;
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
17f62750
...
...
@@ -156,7 +156,7 @@ typedef struct SCtgDBCache {
typedef
struct
SCtgRentSlot
{
SRWLatch
lock
;
bool
needSort
;
SArray
*
meta
;
// element is SDbVgVersion or SSTable
Meta
Version
SArray
*
meta
;
// element is SDbVgVersion or SSTableVersion
}
SCtgRentSlot
;
typedef
struct
SCtgRentMgmt
{
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
17f62750
...
...
@@ -1074,14 +1074,14 @@ _return:
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
int32_t
catalogGetExpiredSTables
(
SCatalog
*
pCtg
,
SSTable
Meta
Version
**
stables
,
uint32_t
*
num
)
{
int32_t
catalogGetExpiredSTables
(
SCatalog
*
pCtg
,
SSTableVersion
**
stables
,
uint32_t
*
num
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
stables
||
NULL
==
num
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_API_LEAVE
(
ctgMetaRentGet
(
&
pCtg
->
stbRent
,
(
void
**
)
stables
,
num
,
sizeof
(
SSTable
Meta
Version
)));
CTG_API_LEAVE
(
ctgMetaRentGet
(
&
pCtg
->
stbRent
,
(
void
**
)
stables
,
num
,
sizeof
(
SSTableVersion
)));
}
int32_t
catalogGetExpiredDBs
(
SCatalog
*
pCtg
,
SDbVgVersion
**
dbs
,
uint32_t
*
num
)
{
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
17f62750
...
...
@@ -970,7 +970,7 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
void
*
orig
=
taosArraySearch
(
slot
->
meta
,
&
id
,
searchCompare
,
TD_EQ
);
if
(
NULL
==
orig
)
{
q
Error
(
"meta not found in slot, id:%"
PRIx64
", slot idx:%d, type:%d, size:%d"
,
id
,
widx
,
mgmt
->
type
,
(
int32_t
)
taosArrayGetSize
(
slot
->
meta
));
q
Debug
(
"meta not found in slot, id:%"
PRIx64
", slot idx:%d, type:%d, size:%d"
,
id
,
widx
,
mgmt
->
type
,
(
int32_t
)
taosArrayGetSize
(
slot
->
meta
));
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
...
...
@@ -983,7 +983,7 @@ _return:
CTG_UNLOCK
(
CTG_WRITE
,
&
slot
->
lock
);
if
(
code
)
{
q
Warn
(
"meta in rent update failed, will try to add it, code:%x, id:%"
PRIx64
", slot idx:%d, type:%d"
,
code
,
id
,
widx
,
mgmt
->
type
);
q
Debug
(
"meta in rent update failed, will try to add it, code:%x, id:%"
PRIx64
", slot idx:%d, type:%d"
,
code
,
id
,
widx
,
mgmt
->
type
);
CTG_RET
(
ctgMetaRentAdd
(
mgmt
,
meta
,
id
,
size
));
}
...
...
@@ -1233,6 +1233,27 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgUpdateRentStbVersion
(
SCatalog
*
pCtg
,
char
*
dbFName
,
char
*
tbName
,
uint64_t
dbId
,
uint64_t
suid
,
SCtgTbCache
*
pCache
)
{
SSTableVersion
metaRent
=
{.
dbId
=
dbId
,
.
suid
=
suid
};
if
(
pCache
->
pMeta
)
{
metaRent
.
sversion
=
pCache
->
pMeta
->
sversion
;
metaRent
.
tversion
=
pCache
->
pMeta
->
tversion
;
}
if
(
pCache
->
pIndex
)
{
metaRent
.
smaVer
=
pCache
->
pIndex
->
version
;
}
strcpy
(
metaRent
.
dbFName
,
dbFName
);
strcpy
(
metaRent
.
stbName
,
tbName
);
CTG_ERR_RET
(
ctgMetaRentUpdate
(
&
pCtg
->
stbRent
,
&
metaRent
,
metaRent
.
suid
,
sizeof
(
SSTableVersion
),
ctgStbVersionSortCompare
,
ctgStbVersionSearchCompare
));
ctgDebug
(
"db %s,%"
PRIx64
" stb %s,%"
PRIx64
" sver %d tver %d smaVer %d updated to stbRent"
,
dbFName
,
dbId
,
tbName
,
suid
,
metaRent
.
sversion
,
metaRent
.
tversion
,
metaRent
.
smaVer
);
}
int32_t
ctgWriteTbMetaToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
char
*
dbFName
,
uint64_t
dbId
,
char
*
tbName
,
STableMeta
*
meta
,
int32_t
metaSize
)
{
if
(
NULL
==
dbCache
->
tbCache
||
NULL
==
dbCache
->
stbCache
)
{
taosMemoryFree
(
meta
);
...
...
@@ -1263,7 +1284,6 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
ctgDebug
(
"stb removed from stbCache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
}
ctgMetaRentRemove
(
&
pCtg
->
stbRent
,
orig
->
suid
,
ctgStbVersionSortCompare
,
ctgStbVersionSearchCompare
);
origSuid
=
orig
->
suid
;
}
}
...
...
@@ -1303,15 +1323,12 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
ctgDebug
(
"stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d"
,
dbFName
,
tbName
,
meta
->
tableType
);
SSTableMetaVersion
metaRent
=
{.
dbId
=
dbId
,
.
suid
=
meta
->
suid
,
.
sversion
=
meta
->
sversion
,
.
tversion
=
meta
->
tversion
};
strcpy
(
metaRent
.
dbFName
,
dbFName
);
strcpy
(
metaRent
.
stbName
,
tbName
);
CTG_ERR_RET
(
ctgMetaRentAdd
(
&
pCtg
->
stbRent
,
&
metaRent
,
metaRent
.
suid
,
sizeof
(
SSTableMetaVersion
)));
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbId
,
meta
->
suid
,
pCache
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgWriteTbIndexToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
char
*
tbName
,
STableIndex
**
index
)
{
int32_t
ctgWriteTbIndexToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
char
*
dbFName
,
char
*
tbName
,
STableIndex
**
index
)
{
if
(
NULL
==
dbCache
->
tbCache
)
{
taosMemoryFreeClear
(
*
index
);
ctgError
(
"db is dropping, dbId:%"
PRIx64
,
dbCache
->
dbId
);
...
...
@@ -1332,6 +1349,9 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *tbNam
*
index
=
NULL
;
ctgDebug
(
"table %s index updated to cache, ver:%d, num:%d"
,
tbName
,
pIndex
->
version
,
taosArrayGetSize
(
pIndex
->
pIndex
));
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbCache
->
dbId
,
pIndex
->
suid
,
pCache
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1344,6 +1364,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *tbNam
*
index
=
NULL
;
ctgDebug
(
"table %s index updated to cache, ver:%d, num:%d"
,
tbName
,
pIndex
->
version
,
taosArrayGetSize
(
pIndex
->
pIndex
));
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbCache
->
dbId
,
pIndex
->
suid
,
pCache
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1728,7 +1750,7 @@ int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
CTG_ERR_JRET
(
code
);
}
CTG_ERR_JRET
(
ctgWriteTbIndexToCache
(
pCtg
,
dbCache
,
msg
->
tbName
,
&
pIndex
));
CTG_ERR_JRET
(
ctgWriteTbIndexToCache
(
pCtg
,
dbCache
,
msg
->
dbFName
,
msg
->
tbName
,
&
pIndex
));
_return:
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
17f62750
...
...
@@ -546,9 +546,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
}
int32_t
ctgStbVersionSearchCompare
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(
*
(
uint64_t
*
)
key1
<
((
SSTable
Meta
Version
*
)
key2
)
->
suid
)
{
if
(
*
(
uint64_t
*
)
key1
<
((
SSTableVersion
*
)
key2
)
->
suid
)
{
return
-
1
;
}
else
if
(
*
(
uint64_t
*
)
key1
>
((
SSTable
Meta
Version
*
)
key2
)
->
suid
)
{
}
else
if
(
*
(
uint64_t
*
)
key1
>
((
SSTableVersion
*
)
key2
)
->
suid
)
{
return
1
;
}
else
{
return
0
;
...
...
@@ -566,9 +566,9 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
}
int32_t
ctgStbVersionSortCompare
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(((
SSTable
MetaVersion
*
)
key1
)
->
suid
<
((
SSTableMeta
Version
*
)
key2
)
->
suid
)
{
if
(((
SSTable
Version
*
)
key1
)
->
suid
<
((
SSTable
Version
*
)
key2
)
->
suid
)
{
return
-
1
;
}
else
if
(((
SSTable
MetaVersion
*
)
key1
)
->
suid
>
((
SSTableMeta
Version
*
)
key2
)
->
suid
)
{
}
else
if
(((
SSTable
Version
*
)
key1
)
->
suid
>
((
SSTable
Version
*
)
key2
)
->
suid
)
{
return
1
;
}
else
{
return
0
;
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
17f62750
...
...
@@ -989,7 +989,7 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
SDbVgVersion
*
dbs
=
NULL
;
SSTable
Meta
Version
*
stb
=
NULL
;
SSTableVersion
*
stb
=
NULL
;
uint32_t
dbNum
=
0
,
stbNum
=
0
,
allDbNum
=
0
,
allStbNum
=
0
;
int32_t
i
=
0
;
while
(
i
<
5
)
{
...
...
@@ -1098,7 +1098,7 @@ TEST(tableMeta, childTableCase) {
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
SDbVgVersion
*
dbs
=
NULL
;
SSTable
Meta
Version
*
stb
=
NULL
;
SSTableVersion
*
stb
=
NULL
;
uint32_t
dbNum
=
0
,
stbNum
=
0
,
allDbNum
=
0
,
allStbNum
=
0
;
int32_t
i
=
0
;
while
(
i
<
5
)
{
...
...
@@ -1220,7 +1220,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
SDbVgVersion
*
dbs
=
NULL
;
SSTable
Meta
Version
*
stb
=
NULL
;
SSTableVersion
*
stb
=
NULL
;
uint32_t
dbNum
=
0
,
stbNum
=
0
,
allDbNum
=
0
,
allStbNum
=
0
;
int32_t
i
=
0
;
while
(
i
<
5
)
{
...
...
@@ -2299,7 +2299,7 @@ TEST(rentTest, allRent) {
SArray
*
vgList
=
NULL
;
ctgTestStop
=
false
;
SDbVgVersion
*
dbs
=
NULL
;
SSTable
Meta
Version
*
stable
=
NULL
;
SSTableVersion
*
stable
=
NULL
;
uint32_t
num
=
0
;
ctgTestInitLogFile
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录