Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b66caf41
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b66caf41
编写于
6月 28, 2022
作者:
C
Cary Xu
提交者:
GitHub
6月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14317 from taosdata/feature/TD-11274-3.0
refactor: rsma restore
上级
364250b8
df417c7f
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
330 addition
and
319 deletion
+330
-319
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+10
-10
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-1
source/dnode/vnode/src/sma/smaOpen.c
source/dnode/vnode/src/sma/smaOpen.c
+1
-7
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+293
-280
source/dnode/vnode/src/sma/smaUtil.c
source/dnode/vnode/src/sma/smaUtil.c
+23
-19
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+2
-2
未找到文件。
source/dnode/vnode/src/inc/sma.h
浏览文件 @
b66caf41
...
@@ -205,16 +205,16 @@ struct STFile {
...
@@ -205,16 +205,16 @@ struct STFile {
uint8_t
state
;
uint8_t
state
;
};
};
#define TD_FILE_F(tf) (&((tf)->f))
#define TD_
T
FILE_F(tf) (&((tf)->f))
#define TD_FILE_PFILE(tf) ((tf)->pFile)
#define TD_
T
FILE_PFILE(tf) ((tf)->pFile)
#define TD_
FILE_OPENED(tf) (TD_
FILE_PFILE(tf) != NULL)
#define TD_
TFILE_OPENED(tf) (TD_T
FILE_PFILE(tf) != NULL)
#define TD_
FILE_FULL_NAME(tf) (TD_
FILE_F(tf)->aname)
#define TD_
TFILE_FULL_NAME(tf) (TD_T
FILE_F(tf)->aname)
#define TD_
FILE_REL_NAME(tf) (TD_
FILE_F(tf)->rname)
#define TD_
TFILE_REL_NAME(tf) (TD_T
FILE_F(tf)->rname)
#define TD_
FILE_OPENED(tf) (TD_
FILE_PFILE(tf) != NULL)
#define TD_
TFILE_OPENED(tf) (TD_T
FILE_PFILE(tf) != NULL)
#define TD_
FILE_CLOSED(tf) (!TD_
FILE_OPENED(tf))
#define TD_
TFILE_CLOSED(tf) (!TD_T
FILE_OPENED(tf))
#define TD_
FILE_SET_CLOSED(f) (TD_
FILE_PFILE(f) = NULL)
#define TD_
TFILE_SET_CLOSED(f) (TD_T
FILE_PFILE(f) = NULL)
#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_
T
FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_
FILE_DID(tf) (TD_
FILE_F(tf)->did)
#define TD_
TFILE_DID(tf) (TD_T
FILE_F(tf)->did)
int32_t
tdInitTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
const
char
*
fname
);
int32_t
tdInitTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
const
char
*
fname
);
int32_t
tdCreateTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
bool
updateHeader
,
int8_t
fType
);
int32_t
tdCreateTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
bool
updateHeader
,
int8_t
fType
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
b66caf41
...
@@ -64,6 +64,7 @@ typedef struct STsdbSnapshotReader STsdbSnapshotReader;
...
@@ -64,6 +64,7 @@ typedef struct STsdbSnapshotReader STsdbSnapshotReader;
#define VNODE_TQ_DIR "tq"
#define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal"
#define VNODE_WAL_DIR "wal"
#define VNODE_TSMA_DIR "tsma"
#define VNODE_TSMA_DIR "tsma"
#define VNODE_RSMA_DIR "rsma"
#define VNODE_RSMA0_DIR "tsdb"
#define VNODE_RSMA0_DIR "tsdb"
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
#define VNODE_RSMA2_DIR "rsma2"
...
@@ -161,7 +162,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
...
@@ -161,7 +162,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
// sma
// sma
int32_t
smaOpen
(
SVnode
*
pVnode
);
int32_t
smaOpen
(
SVnode
*
pVnode
);
int32_t
smaClose
(
SSma
*
pSma
);
int32_t
smaCloseEnv
(
SSma
*
pSma
);
int32_t
smaCloseEnv
(
SSma
*
pSma
);
int32_t
smaCloseEx
(
SSma
*
pSma
);
int32_t
smaCloseEx
(
SSma
*
pSma
);
...
...
source/dnode/vnode/src/sma/smaOpen.c
浏览文件 @
b66caf41
...
@@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) {
...
@@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) {
}
}
// restore the rsma
// restore the rsma
#if
0
#if
1
if
(
rsmaRestore
(
pSma
)
<
0
)
{
if
(
rsmaRestore
(
pSma
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
...
@@ -154,12 +154,6 @@ int32_t smaCloseEx(SSma *pSma) {
...
@@ -154,12 +154,6 @@ int32_t smaCloseEx(SSma *pSma) {
return
0
;
return
0
;
}
}
int32_t
smaClose
(
SSma
*
pSma
)
{
smaCloseEnv
(
pSma
);
smaCloseEx
(
pSma
);
return
0
;
}
/**
/**
* @brief rsma env restore
* @brief rsma env restore
*
*
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
b66caf41
...
@@ -17,11 +17,12 @@
...
@@ -17,11 +17,12 @@
#define RSMA_QTASKINFO_PERSIST_MS 7200000
#define RSMA_QTASKINFO_PERSIST_MS 7200000
#define RSMA_QTASKINFO_BUFSIZE 32768
#define RSMA_QTASKINFO_BUFSIZE 32768
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
typedef
enum
{
TD_QTASK_TMP_FILE
=
0
,
TD_QTASK_CUR_FILE
}
TD_QTASK_FILE_T
;
typedef
enum
{
TD_QTASK_TMP_FILE
=
0
,
TD_QTASK_CUR_FILE
}
TD_QTASK_FILE_T
;
static
const
char
*
tdQTaskInfoFname
[]
=
{
"qtaskinfo.t"
,
"qtaskinfo"
};
static
const
char
*
tdQTaskInfoFname
[]
=
{
"qtaskinfo.t"
,
"qtaskinfo"
};
typedef
struct
SRSmaQTaskInfoItem
SRSmaQTaskInfoItem
;
typedef
struct
SRSmaQTaskInfoItem
SRSmaQTaskInfoItem
;
typedef
struct
SRSmaQTask
FIter
SRSmaQTaskF
Iter
;
typedef
struct
SRSmaQTask
InfoIter
SRSmaQTaskInfo
Iter
;
static
int32_t
tdUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
int32_t
tdUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
static
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
...
@@ -32,11 +33,11 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
...
@@ -32,11 +33,11 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
*
tdRSmaPersistExec
(
void
*
param
);
static
void
*
tdRSmaPersistExec
(
void
*
param
);
static
void
tdRSmaQTaskGetFName
(
int32_t
vid
,
int8_t
ftype
,
char
*
outputName
);
static
void
tdRSmaQTask
Info
GetFName
(
int32_t
vid
,
int8_t
ftype
,
char
*
outputName
);
static
int32_t
tdRSmaQTaskInfoIterInit
(
SRSmaQTask
F
Iter
*
pIter
,
STFile
*
pTFile
);
static
int32_t
tdRSmaQTaskInfoIterInit
(
SRSmaQTask
Info
Iter
*
pIter
,
STFile
*
pTFile
);
static
int32_t
tdRSmaQTaskInfoIterNextBlock
(
SRSmaQTask
F
Iter
*
pIter
,
bool
*
isFinish
);
static
int32_t
tdRSmaQTaskInfoIterNextBlock
(
SRSmaQTask
Info
Iter
*
pIter
,
bool
*
isFinish
);
static
int32_t
tdRSmaQTaskInfo
IterNext
(
SRSmaQTaskFIter
*
pIter
,
SRSmaQTaskInfoItem
*
pItem
,
bool
*
isEnd
);
static
int32_t
tdRSmaQTaskInfo
Restore
(
SSma
*
pSma
,
SRSmaQTaskInfoIter
*
pIter
);
static
int32_t
tdRSmaQTaskInfoItemRestore
(
SSma
*
pSma
,
const
SRSmaQTaskInfoItem
*
infoItem
);
static
int32_t
tdRSmaQTaskInfoItemRestore
(
SSma
*
pSma
,
const
SRSmaQTaskInfoItem
*
infoItem
);
struct
SRSmaInfoItem
{
struct
SRSmaInfoItem
{
...
@@ -63,22 +64,23 @@ struct SRSmaQTaskInfoItem {
...
@@ -63,22 +64,23 @@ struct SRSmaQTaskInfoItem {
void
*
qTaskInfo
;
void
*
qTaskInfo
;
};
};
struct
SRSmaQTask
F
Iter
{
struct
SRSmaQTask
Info
Iter
{
STFile
*
pTFile
;
STFile
*
pTFile
;
int64_t
offset
;
int64_t
offset
;
int64_t
fsize
;
int64_t
fsize
;
int32_t
nBytes
;
int32_t
nBytes
;
int32_t
nAlloc
;
int32_t
nAlloc
;
char
*
b
uf
;
char
*
pB
uf
;
// ------------
// ------------
char
*
qBuf
;
// for iterator
int32_t
nBufPos
;
int32_t
nBufPos
;
};
};
static
FORCE_INLINE
int32_t
tdRSmaQTaskInfoContLen
(
int32_t
lenWithHead
)
{
static
FORCE_INLINE
int32_t
tdRSmaQTaskInfoContLen
(
int32_t
lenWithHead
)
{
return
lenWithHead
-
sizeof
(
int32_t
)
-
sizeof
(
int8_t
)
-
sizeof
(
int64_t
)
;
return
lenWithHead
-
RSMA_QTASKINFO_HEAD_LEN
;
}
}
static
FORCE_INLINE
void
tdRSmaQTaskInfoIterDestroy
(
SRSmaQTask
FIter
*
pIter
)
{
taosMemoryFreeClear
(
pIter
->
b
uf
);
}
static
FORCE_INLINE
void
tdRSmaQTaskInfoIterDestroy
(
SRSmaQTask
InfoIter
*
pIter
)
{
taosMemoryFreeClear
(
pIter
->
pB
uf
);
}
static
FORCE_INLINE
void
tdFreeTaskHandle
(
qTaskInfo_t
*
taskHandle
,
int32_t
vgId
,
int32_t
level
)
{
static
FORCE_INLINE
void
tdFreeTaskHandle
(
qTaskInfo_t
*
taskHandle
,
int32_t
vgId
,
int32_t
level
)
{
// Note: free/kill may in RC
// Note: free/kill may in RC
...
@@ -294,7 +296,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
...
@@ -294,7 +296,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
pRSmaInfo
)
{
if
(
pRSmaInfo
)
{
ASSERT
(
0
);
// TODO: free original pRSmaInfo i
s
exists abnormally
ASSERT
(
0
);
// TODO: free original pRSmaInfo i
f
exists abnormally
smaDebug
(
"vgId:%d, rsma info already exists for table %s, %"
PRIi64
,
SMA_VID
(
pSma
),
tbName
,
suid
);
smaDebug
(
"vgId:%d, rsma info already exists for table %s, %"
PRIi64
,
SMA_VID
(
pSma
),
tbName
,
suid
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -338,10 +340,10 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
...
@@ -338,10 +340,10 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
if
(
taosHashPut
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
),
&
pRSmaInfo
,
sizeof
(
pRSmaInfo
))
<
0
)
{
if
(
taosHashPut
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
),
&
pRSmaInfo
,
sizeof
(
pRSmaInfo
))
<
0
)
{
goto
_err
;
goto
_err
;
}
else
{
smaDebug
(
"vgId:%d, register rsma info succeed for suid:%"
PRIi64
,
SMA_VID
(
pSma
),
suid
);
}
}
smaDebug
(
"vgId:%d, register rsma info succeed for suid:%"
PRIi64
,
SMA_VID
(
pSma
),
suid
);
// start the persist timer
// start the persist timer
if
(
TASK_TRIGGER_STAT_INIT
==
if
(
TASK_TRIGGER_STAT_INIT
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pStat
),
TASK_TRIGGER_STAT_INIT
,
TASK_TRIGGER_STAT_ACTIVE
))
{
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pStat
),
TASK_TRIGGER_STAT_INIT
,
TASK_TRIGGER_STAT_ACTIVE
))
{
...
@@ -356,10 +358,9 @@ _err:
...
@@ -356,10 +358,9 @@ _err:
}
}
/**
/**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam
.
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam
currently
*
*
* @param pTsdb
* @param pVnode
* @param pMeta
* @param pReq
* @param pReq
* @return int32_t
* @return int32_t
*/
*/
...
@@ -695,331 +696,127 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
...
@@ -695,331 +696,127 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
void
tdRSmaQTaskGetFName
(
int32_t
vid
,
int8_t
ftype
,
char
*
outputName
)
{
tdGetVndFileName
(
vid
,
"rsma"
,
tdQTaskInfoFname
[
ftype
],
outputName
);
}
static
void
*
tdRSmaPersistExec
(
void
*
param
)
{
setThreadName
(
"rsma-task-persist"
);
SRSmaStat
*
pRSmaStat
=
param
;
SSma
*
pSma
=
pRSmaStat
->
pSma
;
STfs
*
pTfs
=
pSma
->
pVnode
->
pTfs
;
int64_t
toffset
=
0
;
bool
isFileCreated
=
false
;
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)))
{
goto
_end
;
}
void
*
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
NULL
);
if
(
!
infoHash
)
{
goto
_end
;
}
STFile
tFile
=
{
0
};
int32_t
vid
=
SMA_VID
(
pSma
);
while
(
infoHash
)
{
SRSmaInfo
*
pRSmaInfo
=
*
(
SRSmaInfo
**
)
infoHash
;
#if 0
smaDebug("table %" PRIi64 " sleep 15s start ...", pRSmaInfo->items[0].pRsmaInfo->suid);
for (int32_t i = 15; i > 0; --i) {
taosSsleep(1);
smaDebug("table %" PRIi64 " countdown %d", pRSmaInfo->items[0].pRsmaInfo->suid, i);
}
smaDebug("table %" PRIi64 " sleep 15s end ...", pRSmaInfo->items[0].pRsmaInfo->suid);
#endif
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
qTaskInfo_t
taskInfo
=
pRSmaInfo
->
items
[
i
].
taskInfo
;
if
(
!
taskInfo
)
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d qTaskInfo is NULL"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
continue
;
}
char
*
pOutput
=
NULL
;
int32_t
len
=
0
;
int8_t
type
=
(
int8_t
)(
i
+
1
);
if
(
qSerializeTaskStatus
(
taskInfo
,
&
pOutput
,
&
len
)
<
0
)
{
smaError
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task failed since %s"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
terrstr
(
terrno
));
goto
_err
;
}
else
{
if
(
!
pOutput
)
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task success but no output(len %d) and no need to persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
continue
;
}
else
if
(
len
<=
0
)
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task success with len %d and no need to persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
taosMemoryFree
(
pOutput
);
}
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task success with len %d and need persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
#if 1
if
(
qDeserializeTaskStatus
(
taskInfo
,
pOutput
,
len
)
<
0
)
{
smaError
(
"vgId:%d, table %"
PRIi64
"level %d deserialize rsma task failed since %s"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
terrstr
(
terrno
));
}
else
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d deserialize rsma task success"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
}
#endif
}
if
(
!
isFileCreated
)
{
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
tdRSmaQTaskGetFName
(
vid
,
TD_QTASK_TMP_FILE
,
qTaskInfoFName
);
tdInitTFile
(
&
tFile
,
pTfs
,
qTaskInfoFName
);
tdCreateTFile
(
&
tFile
,
pTfs
,
true
,
-
1
);
isFileCreated
=
true
;
}
len
+=
(
sizeof
(
len
)
+
sizeof
(
type
)
+
sizeof
(
pRSmaInfo
->
suid
));
tdAppendTFile
(
&
tFile
,
&
len
,
sizeof
(
len
),
&
toffset
);
tdAppendTFile
(
&
tFile
,
&
type
,
sizeof
(
type
),
&
toffset
);
tdAppendTFile
(
&
tFile
,
&
pRSmaInfo
->
suid
,
sizeof
(
pRSmaInfo
->
suid
),
&
toffset
);
tdAppendTFile
(
&
tFile
,
pOutput
,
len
,
&
toffset
);
taosMemoryFree
(
pOutput
);
}
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
infoHash
);
}
_normal:
if
(
isFileCreated
)
{
if
(
tdUpdateTFileHeader
(
&
tFile
)
<
0
)
{
smaError
(
"vgId:%d, failed to update tfile %s header since %s"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
),
tstrerror
(
terrno
));
tdCloseTFile
(
&
tFile
);
tdRemoveTFile
(
&
tFile
);
goto
_err
;
}
else
{
smaDebug
(
"vgId:%d, succeed to update tfile %s header"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
));
}
tdCloseTFile
(
&
tFile
);
char
newFName
[
TSDB_FILENAME_LEN
];
strncpy
(
newFName
,
TD_FILE_FULL_NAME
(
&
tFile
),
TSDB_FILENAME_LEN
);
char
*
pos
=
strstr
(
newFName
,
tdQTaskInfoFname
[
TD_QTASK_TMP_FILE
]);
strncpy
(
pos
,
tdQTaskInfoFname
[
TD_QTASK_CUR_FILE
],
TSDB_FILENAME_LEN
-
POINTER_DISTANCE
(
pos
,
newFName
));
if
(
taosRenameFile
(
TD_FILE_FULL_NAME
(
&
tFile
),
newFName
)
!=
0
)
{
smaError
(
"vgId:%d, failed to rename %s to %s"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
),
newFName
);
goto
_err
;
}
else
{
smaDebug
(
"vgId:%d, succeed to rename %s to %s"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
),
newFName
);
}
}
goto
_end
;
_err:
if
(
isFileCreated
)
{
tdRemoveTFile
(
&
tFile
);
}
_end:
if
(
TASK_TRIGGER_STAT_INACTIVE
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INACTIVE
,
TASK_TRIGGER_STAT_ACTIVE
))
{
smaDebug
(
"vgId:%d, persist task is active again"
,
vid
);
}
else
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"vgId:%d, persist task is cancelled"
,
vid
);
}
else
{
smaWarn
(
"vgId:%d, persist task in abnormal stat %"
PRIi8
,
vid
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)));
ASSERT
(
0
);
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
taosThreadExit
(
NULL
);
return
NULL
;
}
static
void
tdRSmaPersistTask
(
SRSmaStat
*
pRSmaStat
)
{
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_DETACHED
);
TdThread
tid
;
if
(
taosThreadCreate
(
&
tid
,
&
thAttr
,
tdRSmaPersistExec
,
pRSmaStat
)
!=
0
)
{
if
(
TASK_TRIGGER_STAT_INACTIVE
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INACTIVE
,
TASK_TRIGGER_STAT_ACTIVE
))
{
smaDebug
(
"persist task is active again"
);
}
else
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
" persist task is cancelled and set finished"
);
}
else
{
smaWarn
(
"persist task in abnormal stat %"
PRIi8
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)));
ASSERT
(
0
);
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
}
taosThreadAttrDestroy
(
&
thAttr
);
}
/**
* @brief trigger to persist rsma qTaskInfo
*
* @param param
* @param tmrId
*/
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaStat
*
pRSmaStat
=
param
;
int8_t
tmrStat
=
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
switch
(
tmrStat
)
{
case
TASK_TRIGGER_STAT_ACTIVE
:
{
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
1
);
if
(
TASK_TRIGGER_STAT_CANCELLED
!=
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"rsma persistence start since active"
);
// start persist task
tdRSmaPersistTask
(
pRSmaStat
);
taosTmrReset
(
tdRSmaPersistTrigger
,
RSMA_QTASKINFO_PERSIST_MS
,
pRSmaStat
,
pRSmaStat
->
tmrHandle
,
&
pRSmaStat
->
tmrId
);
}
else
{
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
}
}
break
;
case
TASK_TRIGGER_STAT_CANCELLED
:
{
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_FINISHED
);
smaDebug
(
"rsma persistence not start since cancelled and finished"
);
}
break
;
case
TASK_TRIGGER_STAT_INACTIVE
:
{
smaDebug
(
"rsma persistence not start since inactive"
);
}
break
;
case
TASK_TRIGGER_STAT_INIT
:
{
smaDebug
(
"rsma persistence not start since init"
);
}
break
;
default:
{
smaWarn
(
"rsma persistence not start since unknown stat %"
PRIi8
,
tmrStat
);
ASSERT
(
0
);
}
break
;
}
}
int32_t
tdProcessRSmaRestoreImpl
(
SSma
*
pSma
)
{
int32_t
tdProcessRSmaRestoreImpl
(
SSma
*
pSma
)
{
SVnode
*
pVnode
=
pSma
->
pVnode
;
SVnode
*
pVnode
=
pSma
->
pVnode
;
// step 1: iterate all stables to restore the rsma env
// step 1: iterate all stables to restore the rsma env
SArray
*
suidList
=
taosArrayInit
(
1
,
sizeof
(
tb_uid_t
));
SArray
*
suidList
=
taosArrayInit
(
1
,
sizeof
(
tb_uid_t
));
if
(
tsdbGetStbIdList
(
SMA_META
(
pSma
),
0
,
suidList
)
<
0
)
{
if
(
tsdbGetStbIdList
(
SMA_META
(
pSma
),
0
,
suidList
)
<
0
)
{
smaError
(
"vgId:%d, failed to restore rsma since get stb id list error: %s"
,
TD_VID
(
pVnode
),
terrstr
());
taosArrayDestroy
(
suidList
);
smaError
(
"vgId:%d, failed to restore rsma env since get stb id list error: %s"
,
TD_VID
(
pVnode
),
terrstr
());
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
if
(
taosArrayGetSize
(
suidList
)
==
0
)
{
int32_t
arrSize
=
taosArrayGetSize
(
suidList
);
smaDebug
(
"vgId:%d no need to restore rsma since empty stb id list"
,
TD_VID
(
pVnode
));
if
(
arrSize
==
0
)
{
taosArrayDestroy
(
suidList
);
smaDebug
(
"vgId:%d, no need to restore rsma env since empty stb id list"
,
TD_VID
(
pVnode
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
SMetaReader
mr
=
{
0
};
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
SMA_META
(
pSma
),
0
);
metaReaderInit
(
&
mr
,
SMA_META
(
pSma
),
0
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
suidList
)
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
arrSize
;
++
i
)
{
tb_uid_t
suid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
suidList
,
i
);
tb_uid_t
suid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
suidList
,
i
);
smaDebug
(
"
suid [%d] is %"
PRIi64
,
i
,
suid
);
smaDebug
(
"
vgId:%d, rsma restore, suid[%d] is %"
PRIi64
,
TD_VID
(
pVnode
)
,
i
,
suid
);
if
(
metaGetTableEntryByUid
(
&
mr
,
suid
)
<
0
)
{
if
(
metaGetTableEntryByUid
(
&
mr
,
suid
)
<
0
)
{
smaError
(
"vgId:%d failed to get table meta for %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
smaError
(
"vgId:%d, rsma restore, failed to get table meta for %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
goto
_err
;
goto
_err
;
}
}
ASSERT
(
mr
.
me
.
type
==
TSDB_SUPER_TABLE
);
ASSERT
(
mr
.
me
.
type
==
TSDB_SUPER_TABLE
);
ASSERT
(
mr
.
me
.
uid
==
suid
);
ASSERT
(
mr
.
me
.
uid
==
suid
);
if
(
TABLE_IS_ROLLUP
(
mr
.
me
.
flags
))
{
if
(
TABLE_IS_ROLLUP
(
mr
.
me
.
flags
))
{
SRSmaParam
*
param
=
&
mr
.
me
.
stbEntry
.
rsmaParam
;
SRSmaParam
*
param
=
&
mr
.
me
.
stbEntry
.
rsmaParam
;
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
for
(
int
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
smaDebug
(
"vgId: %d table:%"
PRIi64
" maxdelay[%d]:%"
PRIi64
" watermark[%d]:%"
PRIi64
,
TD_VID
(
pSma
->
pVnode
),
smaDebug
(
"vgId:%d, rsma restore, table:%"
PRIi64
" level:%d, maxdelay:%"
PRIi64
" watermark:%"
PRIi64
suid
,
i
,
param
->
maxdelay
[
i
],
i
,
param
->
watermark
[
i
]);
" qmsgLen:%"
PRIi32
,
TD_VID
(
pVnode
),
suid
,
i
,
param
->
maxdelay
[
i
],
param
->
watermark
[
i
],
param
->
qmsgLen
[
i
]);
}
}
if
(
tdProcessRSmaCreateImpl
(
pSma
,
&
mr
.
me
.
stbEntry
.
rsmaParam
,
suid
,
mr
.
me
.
name
)
<
0
)
{
if
(
tdProcessRSmaCreateImpl
(
pSma
,
&
mr
.
me
.
stbEntry
.
rsmaParam
,
suid
,
mr
.
me
.
name
)
<
0
)
{
smaError
(
"vgId:%d
failed to retore rsma env
for %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
smaError
(
"vgId:%d
, rsma restore env failed
for %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
goto
_err
;
goto
_err
;
}
}
smaDebug
(
"vgId:%d, rsma restore env success for %"
PRIi64
,
TD_VID
(
pVnode
),
suid
);
}
}
}
}
// step 2: retrieve qtaskinfo
object from the rsma/qtaskinfo file
and restore
// step 2: retrieve qtaskinfo
items from the persistence file(rsma/qtaskinfo)
and restore
STFile
tFile
=
{
0
};
STFile
tFile
=
{
0
};
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
tdRSmaQTaskGetFName
(
TD_VID
(
pVnode
),
TD_QTASK_CUR_FILE
,
qTaskInfoFName
);
tdRSmaQTask
Info
GetFName
(
TD_VID
(
pVnode
),
TD_QTASK_CUR_FILE
,
qTaskInfoFName
);
if
(
tdInitTFile
(
&
tFile
,
pVnode
->
pTfs
,
qTaskInfoFName
)
<
0
)
{
if
(
tdInitTFile
(
&
tFile
,
pVnode
->
pTfs
,
qTaskInfoFName
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
if
(
!
taosCheckExistFile
(
TD_TFILE_FULL_NAME
(
&
tFile
)))
{
metaReaderClear
(
&
mr
);
taosArrayDestroy
(
suidList
);
return
TSDB_CODE_SUCCESS
;
}
if
(
tdOpenTFile
(
&
tFile
,
TD_FILE_READ
)
<
0
)
{
if
(
tdOpenTFile
(
&
tFile
,
TD_FILE_READ
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
SRSmaQTaskFIter
fIter
=
{
0
};
SRSmaQTaskInfoIter
fIter
=
{
0
};
if
(
tdRSmaQTaskInfoIterInit
(
&
fIter
,
&
tFile
)
<
0
)
{
if
(
tdRSmaQTaskInfoIterInit
(
&
fIter
,
&
tFile
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
SRSmaQTaskInfoItem
infoItem
=
{
0
};
SRSmaQTaskInfoItem
infoItem
=
{
0
};
bool
isEnd
=
false
;
if
(
tdRSmaQTaskInfoRestore
(
pSma
,
&
fIter
)
<
0
)
{
int32_t
code
=
0
;
tdRSmaQTaskInfoIterDestroy
(
&
fIter
);
while
((
code
=
tdRSmaQTaskInfoIterNext
(
&
fIter
,
&
infoItem
,
&
isEnd
))
==
0
)
{
if
(
isEnd
)
{
break
;
}
if
((
code
=
tdRSmaQTaskInfoItemRestore
(
pSma
,
&
infoItem
))
<
0
)
{
break
;
}
}
tdRSmaQTaskInfoIterDestroy
(
&
fIter
);
if
(
code
<
0
)
{
goto
_err
;
goto
_err
;
}
}
tdRSmaQTaskInfoIterDestroy
(
&
fIter
);
metaReaderClear
(
&
mr
);
metaReaderClear
(
&
mr
);
taosArrayDestroy
(
suidList
);
taosArrayDestroy
(
suidList
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_err:
_err:
ASSERT
(
0
);
metaReaderClear
(
&
mr
);
metaReaderClear
(
&
mr
);
taosArrayDestroy
(
suidList
);
taosArrayDestroy
(
suidList
);
smaError
(
"failed to restore rsma
info
since %s"
,
terrstr
());
smaError
(
"failed to restore rsma
task
since %s"
,
terrstr
());
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
static
int32_t
tdRSmaQTaskInfoItemRestore
(
SSma
*
pSma
,
const
SRSmaQTaskInfoItem
*
info
Item
)
{
static
int32_t
tdRSmaQTaskInfoItemRestore
(
SSma
*
pSma
,
const
SRSmaQTaskInfoItem
*
p
Item
)
{
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
((
SSmaEnv
*
)
pSma
->
pRSmaEnv
);
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
((
SSmaEnv
*
)
pSma
->
pRSmaEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
SRSmaInfo
*
pRSmaInfo
=
NULL
;
void
*
qTaskInfo
=
NULL
;
void
*
qTaskInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
infoItem
->
suid
,
sizeof
(
info
Item
->
suid
));
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
pItem
->
suid
,
sizeof
(
p
Item
->
suid
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
smaDebug
(
"vgId:%d, no restore as no rsma info for
suid:%"
PRIu64
,
SMA_VID
(
pSma
),
info
Item
->
suid
);
smaDebug
(
"vgId:%d, no restore as no rsma info for
table:%"
PRIu64
,
SMA_VID
(
pSma
),
p
Item
->
suid
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
if
(
info
Item
->
type
==
1
)
{
if
(
p
Item
->
type
==
1
)
{
qTaskInfo
=
pRSmaInfo
->
items
[
0
].
taskInfo
;
qTaskInfo
=
pRSmaInfo
->
items
[
0
].
taskInfo
;
}
else
if
(
info
Item
->
type
==
2
)
{
}
else
if
(
p
Item
->
type
==
2
)
{
qTaskInfo
=
pRSmaInfo
->
items
[
1
].
taskInfo
;
qTaskInfo
=
pRSmaInfo
->
items
[
1
].
taskInfo
;
}
else
{
}
else
{
ASSERT
(
0
);
ASSERT
(
0
);
}
}
if
(
!
qTaskInfo
)
{
if
(
!
qTaskInfo
)
{
smaDebug
(
"vgId:%d, no restore as NULL rsma qTaskInfo for
suid:%"
PRIu64
,
SMA_VID
(
pSma
),
info
Item
->
suid
);
smaDebug
(
"vgId:%d, no restore as NULL rsma qTaskInfo for
table:%"
PRIu64
,
SMA_VID
(
pSma
),
p
Item
->
suid
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
if
(
qDeserializeTaskStatus
(
qTaskInfo
,
infoItem
->
qTaskInfo
,
info
Item
->
len
)
<
0
)
{
if
(
qDeserializeTaskStatus
(
qTaskInfo
,
pItem
->
qTaskInfo
,
p
Item
->
len
)
<
0
)
{
smaError
(
"vgId:%d, restore rsma
failed for suid:%"
PRIi64
" level %d since %s"
,
SMA_VID
(
pSma
),
info
Item
->
suid
,
smaError
(
"vgId:%d, restore rsma
task failed for table:%"
PRIi64
" level %d since %s"
,
SMA_VID
(
pSma
),
p
Item
->
suid
,
info
Item
->
type
,
terrstr
(
terrno
));
p
Item
->
type
,
terrstr
(
terrno
));
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
smaDebug
(
"vgId:%d, restore rsma success for suid:%"
PRIi64
" level %d"
,
SMA_VID
(
pSma
),
infoItem
->
suid
,
smaDebug
(
"vgId:%d, restore rsma task success for table:%"
PRIi64
" level %d"
,
SMA_VID
(
pSma
),
pItem
->
suid
,
pItem
->
type
);
infoItem
->
type
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
tdRSmaQTaskInfoIterInit
(
SRSmaQTask
F
Iter
*
pIter
,
STFile
*
pTFile
)
{
static
int32_t
tdRSmaQTaskInfoIterInit
(
SRSmaQTask
Info
Iter
*
pIter
,
STFile
*
pTFile
)
{
memset
(
pIter
,
0
,
sizeof
(
*
pIter
));
memset
(
pIter
,
0
,
sizeof
(
*
pIter
));
pIter
->
pTFile
=
pTFile
;
pIter
->
pTFile
=
pTFile
;
pIter
->
offset
=
TD_FILE_HEAD_SIZE
;
pIter
->
offset
=
TD_FILE_HEAD_SIZE
;
...
@@ -1038,16 +835,17 @@ static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile) {
...
@@ -1038,16 +835,17 @@ static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile) {
pIter
->
nAlloc
=
TD_FILE_HEAD_SIZE
;
pIter
->
nAlloc
=
TD_FILE_HEAD_SIZE
;
}
}
pIter
->
b
uf
=
taosMemoryMalloc
(
pIter
->
nAlloc
);
pIter
->
pB
uf
=
taosMemoryMalloc
(
pIter
->
nAlloc
);
if
(
!
pIter
->
b
uf
)
{
if
(
!
pIter
->
pB
uf
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
pIter
->
qBuf
=
pIter
->
pBuf
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
tdRSmaQTaskInfoIterNextBlock
(
SRSmaQTask
F
Iter
*
pIter
,
bool
*
isFinish
)
{
static
int32_t
tdRSmaQTaskInfoIterNextBlock
(
SRSmaQTask
Info
Iter
*
pIter
,
bool
*
isFinish
)
{
STFile
*
pTFile
=
pIter
->
pTFile
;
STFile
*
pTFile
=
pIter
->
pTFile
;
int64_t
nBytes
=
RSMA_QTASKINFO_BUFSIZE
;
int64_t
nBytes
=
RSMA_QTASKINFO_BUFSIZE
;
...
@@ -1065,22 +863,23 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFini
...
@@ -1065,22 +863,23 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFini
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
if
(
tdReadTFile
(
pTFile
,
pIter
->
b
uf
,
nBytes
)
!=
nBytes
)
{
if
(
tdReadTFile
(
pTFile
,
pIter
->
qB
uf
,
nBytes
)
!=
nBytes
)
{
ASSERT
(
0
);
ASSERT
(
0
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
int32_t
infoLen
=
0
;
int32_t
infoLen
=
0
;
taosDecodeFixedI32
(
pIter
->
b
uf
,
&
infoLen
);
taosDecodeFixedI32
(
pIter
->
qB
uf
,
&
infoLen
);
if
(
infoLen
>
nBytes
)
{
if
(
infoLen
>
nBytes
)
{
ASSERT
(
infoLen
>
RSMA_QTASKINFO_BUFSIZE
);
ASSERT
(
infoLen
>
RSMA_QTASKINFO_BUFSIZE
);
pIter
->
nAlloc
=
infoLen
;
pIter
->
nAlloc
=
infoLen
;
void
*
pBuf
=
taosMemoryRealloc
(
pIter
->
b
uf
,
infoLen
);
void
*
pBuf
=
taosMemoryRealloc
(
pIter
->
pB
uf
,
infoLen
);
if
(
!
pBuf
)
{
if
(
!
pBuf
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
pIter
->
buf
=
pBuf
;
pIter
->
pBuf
=
pBuf
;
pIter
->
qBuf
=
pIter
->
pBuf
;
nBytes
=
infoLen
;
nBytes
=
infoLen
;
if
(
tdSeekTFile
(
pTFile
,
pIter
->
offset
,
SEEK_SET
))
{
if
(
tdSeekTFile
(
pTFile
,
pIter
->
offset
,
SEEK_SET
))
{
...
@@ -1088,7 +887,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFini
...
@@ -1088,7 +887,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFini
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
if
(
tdReadTFile
(
pTFile
,
pIter
->
b
uf
,
nBytes
)
!=
nBytes
)
{
if
(
tdReadTFile
(
pTFile
,
pIter
->
pB
uf
,
nBytes
)
!=
nBytes
)
{
ASSERT
(
0
);
ASSERT
(
0
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
...
@@ -1101,7 +900,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFini
...
@@ -1101,7 +900,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFini
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
tdRSmaQTaskInfo
IterNext
(
SRSmaQTaskFIter
*
pIter
,
SRSmaQTaskInfoItem
*
pItem
,
bool
*
isEnd
)
{
static
int32_t
tdRSmaQTaskInfo
Restore
(
SSma
*
pSma
,
SRSmaQTaskInfoIter
*
pIter
)
{
while
(
1
)
{
while
(
1
)
{
// block iter
// block iter
bool
isFinish
=
false
;
bool
isFinish
=
false
;
...
@@ -1110,30 +909,39 @@ static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoIte
...
@@ -1110,30 +909,39 @@ static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoIte
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
if
(
isFinish
)
{
if
(
isFinish
)
{
*
isEnd
=
true
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
// consume the block
// consume the block
int32_t
qTaskInfoLenWithHead
=
0
;
int32_t
qTaskInfoLenWithHead
=
0
;
pIter
->
buf
=
taosDecodeFixedI32
(
pIter
->
b
uf
,
&
qTaskInfoLenWithHead
);
pIter
->
qBuf
=
taosDecodeFixedI32
(
pIter
->
qB
uf
,
&
qTaskInfoLenWithHead
);
if
(
qTaskInfoLenWithHead
<
0
)
{
if
(
qTaskInfoLenWithHead
<
RSMA_QTASKINFO_HEAD_LEN
)
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
while
(
1
)
{
while
(
1
)
{
if
((
pIter
->
nBufPos
+
qTaskInfoLenWithHead
)
<=
pIter
->
nBytes
)
{
if
((
pIter
->
nBufPos
+
qTaskInfoLenWithHead
)
<=
pIter
->
nBytes
)
{
pIter
->
buf
=
taosDecodeFixedI8
(
pIter
->
buf
,
&
pItem
->
type
);
SRSmaQTaskInfoItem
infoItem
=
{
0
};
pIter
->
buf
=
taosDecodeFixedI64
(
pIter
->
buf
,
&
pItem
->
suid
);
pIter
->
qBuf
=
taosDecodeFixedI8
(
pIter
->
qBuf
,
&
infoItem
.
type
);
pItem
->
qTaskInfo
=
pIter
->
buf
;
pIter
->
qBuf
=
taosDecodeFixedI64
(
pIter
->
qBuf
,
&
infoItem
.
suid
);
pItem
->
len
=
tdRSmaQTaskInfoContLen
(
qTaskInfoLenWithHead
);
infoItem
.
qTaskInfo
=
pIter
->
qBuf
;
infoItem
.
len
=
tdRSmaQTaskInfoContLen
(
qTaskInfoLenWithHead
);
// do the restore job
// do the restore job
printf
(
"%s:%d ###### restore the qtask info offset:%"
PRIi64
"
\n
"
,
__func__
,
__LINE__
,
pIter
->
offset
);
smaDebug
(
"vgId:%d, restore the qtask info %s offset:%"
PRIi64
"
\n
"
,
SMA_VID
(
pSma
),
TD_TFILE_FULL_NAME
(
pIter
->
pTFile
),
pIter
->
offset
-
pIter
->
nBytes
+
pIter
->
nBufPos
);
tdRSmaQTaskInfoItemRestore
(
pSma
,
&
infoItem
);
pIter
->
buf
=
POINTER_SHIFT
(
pIter
->
buf
,
pItem
->
len
);
pIter
->
qBuf
=
POINTER_SHIFT
(
pIter
->
qBuf
,
infoItem
.
len
);
pIter
->
nBufPos
+=
qTaskInfoLenWithHead
;
pIter
->
nBufPos
+=
qTaskInfoLenWithHead
;
pIter
->
buf
=
taosDecodeFixedI32
(
pIter
->
buf
,
&
qTaskInfoLenWithHead
);
if
((
pIter
->
nBufPos
+
RSMA_QTASKINFO_HEAD_LEN
)
>=
pIter
->
nBytes
)
{
// prepare and load next block in the file
pIter
->
offset
-=
(
pIter
->
nBytes
-
pIter
->
nBufPos
);
break
;
}
pIter
->
qBuf
=
taosDecodeFixedI32
(
pIter
->
qBuf
,
&
qTaskInfoLenWithHead
);
continue
;
continue
;
}
}
// prepare and load next block in the file
// prepare and load next block in the file
...
@@ -1144,3 +952,208 @@ static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoIte
...
@@ -1144,3 +952,208 @@ static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoIte
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
void
tdRSmaQTaskInfoGetFName
(
int32_t
vid
,
int8_t
ftype
,
char
*
outputName
)
{
tdGetVndFileName
(
vid
,
VNODE_RSMA_DIR
,
tdQTaskInfoFname
[
ftype
],
outputName
);
}
static
void
*
tdRSmaPersistExec
(
void
*
param
)
{
setThreadName
(
"rsma-task-persist"
);
SRSmaStat
*
pRSmaStat
=
param
;
SSma
*
pSma
=
pRSmaStat
->
pSma
;
STfs
*
pTfs
=
pSma
->
pVnode
->
pTfs
;
int32_t
vid
=
SMA_VID
(
pSma
);
int64_t
toffset
=
0
;
bool
isFileCreated
=
false
;
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)))
{
goto
_end
;
}
void
*
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
NULL
);
if
(
!
infoHash
)
{
goto
_end
;
}
STFile
tFile
=
{
0
};
while
(
infoHash
)
{
SRSmaInfo
*
pRSmaInfo
=
*
(
SRSmaInfo
**
)
infoHash
;
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
qTaskInfo_t
taskInfo
=
pRSmaInfo
->
items
[
i
].
taskInfo
;
if
(
!
taskInfo
)
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d qTaskInfo is NULL"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
continue
;
}
char
*
pOutput
=
NULL
;
int32_t
len
=
0
;
int8_t
type
=
(
int8_t
)(
i
+
1
);
if
(
qSerializeTaskStatus
(
taskInfo
,
&
pOutput
,
&
len
)
<
0
)
{
smaError
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task failed since %s"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
terrstr
(
terrno
));
goto
_err
;
}
if
(
!
pOutput
||
len
<=
0
)
{
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task success but no output(len %d), not persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
taosMemoryFreeClear
(
pOutput
);
continue
;
}
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d serialize rsma task success with len %d, need persist"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
);
#if 0
if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) {
smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid,
i + 1, terrstr(terrno));
} else {
smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1);
}
#endif
if
(
!
isFileCreated
)
{
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
tdRSmaQTaskInfoGetFName
(
vid
,
TD_QTASK_TMP_FILE
,
qTaskInfoFName
);
tdInitTFile
(
&
tFile
,
pTfs
,
qTaskInfoFName
);
tdCreateTFile
(
&
tFile
,
pTfs
,
true
,
-
1
);
isFileCreated
=
true
;
}
char
tmpBuf
[
RSMA_QTASKINFO_HEAD_LEN
]
=
{
0
};
void
*
pTmpBuf
=
&
tmpBuf
;
int32_t
headLen
=
0
;
headLen
+=
taosEncodeFixedI32
(
&
pTmpBuf
,
len
+
RSMA_QTASKINFO_HEAD_LEN
);
headLen
+=
taosEncodeFixedI8
(
&
pTmpBuf
,
type
);
headLen
+=
taosEncodeFixedI64
(
&
pTmpBuf
,
pRSmaInfo
->
suid
);
ASSERT
(
headLen
<=
RSMA_QTASKINFO_HEAD_LEN
);
tdAppendTFile
(
&
tFile
,
(
void
*
)
&
tmpBuf
,
headLen
,
&
toffset
);
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d head part len:%d appended to offset:%"
PRIi64
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
headLen
,
toffset
);
tdAppendTFile
(
&
tFile
,
pOutput
,
len
,
&
toffset
);
smaDebug
(
"vgId:%d, table %"
PRIi64
" level %d body part len:%d appended to offset:%"
PRIi64
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
,
len
,
toffset
);
taosMemoryFree
(
pOutput
);
}
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
infoHash
);
}
_normal:
if
(
isFileCreated
)
{
if
(
tdUpdateTFileHeader
(
&
tFile
)
<
0
)
{
smaError
(
"vgId:%d, failed to update tfile %s header since %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
tstrerror
(
terrno
));
tdCloseTFile
(
&
tFile
);
tdRemoveTFile
(
&
tFile
);
goto
_err
;
}
else
{
smaDebug
(
"vgId:%d, succeed to update tfile %s header"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
));
}
tdCloseTFile
(
&
tFile
);
char
newFName
[
TSDB_FILENAME_LEN
];
strncpy
(
newFName
,
TD_TFILE_FULL_NAME
(
&
tFile
),
TSDB_FILENAME_LEN
);
char
*
pos
=
strstr
(
newFName
,
tdQTaskInfoFname
[
TD_QTASK_TMP_FILE
]);
strncpy
(
pos
,
tdQTaskInfoFname
[
TD_QTASK_CUR_FILE
],
TSDB_FILENAME_LEN
-
POINTER_DISTANCE
(
pos
,
newFName
));
if
(
taosRenameFile
(
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
)
!=
0
)
{
smaError
(
"vgId:%d, failed to rename %s to %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
);
goto
_err
;
}
else
{
smaDebug
(
"vgId:%d, succeed to rename %s to %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
newFName
);
}
}
goto
_end
;
_err:
if
(
isFileCreated
)
{
tdRemoveTFile
(
&
tFile
);
}
_end:
if
(
TASK_TRIGGER_STAT_INACTIVE
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INACTIVE
,
TASK_TRIGGER_STAT_ACTIVE
))
{
smaDebug
(
"vgId:%d, persist task is active again"
,
vid
);
}
else
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"vgId:%d, persist task is cancelled"
,
vid
);
}
else
{
smaWarn
(
"vgId:%d, persist task in abnormal stat %"
PRIi8
,
vid
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)));
ASSERT
(
0
);
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
taosThreadExit
(
NULL
);
return
NULL
;
}
static
void
tdRSmaPersistTask
(
SRSmaStat
*
pRSmaStat
)
{
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_DETACHED
);
TdThread
tid
;
if
(
taosThreadCreate
(
&
tid
,
&
thAttr
,
tdRSmaPersistExec
,
pRSmaStat
)
!=
0
)
{
if
(
TASK_TRIGGER_STAT_INACTIVE
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_INACTIVE
,
TASK_TRIGGER_STAT_ACTIVE
))
{
smaDebug
(
"vgId:%d, persist task is active again"
,
SMA_VID
(
pRSmaStat
->
pSma
));
}
else
if
(
TASK_TRIGGER_STAT_CANCELLED
==
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"vgId:%d, persist task is cancelled and set finished"
,
SMA_VID
(
pRSmaStat
->
pSma
));
}
else
{
smaWarn
(
"vgId:%d, persist task in abnormal stat %"
PRIi8
,
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
)),
SMA_VID
(
pRSmaStat
->
pSma
));
ASSERT
(
0
);
}
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
}
taosThreadAttrDestroy
(
&
thAttr
);
}
/**
* @brief trigger to persist rsma qTaskInfo
*
* @param param
* @param tmrId
*/
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaStat
*
pRSmaStat
=
param
;
int8_t
tmrStat
=
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
switch
(
tmrStat
)
{
case
TASK_TRIGGER_STAT_ACTIVE
:
{
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
1
);
if
(
TASK_TRIGGER_STAT_CANCELLED
!=
atomic_val_compare_exchange_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_CANCELLED
,
TASK_TRIGGER_STAT_FINISHED
))
{
smaDebug
(
"vgId:%d, rsma persistence start since active"
,
SMA_VID
(
pRSmaStat
->
pSma
));
// start persist task
tdRSmaPersistTask
(
pRSmaStat
);
taosTmrReset
(
tdRSmaPersistTrigger
,
RSMA_QTASKINFO_PERSIST_MS
,
pRSmaStat
,
pRSmaStat
->
tmrHandle
,
&
pRSmaStat
->
tmrId
);
}
else
{
atomic_store_8
(
RSMA_RUNNING_STAT
(
pRSmaStat
),
0
);
}
}
break
;
case
TASK_TRIGGER_STAT_CANCELLED
:
{
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
),
TASK_TRIGGER_STAT_FINISHED
);
smaDebug
(
"rsma persistence not start since cancelled and finished"
);
}
break
;
case
TASK_TRIGGER_STAT_INACTIVE
:
{
smaDebug
(
"rsma persistence not start since inactive"
);
}
break
;
case
TASK_TRIGGER_STAT_INIT
:
{
smaDebug
(
"rsma persistence not start since init"
);
}
break
;
default:
{
smaWarn
(
"rsma persistence not start since unknown stat %"
PRIi8
,
tmrStat
);
}
break
;
}
}
source/dnode/vnode/src/sma/smaUtil.c
浏览文件 @
b66caf41
...
@@ -22,7 +22,6 @@
...
@@ -22,7 +22,6 @@
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF
static
int32_t
tdEncodeTFInfo
(
void
**
buf
,
STFInfo
*
pInfo
);
static
int32_t
tdEncodeTFInfo
(
void
**
buf
,
STFInfo
*
pInfo
);
static
void
*
tdDecodeTFInfo
(
void
*
buf
,
STFInfo
*
pInfo
);
static
void
*
tdDecodeTFInfo
(
void
*
buf
,
STFInfo
*
pInfo
);
...
@@ -46,7 +45,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
...
@@ -46,7 +45,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
}
}
int64_t
tdWriteTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
)
{
int64_t
tdWriteTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
)
{
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
ASSERT
(
TD_
T
FILE_OPENED
(
pTFile
));
int64_t
nwrite
=
taosWriteFile
(
pTFile
->
pFile
,
buf
,
nbyte
);
int64_t
nwrite
=
taosWriteFile
(
pTFile
->
pFile
,
buf
,
nbyte
);
if
(
nwrite
<
nbyte
)
{
if
(
nwrite
<
nbyte
)
{
...
@@ -58,9 +57,9 @@ int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
...
@@ -58,9 +57,9 @@ int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
}
}
int64_t
tdSeekTFile
(
STFile
*
pTFile
,
int64_t
offset
,
int
whence
)
{
int64_t
tdSeekTFile
(
STFile
*
pTFile
,
int64_t
offset
,
int
whence
)
{
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
ASSERT
(
TD_
T
FILE_OPENED
(
pTFile
));
int64_t
loffset
=
taosLSeekFile
(
TD_FILE_PFILE
(
pTFile
),
offset
,
whence
);
int64_t
loffset
=
taosLSeekFile
(
TD_
T
FILE_PFILE
(
pTFile
),
offset
,
whence
);
if
(
loffset
<
0
)
{
if
(
loffset
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
...
@@ -70,12 +69,12 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
...
@@ -70,12 +69,12 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
}
}
int64_t
tdGetTFileSize
(
STFile
*
pTFile
,
int64_t
*
size
)
{
int64_t
tdGetTFileSize
(
STFile
*
pTFile
,
int64_t
*
size
)
{
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
ASSERT
(
TD_
T
FILE_OPENED
(
pTFile
));
return
taosFStatFile
(
pTFile
->
pFile
,
size
,
NULL
);
return
taosFStatFile
(
pTFile
->
pFile
,
size
,
NULL
);
}
}
int64_t
tdReadTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
)
{
int64_t
tdReadTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
)
{
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
ASSERT
(
TD_
T
FILE_OPENED
(
pTFile
));
int64_t
nread
=
taosReadFile
(
pTFile
->
pFile
,
buf
,
nbyte
);
int64_t
nread
=
taosReadFile
(
pTFile
->
pFile
,
buf
,
nbyte
);
if
(
nread
<
0
)
{
if
(
nread
<
0
)
{
...
@@ -108,7 +107,7 @@ int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) {
...
@@ -108,7 +107,7 @@ int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) {
char
buf
[
TD_FILE_HEAD_SIZE
]
=
"
\0
"
;
char
buf
[
TD_FILE_HEAD_SIZE
]
=
"
\0
"
;
uint32_t
_version
;
uint32_t
_version
;
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
ASSERT
(
TD_
T
FILE_OPENED
(
pTFile
));
if
(
tdSeekTFile
(
pTFile
,
0
,
SEEK_SET
)
<
0
)
{
if
(
tdSeekTFile
(
pTFile
,
0
,
SEEK_SET
)
<
0
)
{
return
-
1
;
return
-
1
;
...
@@ -133,7 +132,7 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) {
...
@@ -133,7 +132,7 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) {
}
}
int64_t
tdAppendTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
,
int64_t
*
offset
)
{
int64_t
tdAppendTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
,
int64_t
*
offset
)
{
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
ASSERT
(
TD_
T
FILE_OPENED
(
pTFile
));
int64_t
toffset
;
int64_t
toffset
;
...
@@ -141,6 +140,11 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
...
@@ -141,6 +140,11 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
return
-
1
;
return
-
1
;
}
}
#if 1
smaDebug
(
"append to file %s, offset:%"
PRIi64
" + nbyte:%"
PRIi64
" =%"
PRIi64
,
TD_TFILE_FULL_NAME
(
pTFile
),
toffset
,
nbyte
,
toffset
+
nbyte
);
#endif
ASSERT
(
pTFile
->
info
.
fsize
==
toffset
);
ASSERT
(
pTFile
->
info
.
fsize
==
toffset
);
if
(
offset
)
{
if
(
offset
)
{
...
@@ -157,9 +161,9 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
...
@@ -157,9 +161,9 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
}
}
int32_t
tdOpenTFile
(
STFile
*
pTFile
,
int
flags
)
{
int32_t
tdOpenTFile
(
STFile
*
pTFile
,
int
flags
)
{
ASSERT
(
!
TD_FILE_OPENED
(
pTFile
));
ASSERT
(
!
TD_
T
FILE_OPENED
(
pTFile
));
pTFile
->
pFile
=
taosOpenFile
(
TD_FILE_FULL_NAME
(
pTFile
),
flags
);
pTFile
->
pFile
=
taosOpenFile
(
TD_
T
FILE_FULL_NAME
(
pTFile
),
flags
);
if
(
pTFile
->
pFile
==
NULL
)
{
if
(
pTFile
->
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
...
@@ -169,9 +173,9 @@ int32_t tdOpenTFile(STFile *pTFile, int flags) {
...
@@ -169,9 +173,9 @@ int32_t tdOpenTFile(STFile *pTFile, int flags) {
}
}
void
tdCloseTFile
(
STFile
*
pTFile
)
{
void
tdCloseTFile
(
STFile
*
pTFile
)
{
if
(
TD_FILE_OPENED
(
pTFile
))
{
if
(
TD_
T
FILE_OPENED
(
pTFile
))
{
taosCloseFile
(
&
pTFile
->
pFile
);
taosCloseFile
(
&
pTFile
->
pFile
);
TD_FILE_SET_CLOSED
(
pTFile
);
TD_
T
FILE_SET_CLOSED
(
pTFile
);
}
}
}
}
...
@@ -183,8 +187,8 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
...
@@ -183,8 +187,8 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
char
fullname
[
TSDB_FILENAME_LEN
];
char
fullname
[
TSDB_FILENAME_LEN
];
SDiskID
did
=
{
0
};
SDiskID
did
=
{
0
};
TD_FILE_SET_STATE
(
pTFile
,
TD_FILE_STATE_OK
);
TD_
T
FILE_SET_STATE
(
pTFile
,
TD_FILE_STATE_OK
);
TD_FILE_SET_CLOSED
(
pTFile
);
TD_
T
FILE_SET_CLOSED
(
pTFile
);
memset
(
&
(
pTFile
->
info
),
0
,
sizeof
(
pTFile
->
info
));
memset
(
&
(
pTFile
->
info
),
0
,
sizeof
(
pTFile
->
info
));
pTFile
->
info
.
magic
=
TD_FILE_INIT_MAGIC
;
pTFile
->
info
.
magic
=
TD_FILE_INIT_MAGIC
;
...
@@ -202,18 +206,18 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
...
@@ -202,18 +206,18 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
int32_t
tdCreateTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
bool
updateHeader
,
int8_t
fType
)
{
int32_t
tdCreateTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
bool
updateHeader
,
int8_t
fType
)
{
ASSERT
(
pTFile
->
info
.
fsize
==
0
&&
pTFile
->
info
.
magic
==
TD_FILE_INIT_MAGIC
);
ASSERT
(
pTFile
->
info
.
fsize
==
0
&&
pTFile
->
info
.
magic
==
TD_FILE_INIT_MAGIC
);
pTFile
->
pFile
=
taosOpenFile
(
TD_FILE_FULL_NAME
(
pTFile
),
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
pTFile
->
pFile
=
taosOpenFile
(
TD_
T
FILE_FULL_NAME
(
pTFile
),
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pTFile
->
pFile
==
NULL
)
{
if
(
pTFile
->
pFile
==
NULL
)
{
if
(
errno
==
ENOENT
)
{
if
(
errno
==
ENOENT
)
{
// Try to create directory recursively
// Try to create directory recursively
char
*
s
=
strdup
(
TD_FILE_REL_NAME
(
pTFile
));
char
*
s
=
strdup
(
TD_
T
FILE_REL_NAME
(
pTFile
));
if
(
tfsMkdirRecurAt
(
pTfs
,
taosDirName
(
s
),
TD_FILE_DID
(
pTFile
))
<
0
)
{
if
(
tfsMkdirRecurAt
(
pTfs
,
taosDirName
(
s
),
TD_
T
FILE_DID
(
pTFile
))
<
0
)
{
taosMemoryFreeClear
(
s
);
taosMemoryFreeClear
(
s
);
return
-
1
;
return
-
1
;
}
}
taosMemoryFreeClear
(
s
);
taosMemoryFreeClear
(
s
);
pTFile
->
pFile
=
taosOpenFile
(
TD_FILE_FULL_NAME
(
pTFile
),
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
pTFile
->
pFile
=
taosOpenFile
(
TD_
T
FILE_FULL_NAME
(
pTFile
),
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pTFile
->
pFile
==
NULL
)
{
if
(
pTFile
->
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
...
@@ -240,7 +244,7 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp
...
@@ -240,7 +244,7 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp
return
0
;
return
0
;
}
}
int32_t
tdRemoveTFile
(
STFile
*
pTFile
)
{
return
tfsRemoveFile
(
TD_FILE_F
(
pTFile
));
}
int32_t
tdRemoveTFile
(
STFile
*
pTFile
)
{
return
tfsRemoveFile
(
TD_
T
FILE_F
(
pTFile
));
}
// smaXXXUtil ================
// smaXXXUtil ================
// ...
// ...
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
b66caf41
...
@@ -152,14 +152,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
...
@@ -152,14 +152,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
return
pVnode
;
return
pVnode
;
_err:
_err:
if
(
pVnode
->
pSma
)
smaClose
(
pVnode
->
pSma
);
if
(
pVnode
->
pSma
)
smaClose
Env
(
pVnode
->
pSma
);
if
(
pVnode
->
pQuery
)
vnodeQueryClose
(
pVnode
);
if
(
pVnode
->
pQuery
)
vnodeQueryClose
(
pVnode
);
if
(
pVnode
->
pTq
)
tqClose
(
pVnode
->
pTq
);
if
(
pVnode
->
pTq
)
tqClose
(
pVnode
->
pTq
);
if
(
pVnode
->
pWal
)
walClose
(
pVnode
->
pWal
);
if
(
pVnode
->
pWal
)
walClose
(
pVnode
->
pWal
);
if
(
pVnode
->
pTsdb
)
tsdbClose
(
&
pVnode
->
pTsdb
);
if
(
pVnode
->
pTsdb
)
tsdbClose
(
&
pVnode
->
pTsdb
);
if
(
pVnode
->
pSma
)
smaCloseEx
(
pVnode
->
pSma
);
if
(
pVnode
->
pMeta
)
metaClose
(
pVnode
->
pMeta
);
if
(
pVnode
->
pMeta
)
metaClose
(
pVnode
->
pMeta
);
tsem_destroy
(
&
(
pVnode
->
canCommit
));
tsem_destroy
(
&
(
pVnode
->
canCommit
));
taosMemoryFree
(
pVnode
);
taosMemoryFree
(
pVnode
);
return
NULL
;
return
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录