Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
de439d47
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
de439d47
编写于
2月 06, 2022
作者:
D
dapan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
dd0a08c9
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
323 addition
and
26 deletion
+323
-26
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+5
-1
source/client/src/clientHb.c
source/client/src/clientHb.c
+122
-5
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+1
-1
source/dnode/mnode/impl/inc/mndStb.h
source/dnode/mnode/impl/inc/mndStb.h
+3
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+3
-3
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+10
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+104
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+68
-7
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+5
-4
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
de439d47
...
...
@@ -53,13 +53,15 @@ typedef struct SCatalogCfg {
}
SCatalogCfg
;
typedef
struct
SSTableMetaVersion
{
char
dbFName
[
TSDB_DB_FNAME_LEN
];
char
stbName
[
TSDB_TABLE_NAME_LEN
];
uint64_t
suid
;
int16_t
sversion
;
int16_t
tversion
;
}
SSTableMetaVersion
;
typedef
struct
SDbVgVersion
{
char
dbName
[
TSDB_DB_FNAME_LEN
];
char
db
F
Name
[
TSDB_DB_FNAME_LEN
];
int64_t
dbId
;
int32_t
vgVersion
;
}
SDbVgVersion
;
...
...
@@ -101,6 +103,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
int32_t
catalogRemoveDB
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
uint64_t
dbId
);
int32_t
catalogRemoveSTableMeta
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
const
char
*
stbName
,
uint64_t
suid
);
/**
* Get a table's meta data.
* @param pCatalog (input, got with catalogGetHandle)
...
...
source/client/src/clientHb.c
浏览文件 @
de439d47
...
...
@@ -85,6 +85,75 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
hbProcessStbInfoRsp
(
void
*
value
,
int32_t
valueLen
,
struct
SCatalog
*
pCatalog
)
{
int32_t
msgLen
=
0
;
int32_t
code
=
0
;
int32_t
schemaNum
=
0
;
while
(
msgLen
<
valueLen
)
{
STableMetaRsp
*
rsp
=
(
STableMetaRsp
*
)((
char
*
)
value
+
msgLen
);
rsp
->
numOfColumns
=
ntohl
(
rsp
->
numOfColumns
);
rsp
->
suid
=
be64toh
(
rsp
->
suid
);
if
(
rsp
->
numOfColumns
<
0
)
{
schemaNum
=
0
;
tscDebug
(
"hb remove stb, db:%s, stb:%s"
,
rsp
->
dbFName
,
rsp
->
stbName
);
code
=
catalogRemoveSTableMeta
(
pCatalog
,
rsp
->
dbFName
,
rsp
->
stbName
,
rsp
->
suid
);
}
else
{
rsp
->
numOfTags
=
ntohl
(
rsp
->
numOfTags
);
schemaNum
=
rsp
->
numOfColumns
+
rsp
->
numOfTags
;
/*
rsp->vgNum = ntohl(rsp->vgNum);
rsp->uid = be64toh(rsp->uid);
SDBVgroupInfo vgInfo = {0};
vgInfo.dbId = rsp->uid;
vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgHash) {
tscError("hash init[%d] failed", rsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < rsp->vgNum; ++i) {
rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId);
rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin);
rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd);
for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) {
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
}
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgHash);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
if (code) {
taosHashCleanup(vgInfo.vgHash);
}
*/
}
if
(
code
)
{
return
code
;
}
msgLen
+=
sizeof
(
STableMetaRsp
)
+
schemaNum
*
sizeof
(
SSchema
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
hbQueryHbRspHandle
(
struct
SAppHbMgr
*
pAppHbMgr
,
SClientHbRsp
*
pRsp
)
{
SHbConnInfo
*
info
=
taosHashGet
(
pAppHbMgr
->
connInfo
,
&
pRsp
->
connKey
,
sizeof
(
SClientHbKey
));
if
(
NULL
==
info
)
{
...
...
@@ -117,9 +186,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
hbProcessDBInfoRsp
(
kv
->
value
,
kv
->
valueLen
,
pCatalog
);
break
;
}
case
HEARTBEAT_KEY_STBINFO
:
case
HEARTBEAT_KEY_STBINFO
:{
if
(
kv
->
valueLen
<=
0
||
NULL
==
kv
->
value
)
{
tscError
(
"invalid hb stb info, len:%d, value:%p"
,
kv
->
valueLen
,
kv
->
value
);
break
;
}
int64_t
*
clusterId
=
(
int64_t
*
)
info
->
param
;
struct
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
catalogGetHandle
(
*
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
*
clusterId
,
tstrerror
(
code
));
break
;
}
hbProcessStbInfoRsp
(
kv
->
value
,
kv
->
valueLen
,
pCatalog
);
break
;
}
default:
tscError
(
"invalid hb key type:%d"
,
kv
->
key
);
break
;
...
...
@@ -152,7 +236,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
tfree
(
param
);
if
(
rspNum
)
{
tscDebug
(
"hb got %d rsp, %d empty rsp
prior
"
,
rspNum
,
atomic_val_compare_exchange_32
(
&
emptyRspNum
,
emptyRspNum
,
0
));
tscDebug
(
"hb got %d rsp, %d empty rsp
received before
"
,
rspNum
,
atomic_val_compare_exchange_32
(
&
emptyRspNum
,
emptyRspNum
,
0
));
}
else
{
atomic_add_fetch_32
(
&
emptyRspNum
,
1
);
}
...
...
@@ -199,6 +283,37 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
return
TSDB_CODE_SUCCESS
;
}
int32_t
hbGetExpiredStbInfo
(
SClientHbKey
*
connKey
,
struct
SCatalog
*
pCatalog
,
SClientHbReq
*
req
)
{
SSTableMetaVersion
*
stbs
=
NULL
;
uint32_t
stbNum
=
0
;
int32_t
code
=
0
;
code
=
catalogGetExpiredSTables
(
pCatalog
,
&
stbs
,
&
stbNum
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
if
(
stbNum
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
for
(
int32_t
i
=
0
;
i
<
stbNum
;
++
i
)
{
SSTableMetaVersion
*
stb
=
&
stbs
[
i
];
stb
->
suid
=
htobe64
(
stb
->
suid
);
stb
->
sversion
=
htons
(
stb
->
sversion
);
stb
->
tversion
=
htons
(
stb
->
tversion
);
}
SKv
kv
=
{.
key
=
HEARTBEAT_KEY_STBINFO
,
.
valueLen
=
sizeof
(
SSTableMetaVersion
)
*
stbNum
,
.
value
=
stbs
};
tscDebug
(
"hb got %d expired stb, valueLen:%d"
,
stbNum
,
kv
.
valueLen
);
taosHashPut
(
req
->
info
,
&
kv
.
key
,
sizeof
(
kv
.
key
),
&
kv
,
sizeof
(
kv
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
hbQueryHbReqHandle
(
SClientHbKey
*
connKey
,
void
*
param
,
SClientHbReq
*
req
)
{
int64_t
*
clusterId
=
(
int64_t
*
)
param
;
struct
SCatalog
*
pCatalog
=
NULL
;
...
...
@@ -214,6 +329,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req
return
code
;
}
code
=
hbGetExpiredStbInfo
(
connKey
,
pCatalog
,
req
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -384,7 +504,6 @@ static void hbStopThread() {
}
SAppHbMgr
*
appHbMgrInit
(
SAppInstInfo
*
pAppInstInfo
,
char
*
key
)
{
return
NULL
;
hbMgrInit
();
SAppHbMgr
*
pAppHbMgr
=
malloc
(
sizeof
(
SAppHbMgr
));
if
(
pAppHbMgr
==
NULL
)
{
...
...
@@ -442,7 +561,6 @@ void appHbMgrCleanup(void) {
}
int
hbMgrInit
()
{
return
0
;
// init once
int8_t
old
=
atomic_val_compare_exchange_8
(
&
clientHbMgr
.
inited
,
0
,
1
);
if
(
old
==
1
)
return
0
;
...
...
@@ -499,7 +617,6 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
}
int
hbRegisterConn
(
SAppHbMgr
*
pAppHbMgr
,
int32_t
connId
,
int64_t
clusterId
,
int32_t
hbType
)
{
return
0
;
SClientHbKey
connKey
=
{.
connId
=
connId
,
.
hbType
=
HEARTBEAT_TYPE_QUERY
};
SHbConnInfo
info
=
{
0
};
...
...
source/client/src/clientImpl.c
浏览文件 @
de439d47
...
...
@@ -109,7 +109,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo
*
p
=
calloc
(
1
,
sizeof
(
struct
SAppInstInfo
));
p
->
mgmtEp
=
epSet
;
p
->
pTransporter
=
openTransporter
(
user
,
secretEncrypt
,
tsNumOfCores
);
/*p->pAppHbMgr = appHbMgrInit(p, key);*/
p
->
pAppHbMgr
=
appHbMgrInit
(
p
,
key
);
taosHashPut
(
appInfo
.
pInstMap
,
key
,
strlen
(
key
),
&
p
,
POINTER_BYTES
);
pInst
=
&
p
;
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
de439d47
...
...
@@ -76,7 +76,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj
->
connType
=
HEARTBEAT_TYPE_QUERY
;
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);*/
hbRegisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
pConnect
->
connId
,
pConnect
->
clusterId
,
HEARTBEAT_TYPE_QUERY
);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug
(
"0x%"
PRIx64
" clusterId:%"
PRId64
", totalConn:%"
PRId64
,
pRequest
->
requestId
,
pConnect
->
clusterId
,
...
...
source/dnode/mnode/impl/inc/mndStb.h
浏览文件 @
de439d47
...
...
@@ -28,6 +28,9 @@ void mndCleanupStb(SMnode *pMnode);
SStbObj
*
mndAcquireStb
(
SMnode
*
pMnode
,
char
*
stbName
);
void
mndReleaseStb
(
SMnode
*
pMnode
,
SStbObj
*
pStb
);
int32_t
mndValidateStbInfo
(
SMnode
*
pMnode
,
SSTableMetaVersion
*
stbs
,
int32_t
num
,
void
**
rsp
,
int32_t
*
rspLen
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
de439d47
...
...
@@ -907,9 +907,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
len
=
0
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
db
->
dbName
);
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
db
->
db
F
Name
);
if
(
pDb
==
NULL
)
{
mInfo
(
"db %s not exist"
,
db
->
dbName
);
mInfo
(
"db %s not exist"
,
db
->
db
F
Name
);
len
=
sizeof
(
SUseDbRsp
);
}
else
if
(
pDb
->
uid
!=
db
->
dbId
||
db
->
vgVersion
<
pDb
->
vgVersion
)
{
...
...
@@ -929,7 +929,7 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
}
pRsp
=
(
SUseDbRsp
*
)((
char
*
)
buf
+
bufOffset
);
memcpy
(
pRsp
->
db
,
db
->
dbName
,
TSDB_DB_FNAME_LEN
);
memcpy
(
pRsp
->
db
,
db
->
db
F
Name
,
TSDB_DB_FNAME_LEN
);
if
(
pDb
)
{
int32_t
vgNum
=
0
;
mndBuildDBVgroupInfo
(
pDb
,
pMnode
,
pRsp
->
vgroupInfo
,
&
vgNum
);
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
de439d47
...
...
@@ -18,6 +18,7 @@
#include "mndProfile.h"
//#include "mndConsumer.h"
#include "mndDb.h"
#include "mndStb.h"
#include "mndMnode.h"
#include "mndShow.h"
//#include "mndTopic.h"
...
...
@@ -376,9 +377,16 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
}
break
;
}
case
HEARTBEAT_KEY_STBINFO
:
case
HEARTBEAT_KEY_STBINFO
:
{
void
*
rspMsg
=
NULL
;
int32_t
rspLen
=
0
;
mndValidateStbInfo
(
pMnode
,
(
SSTableMetaVersion
*
)
kv
->
value
,
kv
->
valueLen
/
sizeof
(
SSTableMetaVersion
),
&
rspMsg
,
&
rspLen
);
if
(
rspMsg
&&
rspLen
>
0
)
{
SKv
kv
=
{.
key
=
HEARTBEAT_KEY_STBINFO
,
.
valueLen
=
rspLen
,
.
value
=
rspMsg
};
taosArrayPush
(
hbRsp
.
info
,
&
kv
);
}
break
;
}
default:
mError
(
"invalid kv key:%d"
,
kv
->
key
);
hbRsp
.
status
=
TSDB_CODE_MND_APP_ERROR
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
de439d47
...
...
@@ -728,7 +728,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
mDebug
(
"stb:%s, start to retrieve meta"
,
tbFName
);
SDbObj
*
pDb
=
mndAcquireDb
ByStb
(
pMnode
,
t
bFName
);
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pInfo
->
d
bFName
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
mError
(
"stb:%s, failed to retrieve meta since %s"
,
tbFName
,
terrstr
());
...
...
@@ -788,6 +788,109 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
return
0
;
}
int32_t
mndValidateStbInfo
(
SMnode
*
pMnode
,
SSTableMetaVersion
*
stbs
,
int32_t
num
,
void
**
rsp
,
int32_t
*
rspLen
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
bufSize
=
num
*
(
sizeof
(
STableMetaRsp
)
+
4
*
sizeof
(
SSchema
));
void
*
buf
=
malloc
(
bufSize
);
int32_t
len
=
0
;
int32_t
contLen
=
0
;
STableMetaRsp
*
pRsp
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SSTableMetaVersion
*
stb
=
&
stbs
[
i
];
stb
->
suid
=
be64toh
(
stb
->
suid
);
stb
->
sversion
=
ntohs
(
stb
->
sversion
);
stb
->
tversion
=
ntohs
(
stb
->
tversion
);
if
((
contLen
+
sizeof
(
STableMetaRsp
))
>
bufSize
)
{
bufSize
=
contLen
+
(
num
-
i
)
*
(
sizeof
(
STableMetaRsp
)
+
4
*
sizeof
(
SSchema
));
buf
=
realloc
(
buf
,
bufSize
);
}
pRsp
=
(
STableMetaRsp
*
)((
char
*
)
buf
+
contLen
);
strcpy
(
pRsp
->
dbFName
,
stb
->
dbFName
);
strcpy
(
pRsp
->
tbName
,
stb
->
stbName
);
strcpy
(
pRsp
->
stbName
,
stb
->
stbName
);
mDebug
(
"start to retrieve meta, db:%s, stb:%s"
,
stb
->
dbFName
,
stb
->
stbName
);
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
stb
->
dbFName
);
if
(
pDb
==
NULL
)
{
pRsp
->
numOfColumns
=
-
1
;
pRsp
->
suid
=
htobe64
(
stb
->
suid
);
contLen
+=
sizeof
(
STableMetaRsp
);
mWarn
(
"db:%s, failed to require db since %s"
,
stb
->
dbFName
,
terrstr
());
continue
;
}
char
tbFName
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
snprintf
(
tbFName
,
sizeof
(
tbFName
),
"%s.%s"
,
stb
->
dbFName
,
stb
->
stbName
);
SStbObj
*
pStb
=
mndAcquireStb
(
pMnode
,
tbFName
);
if
(
pStb
==
NULL
)
{
mndReleaseDb
(
pMnode
,
pDb
);
pRsp
->
numOfColumns
=
-
1
;
pRsp
->
suid
=
htobe64
(
stb
->
suid
);
contLen
+=
sizeof
(
STableMetaRsp
);
mWarn
(
"stb:%s, failed to get meta since %s"
,
tbFName
,
terrstr
());
continue
;
}
taosRLockLatch
(
&
pStb
->
lock
);
if
(
stb
->
suid
==
pStb
->
uid
&&
stb
->
sversion
==
pStb
->
version
)
{
taosRUnLockLatch
(
&
pStb
->
lock
);
mndReleaseDb
(
pMnode
,
pDb
);
mndReleaseStb
(
pMnode
,
pStb
);
continue
;
}
int32_t
totalCols
=
pStb
->
numOfColumns
+
pStb
->
numOfTags
;
int32_t
len
=
totalCols
*
sizeof
(
SSchema
);
contLen
+=
sizeof
(
STableMetaRsp
)
+
len
;
if
(
contLen
>
bufSize
)
{
bufSize
=
contLen
+
(
num
-
i
-
1
)
*
(
sizeof
(
STableMetaRsp
)
+
4
*
sizeof
(
SSchema
));
buf
=
realloc
(
buf
,
bufSize
);
}
pRsp
->
numOfTags
=
htonl
(
pStb
->
numOfTags
);
pRsp
->
numOfColumns
=
htonl
(
pStb
->
numOfColumns
);
pRsp
->
precision
=
pDb
->
cfg
.
precision
;
pRsp
->
tableType
=
TSDB_SUPER_TABLE
;
pRsp
->
update
=
pDb
->
cfg
.
update
;
pRsp
->
sversion
=
htonl
(
pStb
->
version
);
pRsp
->
suid
=
htobe64
(
pStb
->
uid
);
pRsp
->
tuid
=
htobe64
(
pStb
->
uid
);
for
(
int32_t
i
=
0
;
i
<
totalCols
;
++
i
)
{
SSchema
*
pSchema
=
&
pRsp
->
pSchema
[
i
];
SSchema
*
pSrcSchema
=
&
pStb
->
pSchema
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
colId
=
htonl
(
pSrcSchema
->
colId
);
pSchema
->
bytes
=
htonl
(
pSrcSchema
->
bytes
);
}
taosRUnLockLatch
(
&
pStb
->
lock
);
mndReleaseDb
(
pMnode
,
pDb
);
mndReleaseStb
(
pMnode
,
pStb
);
}
if
(
contLen
>
0
)
{
*
rsp
=
buf
;
*
rspLen
=
contLen
;
}
else
{
*
rsp
=
NULL
;
tfree
(
buf
);
*
rspLen
=
0
;
}
return
0
;
}
static
int32_t
mndGetNumOfStbs
(
SMnode
*
pMnode
,
char
*
dbName
,
int32_t
*
pNumOfStbs
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
de439d47
...
...
@@ -27,7 +27,7 @@ extern "C" {
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000
0
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
de439d47
...
...
@@ -527,9 +527,9 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn
}
int32_t
ctgSTableVersionCompare
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(
((
SSTableMetaVersion
*
)
key1
)
->
suid
<
((
SSTableMetaVersion
*
)
key2
)
->
suid
)
{
if
(
*
(
uint64_t
*
)
key1
<
((
SSTableMetaVersion
*
)
key2
)
->
suid
)
{
return
-
1
;
}
else
if
(
((
SSTableMetaVersion
*
)
key1
)
->
suid
>
((
SSTableMetaVersion
*
)
key2
)
->
suid
)
{
}
else
if
(
*
(
uint64_t
*
)
key1
>
((
SSTableMetaVersion
*
)
key2
)
->
suid
)
{
return
1
;
}
else
{
return
0
;
...
...
@@ -557,7 +557,7 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt
->
slots
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
mgmt
->
slots
)
{
qError
(
"calloc %d failed"
,
(
int32_t
)
msgSize
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
qDebug
(
"meta rent initialized, type:%d, slotNum:%d"
,
type
,
mgmt
->
slotNum
);
...
...
@@ -825,6 +825,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
if
(
TSDB_SUPER_TABLE
==
output
->
tbMeta
->
tableType
)
{
bool
newAdded
=
false
;
SSTableMetaVersion
metaRent
=
{.
suid
=
output
->
tbMeta
->
suid
,
.
sversion
=
output
->
tbMeta
->
sversion
,
.
tversion
=
output
->
tbMeta
->
tversion
};
strcpy
(
metaRent
.
dbFName
,
output
->
dbFName
);
strcpy
(
metaRent
.
stbName
,
output
->
tbName
);
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
if
(
taosHashPut
(
dbCache
->
tbCache
.
cache
,
output
->
tbName
,
strlen
(
output
->
tbName
),
output
->
tbMeta
,
tbSize
)
!=
0
)
{
...
...
@@ -951,6 +953,39 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, ui
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgValidateAndRemoveStbMeta
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
const
char
*
stbName
,
uint64_t
suid
,
bool
*
removed
)
{
*
removed
=
false
;
SCtgDBCache
*
dbCache
=
(
SCtgDBCache
*
)
taosHashAcquire
(
pCatalog
->
dbCache
,
dbName
,
strlen
(
dbName
));
if
(
NULL
==
dbCache
)
{
ctgInfo
(
"db not exist in dbCache, may be removed, db:%s"
,
dbName
);
return
TSDB_CODE_SUCCESS
;
}
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
if
(
taosHashRemove
(
dbCache
->
tbCache
.
stbCache
,
&
suid
,
sizeof
(
suid
)))
{
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
ctgInfo
(
"stb not exist in stbCache, may be removed, db:%s, stb:%s, suid:%"
PRIx64
,
dbName
,
stbName
,
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
taosHashRemove
(
dbCache
->
tbCache
.
cache
,
stbName
,
strlen
(
stbName
)))
{
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
ctgError
(
"stb not exist in cache, db:%s, stb:%s, suid:%"
PRIx64
,
dbName
,
stbName
,
suid
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
*
removed
=
true
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgRenewTableMetaImpl
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pTransporter
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
)
{
...
...
@@ -1065,7 +1100,7 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg
int32_t
catalogInit
(
SCatalogCfg
*
cfg
)
{
if
(
ctgMgmt
.
pCluster
)
{
qError
(
"catalog already init"
);
qError
(
"catalog already init
ialized
"
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
...
...
@@ -1111,7 +1146,7 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
}
if
(
NULL
==
ctgMgmt
.
pCluster
)
{
qError
(
"cluster cache are not ready, clusterId:%"
PRIx64
,
clusterId
);
qError
(
"c
atalog c
luster cache are not ready, clusterId:%"
PRIx64
,
clusterId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_NOT_READY
);
}
...
...
@@ -1312,7 +1347,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
ctgMetaRentRemove
(
&
pCatalog
->
dbRent
,
dbCache
->
vgInfo
->
dbId
,
ctgDbVgVersionCompare
);
newAdded
=
true
;
}
else
if
(
dbInfo
->
vgVersion
<=
dbCache
->
vgInfo
->
vgVersion
)
{
ctgInfo
(
"db vgVersion is
not new
, db:%s, vgVersion:%d, current:%d"
,
dbName
,
dbInfo
->
vgVersion
,
dbCache
->
vgInfo
->
vgVersion
);
ctgInfo
(
"db vgVersion is
old
, db:%s, vgVersion:%d, current:%d"
,
dbName
,
dbInfo
->
vgVersion
,
dbCache
->
vgInfo
->
vgVersion
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
...
...
@@ -1345,7 +1380,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
dbInfo
=
NULL
;
strncpy
(
vgVersion
.
db
Name
,
dbName
,
sizeof
(
vgVersion
.
db
Name
));
strncpy
(
vgVersion
.
db
FName
,
dbName
,
sizeof
(
vgVersion
.
dbF
Name
));
if
(
newAdded
)
{
CTG_ERR_JRET
(
ctgMetaRentAdd
(
&
pCatalog
->
dbRent
,
&
vgVersion
,
vgVersion
.
dbId
,
sizeof
(
SDbVgVersion
)));
...
...
@@ -1394,6 +1429,32 @@ int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t
CTG_RET
(
code
);
}
int32_t
catalogRemoveSTableMeta
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
const
char
*
stbName
,
uint64_t
suid
)
{
int32_t
code
=
0
;
bool
removed
=
false
;
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
stbName
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
if
(
NULL
==
pCatalog
->
dbCache
)
{
return
TSDB_CODE_SUCCESS
;
}
CTG_ERR_RET
(
ctgValidateAndRemoveStbMeta
(
pCatalog
,
dbName
,
stbName
,
suid
,
&
removed
));
if
(
!
removed
)
{
return
TSDB_CODE_SUCCESS
;
}
ctgInfo
(
"stb removed from cache, db:%s, stbName:%s, suid:%"
PRIx64
,
dbName
,
stbName
,
suid
);
CTG_ERR_RET
(
ctgMetaRentRemove
(
&
pCatalog
->
stbRent
,
suid
,
ctgSTableVersionCompare
));
ctgDebug
(
"stb removed from rent, db:%s, stbName:%s, suid:%"
PRIx64
,
dbName
,
stbName
,
suid
);
CTG_RET
(
code
);
}
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
return
ctgGetTableMeta
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
-
1
);
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
de439d47
...
...
@@ -748,7 +748,7 @@ TEST(tableMeta, normalTable) {
}
if
(
stbNum
)
{
printf
(
"got expired stb,suid:%"
PRId64
"
\n
"
,
stb
->
suid
);
printf
(
"got expired stb,suid:%"
PRId64
"
,dbFName:%s, stbName:%s
\n
"
,
stb
->
suid
,
stb
->
dbFName
,
stb
->
stbName
);
free
(
stb
);
stb
=
NULL
;
}
else
{
...
...
@@ -844,7 +844,7 @@ TEST(tableMeta, childTableCase) {
}
if
(
stbNum
)
{
printf
(
"got expired stb,suid:%"
PRId64
"
\n
"
,
stb
->
suid
);
printf
(
"got expired stb,suid:%"
PRId64
"
,dbFName:%s, stbName:%s
\n
"
,
stb
->
suid
,
stb
->
dbFName
,
stb
->
stbName
);
free
(
stb
);
stb
=
NULL
;
}
else
{
...
...
@@ -945,7 +945,8 @@ TEST(tableMeta, superTableCase) {
}
if
(
stbNum
)
{
printf
(
"got expired stb,suid:%"
PRId64
"
\n
"
,
stb
->
suid
);
printf
(
"got expired stb,suid:%"
PRId64
",dbFName:%s, stbName:%s
\n
"
,
stb
->
suid
,
stb
->
dbFName
,
stb
->
stbName
);
free
(
stb
);
stb
=
NULL
;
}
else
{
...
...
@@ -1289,7 +1290,7 @@ TEST(rentTest, allRent) {
printf
(
"%d - expired stableNum:%d
\n
"
,
i
,
num
);
if
(
stable
)
{
for
(
int32_t
n
=
0
;
n
<
num
;
++
n
)
{
printf
(
"suid:%"
PRId64
",
sversion:%d, tversion:%d
\n
"
,
stable
[
n
].
suid
,
stable
[
n
].
sversion
,
stable
[
n
].
tversion
);
printf
(
"suid:%"
PRId64
",
dbFName:%s, stbName:%s, sversion:%d, tversion:%d
\n
"
,
stable
[
n
].
suid
,
stable
[
n
].
dbFName
,
stable
[
n
].
stbName
,
stable
[
n
].
sversion
,
stable
[
n
].
tversion
);
}
free
(
stable
);
stable
=
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录