Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
021fcf2a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
021fcf2a
编写于
7月 14, 2023
作者:
W
wade zhang
提交者:
GitHub
7月 14, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #22059 from taosdata/fix/TD-25215
fix/TD-25215: ttlMgrDeleteTtl should ignore ttl 0 tables
上级
1a19d29e
a39a9428
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
68 addition
and
41 deletion
+68
-41
source/dnode/vnode/src/inc/metaTtl.h
source/dnode/vnode/src/inc/metaTtl.h
+4
-1
source/dnode/vnode/src/meta/metaOpen.c
source/dnode/vnode/src/meta/metaOpen.c
+3
-1
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+8
-1
source/dnode/vnode/src/meta/metaTtl.c
source/dnode/vnode/src/meta/metaTtl.c
+53
-38
未找到文件。
source/dnode/vnode/src/inc/metaTtl.h
浏览文件 @
021fcf2a
...
@@ -38,6 +38,8 @@ typedef struct STtlManger {
...
@@ -38,6 +38,8 @@ typedef struct STtlManger {
SHashObj
*
pTtlCache
;
// key: tuid, value: {ttl, ctime}
SHashObj
*
pTtlCache
;
// key: tuid, value: {ttl, ctime}
SHashObj
*
pDirtyUids
;
// dirty tuid
SHashObj
*
pDirtyUids
;
// dirty tuid
TTB
*
pTtlIdx
;
// btree<{deleteTime, tuid}, ttl>
TTB
*
pTtlIdx
;
// btree<{deleteTime, tuid}, ttl>
char
*
logPrefix
;
}
STtlManger
;
}
STtlManger
;
typedef
struct
{
typedef
struct
{
...
@@ -77,9 +79,10 @@ typedef struct {
...
@@ -77,9 +79,10 @@ typedef struct {
typedef
struct
{
typedef
struct
{
tb_uid_t
uid
;
tb_uid_t
uid
;
TXN
*
pTxn
;
TXN
*
pTxn
;
int64_t
ttlDays
;
}
STtlDelTtlCtx
;
}
STtlDelTtlCtx
;
int
ttlMgrOpen
(
STtlManger
**
ppTtlMgr
,
TDB
*
pEnv
,
int8_t
rollback
);
int
ttlMgrOpen
(
STtlManger
**
ppTtlMgr
,
TDB
*
pEnv
,
int8_t
rollback
,
const
char
*
logPrefix
);
void
ttlMgrClose
(
STtlManger
*
pTtlMgr
);
void
ttlMgrClose
(
STtlManger
*
pTtlMgr
);
int
ttlMgrPostOpen
(
STtlManger
*
pTtlMgr
,
void
*
pMeta
);
int
ttlMgrPostOpen
(
STtlManger
*
pTtlMgr
,
void
*
pMeta
);
...
...
source/dnode/vnode/src/meta/metaOpen.c
浏览文件 @
021fcf2a
...
@@ -128,7 +128,9 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
...
@@ -128,7 +128,9 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
}
}
// open pTtlMgr ("ttlv1.idx")
// open pTtlMgr ("ttlv1.idx")
ret
=
ttlMgrOpen
(
&
pMeta
->
pTtlMgr
,
pMeta
->
pEnv
,
0
);
char
logPrefix
[
128
]
=
{
0
};
sprintf
(
logPrefix
,
"vgId:%d"
,
TD_VID
(
pVnode
));
ret
=
ttlMgrOpen
(
&
pMeta
->
pTtlMgr
,
pMeta
->
pEnv
,
0
,
logPrefix
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta ttl index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
metaError
(
"vgId:%d, failed to open meta ttl index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
goto
_err
;
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
021fcf2a
...
@@ -974,7 +974,15 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
...
@@ -974,7 +974,15 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
}
}
static
int
metaDeleteTtl
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
static
int
metaDeleteTtl
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
if
(
pME
->
type
!=
TSDB_CHILD_TABLE
&&
pME
->
type
!=
TSDB_NORMAL_TABLE
)
return
0
;
STtlDelTtlCtx
ctx
=
{.
uid
=
pME
->
uid
,
.
pTxn
=
pMeta
->
txn
};
STtlDelTtlCtx
ctx
=
{.
uid
=
pME
->
uid
,
.
pTxn
=
pMeta
->
txn
};
if
(
pME
->
type
==
TSDB_CHILD_TABLE
)
{
ctx
.
ttlDays
=
pME
->
ctbEntry
.
ttlDays
;
}
else
{
ctx
.
ttlDays
=
pME
->
ntbEntry
.
ttlDays
;
}
return
ttlMgrDeleteTtl
(
pMeta
->
pTtlMgr
,
&
ctx
);
return
ttlMgrDeleteTtl
(
pMeta
->
pTtlMgr
,
&
ctx
);
}
}
...
@@ -1968,7 +1976,6 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
...
@@ -1968,7 +1976,6 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
if
(
pME
->
type
!=
TSDB_CHILD_TABLE
&&
pME
->
type
!=
TSDB_NORMAL_TABLE
)
return
0
;
if
(
pME
->
type
!=
TSDB_CHILD_TABLE
&&
pME
->
type
!=
TSDB_NORMAL_TABLE
)
return
0
;
STtlUpdTtlCtx
ctx
=
{.
uid
=
pME
->
uid
};
STtlUpdTtlCtx
ctx
=
{.
uid
=
pME
->
uid
};
if
(
pME
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
pME
->
type
==
TSDB_CHILD_TABLE
)
{
ctx
.
ttlDays
=
pME
->
ctbEntry
.
ttlDays
;
ctx
.
ttlDays
=
pME
->
ctbEntry
.
ttlDays
;
ctx
.
changeTimeMs
=
pME
->
ctbEntry
.
btime
;
ctx
.
changeTimeMs
=
pME
->
ctbEntry
.
btime
;
...
...
source/dnode/vnode/src/meta/metaTtl.c
浏览文件 @
021fcf2a
...
@@ -39,7 +39,7 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr);
...
@@ -39,7 +39,7 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr);
const
char
*
ttlTbname
=
"ttl.idx"
;
const
char
*
ttlTbname
=
"ttl.idx"
;
const
char
*
ttlV1Tbname
=
"ttlv1.idx"
;
const
char
*
ttlV1Tbname
=
"ttlv1.idx"
;
int
ttlMgrOpen
(
STtlManger
**
ppTtlMgr
,
TDB
*
pEnv
,
int8_t
rollback
)
{
int
ttlMgrOpen
(
STtlManger
**
ppTtlMgr
,
TDB
*
pEnv
,
int8_t
rollback
,
const
char
*
logPrefix
)
{
int
ret
=
TSDB_CODE_SUCCESS
;
int
ret
=
TSDB_CODE_SUCCESS
;
int64_t
startNs
=
taosGetTimestampNs
();
int64_t
startNs
=
taosGetTimestampNs
();
...
@@ -48,9 +48,17 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
...
@@ -48,9 +48,17 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
STtlManger
*
pTtlMgr
=
(
STtlManger
*
)
tdbOsCalloc
(
1
,
sizeof
(
*
pTtlMgr
));
STtlManger
*
pTtlMgr
=
(
STtlManger
*
)
tdbOsCalloc
(
1
,
sizeof
(
*
pTtlMgr
));
if
(
pTtlMgr
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
if
(
pTtlMgr
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
char
*
logBuffer
=
(
char
*
)
tdbOsCalloc
(
1
,
strlen
(
logPrefix
)
+
1
);
if
(
logBuffer
==
NULL
)
{
tdbOsFree
(
pTtlMgr
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
strcpy
(
logBuffer
,
logPrefix
);
pTtlMgr
->
logPrefix
=
logBuffer
;
ret
=
tdbTbOpen
(
ttlV1Tbname
,
TDB_VARIANT_LEN
,
TDB_VARIANT_LEN
,
ttlIdxKeyV1Cmpr
,
pEnv
,
&
pTtlMgr
->
pTtlIdx
,
rollback
);
ret
=
tdbTbOpen
(
ttlV1Tbname
,
TDB_VARIANT_LEN
,
TDB_VARIANT_LEN
,
ttlIdxKeyV1Cmpr
,
pEnv
,
&
pTtlMgr
->
pTtlIdx
,
rollback
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
failed to open %s since %s"
,
ttlV1Tbname
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to open %s since %s"
,
pTtlMgr
->
logPrefix
,
ttlV1Tbname
,
tstrerror
(
terrno
));
tdbOsFree
(
pTtlMgr
);
tdbOsFree
(
pTtlMgr
);
return
ret
;
return
ret
;
}
}
...
@@ -62,14 +70,14 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
...
@@ -62,14 +70,14 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
ret
=
ttlMgrFillCache
(
pTtlMgr
);
ret
=
ttlMgrFillCache
(
pTtlMgr
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
failed to fill hash since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to fill hash since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
ttlMgrCleanup
(
pTtlMgr
);
ttlMgrCleanup
(
pTtlMgr
);
return
ret
;
return
ret
;
}
}
int64_t
endNs
=
taosGetTimestampNs
();
int64_t
endNs
=
taosGetTimestampNs
();
metaInfo
(
"
ttl mgr open end, hash size: %d, time consumed: %"
PRId64
" ns"
,
taosHashGetSize
(
pTtlMgr
->
pTtlCache
)
,
metaInfo
(
"
%s, ttl mgr open end, hash size: %d, time consumed: %"
PRId64
" ns"
,
pTtlMgr
->
logPrefix
,
endNs
-
startNs
);
taosHashGetSize
(
pTtlMgr
->
pTtlCache
),
endNs
-
startNs
);
*
ppTtlMgr
=
pTtlMgr
;
*
ppTtlMgr
=
pTtlMgr
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -91,37 +99,37 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
...
@@ -91,37 +99,37 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
if
(
!
tdbTbExist
(
ttlTbname
,
meta
->
pEnv
))
return
TSDB_CODE_SUCCESS
;
if
(
!
tdbTbExist
(
ttlTbname
,
meta
->
pEnv
))
return
TSDB_CODE_SUCCESS
;
metaInfo
(
"
ttl mgr start upgrade"
);
metaInfo
(
"
%s, ttl mgr start upgrade"
,
pTtlMgr
->
logPrefix
);
int64_t
startNs
=
taosGetTimestampNs
();
int64_t
startNs
=
taosGetTimestampNs
();
ret
=
tdbTbOpen
(
ttlTbname
,
sizeof
(
STtlIdxKey
),
0
,
ttlIdxKeyCmpr
,
meta
->
pEnv
,
&
pTtlMgr
->
pOldTtlIdx
,
0
);
ret
=
tdbTbOpen
(
ttlTbname
,
sizeof
(
STtlIdxKey
),
0
,
ttlIdxKeyCmpr
,
meta
->
pEnv
,
&
pTtlMgr
->
pOldTtlIdx
,
0
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
failed to open %s index since %s"
,
ttlTbname
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to open %s index since %s"
,
pTtlMgr
->
logPrefix
,
ttlTbname
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
ret
=
ttlMgrConvert
(
pTtlMgr
->
pOldTtlIdx
,
pTtlMgr
->
pTtlIdx
,
pMeta
);
ret
=
ttlMgrConvert
(
pTtlMgr
->
pOldTtlIdx
,
pTtlMgr
->
pTtlIdx
,
pMeta
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
failed to convert ttl index since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to convert ttl index since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
ret
=
tdbTbDropByName
(
ttlTbname
,
meta
->
pEnv
,
meta
->
txn
);
ret
=
tdbTbDropByName
(
ttlTbname
,
meta
->
pEnv
,
meta
->
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
failed to drop old ttl index since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to drop old ttl index since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
ret
=
ttlMgrFillCache
(
pTtlMgr
);
ret
=
ttlMgrFillCache
(
pTtlMgr
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
failed to fill hash since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to fill hash since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
int64_t
endNs
=
taosGetTimestampNs
();
int64_t
endNs
=
taosGetTimestampNs
();
metaInfo
(
"
ttl mgr upgrade end, hash size: %d, time consumed: %"
PRId64
" ns"
,
taosHashGetSize
(
pTtlMgr
->
pTtlCache
)
,
metaInfo
(
"
%s, ttl mgr upgrade end, hash size: %d, time consumed: %"
PRId64
" ns"
,
pTtlMgr
->
logPrefix
,
endNs
-
startNs
);
taosHashGetSize
(
pTtlMgr
->
pTtlCache
),
endNs
-
startNs
);
_out:
_out:
tdbTbClose
(
pTtlMgr
->
pOldTtlIdx
);
tdbTbClose
(
pTtlMgr
->
pOldTtlIdx
);
pTtlMgr
->
pOldTtlIdx
=
NULL
;
pTtlMgr
->
pOldTtlIdx
=
NULL
;
...
@@ -130,11 +138,12 @@ _out:
...
@@ -130,11 +138,12 @@ _out:
}
}
static
void
ttlMgrCleanup
(
STtlManger
*
pTtlMgr
)
{
static
void
ttlMgrCleanup
(
STtlManger
*
pTtlMgr
)
{
taosMemoryFree
(
pTtlMgr
->
logPrefix
);
taosHashCleanup
(
pTtlMgr
->
pTtlCache
);
taosHashCleanup
(
pTtlMgr
->
pTtlCache
);
taosHashCleanup
(
pTtlMgr
->
pDirtyUids
);
taosHashCleanup
(
pTtlMgr
->
pDirtyUids
);
tdbTbClose
(
pTtlMgr
->
pTtlIdx
);
tdbTbClose
(
pTtlMgr
->
pTtlIdx
);
taosThreadRwlockDestroy
(
&
pTtlMgr
->
lock
);
taosThreadRwlockDestroy
(
&
pTtlMgr
->
lock
);
t
dbOs
Free
(
pTtlMgr
);
t
aosMemory
Free
(
pTtlMgr
);
}
}
static
void
ttlMgrBuildKey
(
STtlIdxKeyV1
*
pTtlKey
,
int64_t
ttlDays
,
int64_t
changeTimeMs
,
tb_uid_t
uid
)
{
static
void
ttlMgrBuildKey
(
STtlIdxKeyV1
*
pTtlKey
,
int64_t
ttlDays
,
int64_t
changeTimeMs
,
tb_uid_t
uid
)
{
...
@@ -250,13 +259,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
...
@@ -250,13 +259,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
int
ret
=
taosHashPut
(
pTtlMgr
->
pTtlCache
,
&
updCtx
->
uid
,
sizeof
(
updCtx
->
uid
),
&
cacheEntry
,
sizeof
(
cacheEntry
));
int
ret
=
taosHashPut
(
pTtlMgr
->
pTtlCache
,
&
updCtx
->
uid
,
sizeof
(
updCtx
->
uid
),
&
cacheEntry
,
sizeof
(
cacheEntry
));
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr insert failed to update ttl cache since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr insert failed to update ttl cache since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
ret
=
taosHashPut
(
pTtlMgr
->
pDirtyUids
,
&
updCtx
->
uid
,
sizeof
(
updCtx
->
uid
),
&
dirtryEntry
,
sizeof
(
dirtryEntry
));
ret
=
taosHashPut
(
pTtlMgr
->
pDirtyUids
,
&
updCtx
->
uid
,
sizeof
(
updCtx
->
uid
),
&
dirtryEntry
,
sizeof
(
dirtryEntry
));
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr insert failed to update ttl dirty uids since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr insert failed to update ttl dirty uids since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
...
@@ -264,20 +273,21 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
...
@@ -264,20 +273,21 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
_out:
_out:
ttlMgrULock
(
pTtlMgr
);
ttlMgrULock
(
pTtlMgr
);
metaDebug
(
"
ttl mgr insert ttl, uid: %"
PRId64
", ctime: %"
PRId64
", ttlDays: %"
PRId64
,
updCtx
->
uid
,
metaDebug
(
"
%s, ttl mgr insert ttl, uid: %"
PRId64
", ctime: %"
PRId64
", ttlDays: %"
PRId64
,
pTtlMgr
->
logPrefix
,
updCtx
->
changeTimeMs
,
updCtx
->
ttlDays
);
updCtx
->
uid
,
updCtx
->
changeTimeMs
,
updCtx
->
ttlDays
);
return
ret
;
return
ret
;
}
}
int
ttlMgrDeleteTtl
(
STtlManger
*
pTtlMgr
,
const
STtlDelTtlCtx
*
delCtx
)
{
int
ttlMgrDeleteTtl
(
STtlManger
*
pTtlMgr
,
const
STtlDelTtlCtx
*
delCtx
)
{
if
(
delCtx
->
ttlDays
==
0
)
return
0
;
ttlMgrWLock
(
pTtlMgr
);
ttlMgrWLock
(
pTtlMgr
);
STtlDirtyEntry
dirtryEntry
=
{.
type
=
ENTRY_TYPE_DEL
};
STtlDirtyEntry
dirtryEntry
=
{.
type
=
ENTRY_TYPE_DEL
};
int
ret
=
taosHashPut
(
pTtlMgr
->
pDirtyUids
,
&
delCtx
->
uid
,
sizeof
(
delCtx
->
uid
),
&
dirtryEntry
,
sizeof
(
dirtryEntry
));
int
ret
=
taosHashPut
(
pTtlMgr
->
pDirtyUids
,
&
delCtx
->
uid
,
sizeof
(
delCtx
->
uid
),
&
dirtryEntry
,
sizeof
(
dirtryEntry
));
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr del failed to update ttl dirty uids since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr del failed to update ttl dirty uids since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
...
@@ -285,7 +295,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
...
@@ -285,7 +295,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
_out:
_out:
ttlMgrULock
(
pTtlMgr
);
ttlMgrULock
(
pTtlMgr
);
metaDebug
(
"
ttl mgr delete ttl, uid: %"
PRId64
,
delCtx
->
uid
);
metaDebug
(
"
%s, ttl mgr delete ttl, uid: %"
PRId64
,
pTtlMgr
->
logPrefix
,
delCtx
->
uid
);
return
ret
;
return
ret
;
}
}
...
@@ -293,6 +303,8 @@ _out:
...
@@ -293,6 +303,8 @@ _out:
int
ttlMgrUpdateChangeTime
(
STtlManger
*
pTtlMgr
,
const
STtlUpdCtimeCtx
*
pUpdCtimeCtx
)
{
int
ttlMgrUpdateChangeTime
(
STtlManger
*
pTtlMgr
,
const
STtlUpdCtimeCtx
*
pUpdCtimeCtx
)
{
ttlMgrWLock
(
pTtlMgr
);
ttlMgrWLock
(
pTtlMgr
);
int
ret
=
0
;
STtlCacheEntry
*
oldData
=
taosHashGet
(
pTtlMgr
->
pTtlCache
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
));
STtlCacheEntry
*
oldData
=
taosHashGet
(
pTtlMgr
->
pTtlCache
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
));
if
(
oldData
==
NULL
)
{
if
(
oldData
==
NULL
)
{
goto
_out
;
goto
_out
;
...
@@ -301,17 +313,17 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
...
@@ -301,17 +313,17 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
STtlCacheEntry
cacheEntry
=
{.
ttlDays
=
oldData
->
ttlDays
,
.
changeTimeMs
=
pUpdCtimeCtx
->
changeTimeMs
};
STtlCacheEntry
cacheEntry
=
{.
ttlDays
=
oldData
->
ttlDays
,
.
changeTimeMs
=
pUpdCtimeCtx
->
changeTimeMs
};
STtlDirtyEntry
dirtryEntry
=
{.
type
=
ENTRY_TYPE_UPSERT
};
STtlDirtyEntry
dirtryEntry
=
{.
type
=
ENTRY_TYPE_UPSERT
};
int
ret
=
ret
=
taosHashPut
(
pTtlMgr
->
pTtlCache
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
),
&
cacheEntry
,
sizeof
(
cacheEntry
));
taosHashPut
(
pTtlMgr
->
pTtlCache
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
),
&
cacheEntry
,
sizeof
(
cacheEntry
));
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr update ctime failed to update ttl cache since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr update ctime failed to update ttl cache since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
ret
=
taosHashPut
(
pTtlMgr
->
pDirtyUids
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
),
&
dirtryEntry
,
ret
=
taosHashPut
(
pTtlMgr
->
pDirtyUids
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
),
&
dirtryEntry
,
sizeof
(
dirtryEntry
));
sizeof
(
dirtryEntry
));
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"ttlMgr update ctime failed to update ttl dirty uids since %s"
,
tstrerror
(
terrno
));
metaError
(
"%s, ttlMgr update ctime failed to update ttl dirty uids since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
...
@@ -319,7 +331,8 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
...
@@ -319,7 +331,8 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
_out:
_out:
ttlMgrULock
(
pTtlMgr
);
ttlMgrULock
(
pTtlMgr
);
metaDebug
(
"ttl mgr update ctime, uid: %"
PRId64
", ctime: %"
PRId64
,
pUpdCtimeCtx
->
uid
,
pUpdCtimeCtx
->
changeTimeMs
);
metaDebug
(
"%s, ttl mgr update ctime, uid: %"
PRId64
", ctime: %"
PRId64
,
pTtlMgr
->
logPrefix
,
pUpdCtimeCtx
->
uid
,
pUpdCtimeCtx
->
changeTimeMs
);
return
ret
;
return
ret
;
}
}
...
@@ -366,7 +379,7 @@ _out:
...
@@ -366,7 +379,7 @@ _out:
int
ttlMgrFlush
(
STtlManger
*
pTtlMgr
,
TXN
*
pTxn
)
{
int
ttlMgrFlush
(
STtlManger
*
pTtlMgr
,
TXN
*
pTxn
)
{
ttlMgrWLock
(
pTtlMgr
);
ttlMgrWLock
(
pTtlMgr
);
metaInfo
(
"
ttl mgr flush start."
);
metaInfo
(
"
%s, ttl mgr flush start. dirty uids:%d"
,
pTtlMgr
->
logPrefix
,
taosHashGetSize
(
pTtlMgr
->
pDirtyUids
)
);
int
ret
=
-
1
;
int
ret
=
-
1
;
...
@@ -377,9 +390,9 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
...
@@ -377,9 +390,9 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
STtlCacheEntry
*
cacheEntry
=
taosHashGet
(
pTtlMgr
->
pTtlCache
,
pUid
,
sizeof
(
*
pUid
));
STtlCacheEntry
*
cacheEntry
=
taosHashGet
(
pTtlMgr
->
pTtlCache
,
pUid
,
sizeof
(
*
pUid
));
if
(
cacheEntry
==
NULL
)
{
if
(
cacheEntry
==
NULL
)
{
metaError
(
"
ttlMgr flush failed to get ttl cache since %s, uid: %"
PRId64
", type: %d"
,
tstrerror
(
terrno
),
*
pUid
,
metaError
(
"
%s, ttlMgr flush failed to get ttl cache since %s, uid: %"
PRId64
", type: %d"
,
pTtlMgr
->
logPrefix
,
pEntry
->
type
);
tstrerror
(
terrno
),
*
pUid
,
pEntry
->
type
);
goto
_out
;
continue
;
}
}
STtlIdxKeyV1
ttlKey
;
STtlIdxKeyV1
ttlKey
;
...
@@ -389,27 +402,29 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
...
@@ -389,27 +402,29 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ret
=
tdbTbUpsert
(
pTtlMgr
->
pTtlIdx
,
&
ttlKey
,
sizeof
(
ttlKey
),
&
cacheEntry
->
ttlDays
,
sizeof
(
cacheEntry
->
ttlDays
),
ret
=
tdbTbUpsert
(
pTtlMgr
->
pTtlIdx
,
&
ttlKey
,
sizeof
(
ttlKey
),
&
cacheEntry
->
ttlDays
,
sizeof
(
cacheEntry
->
ttlDays
),
pTxn
);
pTxn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr flush failed to flush ttl cache upsert since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr flush failed to flush ttl cache upsert since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
}
else
if
(
pEntry
->
type
==
ENTRY_TYPE_DEL
)
{
}
else
if
(
pEntry
->
type
==
ENTRY_TYPE_DEL
)
{
ret
=
tdbTbDelete
(
pTtlMgr
->
pTtlIdx
,
&
ttlKey
,
sizeof
(
ttlKey
),
pTxn
);
ret
=
tdbTbDelete
(
pTtlMgr
->
pTtlIdx
,
&
ttlKey
,
sizeof
(
ttlKey
),
pTxn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr flush failed to flush ttl cache del since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr flush failed to flush ttl cache del since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
ret
=
taosHashRemove
(
pTtlMgr
->
pTtlCache
,
pUid
,
sizeof
(
*
pUid
));
ret
=
taosHashRemove
(
pTtlMgr
->
pTtlCache
,
pUid
,
sizeof
(
*
pUid
));
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr flush failed to delete ttl cache since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr flush failed to delete ttl cache since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
goto
_out
;
}
}
}
else
{
}
else
{
metaError
(
"
ttlMgr flush failed to flush ttl cache, unknown type: %d"
,
pEntry
->
type
);
metaError
(
"
%s, ttlMgr flush failed to flush ttl cache, unknown type: %d"
,
pTtlMgr
->
logPrefix
,
pEntry
->
type
);
goto
_out
;
goto
_out
;
}
}
pIter
=
taosHashIterate
(
pTtlMgr
->
pDirtyUids
,
pIter
);
void
*
pIterTmp
=
pIter
;
pIter
=
taosHashIterate
(
pTtlMgr
->
pDirtyUids
,
pIterTmp
);
taosHashRemove
(
pTtlMgr
->
pDirtyUids
,
pUid
,
sizeof
(
tb_uid_t
));
}
}
taosHashClear
(
pTtlMgr
->
pDirtyUids
);
taosHashClear
(
pTtlMgr
->
pDirtyUids
);
...
@@ -418,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
...
@@ -418,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
_out:
_out:
ttlMgrULock
(
pTtlMgr
);
ttlMgrULock
(
pTtlMgr
);
metaInfo
(
"
ttl mgr flush end."
);
metaInfo
(
"
%s, ttl mgr flush end."
,
pTtlMgr
->
logPrefix
);
return
ret
;
return
ret
;
}
}
...
@@ -426,7 +441,7 @@ _out:
...
@@ -426,7 +441,7 @@ _out:
static
int32_t
ttlMgrRLock
(
STtlManger
*
pTtlMgr
)
{
static
int32_t
ttlMgrRLock
(
STtlManger
*
pTtlMgr
)
{
int32_t
ret
=
0
;
int32_t
ret
=
0
;
metaTrace
(
"
ttlMgr rlock %p"
,
&
pTtlMgr
->
lock
);
metaTrace
(
"
%s, ttlMgr rlock %p"
,
pTtlMgr
->
logPrefix
,
&
pTtlMgr
->
lock
);
ret
=
taosThreadRwlockRdlock
(
&
pTtlMgr
->
lock
);
ret
=
taosThreadRwlockRdlock
(
&
pTtlMgr
->
lock
);
...
@@ -436,7 +451,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
...
@@ -436,7 +451,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
static
int32_t
ttlMgrWLock
(
STtlManger
*
pTtlMgr
)
{
static
int32_t
ttlMgrWLock
(
STtlManger
*
pTtlMgr
)
{
int32_t
ret
=
0
;
int32_t
ret
=
0
;
metaTrace
(
"
ttlMgr wlock %p"
,
&
pTtlMgr
->
lock
);
metaTrace
(
"
%s, ttlMgr wlock %p"
,
pTtlMgr
->
logPrefix
,
&
pTtlMgr
->
lock
);
ret
=
taosThreadRwlockWrlock
(
&
pTtlMgr
->
lock
);
ret
=
taosThreadRwlockWrlock
(
&
pTtlMgr
->
lock
);
...
@@ -446,7 +461,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
...
@@ -446,7 +461,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
static
int32_t
ttlMgrULock
(
STtlManger
*
pTtlMgr
)
{
static
int32_t
ttlMgrULock
(
STtlManger
*
pTtlMgr
)
{
int32_t
ret
=
0
;
int32_t
ret
=
0
;
metaTrace
(
"
ttlMgr ulock %p"
,
&
pTtlMgr
->
lock
);
metaTrace
(
"
%s, ttlMgr ulock %p"
,
pTtlMgr
->
logPrefix
,
&
pTtlMgr
->
lock
);
ret
=
taosThreadRwlockUnlock
(
&
pTtlMgr
->
lock
);
ret
=
taosThreadRwlockUnlock
(
&
pTtlMgr
->
lock
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录