Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d53e1982
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d53e1982
编写于
6月 30, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: tref used for rsma fetch task
上级
6600c472
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
57 addition
and
53 deletion
+57
-53
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+6
-7
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+1
-1
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+50
-45
未找到文件。
source/dnode/vnode/src/inc/sma.h
浏览文件 @
d53e1982
...
...
@@ -48,7 +48,6 @@ struct SSmaEnv {
typedef
struct
{
int32_t
smaRef
;
int32_t
refId
;
}
SSmaMgmt
;
#define SMA_ENV_LOCK(env) ((env)->lock)
...
...
@@ -63,12 +62,12 @@ struct STSmaStat {
struct
SRSmaStat
{
SSma
*
pSma
;
int64_t
refId
;
void
*
tmrHandle
;
tmr_h
tmrId
;
int32_t
tmrSeconds
;
int8_t
triggerStat
;
int8_t
runningStat
;
int64_t
refId
;
// shared by persistence/fetch tasks
void
*
tmrHandle
;
// for persistence task
tmr_h
tmrId
;
// for persistence task
int32_t
tmrSeconds
;
// for persistence task
int8_t
triggerStat
;
// for persistence task
int8_t
runningStat
;
// for persistence task
SHashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
};
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
d53e1982
...
...
@@ -135,7 +135,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
// init smaMgmt
smaMgmt
.
smaRef
=
taosOpenRef
(
SMA_MGMT_REF_NUM
,
tdDestroyRSmaStat
);
if
(
smaMgmt
.
refId
<
0
)
{
if
(
smaMgmt
.
smaRef
<
0
)
{
smaError
(
"init smaRef failed, num:%d"
,
SMA_MGMT_REF_NUM
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
d53e1982
...
...
@@ -50,6 +50,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
struct
SRSmaInfoItem
{
SRSmaInfo
*
pRsmaInfo
;
int64_t
refId
;
void
*
taskInfo
;
// qTaskInfo_t
tmr_h
tmrId
;
int8_t
level
;
...
...
@@ -60,11 +61,14 @@ struct SRSmaInfoItem {
struct
SRSmaInfo
{
STSchema
*
pTSchema
;
S
Sma
*
pSma
;
S
RSmaStat
*
pStat
;
int64_t
suid
;
SRSmaInfoItem
items
[
TSDB_RETENTION_L2
];
};
#define RSMA_INFO_SMA(r) ((r)->pStat->pSma)
#define RSMA_INFO_STAT(r) ((r)->pStat)
struct
SRSmaQTaskInfoItem
{
int32_t
len
;
int8_t
type
;
...
...
@@ -107,22 +111,21 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId,
void
*
tdFreeRSmaInfo
(
SRSmaInfo
*
pInfo
)
{
if
(
pInfo
)
{
SSma
*
pSma
=
RSMA_INFO_SMA
(
pInfo
);
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
SRSmaInfoItem
*
pItem
=
&
pInfo
->
items
[
i
];
if
(
pItem
->
taskInfo
)
{
smaDebug
(
"vgId:%d, stb %"
PRIi64
" stop fetch-timer %p level %d"
,
SMA_VID
(
p
Info
->
pSma
),
pInfo
->
sui
d
,
pItem
->
tmrId
,
i
+
1
);
smaDebug
(
"vgId:%d, stb %"
PRIi64
" stop fetch-timer %p level %d"
,
SMA_VID
(
p
Sma
),
pInfo
->
suid
,
pItem
->
tmrI
d
,
i
+
1
);
taosTmrStopA
(
&
pItem
->
tmrId
);
tdFreeTaskHandle
(
&
pItem
->
taskInfo
,
SMA_VID
(
p
Info
->
p
Sma
),
i
+
1
);
tdFreeTaskHandle
(
&
pItem
->
taskInfo
,
SMA_VID
(
pSma
),
i
+
1
);
}
else
{
smaDebug
(
"vgId:%d, stb %"
PRIi64
" no need to destroy rsma info level %d since empty taskInfo"
,
SMA_VID
(
pInfo
->
pSma
),
pInfo
->
suid
,
i
+
1
);
smaDebug
(
"vgId:%d, stb %"
PRIi64
" no need to destroy rsma info level %d since empty taskInfo"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
i
+
1
);
}
}
taosMemoryFree
(
pInfo
->
pTSchema
);
taosMemoryFree
(
pInfo
);
}
else
{
smaDebug
(
"vgId:%d, stb %"
PRIi64
" no need to destroy rsma info since empty"
,
SMA_VID
(
pInfo
->
pSma
),
pInfo
->
suid
);
}
return
NULL
;
...
...
@@ -255,6 +258,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
if
(
param
->
qmsg
[
idx
])
{
SRSmaInfoItem
*
pItem
=
&
(
pRSmaInfo
->
items
[
idx
]);
pItem
->
refId
=
RSMA_REF_ID
(
pRSmaInfo
->
pStat
);
pItem
->
pRsmaInfo
=
pRSmaInfo
;
pItem
->
taskInfo
=
qCreateStreamExecTaskInfo
(
param
->
qmsg
[
idx
],
pReadHandle
);
if
(
!
pItem
->
taskInfo
)
{
...
...
@@ -340,7 +344,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto
_err
;
}
pRSmaInfo
->
pTSchema
=
pTSchema
;
pRSmaInfo
->
pS
ma
=
pSma
;
pRSmaInfo
->
pS
tat
=
pStat
;
pRSmaInfo
->
suid
=
suid
;
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pRSmaInfo
,
&
handle
,
0
)
<
0
)
{
...
...
@@ -522,7 +526,7 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
static
int32_t
tdFetchAndSubmitRSmaResult
(
SRSmaInfoItem
*
pItem
,
int8_t
blkType
)
{
SArray
*
pResult
=
NULL
;
SRSmaInfo
*
pRSmaInfo
=
pItem
->
pRsmaInfo
;
SSma
*
pSma
=
pRSmaInfo
->
pSma
;
SSma
*
pSma
=
RSMA_INFO_SMA
(
pRSmaInfo
)
;
while
(
1
)
{
SSDataBlock
*
output
=
NULL
;
...
...
@@ -585,21 +589,29 @@ _err:
*/
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaInfoItem
*
pItem
=
param
;
SSma
*
pSma
=
pItem
->
pRsmaInfo
->
pSma
;
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
((
SSmaEnv
*
)
pSma
->
pRSmaEnv
);
SSma
*
pSma
=
NULL
;
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
taosAcquireRef
(
smaMgmt
.
smaRef
,
pItem
->
refId
);
if
(
!
pStat
)
{
smaDebug
(
"rsma fetch task not start since already destroyed"
);
return
;
}
pSma
=
RSMA_INFO_SMA
(
pItem
->
pRsmaInfo
);
// if rsma trigger stat in cancelled or finished, not start fetch task anymore
int8_t
rsmaTriggerStat
=
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
));
if
(
rsmaTriggerStat
==
TASK_TRIGGER_STAT_CANCELLED
||
rsmaTriggerStat
==
TASK_TRIGGER_STAT_FINISHED
)
{
smaDebug
(
"vgId:%d, level %"
PRIi8
" not fetch since stat is cancelled for table suid:%"
PRIi64
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
taosReleaseRef
(
smaMgmt
.
smaRef
,
pItem
->
refId
);
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is cancelled"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
return
;
}
int8_t
fetchTriggerStat
=
atomic_val_compare_exchange_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
if
(
fetchTriggerStat
==
TASK_TRIGGER_STAT_ACTIVE
)
{
smaDebug
(
"vgId:%d,
level %"
PRIi8
" stat is active for table suid:%"
PRIi64
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
smaDebug
(
"vgId:%d,
fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is active"
,
SMA_VID
(
pSma
)
,
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
tdRefSmaStat
(
pSma
,
(
SSmaStat
*
)
pStat
);
...
...
@@ -610,9 +622,11 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
tdUnRefSmaStat
(
pSma
,
(
SSmaStat
*
)
pStat
);
}
else
{
smaDebug
(
"vgId:%d,
level %"
PRIi8
" stat is inactive for table suid:%"
PRIi64
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
smaDebug
(
"vgId:%d,
not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is inactive"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
}
_end:
taosReleaseRef
(
smaMgmt
.
smaRef
,
pItem
->
refId
);
}
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
pItem
,
tb_uid_t
suid
,
...
...
@@ -632,7 +646,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
tdFetchAndSubmitRSmaResult
(
pItem
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
atomic_store_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
);
smaDebug
(
"vgId:%d, process rsma insert"
,
SMA_VID
(
pSma
));
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
SMA_RSMA_STAT
(
pEnv
->
pStat
);
...
...
@@ -1036,7 +1049,7 @@ static void *tdRSmaPersistExec(void *param) {
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
qTaskInfo_t
taskInfo
=
pRSmaInfo
->
items
[
i
].
taskInfo
;
if
(
!
taskInfo
)
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d qTaskInfo is NULL"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
smaDebug
(
"vgId:%d,
rsma,
table %"
PRIi64
" level %d qTaskInfo is NULL"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
continue
;
}
...
...
@@ -1044,27 +1057,20 @@ static void *tdRSmaPersistExec(void *param) {
int32_t
len
=
0
;
int8_t
type
=
(
int8_t
)(
i
+
1
);
if
(
qSerializeTaskStatus
(
taskInfo
,
&
pOutput
,
&
len
)
<
0
)
{
smaError
(
"vgId:%d,
table %"
PRIi64
" level %d serialize rsma task failed since %s"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
terrstr
(
terrno
));
smaError
(
"vgId:%d,
rsma, table %"
PRIi64
" level %d serialize qTaskInfo failed since %s"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
terrstr
(
terrno
));
goto
_err
;
}
if
(
!
pOutput
||
len
<=
0
)
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task success but no output(len %d), not persist"
,
smaDebug
(
"vgId:%d, rsma, table %"
PRIi64
" level %d serialize qTaskInfo success but no output(len %d), not persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
taosMemoryFreeClear
(
pOutput
);
continue
;
}
smaDebug
(
"vgId:%d,
table %"
PRIi64
" level %d serialize rsma task
success with len %d, need persist"
,
vid
,
smaDebug
(
"vgId:%d,
rsma, table %"
PRIi64
" level %d serialize qTaskInfo
success with len %d, need persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
#if 0
if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) {
smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid,
i + 1, terrstr(terrno));
} else {
smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1);
}
#endif
if
(
!
isFileCreated
)
{
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
...
...
@@ -1084,11 +1090,11 @@ static void *tdRSmaPersistExec(void *param) {
ASSERT
(
headLen
<=
RSMA_QTASKINFO_HEAD_LEN
);
tdAppendTFile
(
&
tFile
,
(
void
*
)
&
tmpBuf
,
headLen
,
&
toffset
);
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d head part(len:%d) appended to offset:%"
PRIi64
,
vid
,
smaDebug
(
"vgId:%d,
rsma,
table %"
PRIi64
" level %d head part(len:%d) appended to offset:%"
PRIi64
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
headLen
,
toffset
);
tdAppendTFile
(
&
tFile
,
pOutput
,
len
,
&
toffset
);
smaDebug
(
"vgId:%d,
table %"
PRIi64
" level %d body part len:%d appended to offset:%"
PRIi64
,
vid
,
pRSmaInfo
->
su
id
,
i
+
1
,
len
,
toffset
);
smaDebug
(
"vgId:%d,
rsma, table %"
PRIi64
" level %d body part len:%d appended to offset:%"
PRIi64
,
v
id
,
pRSmaInfo
->
suid
,
i
+
1
,
len
,
toffset
);
taosMemoryFree
(
pOutput
);
}
...
...
@@ -1098,13 +1104,13 @@ static void *tdRSmaPersistExec(void *param) {
_normal:
if
(
isFileCreated
)
{
if
(
tdUpdateTFileHeader
(
&
tFile
)
<
0
)
{
smaError
(
"vgId:%d, failed to update tfile %s header since %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
smaError
(
"vgId:%d,
rsma,
failed to update tfile %s header since %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
tstrerror
(
terrno
));
tdCloseTFile
(
&
tFile
);
tdRemoveTFile
(
&
tFile
);
goto
_err
;
}
else
{
smaDebug
(
"vgId:%d, succeed to update tfile %s header"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
));
smaDebug
(
"vgId:%d,
rsma,
succeed to update tfile %s header"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
));
}
tdCloseTFile
(
&
tFile
);
...
...
@@ -1114,10 +1120,10 @@ _normal:
char
*
pos
=
strstr
(
newFName
,
tdQTaskInfoFname
[
TD_QTASK_TMP_F
]);
strncpy
(
pos
,
tdQTaskInfoFname
[
TD_QTASK_TMP_F
],
TSDB_FILENAME_LEN
-
POINTER_DISTANCE
(
pos
,
newFName
));
if
(
taosRenameFile
(
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
)
!=
0
)
{
smaError
(
"vgId:%d, failed to rename %s to %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
);
smaError
(
"vgId:%d,
rsma,
failed to rename %s to %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
);
goto
_err
;
}
else
{
smaDebug
(
"vgId:%d, succeed to rename %s to %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
);
smaDebug
(
"vgId:%d,
rsma,
succeed to rename %s to %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
);
}
}
goto
_end
;
...
...
@@ -1129,13 +1135,13 @@ _end:
if
(
TASK_TRIGGER_STAT_INACTIVE
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INACTIVE
,
TASK_TRIGGER_STAT_ACTIVE
))
{
smaDebug
(
"vgId:%d, persist task is active again"
,
vid
);
smaDebug
(
"vgId:%d,
rsma
persist task is active again"
,
vid
);
}
else
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"vgId:%d, persist task is cancelled"
,
vid
);
smaDebug
(
"vgId:%d,
rsma
persist task is cancelled"
,
vid
);
}
else
{
smaWarn
(
"vgId:%d, persist task in abnormal stat %"
PRIi8
,
vid
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)));
smaWarn
(
"vgId:%d,
rsma
persist task in abnormal stat %"
PRIi8
,
vid
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)));
ASSERT
(
0
);
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
...
...
@@ -1179,9 +1185,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
*/
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaStat
*
rsmaStat
=
param
;
int64_t
refId
=
rsmaStat
->
refId
;
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)
taosAcquireRef
(
smaMgmt
.
smaRef
,
rsmaStat
->
refId
)
;
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)
taosAcquireRef
(
smaMgmt
.
smaRef
,
refId
);
if
(
!
pRSmaStat
)
{
smaDebug
(
"rsma persistence task not start since already destroyed"
);
return
;
...
...
@@ -1221,5 +1226,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
smaWarn
(
"rsma persistence not start since unknown stat %"
PRIi8
,
tmrStat
);
}
break
;
}
taosReleaseRef
(
smaMgmt
.
smaRef
,
refId
);
taosReleaseRef
(
smaMgmt
.
smaRef
,
r
smaStat
->
r
efId
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录