Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
82c4b62c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
82c4b62c
编写于
7月 11, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: rsma code optimization and support drop
上级
82890ee5
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
50 addition
and
157 deletion
+50
-157
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+1
-3
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-1
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+3
-22
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+38
-130
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+6
-1
未找到文件。
source/dnode/vnode/src/inc/sma.h
浏览文件 @
82c4b62c
...
...
@@ -67,7 +67,6 @@ struct SRSmaStat {
int64_t
submitVer
;
int64_t
refId
;
// shared by fetch tasks
int8_t
triggerStat
;
// shared by fetch tasks
int8_t
runningStat
;
// for persistence task
SHashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
};
...
...
@@ -83,7 +82,6 @@ struct SSmaStat {
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_SUBMIT_VER(r) ((r)->submitVer)
...
...
@@ -93,7 +91,7 @@ enum {
TASK_TRIGGER_STAT_INACTIVE
=
2
,
TASK_TRIGGER_STAT_PAUSED
=
3
,
TASK_TRIGGER_STAT_CANCELLED
=
4
,
TASK_TRIGGER_STAT_
FINISH
ED
=
5
,
TASK_TRIGGER_STAT_
DROPP
ED
=
5
,
};
void
tdDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
82c4b62c
...
...
@@ -171,8 +171,9 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t
tdProcessTSmaInsert
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
int64_t
tdRSmaGetMaxSubmitVer
(
SSma
*
pSma
,
int8_t
level
);
int32_t
tdProcessRSmaCreate
(
S
Vnode
*
pVnode
,
SVCreateStbReq
*
pReq
);
int32_t
tdProcessRSmaCreate
(
S
Sma
*
pSma
,
SVCreateStbReq
*
pReq
);
int32_t
tdProcessRSmaSubmit
(
SSma
*
pSma
,
void
*
pMsg
,
int32_t
inputType
);
int32_t
tdProcessRSmaDrop
(
SSma
*
pSma
,
SVDropStbReq
*
pReq
);
int32_t
tdFetchTbUidList
(
SSma
*
pSma
,
STbUidStore
**
ppStore
,
tb_uid_t
suid
,
tb_uid_t
uid
);
int32_t
tdUpdateTbUidList
(
SSma
*
pSma
,
STbUidStore
*
pUidStore
);
void
tdUidStoreDestory
(
STbUidStore
*
pStore
);
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
82c4b62c
...
...
@@ -254,26 +254,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
// step 1: set rsma trigger stat cancelled
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pStat
),
TASK_TRIGGER_STAT_CANCELLED
);
// step 2: wait the persistence thread to finish
int32_t
nLoops
=
0
;
if
(
atomic_load_8
(
RSMA_RUNNING_STAT
(
pStat
))
==
1
)
{
while
(
1
)
{
if
(
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
))
==
TASK_TRIGGER_STAT_FINISHED
)
{
smaDebug
(
"vgId:%d, rsma persist task finished already"
,
SMA_VID
(
pSma
));
break
;
}
else
{
smaDebug
(
"vgId:%d, rsma persist task not finished yet since rsma stat in %"
PRIi8
,
SMA_VID
(
pSma
),
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
)));
}
++
nLoops
;
if
(
nLoops
>
1000
)
{
sched_yield
();
nLoops
=
0
;
}
}
}
// step 3: destroy the rsma info and associated fetch tasks
// step 2: destroy the rsma info and associated fetch tasks
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
if
(
taosHashGetSize
(
RSMA_INFO_HASH
(
pStat
))
>
0
)
{
void
*
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pStat
),
NULL
);
...
...
@@ -285,8 +266,8 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
}
taosHashCleanup
(
RSMA_INFO_HASH
(
pStat
));
// step
5
: wait all triggered fetch tasks finished
nLoops
=
0
;
// step
3
: wait all triggered fetch tasks finished
int32_t
nLoops
=
0
;
while
(
1
)
{
if
(
T_REF_VAL_GET
((
SSmaStat
*
)
pStat
)
==
0
)
{
smaDebug
(
"vgId:%d, rsma fetch tasks all finished"
,
SMA_VID
(
pSma
));
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
82c4b62c
...
...
@@ -37,8 +37,6 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
static
int32_t
tdRSmaFetchAndSubmitResult
(
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
,
SRSmaStat
*
pStat
,
int8_t
blkType
);
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
*
tdRSmaPersistExec
(
void
*
param
);
static
void
tdRSmaQTaskInfoGetFName
(
int32_t
vid
,
int64_t
version
,
char
*
outputName
);
static
int32_t
tdRSmaQTaskInfoIterInit
(
SRSmaQTaskInfoIter
*
pIter
,
STFile
*
pTFile
);
...
...
@@ -68,8 +66,8 @@ struct SRSmaInfo {
static
SRSmaInfo
*
tdGetRSmaInfoByItem
(
SRSmaInfoItem
*
pItem
)
{
// adapt accordingly if definition of SRSmaInfo update
int32_t
rsmaInfoHeadLen
=
sizeof
(
int64_t
)
+
sizeof
(
STSchema
*
);
ASSERT
(
pItem
->
level
==
1
||
pItem
->
level
==
2
);
return
(
SRSmaInfo
*
)
POINTER_SHIFT
(
pItem
,
-
sizeof
(
SRSmaInfoItem
)
*
(
pItem
->
level
-
1
)
-
rsmaInfoHeadLen
);
ASSERT
(
pItem
->
level
==
0
||
pItem
->
level
==
1
);
return
(
SRSmaInfo
*
)
POINTER_SHIFT
(
pItem
,
-
sizeof
(
SRSmaInfoItem
)
*
pItem
->
level
-
rsmaInfoHeadLen
);
}
struct
SRSmaQTaskInfoItem
{
...
...
@@ -375,20 +373,48 @@ _err:
/**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently
*
* @param p
Vnode
* @param p
Sma
* @param pReq
* @return int32_t
*/
int32_t
tdProcessRSmaCreate
(
S
Vnode
*
pVnode
,
SVCreateStbReq
*
pReq
)
{
S
Sma
*
pSma
=
pVnode
->
pSma
;
int32_t
tdProcessRSmaCreate
(
S
Sma
*
pSma
,
SVCreateStbReq
*
pReq
)
{
S
Vnode
*
pVnode
=
pSma
->
pVnode
;
if
(
!
pReq
->
rollup
)
{
smaTrace
(
"vgId:%d, return directly since no rollup for stable %s %"
PRIi64
,
SMA_VID
(
pSma
),
pReq
->
name
,
pReq
->
suid
);
smaTrace
(
"vgId:%d, not create rsma for stable %s %"
PRIi64
" since no rollup in req"
,
TD_VID
(
pVnode
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
!
VND_IS_RSMA
(
pVnode
))
{
smaTrace
(
"vgId:%d, not create rsma for stable %s %"
PRIi64
" since vnd is not rsma"
,
TD_VID
(
pVnode
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
return
tdProcessRSmaCreateImpl
(
pSma
,
&
pReq
->
rsmaParam
,
pReq
->
suid
,
pReq
->
name
);
}
/**
* @brief drop cache for stb
*
* @param pSma
* @param pReq
* @return int32_t
*/
int32_t
tdProcessRSmaDrop
(
SSma
*
pSma
,
SVDropStbReq
*
pReq
)
{
SVnode
*
pVnode
=
pSma
->
pVnode
;
if
(
!
VND_IS_RSMA
(
pVnode
))
{
smaTrace
(
"vgId:%d, not create rsma for stable %s %"
PRIi64
" since vnd is not rsma"
,
TD_VID
(
pVnode
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
smaDebug
(
"vgId:%d, drop rsma for table %"
PRIi64
" succeed"
,
TD_VID
(
pVnode
),
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief store suid/[uids], prefer to use array and then hash
*
...
...
@@ -667,8 +693,8 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
}
if
(
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
0
],
pRSmaInfo
->
pTSchema
,
suid
,
TSDB_RETENTION_L1
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
1
],
pRSmaInfo
->
pTSchema
,
suid
,
TSDB_RETENTION_L2
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
0
],
pRSmaInfo
->
pTSchema
,
suid
,
0
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
1
],
pRSmaInfo
->
pTSchema
,
suid
,
1
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1174,123 +1200,6 @@ _err:
return
TSDB_CODE_FAILED
;
}
static
void
*
tdRSmaPersistExec
(
void
*
param
)
{
setThreadName
(
"rsma-task-persist"
);
SRSmaStat
*
pRSmaStat
=
param
;
SSma
*
pSma
=
pRSmaStat
->
pSma
;
int8_t
triggerStat
=
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
));
if
(
TASK_TRIGGER_STAT_CANCELLED
==
triggerStat
||
TASK_TRIGGER_STAT_PAUSED
==
triggerStat
)
{
goto
_end
;
}
// execution
tdRSmaPersistExecImpl
(
pRSmaStat
);
_end:
if
(
TASK_TRIGGER_STAT_INACTIVE
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INACTIVE
,
TASK_TRIGGER_STAT_ACTIVE
))
{
smaDebug
(
"vgId:%d, rsma persist task is active again"
,
SMA_VID
(
pSma
));
}
else
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"vgId:%d, rsma persist task is cancelled"
,
SMA_VID
(
pSma
));
}
else
{
smaWarn
(
"vgId:%d, rsma persist task in stat %"
PRIi8
,
SMA_VID
(
pSma
),
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)));
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
smaDebug
(
"vgId:%d, release rsetId rsetId:%"
PRIi64
" refId:%d"
,
SMA_VID
(
pSma
),
smaMgmt
.
rsetId
,
pRSmaStat
->
refId
);
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pRSmaStat
->
refId
,
__func__
,
__LINE__
);
taosThreadExit
(
NULL
);
return
NULL
;
}
static
void
tdRSmaPersistTask
(
SRSmaStat
*
pRSmaStat
)
{
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_DETACHED
);
TdThread
tid
;
if
(
taosThreadCreate
(
&
tid
,
&
thAttr
,
tdRSmaPersistExec
,
pRSmaStat
)
!=
0
)
{
if
(
TASK_TRIGGER_STAT_INACTIVE
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INACTIVE
,
TASK_TRIGGER_STAT_ACTIVE
))
{
smaDebug
(
"vgId:%d, persist task is active again"
,
SMA_VID
(
pRSmaStat
->
pSma
));
}
else
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"vgId:%d, persist task is cancelled and set finished"
,
SMA_VID
(
pRSmaStat
->
pSma
));
}
else
{
smaWarn
(
"vgId:%d, persist task in abnormal stat %"
PRIi8
,
SMA_VID
(
pRSmaStat
->
pSma
),
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)));
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
smaDebug
(
"vgId:%d, release rsetId rsetId:%"
PRIi64
" refId:%d)"
,
SMA_VID
(
pRSmaStat
->
pSma
),
smaMgmt
.
rsetId
,
pRSmaStat
->
refId
);
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pRSmaStat
->
refId
,
__func__
,
__LINE__
);
}
taosThreadAttrDestroy
(
&
thAttr
);
}
/**
* @brief trigger to persist rsma qTaskInfo
*
* @param param
* @param tmrId
*/
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaStat
*
rsmaStat
=
param
;
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)
taosAcquireRef
(
smaMgmt
.
rsetId
,
rsmaStat
->
refId
);
ASSERT
(
0
);
if
(
!
pRSmaStat
)
{
smaDebug
(
"rsma persistence task not start since already destroyed"
);
return
;
}
int8_t
tmrStat
=
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
switch
(
tmrStat
)
{
case
TASK_TRIGGER_STAT_ACTIVE
:
{
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
1
);
if
(
TASK_TRIGGER_STAT_CANCELLED
!=
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"vgId:%d, rsma persistence start since active"
,
SMA_VID
(
pRSmaStat
->
pSma
));
// start persist task
tdRSmaPersistTask
(
pRSmaStat
);
// taosTmrReset(tdRSmaPersistTrigger, 5000, pRSmaStat, pRSmaStat->tmrHandle,
// RSMA_TMR_ID(pRSmaStat));
}
else
{
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
}
return
;
}
break
;
case
TASK_TRIGGER_STAT_CANCELLED
:
{
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_FINISHED
);
smaDebug
(
"rsma persistence not start since cancelled and finished"
);
}
break
;
case
TASK_TRIGGER_STAT_PAUSED
:
{
smaDebug
(
"rsma persistence not start since paused"
);
}
break
;
case
TASK_TRIGGER_STAT_INACTIVE
:
{
smaDebug
(
"rsma persistence not start since inactive"
);
}
break
;
case
TASK_TRIGGER_STAT_INIT
:
{
smaDebug
(
"rsma persistence not start since init"
);
}
break
;
default:
{
smaWarn
(
"rsma persistence not start since unknown stat %"
PRIi8
,
tmrStat
);
}
break
;
}
taosReleaseRef
(
smaMgmt
.
rsetId
,
rsmaStat
->
refId
);
}
/**
* @brief trigger to get rsma result
*
...
...
@@ -1314,8 +1223,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
int8_t
rsmaTriggerStat
=
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
));
switch
(
rsmaTriggerStat
)
{
case
TASK_TRIGGER_STAT_PAUSED
:
case
TASK_TRIGGER_STAT_CANCELLED
:
case
TASK_TRIGGER_STAT_FINISHED
:
{
case
TASK_TRIGGER_STAT_CANCELLED
:
{
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data since stat is %"
PRIi8
", rsetId rsetId:%"
PRIi64
" refId:%d"
,
...
...
@@ -1328,7 +1236,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfo
*
pRSmaInfo
=
tdGetRSmaInfoByItem
(
pItem
);
ASSERT
(
pRSmaInfo
->
suid
>
0
);
ASSERT
(
pRSmaInfo
->
items
[
pItem
->
level
].
level
==
pItem
->
level
);
int8_t
fetchTriggerStat
=
atomic_val_compare_exchange_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
82c4b62c
...
...
@@ -388,7 +388,7 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p
goto
_err
;
}
if
(
tdProcessRSmaCreate
(
pVnode
,
&
req
)
<
0
)
{
if
(
tdProcessRSmaCreate
(
pVnode
->
pSma
,
&
req
)
<
0
)
{
pRsp
->
code
=
terrno
;
goto
_err
;
}
...
...
@@ -544,6 +544,11 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
goto
_exit
;
}
if
(
tdProcessRSmaDrop
(
pVnode
->
pSma
,
&
req
)
<
0
)
{
rcode
=
terrno
;
goto
_exit
;
}
// return rsp
_exit:
pRsp
->
code
=
rcode
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录