Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1547d8da
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1547d8da
编写于
8月 08, 2022
作者:
C
Cary Xu
提交者:
GitHub
8月 08, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15817 from taosdata/feature/TD-11274-3.0
fix: tsma query data
上级
4f66c427
c2c7ccf7
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
291 addition
and
195 deletion
+291
-195
include/common/tmsg.h
include/common/tmsg.h
+28
-0
include/util/tlockfree.h
include/util/tlockfree.h
+3
-3
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+4
-1
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+15
-11
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+2
-0
source/dnode/vnode/src/sma/smaCommit.c
source/dnode/vnode/src/sma/smaCommit.c
+50
-13
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+2
-5
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+151
-128
source/dnode/vnode/src/sma/smaTimeRange.c
source/dnode/vnode/src/sma/smaTimeRange.c
+9
-1
source/dnode/vnode/src/sma/smaUtil.c
source/dnode/vnode/src/sma/smaUtil.c
+26
-32
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
1547d8da
...
...
@@ -2657,6 +2657,34 @@ typedef struct {
SEpSet
epSet
;
}
SVgEpSet
;
typedef
struct
{
int64_t
refId
;
int64_t
suid
;
int8_t
level
;
}
SRSmaFetchMsg
;
static
FORCE_INLINE
int32_t
tEncodeSRSmaFetchMsg
(
SEncoder
*
pCoder
,
const
SRSmaFetchMsg
*
pReq
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
refId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
pReq
->
level
)
<
0
)
return
-
1
;
tEndEncode
(
pCoder
);
return
0
;
}
static
FORCE_INLINE
int32_t
tDecodeSRSmaFetchMsg
(
SDecoder
*
pCoder
,
SRSmaFetchMsg
*
pReq
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
refId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pReq
->
level
)
<
0
)
return
-
1
;
tEndDecode
(
pCoder
);
return
0
;
}
typedef
struct
{
int8_t
version
;
// for compatibility(default 0)
int8_t
intervalUnit
;
// MACRO: TIME_UNIT_XXX
...
...
include/util/tlockfree.h
浏览文件 @
1547d8da
...
...
@@ -25,9 +25,9 @@ extern "C" {
// reference counting
typedef
void
(
*
_ref_fn_t
)(
const
void
*
pObj
);
#define T_REF_DECLARE() \
struct { \
int32_t val;
\
#define T_REF_DECLARE()
\
struct {
\
volatile int32_t val;
\
} _ref;
#define T_REF_REGISTER_FUNC(s, e) \
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
1547d8da
...
...
@@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
smaObj
.
uid
=
mndGenerateUid
(
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
ASSERT
(
smaObj
.
uid
!=
0
);
char
resultTbName
[
TSDB_TABLE_FNAME_LEN
+
16
]
=
{
0
};
snprintf
(
resultTbName
,
TSDB_TABLE_FNAME_LEN
+
16
,
"
td.tsma.rst.tb.%s"
,
pCreate
->
name
);
snprintf
(
resultTbName
,
TSDB_TABLE_FNAME_LEN
+
16
,
"
%s_td_tsma_rst_tb"
,
pCreate
->
name
);
memcpy
(
smaObj
.
dstTbName
,
resultTbName
,
TSDB_TABLE_FNAME_LEN
);
smaObj
.
dstTbUid
=
mndGenerateUid
(
smaObj
.
dstTbName
,
TSDB_TABLE_FNAME_LEN
);
smaObj
.
stbUid
=
pStb
->
uid
;
...
...
@@ -603,6 +603,9 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if
(
mndPersistStream
(
pMnode
,
pTrans
,
&
streamObj
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
mDebug
(
"mndSma: create sma index %s %"
PRIi64
" on stb:%"
PRIi64
", dstSuid:%"
PRIi64
" dstTb:%s dstVg:%d"
,
pCreate
->
name
,
smaObj
.
uid
,
smaObj
.
stbUid
,
smaObj
.
dstTbUid
,
smaObj
.
dstTbName
,
smaObj
.
dstVgId
);
code
=
0
;
_OVER:
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
1547d8da
...
...
@@ -115,24 +115,29 @@ struct SSmaStat {
#define RSMA_FS_LOCK(r) (&(r)->lock)
struct
SRSmaInfoItem
{
void
*
taskInfo
;
// qTaskInfo_t
int64_t
refId
;
tmr_h
tmrId
;
int32_t
maxDelay
;
int8_t
level
;
int8_t
triggerStat
;
int32_t
maxDelay
;
tmr_h
tmrId
;
};
struct
SRSmaInfo
{
STSchema
*
pTSchema
;
int64_t
suid
;
int64_t
refId
;
// refId of SRSmaStat
int8_t
delFlag
;
T_REF_DECLARE
()
SRSmaInfoItem
items
[
TSDB_RETENTION_L2
];
void
*
taskInfo
[
TSDB_RETENTION_L2
];
// qTaskInfo_t
void
*
iTaskInfo
[
TSDB_RETENTION_L2
];
// immutable
};
#define RSMA_INFO_HEAD_LEN 24
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
#define RSMA_INFO_HEAD_LEN 32
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
#define RSMA_INFO_ITEM(r, i) (&(r)->items[i])
enum
{
TASK_TRIGGER_STAT_INIT
=
0
,
...
...
@@ -168,8 +173,8 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t
tdRefRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pRSmaInfo
);
int32_t
tdUnRefRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pRSmaInfo
);
void
*
tdAcquireSmaRef
(
int32_t
rsetId
,
int64_t
refId
,
const
char
*
tags
,
int32_t
ln
);
int32_t
tdReleaseSmaRef
(
int32_t
rsetId
,
int64_t
refId
,
const
char
*
tags
,
int32_t
ln
);
void
*
tdAcquireSmaRef
(
int32_t
rsetId
,
int64_t
refId
);
int32_t
tdReleaseSmaRef
(
int32_t
rsetId
,
int64_t
refId
);
int32_t
tdCheckAndInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
);
...
...
@@ -223,12 +228,11 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
void
tdRSmaQTaskInfoGetFileName
(
int32_t
vid
,
int64_t
version
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFullName
(
int32_t
vid
,
int64_t
version
,
const
char
*
path
,
char
*
outputName
);
int32_t
tdCloneRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pDest
,
SRSmaInfo
*
pSrc
);
int32_t
tdCloneRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
*
pDest
,
SRSmaInfo
*
pSrc
);
void
tdFreeQTaskInfo
(
qTaskInfo_t
*
taskHandle
,
int32_t
vgId
,
int32_t
level
);
static
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeSmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
,
bool
isDeepFree
);
void
tdRemoveRSmaInfoBySuid
(
SSma
*
pSma
,
int64_t
suid
);
int32_t
tdRSmaPersistExecImpl
(
SRSmaStat
*
pRSmaStat
,
SHashObj
*
pInfoHash
);
int32_t
tdProcessRSmaCreateImpl
(
SSma
*
pSma
,
SRSmaParam
*
param
,
int64_t
suid
,
const
char
*
tbName
);
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
1547d8da
...
...
@@ -381,6 +381,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
terrno
=
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
;
metaReaderClear
(
&
mr
);
return
-
1
;
}
else
if
(
terrno
==
TSDB_CODE_PAR_TABLE_NOT_EXIST
)
{
terrno
=
TSDB_CODE_SUCCESS
;
}
metaReaderClear
(
&
mr
);
...
...
source/dnode/vnode/src/sma/smaCommit.c
浏览文件 @
1547d8da
...
...
@@ -308,12 +308,12 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
* @return int32_t
*/
static
int32_t
tdProcessRSmaAsyncPreCommitImpl
(
SSma
*
pSma
)
{
SSmaEnv
*
p
Sma
Env
=
SMA_RSMA_ENV
(
pSma
);
if
(
!
p
Sma
Env
)
{
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
if
(
!
pEnv
)
{
return
TSDB_CODE_SUCCESS
;
}
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
p
Sma
Env
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaStat
*
pRSmaStat
=
SMA_RSMA_STAT
(
pStat
);
// step 1: set rsma stat
...
...
@@ -337,18 +337,26 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
// step 3: swap rsmaInfoHash and iRsmaInfoHash
ASSERT
(
!
RSMA_IMU_INFO_HASH
(
pRSmaStat
));
// lock
taosWLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
ASSERT
(
RSMA_INFO_HASH
(
pRSmaStat
));
ASSERT
(
!
RSMA_IMU_INFO_HASH
(
pRSmaStat
));
RSMA_IMU_INFO_HASH
(
pRSmaStat
)
=
RSMA_INFO_HASH
(
pRSmaStat
);
RSMA_INFO_HASH
(
pRSmaStat
)
=
taosHashInit
(
RSMA_TASK_INFO_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
!
RSMA_INFO_HASH
(
pRSmaStat
))
{
// unlock
taosWUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
smaError
(
"vgId:%d, rsma async commit failed since %s"
,
SMA_VID
(
pSma
),
terrstr
());
return
TSDB_CODE_FAILED
;
}
// unlock
taosWUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
// step 4: others
pRSmaStat
->
commitAppliedVer
=
pSma
->
pVnode
->
state
.
applied
;
...
...
@@ -383,26 +391,52 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
* @return int32_t
*/
static
int32_t
tdProcessRSmaAsyncPostCommitImpl
(
SSma
*
pSma
)
{
SSmaEnv
*
p
Sma
Env
=
SMA_RSMA_ENV
(
pSma
);
if
(
!
p
Sma
Env
)
{
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
if
(
!
pEnv
)
{
return
TSDB_CODE_SUCCESS
;
}
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
p
Sma
Env
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaStat
*
pRSmaStat
=
SMA_RSMA_STAT
(
pStat
);
// step 1: merge rsmaInfoHash and iRsmaInfoHash
taosWLockLatch
(
SMA_ENV_LOCK
(
pSmaEnv
));
// lock
taosWLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
#if 0
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
// TODO: optimization - just switch the hash pointer if rsmaInfoHash is empty
}
// just switch the hash pointer if rsmaInfoHash is empty
if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) {
SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat);
RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash;
}
} else {
#endif
#if 1
void
*
pIter
=
taosHashIterate
(
RSMA_IMU_INFO_HASH
(
pRSmaStat
),
NULL
);
while
(
pIter
)
{
tb_uid_t
*
pSuid
=
(
tb_uid_t
*
)
taosHashGetKey
(
pIter
,
NULL
);
if
(
!
taosHashGet
(
RSMA_INFO_HASH
(
pRSmaStat
),
pSuid
,
sizeof
(
tb_uid_t
)))
{
SRSmaInfo
*
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pIter
;
if
(
RSMA_INFO_IS_DEL
(
pRSmaInfo
))
{
int32_t
refVal
=
T_REF_VAL_GET
(
pRSmaInfo
);
if
(
refVal
==
0
)
{
tdFreeRSmaInfo
(
pSma
,
pRSmaInfo
,
true
);
smaDebug
(
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
"table:%"
PRIi64
,
SMA_VID
(
pSma
),
*
pSuid
);
}
else
{
smaDebug
(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
"table:%"
PRIi64
,
SMA_VID
(
pSma
),
refVal
,
*
pSuid
);
}
pIter
=
taosHashIterate
(
RSMA_IMU_INFO_HASH
(
pRSmaStat
),
pIter
);
continue
;
}
taosHashPut
(
RSMA_INFO_HASH
(
pRSmaStat
),
pSuid
,
sizeof
(
tb_uid_t
),
pIter
,
sizeof
(
pIter
));
smaDebug
(
"vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%"
PRIi64
,
SMA_VID
(
pSma
),
*
pSuid
);
...
...
@@ -416,11 +450,14 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
pIter
=
taosHashIterate
(
RSMA_IMU_INFO_HASH
(
pRSmaStat
),
pIter
);
}
#endif
// }
taosHashCleanup
(
RSMA_IMU_INFO_HASH
(
pRSmaStat
));
RSMA_IMU_INFO_HASH
(
pRSmaStat
)
=
NULL
;
taosWUnLockLatch
(
SMA_ENV_LOCK
(
pSmaEnv
));
// unlock
taosWUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
// step 2: cleanup outdated qtaskinfo files
tdCleanupQTaskInfoFiles
(
pSma
,
pRSmaStat
);
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
1547d8da
...
...
@@ -17,7 +17,7 @@
typedef
struct
SSmaStat
SSmaStat
;
#define SMA_MGMT_REF_NUM
10240
#define SMA_MGMT_REF_NUM 10240
extern
SSmaMgmt
smaMgmt
;
...
...
@@ -171,7 +171,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
int32_t
tdRefRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pRSmaInfo
)
{
if
(
!
pRSmaInfo
)
return
0
;
int
ref
=
T_REF_INC
(
pRSmaInfo
);
smaDebug
(
"vgId:%d, ref rsma info:%p, val:%d"
,
SMA_VID
(
pSma
),
pRSmaInfo
,
ref
);
return
0
;
...
...
@@ -183,9 +183,6 @@ int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int
ref
=
T_REF_DEC
(
pRSmaInfo
);
smaDebug
(
"vgId:%d, unref rsma info:%p, val:%d"
,
SMA_VID
(
pSma
),
pRSmaInfo
,
ref
);
if
(
ref
==
0
)
{
tdRemoveRSmaInfoBySuid
(
pSma
,
pRSmaInfo
->
suid
);
}
return
0
;
}
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
1547d8da
...
...
@@ -32,12 +32,14 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *ui
static
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
static
int32_t
tdSetRSmaInfoItemParams
(
SSma
*
pSma
,
SRSmaParam
*
param
,
SRSmaStat
*
pStat
,
SRSmaInfo
*
pRSmaInfo
,
int8_t
idx
);
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
rsmaItem
,
STSchema
*
pTSchema
,
tb_uid_t
suid
,
int8_t
level
);
static
SRSmaInfo
*
tdGetRSmaInfoBySuid
(
SSma
*
pSma
,
int64_t
suid
);
static
int32_t
tdRSmaFetchAndSubmitResult
(
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
,
SRSmaStat
*
pStat
,
int8_t
blkType
);
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
);
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfo
*
pInfo
,
tb_uid_t
suid
,
int8_t
level
);
static
SRSmaInfo
*
tdAcquireRSmaInfoBySuid
(
SSma
*
pSma
,
int64_t
suid
);
static
void
tdReleaseRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
);
static
int32_t
tdRSmaFetchAndSubmitResult
(
qTaskInfo_t
taskInfo
,
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
,
SRSmaStat
*
pStat
,
int8_t
blkType
);
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
);
static
int32_t
tdRSmaQTaskInfoIterInit
(
SRSmaQTaskInfoIter
*
pIter
,
STFile
*
pTFile
);
static
int32_t
tdRSmaQTaskInfoIterNextBlock
(
SRSmaQTaskInfoIter
*
pIter
,
bool
*
isFinish
);
...
...
@@ -115,17 +117,26 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if
(
pInfo
)
{
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
SRSmaInfoItem
*
pItem
=
&
pInfo
->
items
[
i
];
if
(
pItem
->
taskInfo
)
{
if
(
isDeepFree
&&
pItem
->
tmrId
)
{
smaDebug
(
"vgId:%d, stop fetch timer %p for table %"
PRIi64
" level %d"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
pItem
->
tmrId
,
i
+
1
);
taosTmrStopA
(
&
pItem
->
tmrId
);
}
tdFreeQTaskInfo
(
&
pItem
->
taskInfo
,
SMA_VID
(
pSma
),
i
+
1
);
if
(
isDeepFree
&&
pItem
->
tmrId
)
{
smaDebug
(
"vgId:%d, stop fetch timer %p for table %"
PRIi64
" level %d"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
pItem
->
tmrId
,
i
+
1
);
taosTmrStopA
(
&
pItem
->
tmrId
);
}
if
(
isDeepFree
&&
pInfo
->
taskInfo
[
i
])
{
tdFreeQTaskInfo
(
&
pInfo
->
taskInfo
[
i
],
SMA_VID
(
pSma
),
i
+
1
);
}
else
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" no need to destroy rsma info level %d since empty taskInfo"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
i
+
1
);
}
if
(
pInfo
->
iTaskInfo
[
i
])
{
tdFreeQTaskInfo
(
&
pInfo
->
iTaskInfo
[
i
],
SMA_VID
(
pSma
),
i
+
1
);
}
else
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" no need to destroy rsma info level %d since empty iTaskInfo"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
i
+
1
);
}
}
if
(
isDeepFree
)
{
taosMemoryFreeClear
(
pInfo
->
pTSchema
);
...
...
@@ -155,7 +166,12 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
return
TSDB_CODE_FAILED
;
}
pRSmaInfo
=
tdGetRSmaInfoBySuid
(
pSma
,
*
suid
);
if
(
!
taosArrayGetSize
(
tbUids
))
{
smaDebug
(
"vgId:%d, no need to update tbUidList for suid:%"
PRIi64
" since Empty tbUids"
,
SMA_VID
(
pSma
),
*
suid
);
return
TSDB_CODE_SUCCESS
;
}
pRSmaInfo
=
tdAcquireRSmaInfoBySuid
(
pSma
,
*
suid
);
if
(
!
pRSmaInfo
)
{
smaError
(
"vgId:%d, failed to get rsma info for uid:%"
PRIi64
,
SMA_VID
(
pSma
),
*
suid
);
...
...
@@ -163,26 +179,21 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
return
TSDB_CODE_FAILED
;
}
if
(
pRSmaInfo
->
items
[
0
].
taskInfo
)
{
if
((
qUpdateQualifiedTableId
(
pRSmaInfo
->
items
[
0
].
taskInfo
,
tbUids
,
true
)
<
0
))
{
smaError
(
"vgId:%d, update tbUidList failed for uid:%"
PRIi64
" since %s"
,
SMA_VID
(
pSma
),
*
suid
,
terrstr
());
return
TSDB_CODE_FAILED
;
}
else
{
smaDebug
(
"vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%"
PRIi64
", uid:%"
PRIi64
,
SMA_VID
(
pSma
),
pRSmaInfo
->
items
[
0
].
taskInfo
,
*
suid
,
*
(
int64_t
*
)
taosArrayGet
(
tbUids
,
0
));
}
}
if
(
pRSmaInfo
->
items
[
1
].
taskInfo
)
{
if
((
qUpdateQualifiedTableId
(
pRSmaInfo
->
items
[
1
].
taskInfo
,
tbUids
,
true
)
<
0
))
{
smaError
(
"vgId:%d, update tbUidList failed for uid:%"
PRIi64
" since %s"
,
SMA_VID
(
pSma
),
*
suid
,
terrstr
());
return
TSDB_CODE_FAILED
;
}
else
{
smaDebug
(
"vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%"
PRIi64
", uid:%"
PRIi64
,
SMA_VID
(
pSma
),
pRSmaInfo
->
items
[
1
].
taskInfo
,
*
suid
,
*
(
int64_t
*
)
taosArrayGet
(
tbUids
,
0
));
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
if
(
pRSmaInfo
->
taskInfo
[
i
])
{
if
((
qUpdateQualifiedTableId
(
pRSmaInfo
->
taskInfo
[
i
],
tbUids
,
true
)
<
0
))
{
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
smaError
(
"vgId:%d, update tbUidList failed for uid:%"
PRIi64
" level %d since %s"
,
SMA_VID
(
pSma
),
*
suid
,
i
,
terrstr
());
return
TSDB_CODE_FAILED
;
}
else
{
smaDebug
(
"vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%"
PRIi64
" uid:%"
PRIi64
" level %d"
,
SMA_VID
(
pSma
),
pRSmaInfo
->
taskInfo
[
0
],
*
suid
,
*
(
int64_t
*
)
taosArrayGet
(
tbUids
,
0
),
i
);
}
}
}
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -267,13 +278,12 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
.
initTqReader
=
1
,
};
SRSmaInfoItem
*
pItem
=
&
(
pRSmaInfo
->
items
[
idx
]);
pItem
->
refId
=
RSMA_REF_ID
(
pStat
);
pItem
->
taskInfo
=
qCreateStreamExecTaskInfo
(
param
->
qmsg
[
idx
],
&
handle
);
if
(
!
pItem
->
taskInfo
)
{
pRSmaInfo
->
taskInfo
[
idx
]
=
qCreateStreamExecTaskInfo
(
param
->
qmsg
[
idx
],
&
handle
);
if
(
!
pRSmaInfo
->
taskInfo
[
idx
])
{
terrno
=
TSDB_CODE_RSMA_QTASKINFO_CREATE
;
return
TSDB_CODE_FAILED
;
}
SRSmaInfoItem
*
pItem
=
&
(
pRSmaInfo
->
items
[
idx
]);
pItem
->
triggerStat
=
TASK_TRIGGER_STAT_INACTIVE
;
if
(
param
->
maxdelay
[
idx
]
<
TSDB_MIN_ROLLUP_MAX_DELAY
)
{
int64_t
msInterval
=
...
...
@@ -342,6 +352,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
}
pRSmaInfo
->
pTSchema
=
pTSchema
;
pRSmaInfo
->
suid
=
suid
;
pRSmaInfo
->
refId
=
RSMA_REF_ID
(
pStat
);
T_REF_INIT_VAL
(
pRSmaInfo
,
1
);
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pStat
,
pRSmaInfo
,
0
)
<
0
)
{
...
...
@@ -411,7 +422,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pSmaEnv
);
SRSmaStat
*
pRSmaStat
=
SMA_RSMA_STAT
(
pStat
);
SRSmaInfo
*
pRSmaInfo
=
td
Get
RSmaInfoBySuid
(
pSma
,
pReq
->
suid
);
SRSmaInfo
*
pRSmaInfo
=
td
Acquire
RSmaInfoBySuid
(
pSma
,
pReq
->
suid
);
if
(
!
pRSmaInfo
)
{
smaWarn
(
"vgId:%d, drop rsma for stable %s %"
PRIi64
" failed no rsma in hash"
,
TD_VID
(
pVnode
),
pReq
->
name
,
...
...
@@ -423,8 +434,10 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
RSMA_INFO_SET_DEL
(
pRSmaInfo
);
tdUnRefRSmaInfo
(
pSma
,
pRSmaInfo
);
// save to file
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
// save to file
// TODO
smaDebug
(
"vgId:%d, drop rsma for table %"
PRIi64
" succeed"
,
TD_VID
(
pVnode
),
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -567,8 +580,32 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
taosArrayDestroy
(
pArray
);
}
static
int32_t
tdRSmaFetchAndSubmitResult
(
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
,
SRSmaStat
*
pStat
,
int8_t
blkType
)
{
/**
* @brief retention of rsma1/rsma2
*
* @param pSma
* @param now
* @return int32_t
*/
int32_t
smaDoRetention
(
SSma
*
pSma
,
int64_t
now
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
VND_IS_RSMA
(
pSma
->
pVnode
))
{
return
code
;
}
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
if
(
pSma
->
pRSmaTsdb
[
i
])
{
code
=
tsdbDoRetention
(
pSma
->
pRSmaTsdb
[
i
],
now
);
if
(
code
)
goto
_end
;
}
}
_end:
return
code
;
}
static
int32_t
tdRSmaFetchAndSubmitResult
(
qTaskInfo_t
taskInfo
,
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
,
SRSmaStat
*
pStat
,
int8_t
blkType
)
{
SArray
*
pResult
=
NULL
;
SSma
*
pSma
=
pStat
->
pSma
;
...
...
@@ -576,7 +613,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
SSDataBlock
*
output
=
NULL
;
uint64_t
ts
;
int32_t
code
=
qExecTask
(
pItem
->
taskInfo
,
&
output
,
&
ts
);
int32_t
code
=
qExecTask
(
taskInfo
,
&
output
,
&
ts
);
if
(
code
<
0
)
{
smaError
(
"vgId:%d, qExecTask for rsma table %"
PRIi64
" level %"
PRIi8
" failed since %s"
,
SMA_VID
(
pSma
),
suid
,
pItem
->
level
,
terrstr
(
code
));
...
...
@@ -637,29 +674,32 @@ _err:
return
TSDB_CODE_FAILED
;
}
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
tb_uid_t
suid
,
int8_t
level
)
{
if
(
!
pItem
||
!
pItem
->
taskInfo
)
{
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfo
*
pInfo
,
tb_uid_t
suid
,
int8_t
level
)
{
int32_t
idx
=
level
-
1
;
if
(
!
pInfo
||
!
RSMA_INFO_QTASK
(
pInfo
,
idx
))
{
smaDebug
(
"vgId:%d, no qTaskInfo to execute rsma %"
PRIi8
" task for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
!
pTSchema
)
{
if
(
!
p
Info
->
p
TSchema
)
{
smaWarn
(
"vgId:%d, no schema to execute rsma %"
PRIi8
" task for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
suid
);
return
TSDB_CODE_FAILED
;
}
smaDebug
(
"vgId:%d, execute rsma %"
PRIi8
" task for qTaskInfo:%p suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
pItem
->
taskInfo
,
suid
);
RSMA_INFO_QTASK
(
pInfo
,
idx
)
,
suid
);
if
(
qSetMultiStreamInput
(
pItem
->
taskInfo
,
pMsg
,
1
,
inputType
)
<
0
)
{
// INPUT__DATA_SUBMIT
if
(
qSetMultiStreamInput
(
RSMA_INFO_QTASK
(
pInfo
,
idx
)
,
pMsg
,
1
,
inputType
)
<
0
)
{
// INPUT__DATA_SUBMIT
smaError
(
"vgId:%d, rsma %"
PRIi8
" qSetStreamInput failed since %s"
,
SMA_VID
(
pSma
),
level
,
tstrerror
(
terrno
));
return
TSDB_CODE_FAILED
;
}
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
SMA_RSMA_STAT
(
pEnv
->
pStat
);
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
SMA_RSMA_STAT
(
pEnv
->
pStat
);
SRSmaInfoItem
*
pItem
=
RSMA_INFO_ITEM
(
pInfo
,
idx
);
tdRSmaFetchAndSubmitResult
(
pItem
,
pTSchema
,
suid
,
pStat
,
STREAM_INPUT__DATA_SUBMIT
);
tdRSmaFetchAndSubmitResult
(
RSMA_INFO_QTASK
(
pInfo
,
idx
),
pItem
,
pInfo
->
pTSchema
,
suid
,
pStat
,
STREAM_INPUT__DATA_SUBMIT
);
atomic_store_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
);
if
(
smaMgmt
.
tmrHandle
)
{
...
...
@@ -678,7 +718,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
* @param suid
* @return SRSmaInfo*
*/
static
SRSmaInfo
*
td
Get
RSmaInfoBySuid
(
SSma
*
pSma
,
int64_t
suid
)
{
static
SRSmaInfo
*
td
Acquire
RSmaInfoBySuid
(
SSma
*
pSma
,
int64_t
suid
)
{
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
NULL
;
SRSmaInfo
*
pRSmaInfo
=
NULL
;
...
...
@@ -692,94 +732,86 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return
NULL
;
}
taosRLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
pRSmaInfo
&&
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
if
(
RSMA_INFO_IS_DEL
(
pRSmaInfo
))
{
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
return
NULL
;
}
tdRefRSmaInfo
(
pSma
,
pRSmaInfo
);
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
return
pRSmaInfo
;
}
if
(
RSMA_COMMIT_STAT
(
pStat
)
==
0
)
{
if
(
RSMA_COMMIT_STAT
(
pStat
)
==
0
)
{
// return NULL if not in committing stat
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
return
NULL
;
}
taosRUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
SRSmaInfo
*
pCowRSmaInfo
=
NULL
;
// lock
taosWLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
if
(
!
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
)))
{
// 2-phase lock
if
(
!
(
pCowRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
)
)))
{
// 2-phase lock
void
*
iRSmaInfo
=
taosHashGet
(
RSMA_IMU_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
iRSmaInfo
)
{
SRSmaInfo
*
pIRSmaInfo
=
*
(
SRSmaInfo
**
)
iRSmaInfo
;
if
(
pIRSmaInfo
)
{
if
(
tdCloneRSmaInfo
(
pSma
,
pCowRSmaInfo
,
pIRSmaInfo
)
<
0
)
{
if
(
pIRSmaInfo
&&
!
RSMA_INFO_IS_DEL
(
pIRSmaInfo
))
{
if
(
tdCloneRSmaInfo
(
pSma
,
&
pCowRSmaInfo
,
pIRSmaInfo
)
<
0
)
{
// unlock
taosWUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
smaError
(
"vgId:%d, clone rsma info failed for suid:%"
PRIu64
" since %s"
,
SMA_VID
(
pSma
),
suid
,
terrstr
());
return
NULL
;
}
smaDebug
(
"vgId:%d, clone rsma info succeed for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
if
(
taosHashPut
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
),
&
pCowRSmaInfo
,
sizeof
(
pCowRSmaInfo
))
<
0
)
{
// unlock
taosWUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
smaError
(
"vgId:%d, clone rsma info failed for suid:%"
PRIu64
" since %s"
,
SMA_VID
(
pSma
),
suid
,
terrstr
());
return
NULL
;
}
}
}
}
else
{
pCowRSmaInfo
=
*
(
SRSmaInfo
**
)
pCowRSmaInfo
;
ASSERT
(
!
pCowRSmaInfo
);
}
if
(
pCowRSmaInfo
)
{
tdRefRSmaInfo
(
pSma
,
pCowRSmaInfo
);
}
// unlock
taosWUnLockLatch
(
SMA_ENV_LOCK
(
pEnv
));
return
pCowRSmaInfo
;
}
/**
* @brief During the drop procedure, only need to delete the object in rsmaInfoHash.
*
* @param pSma
* @param suid
* @return SRSmaInfo*
*/
void
tdRemoveRSmaInfoBySuid
(
SSma
*
pSma
,
int64_t
suid
)
{
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
NULL
;
SRSmaInfo
*
pRSmaInfo
=
NULL
;
if
(
!
pEnv
)
{
return
;
}
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
if
(
!
pStat
||
!
RSMA_INFO_HASH
(
pStat
))
{
return
;
}
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
pRSmaInfo
)
{
if
((
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
tdFreeRSmaInfo
(
pSma
,
pRSmaInfo
,
true
);
}
taosHashRemove
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
smaDebug
(
"vgId:%d, remove from infoHash for table:%"
PRIu64
" succeed"
,
SMA_VID
(
pSma
),
suid
);
static
FORCE_INLINE
void
tdReleaseRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
)
{
if
(
pInfo
)
{
tdUnRefRSmaInfo
(
pSma
,
pInfo
);
}
}
static
int32_t
tdExecuteRSma
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
tb_uid_t
suid
)
{
SRSmaInfo
*
pRSmaInfo
=
td
Get
RSmaInfoBySuid
(
pSma
,
suid
);
SRSmaInfo
*
pRSmaInfo
=
td
Acquire
RSmaInfoBySuid
(
pSma
,
suid
);
if
(
!
pRSmaInfo
)
{
smaDebug
(
"vgId:%d, execute rsma, no rsma info for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
!
pRSmaInfo
->
items
[
0
].
taskInfo
)
{
if
(
!
RSMA_INFO_QTASK
(
pRSmaInfo
,
0
))
{
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
smaDebug
(
"vgId:%d, execute rsma, no rsma qTaskInfo for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
tdRefRSmaInfo
(
pSma
,
pRSmaInfo
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
0
],
pRSmaInfo
->
pTSchema
,
suid
,
TSDB_RETENTION_L1
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
1
],
pRSmaInfo
->
pTSchema
,
suid
,
TSDB_RETENTION_L2
);
tdUnRefRSmaInfo
(
pSma
,
pRSmaInfo
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
pRSmaInfo
,
suid
,
TSDB_RETENTION_L1
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
pRSmaInfo
,
suid
,
TSDB_RETENTION_L2
);
}
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -990,26 +1022,28 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
SRSmaInfo
*
pRSmaInfo
=
NULL
;
void
*
qTaskInfo
=
NULL
;
pRSmaInfo
=
td
Get
RSmaInfoBySuid
(
pSma
,
pItem
->
suid
);
pRSmaInfo
=
td
Acquire
RSmaInfoBySuid
(
pSma
,
pItem
->
suid
);
if
(
!
pRSmaInfo
)
{
smaDebug
(
"vgId:%d, no restore as no rsma info for table:%"
PRIu64
,
SMA_VID
(
pSma
),
pItem
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
pItem
->
type
==
TSDB_RETENTION_L1
)
{
qTaskInfo
=
pRSmaInfo
->
items
[
0
].
taskInfo
;
qTaskInfo
=
RSMA_INFO_QTASK
(
pRSmaInfo
,
0
)
;
}
else
if
(
pItem
->
type
==
TSDB_RETENTION_L2
)
{
qTaskInfo
=
pRSmaInfo
->
items
[
1
].
taskInfo
;
qTaskInfo
=
RSMA_INFO_QTASK
(
pRSmaInfo
,
1
)
;
}
else
{
ASSERT
(
0
);
}
if
(
!
qTaskInfo
)
{
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
smaDebug
(
"vgId:%d, no restore as NULL rsma qTaskInfo for table:%"
PRIu64
,
SMA_VID
(
pSma
),
pItem
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
qDeserializeTaskStatus
(
qTaskInfo
,
pItem
->
qTaskInfo
,
pItem
->
len
)
<
0
)
{
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
smaError
(
"vgId:%d, restore rsma task failed for table:%"
PRIi64
" level %d since %s"
,
SMA_VID
(
pSma
),
pItem
->
suid
,
pItem
->
type
,
terrstr
());
return
TSDB_CODE_FAILED
;
...
...
@@ -1017,6 +1051,7 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
smaDebug
(
"vgId:%d, restore rsma task success for table:%"
PRIi64
" level %d"
,
SMA_VID
(
pSma
),
pItem
->
suid
,
pItem
->
type
);
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1195,8 +1230,14 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
while
(
infoHash
)
{
SRSmaInfo
*
pRSmaInfo
=
*
(
SRSmaInfo
**
)
infoHash
;
if
(
RSMA_INFO_IS_DEL
(
pRSmaInfo
))
{
infoHash
=
taosHashIterate
(
pInfoHash
,
infoHash
);
continue
;
}
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
qTaskInfo_t
taskInfo
=
pRSmaInfo
->
items
[
i
].
taskInfo
;
qTaskInfo_t
taskInfo
=
RSMA_INFO_QTASK
(
pRSmaInfo
,
i
)
;
if
(
!
taskInfo
)
{
smaDebug
(
"vgId:%d, rsma, table %"
PRIi64
" level %d qTaskInfo is NULL"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
continue
;
...
...
@@ -1290,11 +1331,17 @@ _err:
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaInfoItem
*
pItem
=
param
;
SSma
*
pSma
=
NULL
;
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
tdAcquireSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
SRSmaInfo
*
pRSmaInfo
=
tdGetRSmaInfoByItem
(
pItem
);
if
(
RSMA_INFO_IS_DEL
(
pRSmaInfo
))
{
return
;
}
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
tdAcquireSmaRef
(
smaMgmt
.
rsetId
,
pRSmaInfo
->
refId
);
if
(
!
pStat
)
{
smaDebug
(
"rsma fetch task not start since already destroyed, rsetId rsetId:%"
PRIi64
" refId:%d)"
,
smaMgmt
.
rsetId
,
p
Item
->
refId
);
p
RSmaInfo
->
refId
);
return
;
}
...
...
@@ -1305,10 +1352,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
switch
(
rsmaTriggerStat
)
{
case
TASK_TRIGGER_STAT_PAUSED
:
case
TASK_TRIGGER_STAT_CANCELLED
:
{
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
p
Item
->
refId
,
__func__
,
__LINE__
);
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
p
RSmaInfo
->
refId
);
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data since stat is %"
PRIi8
", rsetId rsetId:%"
PRIi64
" refId:%d"
,
SMA_VID
(
pSma
),
pItem
->
level
,
rsmaTriggerStat
,
smaMgmt
.
rsetId
,
p
Item
->
refId
);
SMA_VID
(
pSma
),
pItem
->
level
,
rsmaTriggerStat
,
smaMgmt
.
rsetId
,
p
RSmaInfo
->
refId
);
if
(
rsmaTriggerStat
==
TASK_TRIGGER_STAT_PAUSED
)
{
taosTmrReset
(
tdRSmaFetchTrigger
,
pItem
->
maxDelay
>
5000
?
5000
:
pItem
->
maxDelay
,
pItem
,
smaMgmt
.
tmrHandle
,
&
pItem
->
tmrId
);
...
...
@@ -1319,11 +1366,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
break
;
}
SRSmaInfo
*
pRSmaInfo
=
tdGetRSmaInfoByItem
(
pItem
);
if
(
RSMA_INFO_IS_DEL
(
pRSmaInfo
))
{
goto
_end
;
}
int8_t
fetchTriggerStat
=
atomic_val_compare_exchange_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
switch
(
fetchTriggerStat
)
{
...
...
@@ -1332,16 +1374,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
pItem
->
level
,
pRSmaInfo
->
suid
);
// sync procedure => async process
tdRefRSmaInfo
(
pSma
,
pRSmaInfo
);
SSDataBlock
dataBlock
=
{.
info
.
type
=
STREAM_GET_ALL
};
qSetMultiStreamInput
(
pItem
->
taskInfo
,
&
dataBlock
,
1
,
STREAM_INPUT__DATA_BLOCK
);
tdRSmaFetchAndSubmitResult
(
pItem
,
pRSmaInfo
->
pTSchema
,
pRSmaInfo
->
suid
,
pStat
,
STREAM_INPUT__DATA_BLOCK
);
tdCleanupStreamInputDataBlock
(
pItem
->
taskInfo
);
qTaskInfo_t
taskInfo
=
pRSmaInfo
->
taskInfo
[
pItem
->
level
-
1
];
qSetMultiStreamInput
(
taskInfo
,
&
dataBlock
,
1
,
STREAM_INPUT__DATA_BLOCK
);
tdRSmaFetchAndSubmitResult
(
taskInfo
,
pItem
,
pRSmaInfo
->
pTSchema
,
pRSmaInfo
->
suid
,
pStat
,
STREAM_INPUT__DATA_BLOCK
);
tdCleanupStreamInputDataBlock
(
taskInfo
);
tdUnRefRSmaInfo
(
pSma
,
pRSmaInfo
);
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
// taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
}
break
;
case
TASK_TRIGGER_STAT_PAUSED
:
{
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is paused"
,
...
...
@@ -1362,22 +1402,5 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
}
_end:
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
p
Item
->
refId
,
__func__
,
__LINE__
);
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
p
RSmaInfo
->
refId
);
}
int32_t
smaDoRetention
(
SSma
*
pSma
,
int64_t
now
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
VND_IS_RSMA
(
pSma
->
pVnode
))
{
return
code
;
}
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
if
(
pSma
->
pRSmaTsdb
[
i
])
{
code
=
tsdbDoRetention
(
pSma
->
pRSmaTsdb
[
i
],
now
);
if
(
code
)
goto
_end
;
}
}
_end:
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/sma/smaTimeRange.c
浏览文件 @
1547d8da
...
...
@@ -116,8 +116,10 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
}
// create stable to save tsma result in dstVgId
SName
stbFullName
=
{
0
};
tNameFromString
(
&
stbFullName
,
pCfg
->
dstTbName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
SVCreateStbReq
pReq
=
{
0
};
pReq
.
name
=
pCfg
->
dstTbName
;
pReq
.
name
=
(
char
*
)
tNameGetTableName
(
&
stbFullName
)
;
pReq
.
suid
=
pCfg
->
dstTbUid
;
pReq
.
schemaRow
=
pCfg
->
schemaRow
;
pReq
.
schemaTag
=
pCfg
->
schemaTag
;
...
...
@@ -125,6 +127,12 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
if
(
metaCreateSTable
(
SMA_META
(
pSma
),
version
,
&
pReq
)
<
0
)
{
return
-
1
;
}
smaDebug
(
"vgId:%d, success to create sma index %s %"
PRIi64
" on stb:%"
PRIi64
", dstSuid:%"
PRIi64
" dstTb:%s dstVg:%d"
,
SMA_VID
(
pSma
),
pCfg
->
indexName
,
pCfg
->
indexUid
,
pCfg
->
tableUid
,
pCfg
->
dstTbUid
,
pReq
.
name
,
pCfg
->
dstVgId
);
}
else
{
ASSERT
(
0
);
}
return
0
;
...
...
source/dnode/vnode/src/sma/smaUtil.c
浏览文件 @
1547d8da
...
...
@@ -287,22 +287,22 @@ int32_t tdRemoveTFile(STFile *pTFile) {
}
// smaXXXUtil ================
void
*
tdAcquireSmaRef
(
int32_t
rsetId
,
int64_t
refId
,
const
char
*
tags
,
int32_t
ln
)
{
void
*
tdAcquireSmaRef
(
int32_t
rsetId
,
int64_t
refId
)
{
void
*
pResult
=
taosAcquireRef
(
rsetId
,
refId
);
if
(
!
pResult
)
{
smaWarn
(
"
%s:%d taosAcquireRef for rsetId:%"
PRIi64
" refId:%d failed since %s"
,
tags
,
ln
,
rsetId
,
refId
,
terrstr
());
smaWarn
(
"
rsma acquire ref for rsetId:%"
PRIi64
" refId:%d failed since %s"
,
rsetId
,
refId
,
terrstr
());
}
else
{
smaDebug
(
"
%s:%d taosAcquireRef for rsetId:%"
PRIi64
" refId:%d success"
,
tags
,
ln
,
rsetId
,
refId
);
smaDebug
(
"
rsma acquire ref for rsetId:%"
PRIi64
" refId:%d success"
,
rsetId
,
refId
);
}
return
pResult
;
}
int32_t
tdReleaseSmaRef
(
int32_t
rsetId
,
int64_t
refId
,
const
char
*
tags
,
int32_t
ln
)
{
int32_t
tdReleaseSmaRef
(
int32_t
rsetId
,
int64_t
refId
)
{
if
(
taosReleaseRef
(
rsetId
,
refId
)
<
0
)
{
smaWarn
(
"
%s:%d taosReleaseRef for rsetId:%"
PRIi64
" refId:%d failed since %s"
,
tags
,
ln
,
rsetId
,
refId
,
terrstr
());
smaWarn
(
"
rsma release ref for rsetId:%"
PRIi64
" refId:%d failed since %s"
,
rsetId
,
refId
,
terrstr
());
return
TSDB_CODE_FAILED
;
}
smaDebug
(
"
%s:%d taosReleaseRef for rsetId:%"
PRIi64
" refId:%d success"
,
tags
,
ln
,
rsetId
,
refId
);
smaDebug
(
"
rsma release ref for rsetId:%"
PRIi64
" refId:%d success"
,
rsetId
,
refId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -313,7 +313,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
char
*
pOutput
=
NULL
;
int32_t
len
=
0
;
if
(
qSerializeTaskStatus
(
srcTaskInfo
,
&
pOutput
,
&
len
)
<
0
)
{
if
(
(
terrno
=
qSerializeTaskStatus
(
srcTaskInfo
,
&
pOutput
,
&
len
)
)
<
0
)
{
smaError
(
"vgId:%d, rsma clone, table %"
PRIi64
" serialize qTaskInfo failed since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
goto
_err
;
...
...
@@ -337,41 +337,34 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
goto
_err
;
}
sma
Error
(
"vgId:%d, rsma clone, restore rsma task for table:%"
PRIi64
" succeed"
,
TD_VID
(
pVnode
),
suid
);
sma
Debug
(
"vgId:%d, rsma clone, restore rsma task for table:%"
PRIi64
" succeed"
,
TD_VID
(
pVnode
),
suid
);
taosMemoryFreeClear
(
pOutput
);
return
TSDB_CODE_SUCCESS
;
_err:
taosMemoryFreeClear
(
pOutput
);
tdFreeQTaskInfo
(
dstTaskInfo
,
TD_VID
(
pVnode
),
idx
+
1
);
smaError
(
"vgId:%d, rsma clone, restore rsma task for table:%"
PRIi64
" failed since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
return
TSDB_CODE_FAILED
;
}
/**
* @brief pTSchema is shared
*
* @param pSma
* @param pDest
* @param pSrc
* @return int32_t
*
* @param pSma
* @param pDest
* @param pSrc
* @return int32_t
*/
int32_t
tdCloneRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pDest
,
SRSmaInfo
*
pSrc
)
{
int32_t
tdCloneRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
*
pDest
,
SRSmaInfo
*
pSrc
)
{
SVnode
*
pVnode
=
pSma
->
pVnode
;
SRSmaParam
*
param
=
NULL
;
if
(
!
pSrc
)
{
*
pDest
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
if
(
!
pDest
)
{
pDest
=
taosMemoryCalloc
(
1
,
sizeof
(
SRSmaInfo
));
if
(
!
pDest
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
}
memcpy
(
pDest
,
pSrc
,
sizeof
(
SRSmaInfo
));
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
SMA_META
(
pSma
),
0
);
smaDebug
(
"vgId:%d, rsma clone, suid is %"
PRIi64
,
TD_VID
(
pVnode
),
pSrc
->
suid
);
...
...
@@ -384,21 +377,22 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) {
ASSERT
(
mr
.
me
.
uid
==
pSrc
->
suid
);
if
(
TABLE_IS_ROLLUP
(
mr
.
me
.
flags
))
{
param
=
&
mr
.
me
.
stbEntry
.
rsmaParam
;
for
(
int
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
SRSmaInfoItem
*
pItem
=
&
pSrc
->
items
[
i
];
if
(
pItem
->
taskInfo
)
{
tdCloneQTaskInfo
(
pSma
,
pDest
->
items
[
i
].
taskInfo
,
pItem
->
taskInfo
,
param
,
pSrc
->
suid
,
i
);
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
if
(
tdCloneQTaskInfo
(
pSma
,
pSrc
->
iTaskInfo
[
i
],
pSrc
->
taskInfo
[
i
],
param
,
pSrc
->
suid
,
i
)
<
0
)
{
goto
_err
;
}
}
smaDebug
(
"vgId:%d, rsma clone env success for %"
PRIi64
,
TD_VID
(
pVnode
),
pSrc
->
suid
);
}
metaReaderClear
(
&
mr
);
*
pDest
=
pSrc
;
// pointer copy
return
TSDB_CODE_SUCCESS
;
_err:
*
pDest
=
NULL
;
metaReaderClear
(
&
mr
);
tdFreeRSmaInfo
(
pSma
,
pDest
,
false
);
smaError
(
"vgId:%d, rsma clone env failed for %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
pSrc
->
suid
,
terrstr
()
);
return
TSDB_CODE_FAILED
;
}
// ...
\ No newline at end of file
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
1547d8da
...
...
@@ -351,7 +351,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// TODO: remove the function
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
)
{
// TODO
blockDebugShowDataBlocks
(
data
,
__func__
);
tdProcessTSmaInsert
(((
SVnode
*
)
pVnode
)
->
pSma
,
smaId
,
(
const
char
*
)
data
);
}
...
...
@@ -852,6 +851,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
if
(
metaCreateTable
(
pVnode
->
pMeta
,
version
,
&
createTbReq
)
<
0
)
{
if
(
terrno
!=
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
)
{
submitBlkRsp
.
code
=
terrno
;
pRsp
->
code
=
terrno
;
tDecoderClear
(
&
decoder
);
taosArrayDestroy
(
createTbReq
.
ctb
.
tagName
);
goto
_exit
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录