Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7a78d812
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7a78d812
编写于
6月 13, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix compile issue
上级
eff9f9be
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
141 addition
and
76 deletion
+141
-76
include/common/tmsg.h
include/common/tmsg.h
+1
-0
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+2
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+0
-1
source/dnode/mnode/impl/inc/mndSma.h
source/dnode/mnode/impl/inc/mndSma.h
+1
-0
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+1
-1
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+5
-4
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+10
-4
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+31
-33
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+6
-6
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+65
-18
source/libs/catalog/src/ctgDbg.c
source/libs/catalog/src/ctgDbg.c
+9
-9
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+10
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
7a78d812
...
...
@@ -1134,6 +1134,7 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t
tSerializeSTableMetaRsp
(
void
*
buf
,
int32_t
bufLen
,
STableMetaRsp
*
pRsp
);
int32_t
tDeserializeSTableMetaRsp
(
void
*
buf
,
int32_t
bufLen
,
STableMetaRsp
*
pRsp
);
void
tFreeSTableMetaRsp
(
STableMetaRsp
*
pRsp
);
void
tFreeSTableIndexRsp
(
void
*
info
);
typedef
struct
{
SArray
*
pMetaRsp
;
// Array of STableMetaRsp
...
...
include/libs/catalog/catalog.h
浏览文件 @
7a78d812
...
...
@@ -282,6 +282,8 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
int32_t
catalogGetTableIndex
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SArray
**
pRes
);
int32_t
catalogUpdateTableIndex
(
SCatalog
*
pCtg
,
STableIndexRsp
*
pRsp
);
int32_t
catalogGetUdfInfo
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
funcName
,
SFuncInfo
*
pInfo
);
int32_t
catalogChkAuth
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
);
...
...
source/common/src/tmsg.c
浏览文件 @
7a78d812
...
...
@@ -2758,7 +2758,6 @@ void tFreeSTableIndexRsp(void *info) {
taosArrayDestroyEx
(
pInfo
->
pIndex
,
tFreeSTableIndexInfo
);
}
void
tFreeSSTbHbRsp
(
SSTbHbRsp
*
pRsp
)
{
int32_t
numOfMeta
=
taosArrayGetSize
(
pRsp
->
pMetaRsp
);
for
(
int32_t
i
=
0
;
i
<
numOfMeta
;
++
i
)
{
...
...
source/dnode/mnode/impl/inc/mndSma.h
浏览文件 @
7a78d812
...
...
@@ -26,6 +26,7 @@ int32_t mndInitSma(SMnode *pMnode);
void
mndCleanupSma
(
SMnode
*
pMnode
);
SSmaObj
*
mndAcquireSma
(
SMnode
*
pMnode
,
char
*
smaName
);
void
mndReleaseSma
(
SMnode
*
pMnode
,
SSmaObj
*
pSma
);
int32_t
mndGetTableSma
(
SMnode
*
pMnode
,
char
*
tbFName
,
STableIndexRsp
*
rsp
,
bool
*
exist
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
7a78d812
...
...
@@ -872,7 +872,7 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
return
code
;
}
static
int32_t
mndGetTableSma
(
SMnode
*
pMnode
,
char
*
tbFName
,
STableIndexRsp
*
rsp
,
bool
*
exist
)
{
int32_t
mndGetTableSma
(
SMnode
*
pMnode
,
char
*
tbFName
,
STableIndexRsp
*
rsp
,
bool
*
exist
)
{
int32_t
code
=
0
;
SSmaObj
*
pSma
=
NULL
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
7a78d812
...
...
@@ -27,6 +27,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndSma.h"
#include "tname.h"
#define STB_VER_NUMBER 1
...
...
@@ -1638,7 +1639,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
}
}
else
{
mDebug
(
"stb:%s.%s, start to retrieve meta"
,
infoReq
.
dbFName
,
infoReq
.
tbName
);
if
(
mndBuildStbSchema
(
pMnode
,
infoReq
.
dbFName
,
infoReq
.
tbName
,
&
metaRsp
)
!=
0
)
{
if
(
mndBuildStbSchema
(
pMnode
,
infoReq
.
dbFName
,
infoReq
.
tbName
,
&
metaRsp
,
NULL
)
!=
0
)
{
goto
_OVER
;
}
}
...
...
@@ -1710,7 +1711,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
tFreeSTableMetaRsp
(
&
metaRsp
);
}
if
(
pStbVersion
->
smaVer
!=
smaVer
)
{
if
(
pStbVersion
->
smaVer
&&
pStbVersion
->
smaVer
!=
smaVer
)
{
bool
exist
=
false
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
STableIndexRsp
indexRsp
=
{
0
};
...
...
@@ -1722,8 +1723,8 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
indexRsp
.
pIndex
=
NULL
;
}
strcpy
(
indexRsp
->
dbFName
,
pStbVersion
->
dbFName
);
strcpy
(
indexRsp
->
tbName
,
pStbVersion
->
stbName
);
strcpy
(
indexRsp
.
dbFName
,
pStbVersion
->
dbFName
);
strcpy
(
indexRsp
.
tbName
,
pStbVersion
->
stbName
);
taosArrayPush
(
hbRsp
.
pIndexRsp
,
&
indexRsp
);
}
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
7a78d812
...
...
@@ -491,8 +491,8 @@ void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
void
ctgRUnlockVgInfo
(
SCtgDBCache
*
dbCache
);
int32_t
ctgTbMetaExistInCache
(
SCatalog
*
pCtg
,
char
*
dbFName
,
char
*
tbName
,
int32_t
*
exist
);
int32_t
ctgReadTbMetaFromCache
(
SCatalog
*
pCtg
,
SCtgTbMetaCtx
*
ctx
,
STableMeta
**
pTableMeta
);
int32_t
ctgReadTbVerFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
int32_t
*
sver
,
int32_t
*
tver
,
int32_t
*
tbType
,
uint64_t
*
suid
,
char
*
stbName
);
int32_t
ctgChkAuthFromCache
(
SCatalog
*
pCtg
,
c
onst
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
inCache
,
bool
*
pass
);
int32_t
ctgReadTbVerFromCache
(
SCatalog
*
pCtg
,
SName
*
pTableName
,
int32_t
*
sver
,
int32_t
*
tver
,
int32_t
*
tbType
,
uint64_t
*
suid
,
char
*
stbName
);
int32_t
ctgChkAuthFromCache
(
SCatalog
*
pCtg
,
c
har
*
user
,
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
inCache
,
bool
*
pass
);
int32_t
ctgDropDbCacheEnqueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int64_t
dbId
);
int32_t
ctgDropDbVgroupEnqueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
bool
syncReq
);
int32_t
ctgDropStbMetaEnqueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int64_t
dbId
,
const
char
*
stbName
,
uint64_t
suid
,
bool
syncReq
);
...
...
@@ -501,13 +501,18 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
int32_t
ctgUpdateTbMetaEnqueue
(
SCatalog
*
pCtg
,
STableMetaOutput
*
output
,
bool
syncReq
);
int32_t
ctgUpdateUserEnqueue
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
,
bool
syncReq
);
int32_t
ctgUpdateVgEpsetEnqueue
(
SCatalog
*
pCtg
,
char
*
dbFName
,
int32_t
vgId
,
SEpSet
*
pEpSet
);
int32_t
ctgUpdateTbIndexEnqueue
(
SCatalog
*
pCtg
,
STableIndex
*
pIndex
,
bool
syncOp
);
int32_t
ctgUpdateTbIndexEnqueue
(
SCatalog
*
pCtg
,
STableIndex
*
*
pIndex
,
bool
syncOp
);
int32_t
ctgMetaRentInit
(
SCtgRentMgmt
*
mgmt
,
uint32_t
rentSec
,
int8_t
type
);
int32_t
ctgMetaRentAdd
(
SCtgRentMgmt
*
mgmt
,
void
*
meta
,
int64_t
id
,
int32_t
size
);
int32_t
ctgMetaRentGet
(
SCtgRentMgmt
*
mgmt
,
void
**
res
,
uint32_t
*
num
,
int32_t
size
);
int32_t
ctgUpdateTbMetaToCache
(
SCatalog
*
pCtg
,
STableMetaOutput
*
pOut
,
bool
syncReq
);
int32_t
ctgStartUpdateThread
();
int32_t
ctgRelaunchGetTbMetaTask
(
SCtgTask
*
pTask
);
void
ctgReleaseVgInfoToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
);
int32_t
ctgReadTbIndexFromCache
(
SCatalog
*
pCtg
,
SName
*
pTableName
,
SArray
**
pRes
);
int32_t
ctgDropTbIndexEnqueue
(
SCatalog
*
pCtg
,
SName
*
pName
,
bool
syncOp
);
int32_t
ctgOpDropTbIndex
(
SCtgCacheOperation
*
operation
);
int32_t
ctgOpUpdateTbIndex
(
SCtgCacheOperation
*
operation
);
...
...
@@ -516,7 +521,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
int32_t
ctgGetQnodeListFromMnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SArray
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetDBCfgFromMnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
dbFName
,
SDbCfgInfo
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetIndexInfoFromMnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
indexName
,
SIndexInfo
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetTbIndexFromMnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SName
*
name
,
SArray
*
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetTbIndexFromMnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SName
*
name
,
STableIndex
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetUdfInfoFromMnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
funcName
,
SFuncInfo
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetUserDbAuthFromMnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
SGetUserAuthRsp
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetTbMetaFromMnodeImpl
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
char
*
dbFName
,
char
*
tbName
,
STableMetaOutput
*
out
,
SCtgTask
*
pTask
);
...
...
@@ -545,6 +550,7 @@ int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* targ
char
*
ctgTaskTypeStr
(
CTG_TASK_TYPE
type
);
int32_t
ctgUpdateSendTargetInfo
(
SMsgSendInfo
*
pMsgSendInfo
,
int32_t
msgType
,
SCtgTask
*
pTask
);
int32_t
ctgCloneTableIndex
(
SArray
*
pIndex
,
SArray
**
pRes
);
void
ctgFreeSTableIndex
(
void
*
info
);
extern
SCatalogMgmt
gCtgMgmt
;
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
7a78d812
...
...
@@ -350,7 +350,7 @@ int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo *pConn, const char* user, co
*
pass
=
false
;
CTG_ERR_RET
(
ctgChkAuthFromCache
(
pCtg
,
user
,
dbFName
,
type
,
&
inCache
,
pass
));
CTG_ERR_RET
(
ctgChkAuthFromCache
(
pCtg
,
(
char
*
)
user
,
(
char
*
)
dbFName
,
type
,
&
inCache
,
pass
));
if
(
inCache
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -382,15 +382,38 @@ _return:
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTbIndex
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SArray
**
pRes
)
{
int32_t
ctgGetTbIndex
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SName
*
pTableName
,
SArray
**
pRes
)
{
CTG_ERR_RET
(
ctgReadTbIndexFromCache
(
pCtg
,
pTableName
,
pRes
));
if
(
*
pRes
)
{
return
TSDB_CODE_SUCCESS
;
}
CTG_ERR_RET
(
ctgGetTbIndexFromMnode
(
pCtg
,
pConn
,
(
SName
*
)
pTableName
,
pRes
,
NULL
));
STableIndex
*
pIndex
=
taosMemoryCalloc
(
1
,
sizeof
(
STableIndex
));
if
(
NULL
==
pIndex
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
int32_t
code
=
0
;
CTG_ERR_JRET
(
ctgGetTbIndexFromMnode
(
pCtg
,
pConn
,
(
SName
*
)
pTableName
,
pIndex
,
NULL
));
SArray
*
pInfo
=
NULL
;
CTG_ERR_JRET
(
ctgCloneTableIndex
(
pIndex
->
pIndex
,
&
pInfo
));
*
pRes
=
pInfo
;
CTG_ERR_JRET
(
ctgUpdateTbIndexEnqueue
(
pCtg
,
&
pIndex
,
false
));
return
TSDB_CODE_SUCCESS
;
_return:
tFreeSTableIndexRsp
(
pIndex
);
taosMemoryFree
(
pIndex
);
taosArrayDestroyEx
(
*
pRes
,
tFreeSTableIndexInfo
);
*
pRes
=
NULL
;
CTG_RET
(
code
);
}
...
...
@@ -416,7 +439,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pConn
,
db
,
&
dbCache
,
&
vgInfo
));
if
(
dbCache
)
{
vgHash
=
dbCache
->
vgInfo
->
vgHash
;
vgHash
=
dbCache
->
vg
Cache
.
vg
Info
->
vgHash
;
}
else
{
vgHash
=
vgInfo
->
vgHash
;
}
...
...
@@ -640,9 +663,9 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
*
version
=
dbCache
->
vgInfo
->
vgVersion
;
*
version
=
dbCache
->
vg
Cache
.
vg
Info
->
vgVersion
;
*
dbId
=
dbCache
->
dbId
;
*
tableNum
=
dbCache
->
vgInfo
->
numOfTable
;
*
tableNum
=
dbCache
->
vg
Cache
.
vg
Info
->
numOfTable
;
ctgReleaseVgInfoToCache
(
pCtg
,
dbCache
);
...
...
@@ -969,7 +992,7 @@ int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const
SDBVgInfo
*
vgInfo
=
NULL
;
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pConn
,
db
,
&
dbCache
,
&
vgInfo
));
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
vgInfo
?
vgInfo
:
dbCache
->
vgInfo
,
pTableName
,
pVgroup
));
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
vgInfo
?
vgInfo
:
dbCache
->
vg
Cache
.
vg
Info
,
pTableName
,
pVgroup
));
_return:
...
...
@@ -1177,36 +1200,11 @@ int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo *pConn, const SNam
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_ERR_RET
(
ctgReadTbIndexFromCache
(
pCtg
,
pTableName
,
pRes
));
if
(
*
pRes
)
{
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
STableIndex
*
pIndex
=
taosMemoryCalloc
(
1
,
sizeof
(
STableIndex
));
if
(
NULL
==
pIndex
)
{
CTG_API_LEAVE
(
TSDB_CODE_OUT_OF_MEMORY
);
}
int32_t
code
=
0
;
CTG_ERR_JRET
(
ctgGetTbIndexFromMnode
(
pCtg
,
pConn
,
(
SName
*
)
pTableName
,
pIndex
,
NULL
));
SArray
*
pInfo
=
NULL
;
CTG_ERR_JRET
(
ctgCloneTableIndex
(
pIndex
->
pIndex
,
&
pInfo
));
*
pRes
=
pInfo
;
CTG_ERR_JRET
(
ctgUpdateTbIndexEnqueue
(
pCtg
,
pTableName
,
&
pIndex
,
false
));
CTG_API_LEAVE
(
code
);
CTG_ERR_JRET
(
ctgGetTbIndex
(
pCtg
,
pConn
,
(
SName
*
)
pTableName
,
pRes
));
_return:
tFreeSTableIndexRsp
(
pIndex
);
taosMemoryFree
(
pIndex
);
taosArrayDestroyEx
(
*
pRes
,
tFreeSTableIndexInfo
);
*
pRes
=
NULL
;
CTG_API_LEAVE
(
code
);
}
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
7a78d812
...
...
@@ -685,7 +685,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
CTG_ERR_RET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
dbFName
,
&
dbCache
));
if
(
NULL
!=
dbCache
)
{
SVgroupInfo
vgInfo
=
{
0
};
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vgInfo
,
ctx
->
pName
,
&
vgInfo
));
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vg
Cache
.
vg
Info
,
ctx
->
pName
,
&
vgInfo
));
ctgDebug
(
"will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d"
,
tNameGetTableName
(
ctx
->
pName
),
ctx
->
flag
);
...
...
@@ -1016,7 +1016,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
CTG_ERR_RET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
dbFName
,
&
dbCache
));
if
(
dbCache
)
{
SVgroupInfo
vgInfo
=
{
0
};
CTG_ERR_RET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vgInfo
,
ctx
->
pName
,
&
vgInfo
));
CTG_ERR_RET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vg
Cache
.
vg
Info
,
ctx
->
pName
,
&
vgInfo
));
ctgDebug
(
"will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d"
,
tNameGetTableName
(
ctx
->
pName
),
ctx
->
flag
);
...
...
@@ -1064,7 +1064,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
CTG_ERR_RET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
pCtx
->
dbFName
,
&
dbCache
));
if
(
NULL
!=
dbCache
)
{
CTG_ERR_JRET
(
ctgGenerateVgList
(
pCtg
,
dbCache
->
vgInfo
->
vgHash
,
(
SArray
**
)
&
pTask
->
res
));
CTG_ERR_JRET
(
ctgGenerateVgList
(
pCtg
,
dbCache
->
vg
Cache
.
vg
Info
->
vgHash
,
(
SArray
**
)
&
pTask
->
res
));
CTG_ERR_JRET
(
ctgHandleTaskEnd
(
pTask
,
0
));
}
else
{
...
...
@@ -1098,7 +1098,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
if
(
NULL
==
pTask
->
res
)
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vgInfo
,
pCtx
->
pName
,
(
SVgroupInfo
*
)
pTask
->
res
));
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vg
Cache
.
vg
Info
,
pCtx
->
pName
,
(
SVgroupInfo
*
)
pTask
->
res
));
CTG_ERR_JRET
(
ctgHandleTaskEnd
(
pTask
,
0
));
}
else
{
...
...
@@ -1171,9 +1171,9 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
SDbInfo
*
pInfo
=
(
SDbInfo
*
)
pTask
->
res
;
CTG_ERR_RET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
pCtx
->
dbFName
,
&
dbCache
));
if
(
NULL
!=
dbCache
)
{
pInfo
->
vgVer
=
dbCache
->
vgInfo
->
vgVersion
;
pInfo
->
vgVer
=
dbCache
->
vg
Cache
.
vg
Info
->
vgVersion
;
pInfo
->
dbId
=
dbCache
->
dbId
;
pInfo
->
tbNum
=
dbCache
->
vgInfo
->
numOfTable
;
pInfo
->
tbNum
=
dbCache
->
vg
Cache
.
vg
Info
->
numOfTable
;
}
else
{
pInfo
->
vgVer
=
CTG_DEFAULT_INVALID_VERSION
;
}
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
7a78d812
...
...
@@ -64,8 +64,12 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
CTG_OP_UPDATE_TB_INDEX
,
"update tbIndex"
,
ctgOpUpdateTbIndex
}
},
{
CTG_OP_DROP_TB_INDEX
,
"drop tbIndex"
,
ctgOpDropTbIndex
}
};
...
...
@@ -305,6 +309,7 @@ _return:
int32_t
ctgTbMetaExistInCache
(
SCatalog
*
pCtg
,
char
*
dbFName
,
char
*
tbName
,
int32_t
*
exist
)
{
SCtgDBCache
*
dbCache
=
NULL
;
SCtgTbCache
*
tbCache
=
NULL
;
ctgAcquireTbMetaFromCache
(
pCtg
,
dbFName
,
tbName
,
&
dbCache
,
&
tbCache
);
if
(
NULL
==
tbCache
)
{
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
...
...
@@ -383,7 +388,7 @@ _return:
CTG_RET
(
code
);
}
int32_t
ctgReadTbVerFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
int32_t
*
sver
,
int32_t
*
tver
,
int32_t
*
tbType
,
uint64_t
*
suid
,
int32_t
ctgReadTbVerFromCache
(
SCatalog
*
pCtg
,
SName
*
pTableName
,
int32_t
*
sver
,
int32_t
*
tver
,
int32_t
*
tbType
,
uint64_t
*
suid
,
char
*
stbName
)
{
*
sver
=
-
1
;
*
tver
=
-
1
;
...
...
@@ -444,10 +449,10 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *
}
int32_t
ctgReadTbTypeFromCache
(
SCatalog
*
pCtg
,
c
onst
char
*
dbFName
,
const
char
*
tableName
,
int32_t
*
tbType
)
{
int32_t
ctgReadTbTypeFromCache
(
SCatalog
*
pCtg
,
c
har
*
dbFName
,
char
*
tableName
,
int32_t
*
tbType
)
{
SCtgDBCache
*
dbCache
=
NULL
;
SCtgTbCache
*
tbCache
=
NULL
;
ctgAcquireTbMetaFromCache
(
pCtg
,
dbFName
,
tableName
,
&
dbCache
,
&
tbCache
);
CTG_ERR_RET
(
ctgAcquireTbMetaFromCache
(
pCtg
,
dbFName
,
tableName
,
&
dbCache
,
&
tbCache
)
);
if
(
NULL
==
tbCache
)
{
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -461,7 +466,7 @@ int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, const char* dbFName, const char *
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgReadTbIndexFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
SArray
**
pRes
)
{
int32_t
ctgReadTbIndexFromCache
(
SCatalog
*
pCtg
,
SName
*
pTableName
,
SArray
**
pRes
)
{
int32_t
code
=
0
;
SCtgDBCache
*
dbCache
=
NULL
;
SCtgTbCache
*
tbCache
=
NULL
;
...
...
@@ -485,7 +490,7 @@ _return:
CTG_RET
(
code
);
}
int32_t
ctgChkAuthFromCache
(
SCatalog
*
pCtg
,
c
onst
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
inCache
,
bool
*
pass
)
{
int32_t
ctgChkAuthFromCache
(
SCatalog
*
pCtg
,
c
har
*
user
,
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
inCache
,
bool
*
pass
)
{
if
(
NULL
==
pCtg
->
userCache
)
{
ctgDebug
(
"empty user auth cache, user:%s"
,
user
);
goto
_return
;
...
...
@@ -862,7 +867,7 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncO
_return:
taosArrayDestroyEx
(
*
pIndex
,
tFreeSTableIndexInfo
);
taosArrayDestroyEx
(
(
*
pIndex
)
->
pIndex
,
tFreeSTableIndexInfo
);
taosMemoryFreeClear
(
*
pIndex
);
taosMemoryFreeClear
(
msg
);
...
...
@@ -893,7 +898,6 @@ int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp) {
_return:
taosArrayDestroyEx
(
pIndex
,
tFreeSTableIndexInfo
);
taosMemoryFreeClear
(
msg
);
CTG_RET
(
code
);
...
...
@@ -1172,7 +1176,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
dbLock
);
atomic_store_8
(
&
dbCache
->
deleted
,
1
);
ctgRemoveStbRent
(
pCtg
,
&
dbCache
);
ctgRemoveStbRent
(
pCtg
,
dbCache
);
ctgFreeDbCache
(
dbCache
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
dbLock
);
...
...
@@ -1251,6 +1255,8 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uin
ctgDebug
(
"db %s,%"
PRIx64
" stb %s,%"
PRIx64
" sver %d tver %d smaVer %d updated to stbRent"
,
dbFName
,
dbId
,
tbName
,
suid
,
metaRent
.
sversion
,
metaRent
.
tversion
,
metaRent
.
smaVer
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1330,32 +1336,40 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
int32_t
ctgWriteTbIndexToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
char
*
dbFName
,
char
*
tbName
,
STableIndex
**
index
)
{
if
(
NULL
==
dbCache
->
tbCache
)
{
ctgFreeSTableIndex
(
*
index
);
taosMemoryFreeClear
(
*
index
);
ctgError
(
"db is dropping, dbId:%"
PRIx64
,
dbCache
->
dbId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
STableIndex
*
pIndex
=
*
index
;
uint64_t
suid
=
pIndex
->
suid
;
SCtgTbCache
*
pCache
=
taosHashGet
(
dbCache
->
tbCache
,
tbName
,
strlen
(
tbName
));
if
(
NULL
==
pCache
)
{
SCtgTbCache
cache
=
{
0
};
cache
.
pIndex
=
pIndex
;
if
(
taosHashPut
(
table
->
tbCache
,
tbName
,
strlen
(
tbName
),
&
cache
,
sizeof
(
cache
))
!=
0
)
{
if
(
taosHashPut
(
dbCache
->
tbCache
,
tbName
,
strlen
(
tbName
),
&
cache
,
sizeof
(
cache
))
!=
0
)
{
ctgFreeSTableIndex
(
*
index
);
taosMemoryFreeClear
(
*
index
);
ctgError
(
"taosHashPut new tbCache failed, tbName:%s"
,
tbName
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
*
index
=
NULL
;
ctgDebug
(
"table %s index updated to cache, ver:%d, num:%d"
,
tbName
,
pIndex
->
version
,
taosArrayGetSize
(
pIndex
->
pIndex
));
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbCache
->
dbId
,
pIndex
->
suid
,
pCache
));
ctgDebug
(
"table %s index updated to cache, ver:%d, num:%d"
,
tbName
,
pIndex
->
version
,
(
int32_t
)
taosArrayGetSize
(
pIndex
->
pIndex
));
if
(
suid
)
{
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbCache
->
dbId
,
pIndex
->
suid
,
pCache
));
}
return
TSDB_CODE_SUCCESS
;
}
if
(
pCache
->
pIndex
)
{
if
(
0
==
suid
)
{
suid
=
pCache
->
pIndex
->
suid
;
}
taosArrayDestroyEx
(
pCache
->
pIndex
->
pIndex
,
tFreeSTableIndexInfo
);
taosMemoryFreeClear
(
pCache
->
pIndex
);
}
...
...
@@ -1363,9 +1377,11 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa
pCache
->
pIndex
=
pIndex
;
*
index
=
NULL
;
ctgDebug
(
"table %s index updated to cache, ver:%d, num:%d"
,
tbName
,
pIndex
->
version
,
taosArrayGetSize
(
pIndex
->
pIndex
));
ctgDebug
(
"table %s index updated to cache, ver:%d, num:%d"
,
tbName
,
pIndex
->
version
,
(
int32_t
)
taosArrayGetSize
(
pIndex
->
pIndex
));
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbCache
->
dbId
,
pIndex
->
suid
,
pCache
));
if
(
suid
)
{
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbCache
->
dbId
,
suid
,
pCache
));
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1390,6 +1406,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
SCtgUpdateVgMsg
*
msg
=
operation
->
data
;
SDBVgInfo
*
dbInfo
=
msg
->
dbInfo
;
char
*
dbFName
=
msg
->
dbFName
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
if
(
NULL
==
dbInfo
->
vgHash
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1611,7 +1628,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
}
if
(
dbCache
->
dbId
!=
msg
->
dbId
)
{
ctgDebug
(
"dbId %"
PRIx64
" not match with curId %"
PRIx64
", dbFName:%s, tbName:%s"
msg
->
dbId
,
dbCache
->
dbId
,
msg
->
dbFName
,
msg
->
tbName
);
ctgDebug
(
"dbId %"
PRIx64
" not match with curId %"
PRIx64
", dbFName:%s, tbName:%s"
,
msg
->
dbId
,
dbCache
->
dbId
,
msg
->
dbFName
,
msg
->
tbName
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1746,9 +1763,39 @@ int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
SCtgDBCache
*
dbCache
=
NULL
;
CTG_ERR_JRET
(
ctgGetAddDBCache
(
pCtg
,
pIndex
->
dbFName
,
0
,
&
dbCache
));
CTG_ERR_JRET
(
ctgWriteTbIndexToCache
(
pCtg
,
dbCache
,
pIndex
->
dbFName
,
pIndex
->
tbName
,
&
pIndex
));
_return:
if
(
pIndex
)
{
taosArrayDestroyEx
(
pIndex
->
pIndex
,
tFreeSTableIndexInfo
);
taosMemoryFreeClear
(
pIndex
);
}
taosMemoryFreeClear
(
msg
);
CTG_RET
(
code
);
}
int32_t
ctgOpDropTbIndex
(
SCtgCacheOperation
*
operation
)
{
int32_t
code
=
0
;
SCtgDropTbIndexMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCtgDBCache
*
dbCache
=
NULL
;
CTG_ERR_JRET
(
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
));
if
(
NULL
==
dbCache
)
{
CTG_ERR_JRET
(
code
);
return
TSDB_CODE_SUCCESS
;
}
STableIndex
*
pIndex
=
taosMemoryCalloc
(
1
,
sizeof
(
STableIndex
));
if
(
NULL
==
pIndex
)
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
strcpy
(
pIndex
->
tbName
,
msg
->
tbName
);
strcpy
(
pIndex
->
dbFName
,
msg
->
dbFName
);
pIndex
->
version
=
-
1
;
CTG_ERR_JRET
(
ctgWriteTbIndexToCache
(
pCtg
,
dbCache
,
pIndex
->
dbFName
,
pIndex
->
tbName
,
&
pIndex
));
...
...
source/libs/catalog/src/ctgDbg.c
浏览文件 @
7a78d812
...
...
@@ -266,11 +266,11 @@ int32_t ctgdGetStatNum(char *option, void *res) {
}
int32_t
ctgdGetTbMetaNum
(
SCtgDBCache
*
dbCache
)
{
return
dbCache
->
t
able
.
tbCache
?
(
int32_t
)
taosHashGetSize
(
dbCache
->
table
.
tbCache
)
:
0
;
return
dbCache
->
t
bCache
?
(
int32_t
)
taosHashGetSize
(
dbCache
->
tbCache
)
:
0
;
}
int32_t
ctgdGetStbNum
(
SCtgDBCache
*
dbCache
)
{
return
dbCache
->
table
.
stbCache
?
(
int32_t
)
taosHashGetSize
(
dbCache
->
table
.
stbCache
)
:
0
;
return
dbCache
->
stbCache
?
(
int32_t
)
taosHashGetSize
(
dbCache
->
stbCache
)
:
0
;
}
int32_t
ctgdGetRentNum
(
SCtgRentMgmt
*
rent
)
{
...
...
@@ -363,17 +363,17 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
dbFName
=
taosHashGetKey
(
pIter
,
&
len
);
int32_t
metaNum
=
dbCache
->
t
able
.
tbCache
?
taosHashGetSize
(
dbCache
->
table
.
tbCache
)
:
0
;
int32_t
stbNum
=
dbCache
->
table
.
stbCache
?
taosHashGetSize
(
dbCache
->
table
.
stbCache
)
:
0
;
int32_t
metaNum
=
dbCache
->
t
bCache
?
taosHashGetSize
(
dbCache
->
tbCache
)
:
0
;
int32_t
stbNum
=
dbCache
->
stbCache
?
taosHashGetSize
(
dbCache
->
stbCache
)
:
0
;
int32_t
vgVersion
=
CTG_DEFAULT_INVALID_VERSION
;
int32_t
hashMethod
=
-
1
;
int32_t
vgNum
=
0
;
if
(
dbCache
->
vgInfo
)
{
vgVersion
=
dbCache
->
vgInfo
->
vgVersion
;
hashMethod
=
dbCache
->
vgInfo
->
hashMethod
;
if
(
dbCache
->
vgInfo
->
vgHash
)
{
vgNum
=
taosHashGetSize
(
dbCache
->
vgInfo
->
vgHash
);
if
(
dbCache
->
vg
Cache
.
vg
Info
)
{
vgVersion
=
dbCache
->
vg
Cache
.
vg
Info
->
vgVersion
;
hashMethod
=
dbCache
->
vg
Cache
.
vg
Info
->
hashMethod
;
if
(
dbCache
->
vg
Cache
.
vg
Info
->
vgHash
)
{
vgNum
=
taosHashGetSize
(
dbCache
->
vg
Cache
.
vg
Info
->
vgHash
);
}
}
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
7a78d812
...
...
@@ -44,6 +44,16 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
}
}
void
ctgFreeSTableIndex
(
void
*
info
)
{
if
(
NULL
==
info
)
{
return
;
}
STableIndex
*
pInfo
=
(
STableIndex
*
)
info
;
taosArrayDestroyEx
(
pInfo
->
pIndex
,
tFreeSTableIndexInfo
);
}
void
ctgFreeSMetaData
(
SMetaData
*
pData
)
{
taosArrayDestroy
(
pData
->
pTableMeta
);
pData
->
pTableMeta
=
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录