Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0597f3b9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0597f3b9
编写于
6月 26, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: rsma resource release
上级
ef84dcbe
变更
18
显示空白变更内容
内联
并排
Showing
18 changed file
with
524 addition
and
196 deletion
+524
-196
source/common/src/tmsg.c
source/common/src/tmsg.c
+3
-7
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-0
source/dnode/vnode/src/inc/meta.h
source/dnode/vnode/src/inc/meta.h
+1
-0
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+20
-12
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+6
-0
source/dnode/vnode/src/meta/metaOpen.c
source/dnode/vnode/src/meta/metaOpen.c
+9
-0
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+64
-0
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+12
-0
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+84
-55
source/dnode/vnode/src/sma/smaOpen.c
source/dnode/vnode/src/sma/smaOpen.c
+15
-3
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+255
-110
source/dnode/vnode/src/sma/smaUtil.c
source/dnode/vnode/src/sma/smaUtil.c
+8
-3
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+24
-0
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+6
-1
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+4
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+9
-1
source/os/src/osFile.c
source/os/src/osFile.c
+2
-2
未找到文件。
source/common/src/tmsg.c
浏览文件 @
0597f3b9
...
@@ -3696,7 +3696,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
...
@@ -3696,7 +3696,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
isTsma
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
isTsma
)
<
0
)
return
-
1
;
if
(
pReq
->
isTsma
)
{
if
(
pReq
->
isTsma
)
{
if
(
tDecodeBinary
Alloc
(
&
decoder
,
&
pReq
->
pTsma
,
NULL
)
<
0
)
return
-
1
;
if
(
tDecodeBinary
(
&
decoder
,
(
uint8_t
**
)
&
pReq
->
pTsma
,
NULL
)
<
0
)
return
-
1
;
}
}
tEndDecode
(
&
decoder
);
tEndDecode
(
&
decoder
);
...
@@ -3707,9 +3707,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
...
@@ -3707,9 +3707,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
int32_t
tFreeSCreateVnodeReq
(
SCreateVnodeReq
*
pReq
)
{
int32_t
tFreeSCreateVnodeReq
(
SCreateVnodeReq
*
pReq
)
{
taosArrayDestroy
(
pReq
->
pRetensions
);
taosArrayDestroy
(
pReq
->
pRetensions
);
pReq
->
pRetensions
=
NULL
;
pReq
->
pRetensions
=
NULL
;
if
(
pReq
->
isTsma
)
{
taosMemoryFreeClear
(
pReq
->
pTsma
);
}
return
0
;
return
0
;
}
}
...
@@ -4747,9 +4744,8 @@ int32_t tDecodeSRSmaParam(SDecoder *pCoder, SRSmaParam *pRSmaParam) {
...
@@ -4747,9 +4744,8 @@ int32_t tDecodeSRSmaParam(SDecoder *pCoder, SRSmaParam *pRSmaParam) {
if
(
tDecodeI64v
(
pCoder
,
&
pRSmaParam
->
watermark
[
i
])
<
0
)
return
-
1
;
if
(
tDecodeI64v
(
pCoder
,
&
pRSmaParam
->
watermark
[
i
])
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pCoder
,
&
pRSmaParam
->
qmsgLen
[
i
])
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pCoder
,
&
pRSmaParam
->
qmsgLen
[
i
])
<
0
)
return
-
1
;
if
(
pRSmaParam
->
qmsgLen
[
i
]
>
0
)
{
if
(
pRSmaParam
->
qmsgLen
[
i
]
>
0
)
{
uint64_t
len
;
tDecoderMalloc
(
pCoder
,
pRSmaParam
->
qmsgLen
[
i
]);
if
(
tDecodeBinaryAlloc
(
pCoder
,
(
void
**
)
&
pRSmaParam
->
qmsg
[
i
],
&
len
)
<
0
)
if
(
tDecodeBinary
(
pCoder
,
(
uint8_t
**
)
&
pRSmaParam
->
qmsg
[
i
],
NULL
)
<
0
)
return
-
1
;
// qmsgLen contains len of '\0'
return
-
1
;
// qmsgLen contains len of '\0'
}
else
{
}
else
{
pRSmaParam
->
qmsg
[
i
]
=
NULL
;
pRSmaParam
->
qmsg
[
i
]
=
NULL
;
}
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
0597f3b9
...
@@ -125,6 +125,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo
...
@@ -125,6 +125,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
pReader
);
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
pReader
);
int32_t
tsdbGetAllTableList
(
SMeta
*
pMeta
,
uint64_t
uid
,
SArray
*
list
);
int32_t
tsdbGetAllTableList
(
SMeta
*
pMeta
,
uint64_t
uid
,
SArray
*
list
);
int32_t
tsdbGetCtbIdList
(
SMeta
*
pMeta
,
int64_t
suid
,
SArray
*
list
);
int32_t
tsdbGetCtbIdList
(
SMeta
*
pMeta
,
int64_t
suid
,
SArray
*
list
);
int32_t
tsdbGetStbIdList
(
SMeta
*
pMeta
,
int64_t
suid
,
SArray
*
list
);
void
*
tsdbGetIdx
(
SMeta
*
pMeta
);
void
*
tsdbGetIdx
(
SMeta
*
pMeta
);
void
*
tsdbGetIvtIdx
(
SMeta
*
pMeta
);
void
*
tsdbGetIvtIdx
(
SMeta
*
pMeta
);
int64_t
tsdbGetNumOfRowsInMemTable
(
tsdbReaderT
*
pHandle
);
int64_t
tsdbGetNumOfRowsInMemTable
(
tsdbReaderT
*
pHandle
);
...
...
source/dnode/vnode/src/inc/meta.h
浏览文件 @
0597f3b9
...
@@ -69,6 +69,7 @@ struct SMeta {
...
@@ -69,6 +69,7 @@ struct SMeta {
TTB
*
pUidIdx
;
TTB
*
pUidIdx
;
TTB
*
pNameIdx
;
TTB
*
pNameIdx
;
TTB
*
pCtbIdx
;
TTB
*
pCtbIdx
;
TTB
*
pSuidIdx
;
// ivt idx and idx
// ivt idx and idx
void
*
pTagIvtIdx
;
void
*
pTagIvtIdx
;
TTB
*
pTagIdx
;
TTB
*
pTagIdx
;
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
0597f3b9
...
@@ -60,8 +60,9 @@ struct SRSmaStat {
...
@@ -60,8 +60,9 @@ struct SRSmaStat {
SSma
*
pSma
;
SSma
*
pSma
;
void
*
tmrHandle
;
void
*
tmrHandle
;
tmr_h
tmrId
;
tmr_h
tmrId
;
int8_t
tmrStat
;
int32_t
tmrSeconds
;
int32_t
tmrSeconds
;
int8_t
triggerStat
;
int8_t
runningStat
;
SHashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
SHashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
};
};
...
@@ -74,11 +75,19 @@ struct SSmaStat {
...
@@ -74,11 +75,19 @@ struct SSmaStat {
};
};
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define SMA_RSMA_INFO_HASH(s) ((s)->rsmaStat.rsmaInfoHash)
#define SMA_RSMA_TMR_HANDLE(s) ((s)->rsmaStat.tmrHandle)
#define SMA_RSMA_TMR_STAT(s) ((s)->rsmaStat.tmrStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
#define RSMA_TMR_ID(r) ((r)->tmrId)
#define RSMA_TMR_HANDLE(r) ((r)->tmrHandle)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
enum
{
TASK_TRIGGER_STAT_INIT
=
0
,
TASK_TRIGGER_STAT_ACTIVE
=
1
,
TASK_TRIGGER_STAT_INACTIVE
=
2
,
TASK_TRIGGER_STAT_CANCELLED
=
3
,
TASK_TRIGGER_STAT_FINISHED
=
4
,
};
void
tdDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
);
void
tdDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
);
void
*
tdFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
);
void
*
tdFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
);
...
@@ -174,6 +183,8 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
...
@@ -174,6 +183,8 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t
tdProcessTSmaInsertImpl
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tdProcessTSmaInsertImpl
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tdProcessTSmaGetDaysImpl
(
SVnodeCfg
*
pCfg
,
void
*
pCont
,
uint32_t
contLen
,
int32_t
*
days
);
int32_t
tdProcessTSmaGetDaysImpl
(
SVnodeCfg
*
pCfg
,
void
*
pCont
,
uint32_t
contLen
,
int32_t
*
days
);
// smaFileUtil ================
typedef
struct
STFInfo
STFInfo
;
typedef
struct
STFInfo
STFInfo
;
typedef
struct
STFile
STFile
;
typedef
struct
STFile
STFile
;
...
@@ -181,7 +192,7 @@ struct STFInfo {
...
@@ -181,7 +192,7 @@ struct STFInfo {
uint32_t
magic
;
uint32_t
magic
;
uint32_t
ftype
;
uint32_t
ftype
;
uint32_t
fver
;
uint32_t
fver
;
uint64_t
fsize
;
int64_t
fsize
;
};
};
struct
STFile
{
struct
STFile
{
...
@@ -199,11 +210,8 @@ struct STFile {
...
@@ -199,11 +210,8 @@ struct STFile {
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf))
#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf))
#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL)
#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL)
#define TD_FILE_STATE(tf) ((tf)->state)
#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did)
#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did)
#define TD_FILE_IS_OK(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_OK)
#define TD_FILE_IS_BAD(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_BAD)
int32_t
tdInitTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
const
char
*
fname
);
int32_t
tdInitTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
const
char
*
fname
);
int32_t
tdCreateTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
bool
updateHeader
,
int8_t
fType
);
int32_t
tdCreateTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
bool
updateHeader
,
int8_t
fType
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
0597f3b9
...
@@ -76,6 +76,7 @@ void vnodeFree(void* p);
...
@@ -76,6 +76,7 @@ void vnodeFree(void* p);
// meta
// meta
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
SMStbCursor
SMStbCursor
;
typedef
struct
STbUidStore
STbUidStore
;
typedef
struct
STbUidStore
STbUidStore
;
int
metaOpen
(
SVnode
*
pVnode
,
SMeta
**
ppMeta
);
int
metaOpen
(
SVnode
*
pVnode
,
SMeta
**
ppMeta
);
...
@@ -97,6 +98,9 @@ int metaGetTbNum(SMeta* pMeta);
...
@@ -97,6 +98,9 @@ int metaGetTbNum(SMeta* pMeta);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCursor
(
SMCtbCursor
*
pCtbCur
);
void
metaCloseCtbCursor
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
SMStbCursor
*
metaOpenStbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseStbCursor
(
SMStbCursor
*
pStbCur
);
tb_uid_t
metaStbCursorNext
(
SMStbCursor
*
pStbCur
);
STSma
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
);
STSma
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
bool
deepCopy
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
bool
deepCopy
);
SArray
*
metaGetSmaIdsByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaIdsByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
...
@@ -158,6 +162,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
...
@@ -158,6 +162,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
// sma
// sma
int32_t
smaOpen
(
SVnode
*
pVnode
);
int32_t
smaOpen
(
SVnode
*
pVnode
);
int32_t
smaClose
(
SSma
*
pSma
);
int32_t
smaClose
(
SSma
*
pSma
);
int32_t
smaCloseEnv
(
SSma
*
pSma
);
int32_t
smaCloseEx
(
SSma
*
pSma
);
int32_t
tdProcessTSmaCreate
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
msg
);
int32_t
tdProcessTSmaCreate
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
msg
);
int32_t
tdProcessTSmaInsert
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tdProcessTSmaInsert
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
...
...
source/dnode/vnode/src/meta/metaOpen.c
浏览文件 @
0597f3b9
...
@@ -92,6 +92,13 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
...
@@ -92,6 +92,13 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto
_err
;
goto
_err
;
}
}
// open pSuidIdx
ret
=
tdbTbOpen
(
"suid.idx"
,
sizeof
(
tb_uid_t
),
0
,
uidIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pSuidIdx
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta super table index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open pTagIdx
// open pTagIdx
// TODO(yihaoDeng), refactor later
// TODO(yihaoDeng), refactor later
char
indexFullPath
[
128
]
=
{
0
};
char
indexFullPath
[
128
]
=
{
0
};
...
@@ -141,6 +148,7 @@ _err:
...
@@ -141,6 +148,7 @@ _err:
if
(
pMeta
->
pTagIvtIdx
)
indexClose
(
pMeta
->
pTagIvtIdx
);
if
(
pMeta
->
pTagIvtIdx
)
indexClose
(
pMeta
->
pTagIvtIdx
);
if
(
pMeta
->
pTagIdx
)
tdbTbClose
(
pMeta
->
pTagIdx
);
if
(
pMeta
->
pTagIdx
)
tdbTbClose
(
pMeta
->
pTagIdx
);
if
(
pMeta
->
pCtbIdx
)
tdbTbClose
(
pMeta
->
pCtbIdx
);
if
(
pMeta
->
pCtbIdx
)
tdbTbClose
(
pMeta
->
pCtbIdx
);
if
(
pMeta
->
pSuidIdx
)
tdbTbClose
(
pMeta
->
pSuidIdx
);
if
(
pMeta
->
pNameIdx
)
tdbTbClose
(
pMeta
->
pNameIdx
);
if
(
pMeta
->
pNameIdx
)
tdbTbClose
(
pMeta
->
pNameIdx
);
if
(
pMeta
->
pUidIdx
)
tdbTbClose
(
pMeta
->
pUidIdx
);
if
(
pMeta
->
pUidIdx
)
tdbTbClose
(
pMeta
->
pUidIdx
);
if
(
pMeta
->
pSkmDb
)
tdbTbClose
(
pMeta
->
pSkmDb
);
if
(
pMeta
->
pSkmDb
)
tdbTbClose
(
pMeta
->
pSkmDb
);
...
@@ -162,6 +170,7 @@ int metaClose(SMeta *pMeta) {
...
@@ -162,6 +170,7 @@ int metaClose(SMeta *pMeta) {
if
(
pMeta
->
pTagIdx
)
tdbTbClose
(
pMeta
->
pTagIdx
);
if
(
pMeta
->
pTagIdx
)
tdbTbClose
(
pMeta
->
pTagIdx
);
#endif
#endif
if
(
pMeta
->
pCtbIdx
)
tdbTbClose
(
pMeta
->
pCtbIdx
);
if
(
pMeta
->
pCtbIdx
)
tdbTbClose
(
pMeta
->
pCtbIdx
);
if
(
pMeta
->
pSuidIdx
)
tdbTbClose
(
pMeta
->
pSuidIdx
);
if
(
pMeta
->
pNameIdx
)
tdbTbClose
(
pMeta
->
pNameIdx
);
if
(
pMeta
->
pNameIdx
)
tdbTbClose
(
pMeta
->
pNameIdx
);
if
(
pMeta
->
pUidIdx
)
tdbTbClose
(
pMeta
->
pUidIdx
);
if
(
pMeta
->
pUidIdx
)
tdbTbClose
(
pMeta
->
pUidIdx
);
if
(
pMeta
->
pSkmDb
)
tdbTbClose
(
pMeta
->
pSkmDb
);
if
(
pMeta
->
pSkmDb
)
tdbTbClose
(
pMeta
->
pSkmDb
);
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
0597f3b9
...
@@ -325,6 +325,70 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
...
@@ -325,6 +325,70 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
return
pCtbIdxKey
->
uid
;
return
pCtbIdxKey
->
uid
;
}
}
struct
SMStbCursor
{
SMeta
*
pMeta
;
TBC
*
pCur
;
tb_uid_t
suid
;
void
*
pKey
;
void
*
pVal
;
int
kLen
;
int
vLen
;
};
SMStbCursor
*
metaOpenStbCursor
(
SMeta
*
pMeta
,
tb_uid_t
suid
)
{
SMStbCursor
*
pStbCur
=
NULL
;
int
ret
=
0
;
int
c
=
0
;
pStbCur
=
(
SMStbCursor
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pStbCur
));
if
(
pStbCur
==
NULL
)
{
return
NULL
;
}
pStbCur
->
pMeta
=
pMeta
;
pStbCur
->
suid
=
suid
;
metaRLock
(
pMeta
);
ret
=
tdbTbcOpen
(
pMeta
->
pSuidIdx
,
&
pStbCur
->
pCur
,
NULL
);
if
(
ret
<
0
)
{
metaULock
(
pMeta
);
taosMemoryFree
(
pStbCur
);
return
NULL
;
}
// move to the suid
tdbTbcMoveTo
(
pStbCur
->
pCur
,
&
suid
,
sizeof
(
suid
),
&
c
);
if
(
c
>
0
)
{
tdbTbcMoveToNext
(
pStbCur
->
pCur
);
}
return
pStbCur
;
}
void
metaCloseStbCursor
(
SMStbCursor
*
pStbCur
)
{
if
(
pStbCur
)
{
if
(
pStbCur
->
pMeta
)
metaULock
(
pStbCur
->
pMeta
);
if
(
pStbCur
->
pCur
)
{
tdbTbcClose
(
pStbCur
->
pCur
);
tdbFree
(
pStbCur
->
pKey
);
tdbFree
(
pStbCur
->
pVal
);
}
taosMemoryFree
(
pStbCur
);
}
}
tb_uid_t
metaStbCursorNext
(
SMStbCursor
*
pStbCur
)
{
int
ret
;
ret
=
tdbTbcNext
(
pStbCur
->
pCur
,
&
pStbCur
->
pKey
,
&
pStbCur
->
kLen
,
&
pStbCur
->
pVal
,
&
pStbCur
->
vLen
);
if
(
ret
<
0
)
{
return
0
;
}
return
*
(
tb_uid_t
*
)
pStbCur
->
pKey
;
}
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
)
{
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
)
{
// SMetaReader mr = {0};
// SMetaReader mr = {0};
STSchema
*
pTSchema
=
NULL
;
STSchema
*
pTSchema
=
NULL
;
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
0597f3b9
...
@@ -23,6 +23,7 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
...
@@ -23,6 +23,7 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
static
int
metaUpdateTtlIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaUpdateTtlIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaSaveToSkmDb
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaSaveToSkmDb
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaUpdateCtbIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaUpdateCtbIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaUpdateSuidIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaUpdateTagIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pCtbEntry
);
static
int
metaUpdateTagIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pCtbEntry
);
static
int
metaDropTableByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int
*
type
);
static
int
metaDropTableByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int
*
type
);
...
@@ -209,6 +210,7 @@ _drop_super_table:
...
@@ -209,6 +210,7 @@ _drop_super_table:
&
pMeta
->
txn
);
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pNameIdx
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pNameIdx
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pUidIdx
,
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pUidIdx
,
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pSuidIdx
,
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pMeta
->
txn
);
metaULock
(
pMeta
);
metaULock
(
pMeta
);
...
@@ -436,11 +438,13 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
...
@@ -436,11 +438,13 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
tdbTbDelete
(
pMeta
->
pUidIdx
,
&
uid
,
sizeof
(
uid
),
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pUidIdx
,
&
uid
,
sizeof
(
uid
),
&
pMeta
->
txn
);
if
(
e
.
type
!=
TSDB_SUPER_TABLE
)
metaDeleteTtlIdx
(
pMeta
,
&
e
);
if
(
e
.
type
!=
TSDB_SUPER_TABLE
)
metaDeleteTtlIdx
(
pMeta
,
&
e
);
if
(
e
.
type
==
TSDB_CHILD_TABLE
)
{
if
(
e
.
type
==
TSDB_CHILD_TABLE
)
{
tdbTbDelete
(
pMeta
->
pCtbIdx
,
&
(
SCtbIdxKey
){.
suid
=
e
.
ctbEntry
.
suid
,
.
uid
=
uid
},
sizeof
(
SCtbIdxKey
),
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pCtbIdx
,
&
(
SCtbIdxKey
){.
suid
=
e
.
ctbEntry
.
suid
,
.
uid
=
uid
},
sizeof
(
SCtbIdxKey
),
&
pMeta
->
txn
);
}
else
if
(
e
.
type
==
TSDB_NORMAL_TABLE
)
{
}
else
if
(
e
.
type
==
TSDB_NORMAL_TABLE
)
{
// drop schema.db (todo)
// drop schema.db (todo)
}
else
if
(
e
.
type
==
TSDB_SUPER_TABLE
)
{
}
else
if
(
e
.
type
==
TSDB_SUPER_TABLE
)
{
tdbTbDelete
(
pMeta
->
pSuidIdx
,
&
e
.
uid
,
sizeof
(
tb_uid_t
),
&
pMeta
->
txn
);
// drop schema.db (todo)
// drop schema.db (todo)
}
}
...
@@ -911,6 +915,10 @@ static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
...
@@ -911,6 +915,10 @@ static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return
tdbTbInsert
(
pMeta
->
pUidIdx
,
&
pME
->
uid
,
sizeof
(
tb_uid_t
),
&
pME
->
version
,
sizeof
(
int64_t
),
&
pMeta
->
txn
);
return
tdbTbInsert
(
pMeta
->
pUidIdx
,
&
pME
->
uid
,
sizeof
(
tb_uid_t
),
&
pME
->
version
,
sizeof
(
int64_t
),
&
pMeta
->
txn
);
}
}
static
int
metaUpdateSuidIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
return
tdbTbInsert
(
pMeta
->
pSuidIdx
,
&
pME
->
uid
,
sizeof
(
tb_uid_t
),
NULL
,
0
,
&
pMeta
->
txn
);
}
static
int
metaUpdateNameIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
static
int
metaUpdateNameIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
return
tdbTbInsert
(
pMeta
->
pNameIdx
,
pME
->
name
,
strlen
(
pME
->
name
)
+
1
,
&
pME
->
uid
,
sizeof
(
tb_uid_t
),
&
pMeta
->
txn
);
return
tdbTbInsert
(
pMeta
->
pNameIdx
,
pME
->
name
,
strlen
(
pME
->
name
)
+
1
,
&
pME
->
uid
,
sizeof
(
tb_uid_t
),
&
pMeta
->
txn
);
}
}
...
@@ -1081,6 +1089,10 @@ static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
...
@@ -1081,6 +1089,10 @@ static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
}
else
{
}
else
{
// update schema.db
// update schema.db
if
(
metaSaveToSkmDb
(
pMeta
,
pME
)
<
0
)
goto
_err
;
if
(
metaSaveToSkmDb
(
pMeta
,
pME
)
<
0
)
goto
_err
;
if
(
pME
->
type
==
TSDB_SUPER_TABLE
)
{
if
(
metaUpdateSuidIdx
(
pMeta
,
pME
)
<
0
)
goto
_err
;
}
}
}
if
(
pME
->
type
!=
TSDB_SUPER_TABLE
)
{
if
(
pME
->
type
!=
TSDB_SUPER_TABLE
)
{
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
0597f3b9
...
@@ -126,22 +126,21 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
...
@@ -126,22 +126,21 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
}
}
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
SMA_RSMA_STAT
(
*
pSmaStat
)
->
pSma
=
(
SSma
*
)
pSma
;
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)(
*
pSmaStat
);
pRSmaStat
->
pSma
=
(
SSma
*
)
pSma
;
// init timer
// init timer
SMA_RSMA_TMR_HANDLE
(
*
pSmaStat
)
=
taosTmrInit
(
10000
,
100
,
10000
,
"RSMA_G
"
);
RSMA_TMR_HANDLE
(
pRSmaStat
)
=
taosTmrInit
(
10000
,
100
,
10000
,
"RSMA
"
);
if
(
!
SMA_RSMA_TMR_HANDLE
(
*
p
SmaStat
))
{
if
(
!
RSMA_TMR_HANDLE
(
pR
SmaStat
))
{
taosMemoryFreeClear
(
*
pSmaStat
);
taosMemoryFreeClear
(
*
pSmaStat
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
atomic_store_8
(
&
SMA_RSMA_TMR_STAT
(
*
pSmaStat
),
TASK_TRIGGER_STATUS__ACTIVE
);
// init hash
// init hash
SMA_RSMA_INFO_HASH
(
*
p
SmaStat
)
=
taosHashInit
(
RSMA_INFO_HASH
(
pR
SmaStat
)
=
taosHashInit
(
RSMA_TASK_INFO_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
RSMA_TASK_INFO_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
!
SMA_RSMA_INFO_HASH
(
*
p
SmaStat
))
{
if
(
!
RSMA_INFO_HASH
(
pR
SmaStat
))
{
if
(
SMA_RSMA_TMR_HANDLE
(
*
p
SmaStat
))
{
if
(
RSMA_TMR_HANDLE
(
pR
SmaStat
))
{
taosTmrCleanUp
(
SMA_RSMA_TMR_HANDLE
(
*
p
SmaStat
));
taosTmrCleanUp
(
RSMA_TMR_HANDLE
(
pR
SmaStat
));
}
}
taosMemoryFreeClear
(
*
pSmaStat
);
taosMemoryFreeClear
(
*
pSmaStat
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
...
@@ -155,12 +154,79 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
...
@@ -155,12 +154,79 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
void
*
tdFree
TSmaStat
(
STSmaStat
*
pStat
)
{
static
void
tdDestroy
TSmaStat
(
STSmaStat
*
pStat
)
{
if
(
pStat
)
{
if
(
pStat
)
{
tDestroyTSma
(
pStat
->
pTSma
);
tDestroyTSma
(
pStat
->
pTSma
);
taosMemoryFreeClear
(
pStat
->
pTSma
);
taosMemoryFreeClear
(
pStat
->
pTSma
);
taosMemoryFreeClear
(
pStat
->
pTSchema
);
}
}
static
void
*
tdFreeTSmaStat
(
STSmaStat
*
pStat
)
{
tdDestroyTSmaStat
(
pStat
);
taosMemoryFreeClear
(
pStat
);
taosMemoryFreeClear
(
pStat
);
return
NULL
;
}
static
void
tdDestroyRSmaStat
(
SRSmaStat
*
pStat
)
{
if
(
pStat
)
{
smaDebug
(
"vgId:%d, %s:%d free rsma stat"
,
SMA_VID
(
pStat
->
pSma
),
__func__
,
__LINE__
);
// step 1: set persistence task cancelled
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pStat
),
TASK_TRIGGER_STAT_CANCELLED
);
// step 2: clean timer
taosTmrStopA
(
&
RSMA_TMR_ID
(
pStat
));
if
(
RSMA_TMR_HANDLE
(
pStat
))
{
taosTmrCleanUp
(
RSMA_TMR_HANDLE
(
pStat
));
}
// step 3: wait the persistence thread to finish
int32_t
nLoops
=
0
;
if
(
atomic_load_8
(
RSMA_RUNNING_STAT
(
pStat
))
==
1
)
{
while
(
1
)
{
if
(
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
))
==
TASK_TRIGGER_STAT_FINISHED
)
{
break
;
}
else
{
smaDebug
(
"not destroyed since rsma stat in %"
PRIi8
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
)));
}
++
nLoops
;
if
(
nLoops
>
1000
)
{
sched_yield
();
nLoops
=
0
;
}
taosMsleep
(
1000
);
// TODO: remove this line when release
}
}
// step 4: destroy the rsma info and associated fetch tasks
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
void
*
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pStat
),
NULL
);
while
(
infoHash
)
{
SRSmaInfo
*
pSmaInfo
=
*
(
SRSmaInfo
**
)
infoHash
;
tdFreeRSmaInfo
(
pSmaInfo
);
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pStat
),
infoHash
);
}
taosHashCleanup
(
RSMA_INFO_HASH
(
pStat
));
// step 5: wait all triggered fetch tasks finished
nLoops
=
0
;
while
(
1
)
{
if
(
T_REF_VAL_GET
((
SSmaStat
*
)
pStat
)
==
0
)
{
break
;
}
}
++
nLoops
;
if
(
nLoops
>
1000
)
{
sched_yield
();
nLoops
=
0
;
}
taosMsleep
(
1000
);
// TODO: remove this line when release
}
}
}
static
void
*
tdFreeRSmaStat
(
SRSmaStat
*
pStat
)
{
tdDestroyRSmaStat
(
pStat
);
taosMemoryFreeClear
(
pStat
);
return
NULL
;
return
NULL
;
}
}
...
@@ -179,22 +245,16 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
...
@@ -179,22 +245,16 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
if
(
pSmaStat
)
{
if
(
pSmaStat
)
{
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
tdFreeTSmaStat
(
&
pSmaStat
->
tsmaStat
);
smaDebug
(
"%s:%d destroy tsma stat"
,
__func__
,
__LINE__
);
tdDestroyTSmaStat
(
SMA_TSMA_STAT
(
pSmaStat
));
}
else
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
}
else
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
if
(
SMA_RSMA_TMR_HANDLE
(
pSmaStat
))
{
smaDebug
(
"%s:%d destroy rsma stat"
,
__func__
,
__LINE__
);
taosTmrCleanUp
(
SMA_RSMA_TMR_HANDLE
(
pSmaStat
));
tdDestroyRSmaStat
(
SMA_RSMA_STAT
(
pSmaStat
));
}
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
void
*
infoHash
=
taosHashIterate
(
SMA_RSMA_INFO_HASH
(
pSmaStat
),
NULL
);
while
(
infoHash
)
{
SRSmaInfo
*
pInfoHash
=
*
(
SRSmaInfo
**
)
infoHash
;
tdFreeRSmaInfo
(
pInfoHash
);
infoHash
=
taosHashIterate
(
SMA_RSMA_INFO_HASH
(
pSmaStat
),
infoHash
);
}
taosHashCleanup
(
SMA_RSMA_INFO_HASH
(
pSmaStat
));
}
else
{
}
else
{
ASSERT
(
0
);
ASSERT
(
0
);
}
}
}
else
{
smaDebug
(
"%s:%d no need to destroy rsma stat"
,
__func__
,
__LINE__
);
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -260,34 +320,3 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
...
@@ -260,34 +320,3 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
};
};
\ No newline at end of file
int32_t
smaTimerInit
(
void
**
timer
,
int8_t
*
initFlag
,
const
char
*
label
)
{
int8_t
old
;
while
(
1
)
{
old
=
atomic_val_compare_exchange_8
(
initFlag
,
0
,
2
);
if
(
old
!=
2
)
break
;
}
if
(
old
==
0
)
{
*
timer
=
taosTmrInit
(
10000
,
100
,
10000
,
label
);
if
(
!
(
*
timer
))
{
atomic_store_8
(
initFlag
,
0
);
return
-
1
;
}
atomic_store_8
(
initFlag
,
1
);
}
return
0
;
}
void
smaTimerCleanUp
(
void
*
timer
,
int8_t
*
initFlag
)
{
int8_t
old
;
while
(
1
)
{
old
=
atomic_val_compare_exchange_8
(
initFlag
,
1
,
2
);
if
(
old
!=
2
)
break
;
}
if
(
old
==
1
)
{
taosTmrCleanUp
(
timer
);
atomic_store_8
(
initFlag
,
0
);
}
}
\ No newline at end of file
source/dnode/vnode/src/sma/smaOpen.c
浏览文件 @
0597f3b9
...
@@ -126,19 +126,31 @@ _err:
...
@@ -126,19 +126,31 @@ _err:
return
-
1
;
return
-
1
;
}
}
int32_t
smaClose
(
SSma
*
pSma
)
{
int32_t
smaCloseEnv
(
SSma
*
pSma
)
{
if
(
pSma
)
{
SMA_TSMA_ENV
(
pSma
)
=
tdFreeSmaEnv
(
SMA_TSMA_ENV
(
pSma
));
SMA_RSMA_ENV
(
pSma
)
=
tdFreeSmaEnv
(
SMA_RSMA_ENV
(
pSma
));
}
return
0
;
}
int32_t
smaCloseEx
(
SSma
*
pSma
)
{
if
(
pSma
)
{
if
(
pSma
)
{
taosThreadMutexDestroy
(
&
pSma
->
mutex
);
taosThreadMutexDestroy
(
&
pSma
->
mutex
);
if
SMA_RSMA_TSDB0
(
pSma
)
tsdbClose
(
&
SMA_RSMA_TSDB0
(
pSma
));
if
SMA_RSMA_TSDB0
(
pSma
)
tsdbClose
(
&
SMA_RSMA_TSDB0
(
pSma
));
if
SMA_RSMA_TSDB1
(
pSma
)
tsdbClose
(
&
SMA_RSMA_TSDB1
(
pSma
));
if
SMA_RSMA_TSDB1
(
pSma
)
tsdbClose
(
&
SMA_RSMA_TSDB1
(
pSma
));
if
SMA_RSMA_TSDB2
(
pSma
)
tsdbClose
(
&
SMA_RSMA_TSDB2
(
pSma
));
if
SMA_RSMA_TSDB2
(
pSma
)
tsdbClose
(
&
SMA_RSMA_TSDB2
(
pSma
));
// SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
// SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
taosMemoryFreeClear
(
pSma
);
taosMemoryFreeClear
(
pSma
);
}
}
return
0
;
return
0
;
}
}
int32_t
smaClose
(
SSma
*
pSma
)
{
smaCloseEnv
(
pSma
);
smaCloseEx
(
pSma
);
return
0
;
}
/**
/**
* @brief rsma env restore
* @brief rsma env restore
*
*
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
0597f3b9
...
@@ -14,8 +14,8 @@
...
@@ -14,8 +14,8 @@
*/
*/
#include "sma.h"
#include "sma.h"
#include "tstream.h"
#define RSMA_QTASK_PERSIST_MS 7200000
typedef
enum
{
TD_QTASK_TMP_FILE
=
0
,
TD_QTASK_CUR_FILE
}
TD_QTASK_FILE_T
;
typedef
enum
{
TD_QTASK_TMP_FILE
=
0
,
TD_QTASK_CUR_FILE
}
TD_QTASK_FILE_T
;
static
const
char
*
tdQTaskInfoFname
[]
=
{
"qtaskinfo.t"
,
"qtaskinfo"
};
static
const
char
*
tdQTaskInfoFname
[]
=
{
"qtaskinfo.t"
,
"qtaskinfo"
};
...
@@ -31,13 +31,13 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId);
...
@@ -31,13 +31,13 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId);
struct
SRSmaInfoItem
{
struct
SRSmaInfoItem
{
SRSmaInfo
*
pRsmaInfo
;
SRSmaInfo
*
pRsmaInfo
;
void
*
taskInfo
;
// qTaskInfo_t
void
*
taskInfo
;
// qTaskInfo_t
void
*
tmrHandle
;
tmr_h
tmrId
;
tmr_h
tmrId
;
int8_t
level
;
int8_t
level
;
int8_t
tmrInitFlag
;
int8_t
tmrInitFlag
;
int8_t
triggerStat
us
;
// TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE
int8_t
triggerStat
;
int32_t
maxDelay
;
int32_t
maxDelay
;
};
};
struct
SRSmaInfo
{
struct
SRSmaInfo
{
STSchema
*
pTSchema
;
STSchema
*
pTSchema
;
SSma
*
pSma
;
SSma
*
pSma
;
...
@@ -45,11 +45,14 @@ struct SRSmaInfo {
...
@@ -45,11 +45,14 @@ struct SRSmaInfo {
SRSmaInfoItem
items
[
TSDB_RETENTION_L2
];
SRSmaInfoItem
items
[
TSDB_RETENTION_L2
];
};
};
static
FORCE_INLINE
void
tdFreeTaskHandle
(
qTaskInfo_t
*
taskHandle
)
{
static
FORCE_INLINE
void
tdFreeTaskHandle
(
qTaskInfo_t
*
taskHandle
,
int32_t
vgId
,
int32_t
level
)
{
// Note: free/kill may in RC
// Note: free/kill may in RC
qTaskInfo_t
otaskHandle
=
atomic_load_ptr
(
taskHandle
);
qTaskInfo_t
otaskHandle
=
atomic_load_ptr
(
taskHandle
);
if
(
otaskHandle
&&
atomic_val_compare_exchange_ptr
(
taskHandle
,
otaskHandle
,
NULL
))
{
if
(
otaskHandle
&&
atomic_val_compare_exchange_ptr
(
taskHandle
,
otaskHandle
,
NULL
))
{
smaDebug
(
"vgId:%d, %s:%d free qTaskInfo_t %p of level %d"
,
vgId
,
__func__
,
__LINE__
,
otaskHandle
,
level
);
qDestroyTask
(
otaskHandle
);
qDestroyTask
(
otaskHandle
);
}
else
{
smaDebug
(
"vgId:%d, %s:%d not free qTaskInfo_t %p of level %d"
,
vgId
,
__func__
,
__LINE__
,
otaskHandle
,
level
);
}
}
}
}
...
@@ -58,14 +61,19 @@ void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
...
@@ -58,14 +61,19 @@ void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
SRSmaInfoItem
*
pItem
=
&
pInfo
->
items
[
i
];
SRSmaInfoItem
*
pItem
=
&
pInfo
->
items
[
i
];
if
(
pItem
->
taskInfo
)
{
if
(
pItem
->
taskInfo
)
{
tdFreeTaskHandle
(
pItem
->
taskInfo
);
smaDebug
(
"vgId:%d, stb %"
PRIi64
" stop fetch-timer %p level %d"
,
SMA_VID
(
pInfo
->
pSma
),
pInfo
->
suid
,
}
pItem
->
tmrId
,
i
+
1
);
if
(
pItem
->
tmrHandle
)
{
taosTmrStopA
(
&
pItem
->
tmrId
);
taosTmrCleanUp
(
pItem
->
tmrHandle
);
tdFreeTaskHandle
(
&
pItem
->
taskInfo
,
SMA_VID
(
pInfo
->
pSma
),
i
+
1
);
}
else
{
smaDebug
(
"vgId:%d, stb %"
PRIi64
" no need to destroy rsma info level %d since empty taskInfo"
,
SMA_VID
(
pInfo
->
pSma
),
pInfo
->
suid
,
i
+
1
);
}
}
}
}
taosMemoryFree
(
pInfo
->
pTSchema
);
taosMemoryFree
(
pInfo
->
pTSchema
);
taosMemoryFree
(
pInfo
);
taosMemoryFree
(
pInfo
);
}
else
{
smaDebug
(
"vgId:%d, stb %"
PRIi64
" no need to destroy rsma info since empty"
,
SMA_VID
(
pInfo
->
pSma
),
pInfo
->
suid
);
}
}
return
NULL
;
return
NULL
;
...
@@ -83,7 +91,7 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
...
@@ -83,7 +91,7 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
static
FORCE_INLINE
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
)
{
static
FORCE_INLINE
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
)
{
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
S
SmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
S
RSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
SRSmaInfo
*
pRSmaInfo
=
NULL
;
if
(
!
suid
||
!
tbUids
)
{
if
(
!
suid
||
!
tbUids
)
{
...
@@ -92,28 +100,32 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
...
@@ -92,28 +100,32 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
pRSmaInfo
=
taosHashGet
(
SMA_
RSMA_INFO_HASH
(
pStat
),
suid
,
sizeof
(
tb_uid_t
));
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
smaError
(
"vgId:%d, failed to get rsma info for uid:%"
PRIi64
,
SMA_VID
(
pSma
),
*
suid
);
smaError
(
"vgId:%d, failed to get rsma info for uid:%"
PRIi64
,
SMA_VID
(
pSma
),
*
suid
);
terrno
=
TSDB_CODE_RSMA_INVALID_STAT
;
terrno
=
TSDB_CODE_RSMA_INVALID_STAT
;
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
if
(
pRSmaInfo
->
items
[
0
].
taskInfo
&&
(
qUpdateQualifiedTableId
(
pRSmaInfo
->
items
[
0
].
taskInfo
,
tbUids
,
true
)
<
0
))
{
if
(
pRSmaInfo
->
items
[
0
].
taskInfo
)
{
if
((
qUpdateQualifiedTableId
(
pRSmaInfo
->
items
[
0
].
taskInfo
,
tbUids
,
true
)
<
0
))
{
smaError
(
"vgId:%d, update tbUidList failed for uid:%"
PRIi64
" since %s"
,
SMA_VID
(
pSma
),
*
suid
,
terrstr
(
terrno
));
smaError
(
"vgId:%d, update tbUidList failed for uid:%"
PRIi64
" since %s"
,
SMA_VID
(
pSma
),
*
suid
,
terrstr
(
terrno
));
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
else
{
}
else
{
smaDebug
(
"vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%"
PRIi64
", uid:%"
PRIi64
,
SMA_VID
(
pSma
),
smaDebug
(
"vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%"
PRIi64
", uid:%"
PRIi64
,
SMA_VID
(
pSma
),
pRSmaInfo
->
items
[
0
].
taskInfo
,
*
suid
,
*
(
int64_t
*
)
taosArrayGet
(
tbUids
,
0
));
pRSmaInfo
->
items
[
0
].
taskInfo
,
*
suid
,
*
(
int64_t
*
)
taosArrayGet
(
tbUids
,
0
));
}
}
}
if
(
pRSmaInfo
->
items
[
1
].
taskInfo
&&
(
qUpdateQualifiedTableId
(
pRSmaInfo
->
items
[
1
].
taskInfo
,
tbUids
,
true
)
<
0
))
{
if
(
pRSmaInfo
->
items
[
1
].
taskInfo
)
{
if
((
qUpdateQualifiedTableId
(
pRSmaInfo
->
items
[
1
].
taskInfo
,
tbUids
,
true
)
<
0
))
{
smaError
(
"vgId:%d, update tbUidList failed for uid:%"
PRIi64
" since %s"
,
SMA_VID
(
pSma
),
*
suid
,
terrstr
(
terrno
));
smaError
(
"vgId:%d, update tbUidList failed for uid:%"
PRIi64
" since %s"
,
SMA_VID
(
pSma
),
*
suid
,
terrstr
(
terrno
));
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
else
{
}
else
{
smaDebug
(
"vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%"
PRIi64
", uid:%"
PRIi64
,
SMA_VID
(
pSma
),
smaDebug
(
"vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%"
PRIi64
", uid:%"
PRIi64
,
SMA_VID
(
pSma
),
pRSmaInfo
->
items
[
1
].
taskInfo
,
*
suid
,
*
(
int64_t
*
)
taosArrayGet
(
tbUids
,
0
));
pRSmaInfo
->
items
[
1
].
taskInfo
,
*
suid
,
*
(
int64_t
*
)
taosArrayGet
(
tbUids
,
0
));
}
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -159,9 +171,9 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
...
@@ -159,9 +171,9 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
S
SmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
S
RSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
SHashObj
*
infoHash
=
NULL
;
SHashObj
*
infoHash
=
NULL
;
if
(
!
pStat
||
!
(
infoHash
=
SMA_
RSMA_INFO_HASH
(
pStat
)))
{
if
(
!
pStat
||
!
(
infoHash
=
RSMA_INFO_HASH
(
pStat
)))
{
terrno
=
TSDB_CODE_RSMA_INVALID_STAT
;
terrno
=
TSDB_CODE_RSMA_INVALID_STAT
;
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
...
@@ -195,11 +207,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
...
@@ -195,11 +207,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
if
(
param
->
qmsg
[
idx
])
{
if
(
param
->
qmsg
[
idx
])
{
SRSmaInfoItem
*
pItem
=
&
(
pRSmaInfo
->
items
[
idx
]);
SRSmaInfoItem
*
pItem
=
&
(
pRSmaInfo
->
items
[
idx
]);
pItem
->
pRsmaInfo
=
pRSmaInfo
;
pItem
->
pRsmaInfo
=
pRSmaInfo
;
pItem
->
taskInfo
=
qCreateStreamExecTaskInfo
(
param
->
qmsg
[
0
],
pReadHandle
);
pItem
->
taskInfo
=
qCreateStreamExecTaskInfo
(
param
->
qmsg
[
idx
],
pReadHandle
);
if
(
!
pItem
->
taskInfo
)
{
if
(
!
pItem
->
taskInfo
)
{
goto
_err
;
goto
_err
;
}
}
pItem
->
triggerStat
us
=
TASK_TRIGGER_STATUS__IN_
ACTIVE
;
pItem
->
triggerStat
=
TASK_TRIGGER_STAT_IN
ACTIVE
;
if
(
param
->
maxdelay
[
idx
]
<
TSDB_MIN_ROLLUP_MAX_DELAY
)
{
if
(
param
->
maxdelay
[
idx
]
<
TSDB_MIN_ROLLUP_MAX_DELAY
)
{
int64_t
msInterval
=
int64_t
msInterval
=
convertTimeFromPrecisionToUnit
(
pRetention
[
idx
+
1
].
freq
,
pTsdbCfg
->
precision
,
TIME_UNIT_MILLISECOND
);
convertTimeFromPrecisionToUnit
(
pRetention
[
idx
+
1
].
freq
,
pTsdbCfg
->
precision
,
TIME_UNIT_MILLISECOND
);
...
@@ -211,10 +223,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
...
@@ -211,10 +223,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
pItem
->
maxDelay
=
TSDB_MAX_ROLLUP_MAX_DELAY
;
pItem
->
maxDelay
=
TSDB_MAX_ROLLUP_MAX_DELAY
;
}
}
pItem
->
level
=
(
idx
==
0
?
TSDB_RETENTION_L1
:
TSDB_RETENTION_L2
);
pItem
->
level
=
(
idx
==
0
?
TSDB_RETENTION_L1
:
TSDB_RETENTION_L2
);
pItem
->
tmrHandle
=
taosTmrInit
(
10000
,
100
,
10000
,
"RSMA"
);
if
(
!
pItem
->
tmrHandle
)
{
goto
_err
;
}
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_err:
_err:
...
@@ -251,10 +259,10 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
...
@@ -251,10 +259,10 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
}
}
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
S
SmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
S
RSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
SRSmaInfo
*
pRSmaInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
SMA_
RSMA_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
));
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
));
if
(
pRSmaInfo
)
{
if
(
pRSmaInfo
)
{
ASSERT
(
0
);
// TODO: free original pRSmaInfo is exists abnormally
ASSERT
(
0
);
// TODO: free original pRSmaInfo is exists abnormally
smaWarn
(
"vgId:%d, rsma info already exists for stb: %s, %"
PRIi64
,
SMA_VID
(
pSma
),
pReq
->
name
,
pReq
->
suid
);
smaWarn
(
"vgId:%d, rsma info already exists for stb: %s, %"
PRIi64
,
SMA_VID
(
pSma
),
pReq
->
name
,
pReq
->
suid
);
...
@@ -293,16 +301,23 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
...
@@ -293,16 +301,23 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pRSmaInfo
,
&
handle
,
0
)
<
0
)
{
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pRSmaInfo
,
&
handle
,
0
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pRSmaInfo
,
&
handle
,
1
)
<
0
)
{
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pRSmaInfo
,
&
handle
,
1
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
if
(
taosHashPut
(
SMA_
RSMA_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pRSmaInfo
,
sizeof
(
pRSmaInfo
))
<
0
)
{
if
(
taosHashPut
(
RSMA_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pRSmaInfo
,
sizeof
(
pRSmaInfo
))
<
0
)
{
goto
_err
;
goto
_err
;
}
else
{
}
else
{
smaDebug
(
"vgId:%d, register rsma info succeed for suid:%"
PRIi64
,
SMA_VID
(
pSma
),
pReq
->
suid
);
smaDebug
(
"vgId:%d, register rsma info succeed for suid:%"
PRIi64
,
SMA_VID
(
pSma
),
pReq
->
suid
);
}
}
// start the persist timer
if
(
TASK_TRIGGER_STAT_INIT
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pStat
),
TASK_TRIGGER_STAT_INIT
,
TASK_TRIGGER_STAT_ACTIVE
))
{
taosTmrStart
(
tdRSmaPersistTrigger
,
RSMA_QTASK_PERSIST_MS
,
pStat
,
RSMA_TMR_HANDLE
(
pStat
));
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_err:
_err:
tdFreeRSmaInfo
(
pRSmaInfo
);
tdFreeRSmaInfo
(
pRSmaInfo
);
...
@@ -432,6 +447,16 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
...
@@ -432,6 +447,16 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
return
0
;
return
0
;
}
}
static
void
tdDestroySDataBlockArray
(
SArray
*
pArray
)
{
#if 0
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SSDataBlock *pDataBlock = taosArrayGet(pArray, i);
blockDestroyInner(pDataBlock);
}
#endif
taosArrayDestroy
(
pArray
);
}
static
int32_t
tdFetchAndSubmitRSmaResult
(
SRSmaInfoItem
*
pItem
,
int8_t
blkType
)
{
static
int32_t
tdFetchAndSubmitRSmaResult
(
SRSmaInfoItem
*
pItem
,
int8_t
blkType
)
{
SArray
*
pResult
=
NULL
;
SArray
*
pResult
=
NULL
;
SRSmaInfo
*
pRSmaInfo
=
pItem
->
pRsmaInfo
;
SRSmaInfo
*
pRSmaInfo
=
pItem
->
pRsmaInfo
;
...
@@ -466,6 +491,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
...
@@ -466,6 +491,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
#endif
#endif
STsdb
*
sinkTsdb
=
(
pItem
->
level
==
TSDB_RETENTION_L1
?
pSma
->
pRSmaTsdb1
:
pSma
->
pRSmaTsdb2
);
STsdb
*
sinkTsdb
=
(
pItem
->
level
==
TSDB_RETENTION_L1
?
pSma
->
pRSmaTsdb1
:
pSma
->
pRSmaTsdb2
);
SSubmitReq
*
pReq
=
NULL
;
SSubmitReq
*
pReq
=
NULL
;
// TODO: the schema update should be handled
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
pRSmaInfo
->
pTSchema
,
SMA_VID
(
pSma
),
pRSmaInfo
->
suid
)
<
0
)
{
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
pRSmaInfo
->
pTSchema
,
SMA_VID
(
pSma
),
pRSmaInfo
->
suid
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
...
@@ -477,17 +503,13 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
...
@@ -477,17 +503,13 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
taosMemoryFreeClear
(
pReq
);
taosMemoryFreeClear
(
pReq
);
}
else
{
}
else
{
smaDebug
(
"vgId:%d, no rsma %
"
PRIi8
" data generated since %s"
,
SMA_VID
(
pSma
),
pItem
->
level
,
tstrerror
(
terrno
));
smaDebug
(
"vgId:%d, no rsma %"
PRIi8
" data generated since %s"
,
SMA_VID
(
pSma
),
pItem
->
level
,
tstrerror
(
terrno
));
}
}
if
(
blkType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
tdDestroySDataBlockArray
(
pResult
);
atomic_store_8
(
&
pItem
->
triggerStatus
,
TASK_TRIGGER_STATUS__ACTIVE
);
}
taosArrayDestroy
(
pResult
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_err:
_err:
t
aosArrayDestro
y
(
pResult
);
t
dDestroySDataBlockArra
y
(
pResult
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
...
@@ -499,22 +521,34 @@ _err:
...
@@ -499,22 +521,34 @@ _err:
*/
*/
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
)
{
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaInfoItem
*
pItem
=
param
;
SRSmaInfoItem
*
pItem
=
param
;
SSma
*
pSma
=
pItem
->
pRsmaInfo
->
pSma
;
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
((
SSmaEnv
*
)
pSma
->
pRSmaEnv
);
if
(
atomic_load_8
(
&
pItem
->
triggerStatus
)
==
TASK_TRIGGER_STATUS__ACTIVE
)
{
int8_t
rsmaTriggerStat
=
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
));
smaWarn
(
"%s:%d THREAD:%"
PRIi64
" level %"
PRIi8
" status is active for tb suid:%"
PRIi64
,
__func__
,
__LINE__
,
if
(
rsmaTriggerStat
==
TASK_TRIGGER_STAT_CANCELLED
||
rsmaTriggerStat
==
TASK_TRIGGER_STAT_FINISHED
)
{
taosGetSelfPthreadId
(),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
smaDebug
(
"vgId:%d, %s:%d level %"
PRIi8
" not fetch since stat is cancelled for table suid:%"
PRIi64
,
SMA_VID
(
pSma
),
SSDataBlock
dataBlock
=
{.
info
.
type
=
STREAM_GET_ALL
};
__func__
,
__LINE__
,
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
return
;
}
atomic_store_8
(
&
pItem
->
triggerStatus
,
TASK_TRIGGER_STATUS__IN_ACTIVE
);
int8_t
fetchTriggerStat
=
qSetStreamInput
(
pItem
->
taskInfo
,
&
dataBlock
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
false
);
atomic_val_compare_exchange_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
if
(
fetchTriggerStat
==
TASK_TRIGGER_STAT_ACTIVE
)
{
smaDebug
(
"vgId:%d, %s:%d level %"
PRIi8
" stat is active for table suid:%"
PRIi64
,
SMA_VID
(
pSma
),
__func__
,
__LINE__
,
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
tdRefSmaStat
(
pSma
,
(
SSmaStat
*
)
pStat
);
SSDataBlock
dataBlock
=
{.
info
.
type
=
STREAM_GET_ALL
};
qSetStreamInput
(
pItem
->
taskInfo
,
&
dataBlock
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
false
);
tdFetchAndSubmitRSmaResult
(
pItem
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
tdFetchAndSubmitRSmaResult
(
pItem
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
tdUnRefSmaStat
(
pSma
,
(
SSmaStat
*
)
pStat
);
}
else
{
}
else
{
sma
Warn
(
"%s:%d THREAD:%"
PRIi64
" level %"
PRIi8
" status is inactive for tb suid:%"
PRIi64
,
__func__
,
__LINE
__
,
sma
Debug
(
"vgId:%d, %s:%d level %"
PRIi8
" stat is inactive for table suid:%"
PRIi64
,
SMA_VID
(
pSma
),
__func
__
,
taosGetSelfPthreadId
()
,
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
__LINE__
,
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
}
}
// taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
}
}
static
FORCE_INLINE
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
pItem
,
static
FORCE_INLINE
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
pItem
,
...
@@ -533,14 +567,17 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
...
@@ -533,14 +567,17 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
}
}
tdFetchAndSubmitRSmaResult
(
pItem
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
tdFetchAndSubmitRSmaResult
(
pItem
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
atomic_store_8
(
&
pItem
->
triggerStat
us
,
TASK_TRIGGER_STATUS_
_ACTIVE
);
atomic_store_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT
_ACTIVE
);
sma
Warn
(
"%s:%d THREAD:%"
PRIi64
" process rsma insert"
,
__func__
,
__LINE__
,
taosGetSelfPthreadId
()
);
sma
Debug
(
"vgId:%d, %s:%d process rsma insert"
,
SMA_VID
(
pSma
),
__func__
,
__LINE__
);
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
SMA_RSMA_STAT
(
pEnv
->
pStat
);
SRSmaStat
*
pStat
=
SMA_RSMA_STAT
(
pEnv
->
pStat
);
taosTmrStart
(
tdRSmaPersistTrigger
,
5000
,
pStat
,
pStat
->
tmrHandle
);
if
(
pStat
->
tmrHandle
)
{
taosTmrReset
(
tdRSmaFetchTrigger
,
pItem
->
maxDelay
,
pItem
,
pItem
->
tmrHandle
,
&
pItem
->
tmrId
);
taosTmrReset
(
tdRSmaFetchTrigger
,
pItem
->
maxDelay
,
pItem
,
pStat
->
tmrHandle
,
&
pItem
->
tmrId
);
}
else
{
ASSERT
(
0
);
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -552,10 +589,10 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
...
@@ -552,10 +589,10 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
S
SmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
S
RSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
SRSmaInfo
*
pRSmaInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
SMA_
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
smaDebug
(
"vgId:%d, return as no rsma info for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
smaDebug
(
"vgId:%d, return as no rsma info for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
...
@@ -608,7 +645,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
...
@@ -608,7 +645,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
void
tdRSmaQTaskGetFName
(
int32_t
vid
,
int8_t
ftype
,
char
*
outputName
)
{
void
tdRSmaQTaskGetFName
(
int32_t
vid
,
int8_t
ftype
,
char
*
outputName
)
{
tdGetVndFileName
(
vid
,
"rsma"
,
tdQTaskInfoFname
[
ftype
],
outputName
);
tdGetVndFileName
(
vid
,
"rsma"
,
tdQTaskInfoFname
[
ftype
],
outputName
);
}
}
...
@@ -618,6 +655,23 @@ static void *tdRSmaPersistExec(void *param) {
...
@@ -618,6 +655,23 @@ static void *tdRSmaPersistExec(void *param) {
SSma
*
pSma
=
pRSmaStat
->
pSma
;
SSma
*
pSma
=
pRSmaStat
->
pSma
;
STfs
*
pTfs
=
pSma
->
pVnode
->
pTfs
;
STfs
*
pTfs
=
pSma
->
pVnode
->
pTfs
;
int64_t
toffset
=
0
;
int64_t
toffset
=
0
;
bool
isFileCreated
=
false
;
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)))
{
goto
_end
;
}
#if 0
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
ASSERT(0);
} else {
for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("suid [%d] is %" PRIi64, i, suid);
}
}
#endif
void
*
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
NULL
);
void
*
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
NULL
);
if
(
!
infoHash
)
{
if
(
!
infoHash
)
{
...
@@ -625,36 +679,81 @@ static void *tdRSmaPersistExec(void *param) {
...
@@ -625,36 +679,81 @@ static void *tdRSmaPersistExec(void *param) {
}
}
STFile
tFile
=
{
0
};
STFile
tFile
=
{
0
};
int32_t
vid
=
2
;
int32_t
vid
=
SMA_VID
(
pSma
);
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
tdRSmaQTaskGetFName
(
vid
,
TD_QTASK_TMP_FILE
,
qTaskInfoFName
);
tdInitTFile
(
&
tFile
,
pTfs
,
qTaskInfoFName
);
tdCreateTFile
(
&
tFile
,
pTfs
,
true
,
-
1
);
while
(
infoHash
)
{
while
(
infoHash
)
{
SRSmaInfo
*
pRSmaInfo
=
*
(
SRSmaInfo
**
)
infoHash
;
SRSmaInfo
*
pRSmaInfo
=
*
(
SRSmaInfo
**
)
infoHash
;
#if 0
smaDebug("table %" PRIi64 " sleep 15s start ...", pRSmaInfo->items[0].pRsmaInfo->suid);
for (int32_t i = 15; i > 0; --i) {
taosSsleep(1);
smaDebug("table %" PRIi64 " countdown %d", pRSmaInfo->items[0].pRsmaInfo->suid, i);
}
smaDebug("table %" PRIi64 " sleep 15s end ...", pRSmaInfo->items[0].pRsmaInfo->suid);
#endif
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
qTaskInfo_t
taskInfo
=
pRSmaInfo
->
items
[
i
].
taskInfo
;
if
(
!
taskInfo
)
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d qTaskInfo is NULL"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
continue
;
}
char
*
pOutput
=
NULL
;
char
*
pOutput
=
NULL
;
int32_t
len
=
0
;
int32_t
len
=
0
;
if
(
qSerializeTaskStatus
(
pRSmaInfo
->
items
[
0
].
taskInfo
,
&
pOutput
,
&
len
)
<
0
)
{
int8_t
type
=
0
;
smaError
(
"serialize rsma task for table %"
PRIi64
" failed since %s"
,
pRSmaInfo
->
items
[
0
].
pRsmaInfo
->
suid
,
if
(
qSerializeTaskStatus
(
taskInfo
,
&
pOutput
,
&
len
)
<
0
)
{
smaError
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task failed since %s"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
terrstr
(
terrno
));
terrstr
(
terrno
));
goto
_err
;
}
else
{
}
else
{
smaWarn
(
"serialize rsma task for table %"
PRIi64
" success and len is %d"
,
pRSmaInfo
->
items
[
0
].
pRsmaInfo
->
suid
,
if
(
!
pOutput
)
{
len
);
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task success but no output(len %d) and no need to persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
continue
;
}
else
if
(
len
<=
0
)
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task success with len %d and no need to persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
taosMemoryFree
(
pOutput
);
}
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task success with len %d and need persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
#if 1
if
(
qDeserializeTaskStatus
(
taskInfo
,
pOutput
,
len
)
<
0
)
{
smaError
(
"vgId:%d, table %"
PRIi64
"level %d deserialize rsma task failed since %s"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
terrstr
(
terrno
));
}
else
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d deserialize rsma task success"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
}
#endif
}
}
if
(
!
isFileCreated
)
{
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
tdRSmaQTaskGetFName
(
vid
,
TD_QTASK_TMP_FILE
,
qTaskInfoFName
);
tdInitTFile
(
&
tFile
,
pTfs
,
qTaskInfoFName
);
tdCreateTFile
(
&
tFile
,
pTfs
,
true
,
-
1
);
isFileCreated
=
true
;
}
len
+=
(
sizeof
(
len
)
+
sizeof
(
pRSmaInfo
->
suid
));
tdAppendTFile
(
&
tFile
,
&
len
,
sizeof
(
len
),
&
toffset
);
tdAppendTFile
(
&
tFile
,
&
len
,
sizeof
(
len
),
&
toffset
);
tdAppendTFile
(
&
tFile
,
&
pRSmaInfo
->
suid
,
sizeof
(
pRSmaInfo
->
suid
),
&
toffset
);
tdAppendTFile
(
&
tFile
,
pOutput
,
len
,
&
toffset
);
tdAppendTFile
(
&
tFile
,
pOutput
,
len
,
&
toffset
);
taosMemoryFree
(
pOutput
);
taosMemoryFree
(
pOutput
);
}
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
infoHash
);
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
infoHash
);
}
}
_
end
:
_
normal
:
if
(
isFileCreated
)
{
if
(
tdUpdateTFileHeader
(
&
tFile
)
<
0
)
{
if
(
tdUpdateTFileHeader
(
&
tFile
)
<
0
)
{
smaError
(
"vgId:%d, failed to update tfile %s header since %s"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
),
tstrerror
(
terrno
));
smaError
(
"vgId:%d, failed to update tfile %s header since %s"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
),
tstrerror
(
terrno
));
tdCloseTFile
(
&
tFile
);
tdCloseTFile
(
&
tFile
);
tdRemoveTFile
(
&
tFile
);
tdRemoveTFile
(
&
tFile
);
return
NULL
;
goto
_err
;
}
else
{
smaDebug
(
"vgId:%d, succeed to update tfile %s header"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
));
}
}
tdCloseTFile
(
&
tFile
);
tdCloseTFile
(
&
tFile
);
...
@@ -663,29 +762,59 @@ _end:
...
@@ -663,29 +762,59 @@ _end:
strncpy
(
newFName
,
TD_FILE_FULL_NAME
(
&
tFile
),
TSDB_FILENAME_LEN
);
strncpy
(
newFName
,
TD_FILE_FULL_NAME
(
&
tFile
),
TSDB_FILENAME_LEN
);
char
*
pos
=
strstr
(
newFName
,
tdQTaskInfoFname
[
TD_QTASK_TMP_FILE
]);
char
*
pos
=
strstr
(
newFName
,
tdQTaskInfoFname
[
TD_QTASK_TMP_FILE
]);
strncpy
(
pos
,
tdQTaskInfoFname
[
TD_QTASK_CUR_FILE
],
TSDB_FILENAME_LEN
-
POINTER_DISTANCE
(
pos
,
newFName
));
strncpy
(
pos
,
tdQTaskInfoFname
[
TD_QTASK_CUR_FILE
],
TSDB_FILENAME_LEN
-
POINTER_DISTANCE
(
pos
,
newFName
));
taosRenameFile
(
TD_FILE_FULL_NAME
(
&
tFile
),
newFName
);
if
(
taosRenameFile
(
TD_FILE_FULL_NAME
(
&
tFile
),
newFName
)
!=
0
)
{
smaError
(
"vgId:%d, failed to rename %s to %s"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
),
newFName
);
atomic_store_8
(
&
pRSmaStat
->
tmrStat
,
TASK_TRIGGER_STATUS__ACTIVE
);
goto
_err
;
return
NULL
;
}
else
{
smaDebug
(
"vgId:%d, succeed to rename %s to %s"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
),
newFName
);
}
}
goto
_end
;
_err:
_err:
atomic_store_8
(
&
pRSmaStat
->
tmrStat
,
TASK_TRIGGER_STATUS__ACTIVE
);
if
(
isFileCreated
)
{
// remove the .tmp file
tdRemoveTFile
(
&
tFile
);
}
_end:
if
(
TASK_TRIGGER_STAT_INACTIVE
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INACTIVE
,
TASK_TRIGGER_STAT_ACTIVE
))
{
smaDebug
(
"vgId:%d, persist task is active again"
,
vid
);
}
else
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"vgId:%d, persist task is cancelled"
,
vid
);
}
else
{
smaWarn
(
"vgId:%d, persist task in abnormal stat %"
PRIi8
,
vid
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)));
ASSERT
(
0
);
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
taosThreadExit
(
NULL
);
return
NULL
;
return
NULL
;
}
}
static
void
tdRSmaPersistTask
(
SRSmaStat
*
pRSmaStat
)
{
static
void
tdRSmaPersistTask
(
SRSmaStat
*
pRSmaStat
)
{
smaWarn
(
"%s:%d entry "
,
__func__
,
__LINE__
);
TdThread
threadId
;
TdThreadAttr
thAttr
;
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_DETACHED
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_DETACHED
);
TdThread
tid
;
if
(
taosThreadCreate
(
&
threadId
,
&
thAttr
,
tdRSmaPersistExec
,
pRSmaStat
)
!=
0
)
{
smaError
(
"failed to create thread to persist rsma qTaskInfo since %s"
,
strerror
(
errno
));
if
(
taosThreadCreate
(
&
tid
,
&
thAttr
,
tdRSmaPersistExec
,
pRSmaStat
)
!=
0
)
{
if
(
TASK_TRIGGER_STAT_INACTIVE
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INACTIVE
,
TASK_TRIGGER_STAT_ACTIVE
))
{
smaDebug
(
"persist task is active again"
);
}
else
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
" persist task is cancelled and set finished"
);
}
else
{
smaWarn
(
"persist task in abnormal stat %"
PRIi8
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)));
ASSERT
(
0
);
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
}
}
taosThreadAttrDestroy
(
&
thAttr
);
taosThreadAttrDestroy
(
&
thAttr
);
smaWarn
(
"%s:%d end "
,
__func__
,
__LINE__
);
}
}
/**
/**
...
@@ -696,17 +825,33 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
...
@@ -696,17 +825,33 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
*/
*/
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
)
{
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaStat
*
pRSmaStat
=
param
;
SRSmaStat
*
pRSmaStat
=
param
;
int8_t
tmrStat
=
if
(
atomic_load_8
(
&
pRSmaStat
->
tmrStat
)
==
TASK_TRIGGER_STATUS__ACTIVE
)
{
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
smaWarn
(
"%s:%d THREAD:%"
PRIi64
" rsma persistence start since active"
,
__func__
,
__LINE__
,
taosGetSelfPthreadId
());
switch
(
tmrStat
)
{
atomic_store_8
(
&
pRSmaStat
->
tmrStat
,
TASK_TRIGGER_STATUS__IN_ACTIVE
);
case
TASK_TRIGGER_STAT_ACTIVE
:
{
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
1
);
// execution
if
(
TASK_TRIGGER_STAT_CANCELLED
!=
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"%s:%d rsma persistence start since active"
,
__func__
,
__LINE__
);
tdRSmaPersistTask
(
pRSmaStat
);
tdRSmaPersistTask
(
pRSmaStat
);
taosTmrReset
(
tdRSmaPersistTrigger
,
RSMA_QTASK_PERSIST_MS
,
pRSmaStat
,
pRSmaStat
->
tmrHandle
,
&
pRSmaStat
->
tmrId
);
}
else
{
}
else
{
smaWarn
(
"%s:%d THREAD:%"
PRIi64
" rsma persistence not start since inactive"
,
__func__
,
__LINE__
,
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
taosGetSelfPthreadId
());
}
}
break
;
case
TASK_TRIGGER_STAT_CANCELLED
:
{
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_FINISHED
);
smaDebug
(
"%s:%d rsma persistence not start since cancelled and finished"
,
__func__
,
__LINE__
);
}
break
;
case
TASK_TRIGGER_STAT_INACTIVE
:
{
smaDebug
(
"%s:%d rsma persistence not start since inactive"
,
__func__
,
__LINE__
);
}
break
;
case
TASK_TRIGGER_STAT_INIT
:
{
smaDebug
(
"%s:%d rsma persistence not start since init"
,
__func__
,
__LINE__
);
}
break
;
default:
{
smaWarn
(
"%s:%d rsma persistence not start since unknown stat %"
PRIi8
,
__func__
,
__LINE__
,
tmrStat
);
}
break
;
}
}
taosTmrReset
(
tdRSmaPersistTrigger
,
3600000
,
pRSmaStat
,
pRSmaStat
->
tmrHandle
,
&
pRSmaStat
->
tmrId
);
}
}
\ No newline at end of file
source/dnode/vnode/src/sma/smaUtil.c
浏览文件 @
0597f3b9
...
@@ -15,6 +15,8 @@
...
@@ -15,6 +15,8 @@
#include "sma.h"
#include "sma.h"
// smaFileUtil ================
#define TD_FILE_HEAD_SIZE 512
#define TD_FILE_HEAD_SIZE 512
#define TD_FILE_STATE_OK 0
#define TD_FILE_STATE_OK 0
...
@@ -32,7 +34,7 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
...
@@ -32,7 +34,7 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
ftype
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
ftype
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
fver
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
fver
);
tlen
+=
taosEncodeFixed
U
64
(
buf
,
pInfo
->
fsize
);
tlen
+=
taosEncodeFixed
I
64
(
buf
,
pInfo
->
fsize
);
return
tlen
;
return
tlen
;
}
}
...
@@ -41,7 +43,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
...
@@ -41,7 +43,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
ftype
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
ftype
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
fver
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
fver
));
buf
=
taosDecodeFixed
U
64
(
buf
,
&
(
pInfo
->
fsize
));
buf
=
taosDecodeFixed
I
64
(
buf
,
&
(
pInfo
->
fsize
));
return
buf
;
return
buf
;
}
}
...
@@ -236,3 +238,6 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp
...
@@ -236,3 +238,6 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp
}
}
int32_t
tdRemoveTFile
(
STFile
*
pTFile
)
{
return
tfsRemoveFile
(
TD_FILE_F
(
pTFile
));
}
int32_t
tdRemoveTFile
(
STFile
*
pTFile
)
{
return
tfsRemoveFile
(
TD_FILE_F
(
pTFile
));
}
// smaXXXUtil ================
// ...
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
0597f3b9
...
@@ -2876,6 +2876,30 @@ int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
...
@@ -2876,6 +2876,30 @@ int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
/**
* @brief Get all suids since suid
*
* @param pMeta
* @param suid return all suids in one vnode if suid is 0
* @param list
* @return int32_t
*/
int32_t
tsdbGetStbIdList
(
SMeta
*
pMeta
,
int64_t
suid
,
SArray
*
list
)
{
SMStbCursor
*
pCur
=
metaOpenStbCursor
(
pMeta
,
suid
);
while
(
1
)
{
tb_uid_t
id
=
metaStbCursorNext
(
pCur
);
if
(
id
==
0
)
{
break
;
}
taosArrayPush
(
list
,
&
id
);
}
metaCloseStbCursor
(
pCur
);
return
TSDB_CODE_SUCCESS
;
}
static
void
destroyHelper
(
void
*
param
)
{
static
void
destroyHelper
(
void
*
param
)
{
if
(
param
==
NULL
)
{
if
(
param
==
NULL
)
{
return
;
return
;
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
0597f3b9
...
@@ -72,7 +72,12 @@ int vnodeBegin(SVnode *pVnode) {
...
@@ -72,7 +72,12 @@ int vnodeBegin(SVnode *pVnode) {
return
0
;
return
0
;
}
}
int
vnodeShouldCommit
(
SVnode
*
pVnode
)
{
return
pVnode
->
inUse
->
size
>
pVnode
->
config
.
szBuf
/
3
;
}
int
vnodeShouldCommit
(
SVnode
*
pVnode
)
{
if
(
pVnode
->
inUse
)
{
return
pVnode
->
inUse
->
size
>
pVnode
->
config
.
szBuf
/
3
;
}
return
false
;
}
int
vnodeSaveInfo
(
const
char
*
dir
,
const
SVnodeInfo
*
pInfo
)
{
int
vnodeSaveInfo
(
const
char
*
dir
,
const
SVnodeInfo
*
pInfo
)
{
char
fname
[
TSDB_FILENAME_LEN
];
char
fname
[
TSDB_FILENAME_LEN
];
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
0597f3b9
...
@@ -152,12 +152,13 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
...
@@ -152,12 +152,13 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
return
pVnode
;
return
pVnode
;
_err:
_err:
if
(
pVnode
->
pSma
)
smaClose
(
pVnode
->
pSma
);
if
(
pVnode
->
pQuery
)
vnodeQueryClose
(
pVnode
);
if
(
pVnode
->
pQuery
)
vnodeQueryClose
(
pVnode
);
if
(
pVnode
->
pTq
)
tqClose
(
pVnode
->
pTq
);
if
(
pVnode
->
pTq
)
tqClose
(
pVnode
->
pTq
);
if
(
pVnode
->
pWal
)
walClose
(
pVnode
->
pWal
);
if
(
pVnode
->
pWal
)
walClose
(
pVnode
->
pWal
);
if
(
pVnode
->
pTsdb
)
tsdbClose
(
&
pVnode
->
pTsdb
);
if
(
pVnode
->
pTsdb
)
tsdbClose
(
&
pVnode
->
pTsdb
);
if
(
pVnode
->
pMeta
)
metaClose
(
pVnode
->
pMeta
);
if
(
pVnode
->
pMeta
)
metaClose
(
pVnode
->
pMeta
);
if
(
pVnode
->
pSma
)
smaClose
(
pVnode
->
pSma
);
tsem_destroy
(
&
(
pVnode
->
canCommit
));
tsem_destroy
(
&
(
pVnode
->
canCommit
));
taosMemoryFree
(
pVnode
);
taosMemoryFree
(
pVnode
);
...
@@ -166,13 +167,14 @@ _err:
...
@@ -166,13 +167,14 @@ _err:
void
vnodeClose
(
SVnode
*
pVnode
)
{
void
vnodeClose
(
SVnode
*
pVnode
)
{
if
(
pVnode
)
{
if
(
pVnode
)
{
smaCloseEnv
(
pVnode
->
pSma
);
vnodeCommit
(
pVnode
);
vnodeCommit
(
pVnode
);
vnodeSyncClose
(
pVnode
);
vnodeSyncClose
(
pVnode
);
vnodeQueryClose
(
pVnode
);
vnodeQueryClose
(
pVnode
);
walClose
(
pVnode
->
pWal
);
walClose
(
pVnode
->
pWal
);
smaCloseEx
(
pVnode
->
pSma
);
tqClose
(
pVnode
->
pTq
);
tqClose
(
pVnode
->
pTq
);
if
(
pVnode
->
pTsdb
)
tsdbClose
(
&
pVnode
->
pTsdb
);
if
(
pVnode
->
pTsdb
)
tsdbClose
(
&
pVnode
->
pTsdb
);
smaClose
(
pVnode
->
pSma
);
metaClose
(
pVnode
->
pMeta
);
metaClose
(
pVnode
->
pMeta
);
vnodeCloseBufPool
(
pVnode
);
vnodeCloseBufPool
(
pVnode
);
// destroy handle
// destroy handle
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
0597f3b9
...
@@ -808,6 +808,7 @@ int32_t getMaximumIdleDurationSec();
...
@@ -808,6 +808,7 @@ int32_t getMaximumIdleDurationSec();
* ops: root operator
* ops: root operator
* data: *data save the result of encode, need to be freed by caller
* data: *data save the result of encode, need to be freed by caller
* length: *length save the length of *data
* length: *length save the length of *data
* nOptrWithVal: *nOptrWithVal save the number of optr with value
* return: result code, 0 means success
* return: result code, 0 means success
*/
*/
int32_t
encodeOperator
(
SOperatorInfo
*
ops
,
char
**
data
,
int32_t
*
length
);
int32_t
encodeOperator
(
SOperatorInfo
*
ops
,
char
**
data
,
int32_t
*
length
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
0597f3b9
...
@@ -2929,6 +2929,13 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
...
@@ -2929,6 +2929,13 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
int32_t
totalSize
=
int32_t
totalSize
=
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
size
*
(
sizeof
(
int32_t
)
+
keyLen
+
sizeof
(
int32_t
)
+
pSup
->
resultRowSize
);
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
size
*
(
sizeof
(
int32_t
)
+
keyLen
+
sizeof
(
int32_t
)
+
pSup
->
resultRowSize
);
// no result
if
(
getTotalBufSize
(
pSup
->
pResultBuf
)
==
0
)
{
*
result
=
NULL
;
*
length
=
0
;
return
TSDB_CODE_SUCCESS
;
}
*
result
=
(
char
*
)
taosMemoryCalloc
(
1
,
totalSize
);
*
result
=
(
char
*
)
taosMemoryCalloc
(
1
,
totalSize
);
if
(
*
result
==
NULL
)
{
if
(
*
result
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -4483,6 +4490,8 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
...
@@ -4483,6 +4490,8 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
return
code
;
return
code
;
}
}
ASSERT
(
currLength
>=
0
);
if
(
*
result
==
NULL
)
{
if
(
*
result
==
NULL
)
{
*
result
=
(
char
*
)
taosMemoryCalloc
(
1
,
currLength
+
sizeof
(
int32_t
));
*
result
=
(
char
*
)
taosMemoryCalloc
(
1
,
currLength
+
sizeof
(
int32_t
));
if
(
*
result
==
NULL
)
{
if
(
*
result
==
NULL
)
{
...
@@ -4507,7 +4516,6 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
...
@@ -4507,7 +4516,6 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
taosMemoryFree
(
pCurrent
);
taosMemoryFree
(
pCurrent
);
*
length
=
*
(
int32_t
*
)(
*
result
);
*
length
=
*
(
int32_t
*
)(
*
result
);
}
}
for
(
int32_t
i
=
0
;
i
<
ops
->
numOfDownstream
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
ops
->
numOfDownstream
;
++
i
)
{
code
=
encodeOperator
(
ops
->
pDownstream
[
i
],
result
,
length
);
code
=
encodeOperator
(
ops
->
pDownstream
[
i
],
result
,
length
);
if
(
code
!=
TDB_CODE_SUCCESS
)
{
if
(
code
!=
TDB_CODE_SUCCESS
)
{
...
...
source/os/src/osFile.c
浏览文件 @
0597f3b9
...
@@ -157,14 +157,14 @@ int32_t taosRenameFile(const char *oldName, const char *newName) {
...
@@ -157,14 +157,14 @@ int32_t taosRenameFile(const char *oldName, const char *newName) {
#ifdef WINDOWS
#ifdef WINDOWS
bool
code
=
MoveFileEx
(
oldName
,
newName
,
MOVEFILE_REPLACE_EXISTING
|
MOVEFILE_COPY_ALLOWED
);
bool
code
=
MoveFileEx
(
oldName
,
newName
,
MOVEFILE_REPLACE_EXISTING
|
MOVEFILE_COPY_ALLOWED
);
if
(
!
code
)
{
if
(
!
code
)
{
printf
(
"failed to rename file %s to %s, reason:%s"
,
oldName
,
newName
,
strerror
(
errno
));
printf
(
"failed to rename file %s to %s, reason:%s
\n
"
,
oldName
,
newName
,
strerror
(
errno
));
}
}
return
!
code
;
return
!
code
;
#else
#else
int32_t
code
=
rename
(
oldName
,
newName
);
int32_t
code
=
rename
(
oldName
,
newName
);
if
(
code
<
0
)
{
if
(
code
<
0
)
{
printf
(
"failed to rename file %s to %s, reason:%s"
,
oldName
,
newName
,
strerror
(
errno
));
printf
(
"failed to rename file %s to %s, reason:%s
\n
"
,
oldName
,
newName
,
strerror
(
errno
));
}
}
return
code
;
return
code
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录