Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c4e71ccd
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
c4e71ccd
编写于
6月 29, 2022
作者:
C
Cary Xu
提交者:
GitHub
6月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14373 from taosdata/feature/TD-11274-3.0
refactor: add tref to solve the timer not stopped problem
上级
3f00a6db
6600c472
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
96 addition
and
35 deletion
+96
-35
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+17
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-0
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+41
-13
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+33
-17
source/dnode/vnode/src/sma/smaUtil.c
source/dnode/vnode/src/sma/smaUtil.c
+4
-4
未找到文件。
source/dnode/vnode/src/inc/sma.h
浏览文件 @
c4e71ccd
...
...
@@ -46,6 +46,11 @@ struct SSmaEnv {
SSmaStat
*
pStat
;
};
typedef
struct
{
int32_t
smaRef
;
int32_t
refId
;
}
SSmaMgmt
;
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat)
...
...
@@ -58,6 +63,7 @@ struct STSmaStat {
struct
SRSmaStat
{
SSma
*
pSma
;
int64_t
refId
;
void
*
tmrHandle
;
tmr_h
tmrId
;
int32_t
tmrSeconds
;
...
...
@@ -73,6 +79,7 @@ struct SSmaStat {
};
T_REF_DECLARE
()
};
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
...
...
@@ -80,6 +87,7 @@ struct SSmaStat {
#define RSMA_TMR_HANDLE(r) ((r)->tmrHandle)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
#define RSMA_REF_ID(r) ((r)->refId)
enum
{
TASK_TRIGGER_STAT_INIT
=
0
,
...
...
@@ -192,10 +200,18 @@ typedef struct STFInfo STFInfo;
typedef
struct
STFile
STFile
;
struct
STFInfo
{
// common fields
uint32_t
magic
;
uint32_t
ftype
;
uint32_t
fver
;
int64_t
fsize
;
// specific fields
union
{
struct
{
int64_t
applyVer
[
2
];
}
qTaskInfo
;
};
};
struct
STFile
{
...
...
@@ -230,7 +246,7 @@ int32_t tdUpdateTFileHeader(STFile *pTFile);
void
tdUpdateTFileMagic
(
STFile
*
pTFile
,
void
*
pCksm
);
void
tdCloseTFile
(
STFile
*
pTFile
);
void
tdGetVndFileName
(
int32_t
v
i
d
,
const
char
*
dname
,
const
char
*
fname
,
char
*
outputName
);
void
tdGetVndFileName
(
int32_t
v
gI
d
,
const
char
*
dname
,
const
char
*
fname
,
char
*
outputName
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
c4e71ccd
...
...
@@ -27,6 +27,7 @@
#include "tdatablock.h"
#include "tdb.h"
#include "tencode.h"
#include "tref.h"
#include "tfs.h"
#include "tglobal.h"
#include "tjson.h"
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
c4e71ccd
...
...
@@ -18,6 +18,9 @@
typedef
struct
SSmaStat
SSmaStat
;
#define RSMA_TASK_INFO_HASH_SLOT 8
#define SMA_MGMT_REF_NUM 1024
extern
SSmaMgmt
smaMgmt
;
// declaration of static functions
...
...
@@ -25,6 +28,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *p
static
SSmaEnv
*
tdNewSmaEnv
(
const
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
);
static
int32_t
tdInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
,
SSmaEnv
**
pEnv
);
static
void
*
tdFreeTSmaStat
(
STSmaStat
*
pStat
);
static
void
tdDestroyRSmaStat
(
void
*
pRSmaStat
);
// implementation
...
...
@@ -128,6 +132,22 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)(
*
pSmaStat
);
pRSmaStat
->
pSma
=
(
SSma
*
)
pSma
;
// init smaMgmt
smaMgmt
.
smaRef
=
taosOpenRef
(
SMA_MGMT_REF_NUM
,
tdDestroyRSmaStat
);
if
(
smaMgmt
.
refId
<
0
)
{
smaError
(
"init smaRef failed, num:%d"
,
SMA_MGMT_REF_NUM
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
int64_t
refId
=
taosAddRef
(
smaMgmt
.
smaRef
,
pRSmaStat
);
if
(
refId
<
0
)
{
smaError
(
"taosAddRef smaRef failed, since:%s"
,
tstrerror
(
terrno
));
return
TSDB_CODE_FAILED
;
}
pRSmaStat
->
refId
=
refId
;
// init timer
RSMA_TMR_HANDLE
(
pRSmaStat
)
=
taosTmrInit
(
10000
,
100
,
10000
,
"RSMA"
);
if
(
!
RSMA_TMR_HANDLE
(
pRSmaStat
))
{
...
...
@@ -169,9 +189,10 @@ static void *tdFreeTSmaStat(STSmaStat *pStat) {
return
NULL
;
}
static
void
tdDestroyRSmaStat
(
SRSmaStat
*
pStat
)
{
if
(
pStat
)
{
smaDebug
(
"vgId:%d destroy rsma stat"
,
SMA_VID
(
pStat
->
pSma
));
static
void
tdDestroyRSmaStat
(
void
*
pRSmaStat
)
{
if
(
pRSmaStat
)
{
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
pRSmaStat
;
smaDebug
(
"vgId:%d %s:%d destroy rsma stat %p"
,
SMA_VID
(
pStat
->
pSma
),
__func__
,
__LINE__
,
pRSmaStat
);
// step 1: set persistence task cancelled
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pStat
),
TASK_TRIGGER_STAT_CANCELLED
);
...
...
@@ -183,9 +204,11 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
if
(
atomic_load_8
(
RSMA_RUNNING_STAT
(
pStat
))
==
1
)
{
while
(
1
)
{
if
(
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
))
==
TASK_TRIGGER_STAT_FINISHED
)
{
smaDebug
(
"rsma, persist task finished already"
);
break
;
}
else
{
smaDebug
(
"not destroyed since rsma stat in %"
PRIi8
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
)));
smaDebug
(
"rsma, persist task not finished yet since rsma stat in %"
PRIi8
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
)));
}
++
nLoops
;
if
(
nLoops
>
1000
)
{
...
...
@@ -209,7 +232,10 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
nLoops
=
0
;
while
(
1
)
{
if
(
T_REF_VAL_GET
((
SSmaStat
*
)
pStat
)
==
0
)
{
smaDebug
(
"rsma, all fetch task finished already"
);
break
;
}
else
{
smaDebug
(
"rsma, fetch tasks not all finished yet"
);
}
++
nLoops
;
if
(
nLoops
>
1000
)
{
...
...
@@ -225,15 +251,13 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
}
}
static
void
*
tdFreeRSmaStat
(
SRSmaStat
*
pStat
)
{
tdDestroyRSmaStat
(
pStat
);
taosMemoryFreeClear
(
pStat
);
return
NULL
;
}
void
*
tdFreeSmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
tdDestroySmaState
(
pSmaStat
,
smaType
);
taosMemoryFreeClear
(
pSmaStat
);
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
taosMemoryFreeClear
(
pSmaStat
);
}
// tref used to free rsma stat
return
NULL
;
}
...
...
@@ -243,17 +267,21 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
* @param pSmaStat
* @return int32_t
*/
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
if
(
pSmaStat
)
{
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
tdDestroyTSmaStat
(
SMA_TSMA_STAT
(
pSmaStat
));
}
else
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
tdDestroyRSmaStat
(
SMA_RSMA_STAT
(
pSmaStat
));
SRSmaStat
*
pRSmaStat
=
SMA_RSMA_STAT
(
pSmaStat
);
if
(
taosRemoveRef
(
smaMgmt
.
smaRef
,
RSMA_REF_ID
(
pRSmaStat
))
<
0
)
{
smaError
(
"remove refId from smaRef failed, refId:0x%"
PRIx64
,
RSMA_REF_ID
(
pRSmaStat
));
}
}
else
{
ASSERT
(
0
);
}
}
return
TSDB_CODE_SUCCESS
;
return
0
;
}
int32_t
tdLockSma
(
SSma
*
pSma
)
{
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
c4e71ccd
...
...
@@ -18,9 +18,13 @@
#define RSMA_QTASKINFO_PERSIST_MS 7200000
#define RSMA_QTASKINFO_BUFSIZE 32768
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
typedef
enum
{
TD_QTASK_TMP_FILE
=
0
,
TD_QTASK_CUR_FILE
}
TD_QTASK_FILE_T
;
static
const
char
*
tdQTaskInfoFname
[]
=
{
"qtaskinfo.t"
,
"qtaskinfo"
};
SSmaMgmt
smaMgmt
=
{
.
smaRef
=
-
1
,
};
typedef
enum
{
TD_QTASK_TMP_F
=
0
,
TD_QTASK_CUR_F
}
TD_QTASK_FILE_T
;
static
const
char
*
tdQTaskInfoFname
[]
=
{
"qtaskinfo.t"
,
"qtaskinfo"
};
typedef
struct
SRSmaQTaskInfoItem
SRSmaQTaskInfoItem
;
typedef
struct
SRSmaQTaskInfoIter
SRSmaQTaskInfoIter
;
...
...
@@ -80,6 +84,10 @@ struct SRSmaQTaskInfoIter {
int32_t
nBufPos
;
};
static
void
tdRSmaQTaskInfoGetFName
(
int32_t
vgId
,
int8_t
ftype
,
char
*
outputName
)
{
tdGetVndFileName
(
vgId
,
VNODE_RSMA_DIR
,
tdQTaskInfoFname
[
ftype
],
outputName
);
}
static
FORCE_INLINE
int32_t
tdRSmaQTaskInfoContLen
(
int32_t
lenWithHead
)
{
return
lenWithHead
-
RSMA_QTASKINFO_HEAD_LEN
;
}
...
...
@@ -761,7 +769,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
STFile
tFile
=
{
0
};
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
tdRSmaQTaskInfoGetFName
(
TD_VID
(
pVnode
),
TD_QTASK_
CUR_FILE
,
qTaskInfoFName
);
tdRSmaQTaskInfoGetFName
(
TD_VID
(
pVnode
),
TD_QTASK_
TMP_F
,
qTaskInfoFName
);
if
(
tdInitTFile
(
&
tFile
,
pVnode
->
pTfs
,
qTaskInfoFName
)
<
0
)
{
goto
_err
;
}
...
...
@@ -797,9 +805,9 @@ _err:
/**
* @brief reload ts data from checkpoint
*
* @param pSma
* @return int32_t
*
* @param pSma
* @return int32_t
*/
static
int32_t
tdRSmaRestoreTSDataReload
(
SSma
*
pSma
)
{
// TODO
...
...
@@ -861,7 +869,8 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
pItem
->
type
,
terrstr
(
terrno
));
return
TSDB_CODE_FAILED
;
}
smaDebug
(
"vgId:%d, restore rsma task success for table:%"
PRIi64
" level %d"
,
SMA_VID
(
pSma
),
pItem
->
suid
,
pItem
->
type
);
smaDebug
(
"vgId:%d, restore rsma task success for table:%"
PRIi64
" level %d"
,
SMA_VID
(
pSma
),
pItem
->
suid
,
pItem
->
type
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1003,10 +1012,6 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
return
TSDB_CODE_SUCCESS
;
}
static
void
tdRSmaQTaskInfoGetFName
(
int32_t
vid
,
int8_t
ftype
,
char
*
outputName
)
{
tdGetVndFileName
(
vid
,
VNODE_RSMA_DIR
,
tdQTaskInfoFname
[
ftype
],
outputName
);
}
static
void
*
tdRSmaPersistExec
(
void
*
param
)
{
setThreadName
(
"rsma-task-persist"
);
SRSmaStat
*
pRSmaStat
=
param
;
...
...
@@ -1063,7 +1068,7 @@ static void *tdRSmaPersistExec(void *param) {
if
(
!
isFileCreated
)
{
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
tdRSmaQTaskInfoGetFName
(
vid
,
TD_QTASK_TMP_F
ILE
,
qTaskInfoFName
);
tdRSmaQTaskInfoGetFName
(
vid
,
TD_QTASK_TMP_F
,
qTaskInfoFName
);
tdInitTFile
(
&
tFile
,
pTfs
,
qTaskInfoFName
);
tdCreateTFile
(
&
tFile
,
pTfs
,
true
,
-
1
);
...
...
@@ -1079,8 +1084,8 @@ static void *tdRSmaPersistExec(void *param) {
ASSERT
(
headLen
<=
RSMA_QTASKINFO_HEAD_LEN
);
tdAppendTFile
(
&
tFile
,
(
void
*
)
&
tmpBuf
,
headLen
,
&
toffset
);
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d head part
len:%d appended to offset:%"
PRIi64
,
vid
,
pRSmaInfo
->
su
id
,
i
+
1
,
headLen
,
toffset
);
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d head part
(len:%d) appended to offset:%"
PRIi64
,
v
id
,
pRSmaInfo
->
suid
,
i
+
1
,
headLen
,
toffset
);
tdAppendTFile
(
&
tFile
,
pOutput
,
len
,
&
toffset
);
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d body part len:%d appended to offset:%"
PRIi64
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
,
toffset
);
...
...
@@ -1106,8 +1111,8 @@ _normal:
char
newFName
[
TSDB_FILENAME_LEN
];
strncpy
(
newFName
,
TD_TFILE_FULL_NAME
(
&
tFile
),
TSDB_FILENAME_LEN
);
char
*
pos
=
strstr
(
newFName
,
tdQTaskInfoFname
[
TD_QTASK_TMP_F
ILE
]);
strncpy
(
pos
,
tdQTaskInfoFname
[
TD_QTASK_
CUR_FILE
],
TSDB_FILENAME_LEN
-
POINTER_DISTANCE
(
pos
,
newFName
));
char
*
pos
=
strstr
(
newFName
,
tdQTaskInfoFname
[
TD_QTASK_TMP_F
]);
strncpy
(
pos
,
tdQTaskInfoFname
[
TD_QTASK_
TMP_F
],
TSDB_FILENAME_LEN
-
POINTER_DISTANCE
(
pos
,
newFName
));
if
(
taosRenameFile
(
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
)
!=
0
)
{
smaError
(
"vgId:%d, failed to rename %s to %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
);
goto
_err
;
...
...
@@ -1134,6 +1139,7 @@ _end:
ASSERT
(
0
);
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
taosReleaseRef
(
smaMgmt
.
smaRef
,
pRSmaStat
->
refId
);
taosThreadExit
(
NULL
);
return
NULL
;
}
...
...
@@ -1159,6 +1165,7 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
ASSERT
(
0
);
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
taosReleaseRef
(
smaMgmt
.
smaRef
,
pRSmaStat
->
refId
);
}
taosThreadAttrDestroy
(
&
thAttr
);
...
...
@@ -1171,7 +1178,14 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
* @param tmrId
*/
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaStat
*
pRSmaStat
=
param
;
SRSmaStat
*
rsmaStat
=
param
;
int64_t
refId
=
rsmaStat
->
refId
;
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)
taosAcquireRef
(
smaMgmt
.
smaRef
,
refId
);
if
(
!
pRSmaStat
)
{
smaDebug
(
"rsma persistence task not start since already destroyed"
);
return
;
}
int8_t
tmrStat
=
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
...
...
@@ -1191,6 +1205,7 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
}
else
{
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
}
return
;
}
break
;
case
TASK_TRIGGER_STAT_CANCELLED
:
{
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_FINISHED
);
...
...
@@ -1206,4 +1221,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
smaWarn
(
"rsma persistence not start since unknown stat %"
PRIi8
,
tmrStat
);
}
break
;
}
taosReleaseRef
(
smaMgmt
.
smaRef
,
refId
);
}
source/dnode/vnode/src/sma/smaUtil.c
浏览文件 @
c4e71ccd
...
...
@@ -141,8 +141,8 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
}
#if 1
smaDebug
(
"append to file %s, offset:%"
PRIi64
"
+ nbyte:%"
PRIi64
" =%"
PRIi64
,
TD_TFILE_FULL_NAME
(
pTFile
),
toffset
,
nbyte
,
toffset
+
nbyte
);
smaDebug
(
"append to file %s, offset:%"
PRIi64
"
nbyte:%"
PRIi64
" fsize:%"
PRIi64
,
TD_TFILE_FULL_NAME
(
pTFile
)
,
toffset
,
nbyte
,
toffset
+
nbyte
);
#endif
ASSERT
(
pTFile
->
info
.
fsize
==
toffset
);
...
...
@@ -179,8 +179,8 @@ void tdCloseTFile(STFile *pTFile) {
}
}
void
tdGetVndFileName
(
int32_t
v
i
d
,
const
char
*
dname
,
const
char
*
fname
,
char
*
outputName
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"vnode/vnode%d/%s/%s"
,
v
i
d
,
dname
,
fname
);
void
tdGetVndFileName
(
int32_t
v
gI
d
,
const
char
*
dname
,
const
char
*
fname
,
char
*
outputName
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"vnode/vnode%d/%s/%s"
,
v
gI
d
,
dname
,
fname
);
}
int32_t
tdInitTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
const
char
*
fname
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录