Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d3eb1a74
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d3eb1a74
编写于
12月 26, 2022
作者:
D
dapan1121
提交者:
GitHub
12月 26, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19137 from taosdata/feature/TD-19148-D
feat: rsma snapshot refactor for stream state
上级
fd4515cf
3db53d9f
变更
19
展开全部
隐藏空白更改
内联
并排
Showing
19 changed file
with
1271 addition
and
1043 deletion
+1271
-1043
include/util/taoserror.h
include/util/taoserror.h
+4
-1
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+25
-73
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-4
source/dnode/vnode/src/sma/smaCommit.c
source/dnode/vnode/src/sma/smaCommit.c
+30
-143
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+1
-4
source/dnode/vnode/src/sma/smaFS.c
source/dnode/vnode/src/sma/smaFS.c
+646
-183
source/dnode/vnode/src/sma/smaOpen.c
source/dnode/vnode/src/sma/smaOpen.c
+11
-8
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+144
-119
source/dnode/vnode/src/sma/smaSnapshot.c
source/dnode/vnode/src/sma/smaSnapshot.c
+290
-228
source/dnode/vnode/src/sma/smaTimeRange.c
source/dnode/vnode/src/sma/smaTimeRange.c
+29
-11
source/dnode/vnode/src/sma/smaUtil.c
source/dnode/vnode/src/sma/smaUtil.c
+51
-244
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+3
-3
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+14
-8
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+7
-7
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+4
-1
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+1
-1
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+3
-3
source/util/src/terror.c
source/util/src/terror.c
+4
-0
未找到文件。
include/util/taoserror.h
浏览文件 @
d3eb1a74
...
...
@@ -700,7 +700,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
// #define TSDB_CODE_RSMA_FILE_CORRUPTED
TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_FS_COMMIT
TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
...
...
@@ -708,6 +708,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158)
#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159)
#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160)
#define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3161)
#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3162)
#define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3163)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
d3eb1a74
...
...
@@ -44,7 +44,6 @@ typedef struct SRSmaInfoItem SRSmaInfoItem;
typedef
struct
SRSmaFS
SRSmaFS
;
typedef
struct
SQTaskFile
SQTaskFile
;
typedef
struct
SQTaskFReader
SQTaskFReader
;
typedef
struct
SQTaskFWriter
SQTaskFWriter
;
struct
SSmaEnv
{
SRWLatch
lock
;
...
...
@@ -85,22 +84,20 @@ struct STSmaStat {
struct
SQTaskFile
{
volatile
int32_t
nRef
;
int32_t
padding
;
int8_t
level
;
int64_t
suid
;
int64_t
version
;
int64_t
size
;
int64_t
mtime
;
};
struct
SQTaskFReader
{
SSma
*
pSma
;
int8_t
level
;
int64_t
suid
;
int64_t
version
;
TdFilePtr
pReadH
;
};
struct
SQTaskFWriter
{
SSma
*
pSma
;
int64_t
version
;
TdFilePtr
pWriteH
;
char
*
fname
;
};
struct
SRSmaFS
{
SArray
*
aQTaskInf
;
// array of SQTaskFile
...
...
@@ -214,85 +211,40 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
// rsma
void
*
tdFreeRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
,
bool
isDeepFree
);
int32_t
tdRSmaFSOpen
(
SSma
*
pSma
,
int64_t
version
);
int32_t
tdRSmaFSOpen
(
SSma
*
pSma
,
int64_t
version
,
int8_t
rollback
);
void
tdRSmaFSClose
(
SRSmaFS
*
fs
);
int32_t
tdRSmaFSRef
(
SSma
*
pSma
,
SRSmaStat
*
pStat
,
int64_t
version
);
void
tdRSmaFSUnRef
(
SSma
*
pSma
,
SRSmaStat
*
pStat
,
int64_t
version
);
int64_t
tdRSmaFSMaxVer
(
SSma
*
pSma
,
SRSmaStat
*
pStat
);
int32_t
tdRSmaFSUpsertQTaskFile
(
SRSmaFS
*
pFS
,
SQTaskFile
*
qTaskFile
);
int32_t
tdRSmaRestore
(
SSma
*
pSma
,
int8_t
type
,
int64_t
committedVer
);
int32_t
tdRSmaFSPrepareCommit
(
SSma
*
pSma
,
SRSmaFS
*
pFSNew
);
int32_t
tdRSmaFSCommit
(
SSma
*
pSma
);
int32_t
tdRSmaFSFinishCommit
(
SSma
*
pSma
);
int32_t
tdRSmaFSCopy
(
SSma
*
pSma
,
SRSmaFS
*
pFS
);
int32_t
tdRSmaFSTakeSnapshot
(
SSma
*
pSma
,
SRSmaFS
*
pFS
);
int32_t
tdRSmaFSRef
(
SSma
*
pSma
,
SRSmaFS
*
pFS
);
void
tdRSmaFSUnRef
(
SSma
*
pSma
,
SRSmaFS
*
pFS
);
int32_t
tdRSmaFSUpsertQTaskFile
(
SSma
*
pSma
,
SRSmaFS
*
pFS
,
SQTaskFile
*
qTaskFile
,
int32_t
nSize
);
int32_t
tdRSmaFSRollback
(
SSma
*
pSma
);
int32_t
tdRSmaRestore
(
SSma
*
pSma
,
int8_t
type
,
int64_t
committedVer
,
int8_t
rollback
);
int32_t
tdRSmaProcessCreateImpl
(
SSma
*
pSma
,
SRSmaParam
*
param
,
int64_t
suid
,
const
char
*
tbName
);
int32_t
tdRSmaProcessExecImpl
(
SSma
*
pSma
,
ERsmaExecType
type
);
int32_t
tdRSmaPersistExecImpl
(
SRSmaStat
*
pRSmaStat
,
SHashObj
*
pInfoHash
);
int32_t
tdRSmaProcessRestoreImpl
(
SSma
*
pSma
,
int8_t
type
,
int64_t
qtaskFileVer
);
void
tdRSmaQTaskInfoGetFileName
(
int32_t
vgId
,
int64_t
version
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFullName
(
int32_t
vgId
,
int64_t
version
,
const
char
*
path
,
char
*
outputName
);
int32_t
tdRSmaProcessRestoreImpl
(
SSma
*
pSma
,
int8_t
type
,
int64_t
qtaskFileVer
,
int8_t
rollback
);
void
tdRSmaQTaskInfoGetFileName
(
int32_t
vgId
,
int64_t
suid
,
int8_t
level
,
int64_t
version
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFullName
(
int32_t
vgId
,
int64_t
suid
,
int8_t
level
,
int64_t
version
,
const
char
*
path
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFullPath
(
int32_t
vgId
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFullPathEx
(
int32_t
vgId
,
tb_uid_t
suid
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
);
static
FORCE_INLINE
void
tdRefRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pRSmaInfo
)
{
int32_t
ref
=
T_REF_INC
(
pRSmaInfo
);
sma
Debug
(
"vgId:%d, ref rsma info:%p, val:%d"
,
SMA_VID
(
pSma
),
pRSmaInfo
,
ref
);
sma
Trace
(
"vgId:%d, ref rsma info:%p, val:%d"
,
SMA_VID
(
pSma
),
pRSmaInfo
,
ref
);
}
static
FORCE_INLINE
void
tdUnRefRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pRSmaInfo
)
{
int32_t
ref
=
T_REF_DEC
(
pRSmaInfo
);
sma
Debug
(
"vgId:%d, unref rsma info:%p, val:%d"
,
SMA_VID
(
pSma
),
pRSmaInfo
,
ref
);
sma
Trace
(
"vgId:%d, unref rsma info:%p, val:%d"
,
SMA_VID
(
pSma
),
pRSmaInfo
,
ref
);
}
// smaFileUtil ================
#define TD_FILE_HEAD_SIZE 512
typedef
struct
STFInfo
STFInfo
;
typedef
struct
STFile
STFile
;
struct
STFInfo
{
// common fields
uint32_t
magic
;
uint32_t
ftype
;
uint32_t
fver
;
int64_t
fsize
;
};
enum
{
TD_FTYPE_RSMA_QTASKINFO
=
0
,
};
#if 0
struct STFile {
uint8_t state;
STFInfo info;
char *fname;
TdFilePtr pFile;
};
#define TD_TFILE_PFILE(tf) ((tf)->pFile)
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_FULL_NAME(tf) ((tf)->fname)
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf))
#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL)
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname);
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType);
int32_t tdOpenTFile(STFile *pTFile, int flags);
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset);
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size);
int32_t tdRemoveTFile(STFile *pTFile);
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
int32_t tdUpdateTFileHeader(STFile *pTFile);
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
void tdCloseTFile(STFile *pTFile);
void tdDestroyTFile(STFile *pTFile);
#endif
void
tdGetVndFileName
(
int32_t
vgId
,
const
char
*
pdname
,
const
char
*
dname
,
const
char
*
fname
,
int64_t
version
,
char
*
outputName
);
void
tdGetVndDirName
(
int32_t
vgId
,
const
char
*
pdname
,
const
char
*
dname
,
bool
endWithSep
,
char
*
outputName
);
void
tdRSmaGetFileName
(
int32_t
vgId
,
const
char
*
pdname
,
const
char
*
dname
,
const
char
*
fname
,
int64_t
suid
,
int8_t
level
,
int64_t
version
,
char
*
outputName
);
void
tdRSmaGetDirName
(
int32_t
vgId
,
const
char
*
pdname
,
const
char
*
dname
,
bool
endWithSep
,
char
*
outputName
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
d3eb1a74
...
...
@@ -211,9 +211,6 @@ void smaCleanUp();
int32_t
smaOpen
(
SVnode
*
pVnode
,
int8_t
rollback
);
int32_t
smaClose
(
SSma
*
pSma
);
int32_t
smaBegin
(
SSma
*
pSma
);
int32_t
smaSyncPreCommit
(
SSma
*
pSma
);
int32_t
smaSyncCommit
(
SSma
*
pSma
);
int32_t
smaSyncPostCommit
(
SSma
*
pSma
);
int32_t
smaPrepareAsyncCommit
(
SSma
*
pSma
);
int32_t
smaCommit
(
SSma
*
pSma
,
SCommitInfo
*
pInfo
);
int32_t
smaFinishCommit
(
SSma
*
pSma
);
...
...
@@ -228,7 +225,6 @@ int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
int32_t
tdProcessRSmaDrop
(
SSma
*
pSma
,
SVDropStbReq
*
pReq
);
int32_t
tdFetchTbUidList
(
SSma
*
pSma
,
STbUidStore
**
ppStore
,
tb_uid_t
suid
,
tb_uid_t
uid
);
int32_t
tdUpdateTbUidList
(
SSma
*
pSma
,
STbUidStore
*
pUidStore
,
bool
isAdd
);
void
tdUidStoreDestory
(
STbUidStore
*
pStore
);
void
*
tdUidStoreFree
(
STbUidStore
*
pStore
);
// SMetaSnapReader ========================================
...
...
@@ -275,6 +271,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
// SRSmaSnapWriter ========================================
int32_t
rsmaSnapWriterOpen
(
SSma
*
pSma
,
int64_t
sver
,
int64_t
ever
,
SRSmaSnapWriter
**
ppWriter
);
int32_t
rsmaSnapWrite
(
SRSmaSnapWriter
*
pWriter
,
uint8_t
*
pData
,
uint32_t
nData
);
int32_t
rsmaSnapWriterPrepareClose
(
SRSmaSnapWriter
*
pWriter
);
int32_t
rsmaSnapWriterClose
(
SRSmaSnapWriter
**
ppWriter
,
int8_t
rollback
);
typedef
struct
{
...
...
@@ -418,6 +415,7 @@ enum {
struct
SSnapDataHdr
{
int8_t
type
;
int8_t
flag
;
int64_t
index
;
int64_t
size
;
uint8_t
data
[];
...
...
source/dnode/vnode/src/sma/smaCommit.c
浏览文件 @
d3eb1a74
...
...
@@ -17,42 +17,11 @@
extern
SSmaMgmt
smaMgmt
;
#if 0
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
#endif
static
int32_t
tdProcessRSmaAsyncPreCommitImpl
(
SSma
*
pSma
);
static
int32_t
tdProcessRSmaAsyncCommitImpl
(
SSma
*
pSma
,
SCommitInfo
*
pInfo
);
static
int32_t
tdProcessRSmaAsyncPostCommitImpl
(
SSma
*
pSma
);
static
int32_t
tdUpdateQTaskInfoFiles
(
SSma
*
pSma
,
SRSmaStat
*
pRSmaStat
);
#if 0
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaSyncPreCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
#endif
/**
* @brief async commit, only applicable to Rollup SMA
*
...
...
@@ -128,84 +97,24 @@ _exit:
int32_t
smaFinishCommit
(
SSma
*
pSma
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SVnode
*
pVnode
=
pSma
->
pVnode
;
code
=
tdRSmaFSFinishCommit
(
pSma
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
VND_RSMA1
(
pVnode
)
&&
(
code
=
tsdbFinishCommit
(
VND_RSMA1
(
pVnode
)))
<
0
)
{
smaError
(
"vgId:%d, failed to finish commit tsdb rsma1 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
code
));
goto
_exit
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
VND_RSMA2
(
pVnode
)
&&
(
code
=
tsdbFinishCommit
(
VND_RSMA2
(
pVnode
)))
<
0
)
{
smaError
(
"vgId:%d, failed to finish commit tsdb rsma2 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
code
));
goto
_exit
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
terrno
=
code
;
return
code
;
}
#if 0
/**
* @brief pre-commit for rollup sma(sync commit).
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
* 2) wait for all triggered fetch tasks to finish
* 3) perform persist task for qTaskInfo
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
// step 1: set rsma stat paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
// step 2: wait for all triggered fetch tasks to finish
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
break;
} else {
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
// step 3: perform persist task for qTaskInfo
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
}
/**
* @brief commit for rollup sma
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
#if 0
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
if
(
code
)
{
smaError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
#endif
return TSDB_CODE_SUCCESS;
return
code
;
}
#endif
// SQTaskFile ======================================================
...
...
@@ -218,6 +127,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
* @return int32_t
*/
static
int32_t
tdUpdateQTaskInfoFiles
(
SSma
*
pSma
,
SRSmaStat
*
pStat
)
{
#if 0
SVnode *pVnode = pSma->pVnode;
SRSmaFS *pFS = RSMA_FS(pStat);
int64_t committed = pStat->commitAppliedVer;
...
...
@@ -264,31 +174,10 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
#endif
return
TSDB_CODE_SUCCESS
;
}
#if 0
/**
* @brief post-commit for rollup sma
* 1) clean up the outdated qtaskinfo files
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
SVnode *pVnode = pSma->pVnode;
if (!VND_IS_RSMA(pVnode)) {
return TSDB_CODE_SUCCESS;
}
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
return TSDB_CODE_SUCCESS;
}
#endif
/**
* @brief Rsma async commit implementation(only do some necessary light weighted task)
* 1) set rsma stat TASK_TRIGGER_STAT_PAUSED
...
...
@@ -298,9 +187,11 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
* @return int32_t
*/
static
int32_t
tdProcessRSmaAsyncPreCommitImpl
(
SSma
*
pSma
)
{
int32_t
code
=
0
;
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
if
(
!
pEnv
)
{
return
TSDB_CODE_SUCCESS
;
return
code
;
}
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
...
...
@@ -317,7 +208,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
}
pRSmaStat
->
commitAppliedVer
=
pSma
->
pVnode
->
state
.
applied
;
ASSERT
(
pRSmaStat
->
commitAppliedVer
>
0
);
//
ASSERT(pRSmaStat->commitAppliedVer > 0);
// step 2: wait for all triggered fetch tasks to finish
nLoops
=
0
;
...
...
@@ -351,8 +242,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
}
smaInfo
(
"vgId:%d, rsma commit, all items are consumed, TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
());
if
(
tdRSmaPersistExecImpl
(
pRSmaStat
,
RSMA_INFO_HASH
(
pRSmaStat
))
<
0
)
{
return
TSDB_CODE_FAILED
;
if
(
(
code
=
tdRSmaPersistExecImpl
(
pRSmaStat
,
RSMA_INFO_HASH
(
pRSmaStat
)))
!=
0
)
{
return
code
;
}
smaInfo
(
"vgId:%d, rsma commit, operator state committed, TID:%p"
,
SMA_VID
(
pSma
),
(
void
*
)
taosGetSelfPthreadId
());
...
...
@@ -383,7 +274,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
if
((
pTsdb
=
VND_RSMA1
(
pSma
->
pVnode
)))
tsdbPrepareCommit
(
pTsdb
);
if
((
pTsdb
=
VND_RSMA2
(
pSma
->
pVnode
)))
tsdbPrepareCommit
(
pTsdb
);
return
TSDB_CODE_SUCCESS
;
return
code
;
}
/**
...
...
@@ -394,26 +285,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
*/
static
int32_t
tdProcessRSmaAsyncCommitImpl
(
SSma
*
pSma
,
SCommitInfo
*
pInfo
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SVnode
*
pVnode
=
pSma
->
pVnode
;
#if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
// perform persist task for qTaskInfo operator
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
#endif
code
=
tdRSmaFSCommit
(
pSma
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbCommit
(
VND_RSMA1
(
pVnode
),
pInfo
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbCommit
(
VND_RSMA2
(
pVnode
),
pInfo
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
((
code
=
tsdbCommit
(
VND_RSMA1
(
pVnode
),
pInfo
))
<
0
)
{
smaError
(
"vgId:%d, failed to commit tsdb rsma1 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
code
));
goto
_exit
;
}
if
((
code
=
tsdbCommit
(
VND_RSMA2
(
pVnode
),
pInfo
))
<
0
)
{
smaError
(
"vgId:%d, failed to commit tsdb rsma2 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
code
));
goto
_exit
;
}
_exit:
terrno
=
code
;
if
(
code
)
{
smaError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
d3eb1a74
...
...
@@ -243,10 +243,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
return
TSDB_CODE_FAILED
;
}
if
(
!
(
RSMA_FS
(
pRSmaStat
)
->
aQTaskInf
=
taosArrayInit
(
1
,
sizeof
(
SQTaskFile
))))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
taosInitRWLatch
(
RSMA_FS_LOCK
(
pRSmaStat
));
}
else
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
// TODO
}
else
{
...
...
source/dnode/vnode/src/sma/smaFS.c
浏览文件 @
d3eb1a74
此差异已折叠。
点击以展开。
source/dnode/vnode/src/sma/smaOpen.c
浏览文件 @
d3eb1a74
...
...
@@ -121,8 +121,6 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
int32_t
smaOpen
(
SVnode
*
pVnode
,
int8_t
rollback
)
{
STsdbCfg
*
pCfg
=
&
pVnode
->
config
.
tsdbCfg
;
ASSERT
(
!
pVnode
->
pSma
);
SSma
*
pSma
=
taosMemoryCalloc
(
1
,
sizeof
(
SSma
));
if
(
!
pSma
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -137,7 +135,7 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
if
(
VND_IS_RSMA
(
pVnode
))
{
STsdbKeepCfg
keepCfg
=
{
0
};
for
(
int
i
=
0
;
i
<
TSDB_RETENTION_MAX
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_RETENTION_MAX
;
++
i
)
{
if
(
i
==
TSDB_RETENTION_L0
)
{
SMA_OPEN_RSMA_IMPL
(
pVnode
,
0
);
}
else
if
(
i
==
TSDB_RETENTION_L1
)
{
...
...
@@ -145,12 +143,14 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
}
else
if
(
i
==
TSDB_RETENTION_L2
)
{
SMA_OPEN_RSMA_IMPL
(
pVnode
,
2
);
}
else
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_APP_ERROR
;
smaError
(
"vgId:%d, sma open failed since %s, level:%d"
,
TD_VID
(
pVnode
),
terrstr
(),
i
);
goto
_err
;
}
}
// restore the rsma
if
(
tdRSmaRestore
(
pSma
,
RSMA_RESTORE_REBOOT
,
pVnode
->
state
.
committed
)
<
0
)
{
if
(
tdRSmaRestore
(
pSma
,
RSMA_RESTORE_REBOOT
,
pVnode
->
state
.
committed
,
rollback
)
<
0
)
{
goto
_err
;
}
}
...
...
@@ -181,8 +181,11 @@ int32_t smaClose(SSma *pSma) {
* @param committedVer
* @return int32_t
*/
int32_t
tdRSmaRestore
(
SSma
*
pSma
,
int8_t
type
,
int64_t
committedVer
)
{
ASSERT
(
VND_IS_RSMA
(
pSma
->
pVnode
));
int32_t
tdRSmaRestore
(
SSma
*
pSma
,
int8_t
type
,
int64_t
committedVer
,
int8_t
rollback
)
{
if
(
!
VND_IS_RSMA
(
pSma
->
pVnode
))
{
terrno
=
TSDB_CODE_RSMA_INVALID_ENV
;
return
TSDB_CODE_FAILED
;
}
return
tdRSmaProcessRestoreImpl
(
pSma
,
type
,
committedVer
);
return
tdRSmaProcessRestoreImpl
(
pSma
,
type
,
committedVer
,
rollback
);
}
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
d3eb1a74
...
...
@@ -21,17 +21,17 @@
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
#define RSMA_FETCH_INTERVAL (5000) // ms
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
SSmaMgmt
smaMgmt
=
{
.
inited
=
0
,
.
rsetId
=
-
1
,
};
#define TD_QTASKINFO_FNAME_PREFIX "qinf.v"
typedef
struct
SRSmaQTaskInfoItem
SRSmaQTaskInfoItem
;
typedef
struct
SRSmaQTaskInfoIter
SRSmaQTaskInfoIter
;
static
int32_t
tdUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
void
tdUidStoreDestory
(
STbUidStore
*
pStore
);
static
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
,
bool
isAdd
);
static
int32_t
tdSetRSmaInfoItemParams
(
SSma
*
pSma
,
SRSmaParam
*
param
,
SRSmaStat
*
pStat
,
SRSmaInfo
*
pRSmaInfo
,
int8_t
idx
);
...
...
@@ -57,38 +57,6 @@ struct SRSmaQTaskInfoItem {
void
*
qTaskInfo
;
};
struct
SRSmaQTaskInfoIter
{
STFile
*
pTFile
;
int64_t
offset
;
int64_t
fsize
;
int32_t
nBytes
;
int32_t
nAlloc
;
char
*
pBuf
;
// ------------
char
*
qBuf
;
// for iterator
int32_t
nBufPos
;
};
void
tdRSmaQTaskInfoGetFileName
(
int32_t
vgId
,
int64_t
version
,
char
*
outputName
)
{
tdGetVndFileName
(
vgId
,
NULL
,
VNODE_RSMA_DIR
,
TD_QTASKINFO_FNAME_PREFIX
,
version
,
outputName
);
}
void
tdRSmaQTaskInfoGetFullName
(
int32_t
vgId
,
int64_t
version
,
const
char
*
path
,
char
*
outputName
)
{
tdGetVndFileName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
TD_QTASKINFO_FNAME_PREFIX
,
version
,
outputName
);
}
void
tdRSmaQTaskInfoGetFullPath
(
int32_t
vgId
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
)
{
tdGetVndDirName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
true
,
outputName
);
int32_t
rsmaLen
=
strlen
(
outputName
);
snprintf
(
outputName
+
rsmaLen
,
TSDB_FILENAME_LEN
-
rsmaLen
,
"%"
PRIi8
,
level
);
}
void
tdRSmaQTaskInfoGetFullPathEx
(
int32_t
vgId
,
tb_uid_t
suid
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
)
{
tdGetVndDirName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
true
,
outputName
);
int32_t
rsmaLen
=
strlen
(
outputName
);
snprintf
(
outputName
+
rsmaLen
,
TSDB_FILENAME_LEN
-
rsmaLen
,
"%"
PRIi64
"%s%"
PRIi8
,
suid
,
TD_DIRSEP
,
level
);
}
static
void
tdRSmaQTaskInfoFree
(
qTaskInfo_t
*
taskHandle
,
int32_t
vgId
,
int32_t
level
)
{
// Note: free/kill may in RC
if
(
!
taskHandle
||
!
(
*
taskHandle
))
return
;
...
...
@@ -363,10 +331,12 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return
TSDB_CODE_SUCCESS
;
}
#if 0
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
#endif
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
...
...
@@ -374,13 +344,8 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
pRSmaInfo
)
{
// TODO: free original pRSmaInfo if exists abnormally
tdFreeRSmaInfo
(
pSma
,
*
(
SRSmaInfo
**
)
pRSmaInfo
,
true
);
if
(
taosHashRemove
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
))
<
0
)
{
terrno
=
TSDB_CODE_RSMA_REMOVE_EXISTS
;
goto
_err
;
}
smaWarn
(
"vgId:%d, remove the rsma info already exists for table %s, %"
PRIi64
,
SMA_VID
(
pSma
),
tbName
,
suid
);
smaInfo
(
"vgId:%d, rsma info already exists for table %s, %"
PRIi64
,
SMA_VID
(
pSma
),
tbName
,
suid
);
return
TSDB_CODE_SUCCESS
;
}
// from write queue: single thead
...
...
@@ -449,8 +414,8 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
}
if
(
!
VND_IS_RSMA
(
pVnode
))
{
sma
Trace
(
"vgId:%d, not create rsma for stable %s %"
PRIi64
" since vnd is not rsma"
,
TD_VID
(
pVnode
),
pReq
->
name
,
pReq
->
suid
);
sma
Warn
(
"vgId:%d, not create rsma for stable %s %"
PRIi64
" since vnd is not rsma"
,
TD_VID
(
pVnode
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -494,9 +459,8 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
// save to file
// TODO
smaDebug
(
"vgId:%d, drop rsma for table %"
PRIi64
" succeed"
,
TD_VID
(
pVnode
),
pReq
->
suid
);
// no need to save to file as triggered by dropping stable
smaDebug
(
"vgId:%d, drop rsma for stable %"
PRIi64
" succeed"
,
TD_VID
(
pVnode
),
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -561,7 +525,7 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid)
return
TSDB_CODE_SUCCESS
;
}
void
tdUidStoreDestory
(
STbUidStore
*
pStore
)
{
static
void
tdUidStoreDestory
(
STbUidStore
*
pStore
)
{
if
(
pStore
)
{
if
(
pStore
->
uidHash
)
{
if
(
pStore
->
tbUids
)
{
...
...
@@ -602,7 +566,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
}
SSubmitReq
*
pSubmitReq
=
(
SSubmitReq
*
)
pReq
;
// TODO: spin lock for race condition
d
// TODO: spin lock for race condition
if
(
tsdbInsertData
(
pTsdb
,
version
,
pSubmitReq
,
NULL
)
<
0
)
{
return
TSDB_CODE_FAILED
;
}
...
...
@@ -708,8 +672,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
#endif
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pResList
);
++
i
)
{
SSDataBlock
*
output
=
taosArrayGetP
(
pResList
,
i
);
smaDebug
(
"result block, uid:%"
PRIu64
", groupid:%"
PRIu64
", rows:%d"
,
output
->
info
.
id
.
uid
,
output
->
info
.
id
.
groupId
,
output
->
info
.
rows
);
smaDebug
(
"result block, uid:%"
PRIu64
", groupid:%"
PRIu64
", rows:%d"
,
output
->
info
.
id
.
uid
,
output
->
info
.
id
.
groupId
,
output
->
info
.
rows
);
STsdb
*
sinkTsdb
=
(
pItem
->
level
==
TSDB_RETENTION_L1
?
pSma
->
pRSmaTsdb
[
0
]
:
pSma
->
pRSmaTsdb
[
1
]);
SSubmitReq
*
pReq
=
NULL
;
...
...
@@ -840,12 +804,13 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
return
TSDB_CODE_SUCCESS
;
}
if
(
!
pInfo
->
pTSchema
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
smaWarn
(
"vgId:%d, no schema to execute rsma %"
PRIi8
" task for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
pInfo
->
suid
);
return
TSDB_CODE_FAILED
;
}
smaDebug
(
"vgId:%d, execute rsma %"
PRIi8
" task for qTaskInfo:%p suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
RSMA_INFO_QTASK
(
pInfo
,
idx
),
pInfo
->
suid
);
smaDebug
(
"vgId:%d, execute rsma %"
PRIi8
" task for qTaskInfo:%p suid:%"
PRIu64
" nMsg:%d"
,
SMA_VID
(
pSma
),
level
,
RSMA_INFO_QTASK
(
pInfo
,
idx
),
pInfo
->
suid
,
msgSize
);
#if 0
for (int32_t i = 0; i < msgSize; ++i) {
...
...
@@ -854,7 +819,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
tdRsmaPrintSubmitReq(pSma, pReq);
}
#endif
if
(
qSetSMAInput
(
qTaskInfo
,
pMsg
,
msgSize
,
inputType
)
<
0
)
{
if
(
(
terrno
=
qSetSMAInput
(
qTaskInfo
,
pMsg
,
msgSize
,
inputType
)
)
<
0
)
{
smaError
(
"vgId:%d, rsma %"
PRIi8
" qSetStreamInput failed since %s"
,
SMA_VID
(
pSma
),
level
,
tstrerror
(
terrno
));
return
TSDB_CODE_FAILED
;
}
...
...
@@ -871,6 +836,12 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
char
*
pOutput
=
NULL
;
int32_t
len
=
0
;
if
(
!
srcTaskInfo
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
smaWarn
(
"vgId:%d, rsma clone, table %"
PRIi64
", no need since srcTaskInfo is NULL"
,
TD_VID
(
pVnode
),
suid
);
return
TSDB_CODE_FAILED
;
}
if
((
terrno
=
qSerializeTaskStatus
(
srcTaskInfo
,
&
pOutput
,
&
len
))
<
0
)
{
smaError
(
"vgId:%d, rsma clone, table %"
PRIi64
" serialize qTaskInfo failed since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
...
...
@@ -1051,12 +1022,8 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
// only applicable when rsma env exists
return
TSDB_CODE_SUCCESS
;
}
STbUidStore
uidStore
=
{
0
};
SRetention
*
pRetention
=
SMA_RETENTION
(
pSma
);
if
(
!
RETENTION_VALID
(
pRetention
+
1
))
{
// return directly if retention level 1 is invalid
return
TSDB_CODE_SUCCESS
;
}
if
(
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
if
(
tdFetchSubmitReqSuids
(
pMsg
,
&
uidStore
)
<
0
)
{
...
...
@@ -1186,18 +1153,21 @@ _err:
}
/**
* @brief reload ts data from checkpoint
*
* @param pSma
* @return int32_t
* N.B. the data would be restored from the unified WAL replay procedure
*/
static
int32_t
tdRSmaRestoreTSDataReload
(
SSma
*
pSma
)
{
// NOTHING TODO: the data would be restored from the unified WAL replay procedure
return
TSDB_CODE_SUCCESS
;
}
int32_t
tdRSmaProcessRestoreImpl
(
SSma
*
pSma
,
int8_t
type
,
int64_t
qtaskFileVer
,
int8_t
rollback
)
{
// step 1: init env
if
(
tdCheckAndInitSmaEnv
(
pSma
,
TSDB_SMA_TYPE_ROLLUP
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
int32_t
tdRSmaProcessRestoreImpl
(
SSma
*
pSma
,
int8_t
type
,
int64_t
qtaskFileVer
)
{
// step 1: iterate all stables to restore the rsma env
// step 2: open SRSmaFS for qTaskFiles
if
(
tdRSmaFSOpen
(
pSma
,
qtaskFileVer
,
rollback
)
<
0
)
{
goto
_err
;
}
// step 3: iterate all stables to restore the rsma env
int64_t
nTables
=
0
;
if
(
tdRSmaRestoreQTaskInfoInit
(
pSma
,
&
nTables
)
<
0
)
{
goto
_err
;
...
...
@@ -1207,16 +1177,6 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
return
TSDB_CODE_SUCCESS
;
}
// step 2: reload ts data from checkpoint
if
(
tdRSmaRestoreTSDataReload
(
pSma
)
<
0
)
{
goto
_err
;
}
// step 3: open SRSmaFS for qTaskFiles
if
(
tdRSmaFSOpen
(
pSma
,
qtaskFileVer
)
<
0
)
{
goto
_err
;
}
smaInfo
(
"vgId:%d, restore rsma task %"
PRIi8
" from qtaskf %"
PRIi64
" succeed"
,
SMA_VID
(
pSma
),
type
,
qtaskFileVer
);
return
TSDB_CODE_SUCCESS
;
_err:
...
...
@@ -1226,19 +1186,26 @@ _err:
}
int32_t
tdRSmaPersistExecImpl
(
SRSmaStat
*
pRSmaStat
,
SHashObj
*
pInfoHash
)
{
SSma
*
pSma
=
pRSmaStat
->
pSma
;
SVnode
*
pVnode
=
pSma
->
pVnode
;
int32_t
vid
=
SMA_VID
(
pSma
);
int32_t
code
=
0
;
int32_t
lino
=
0
;
SSma
*
pSma
=
pRSmaStat
->
pSma
;
SVnode
*
pVnode
=
pSma
->
pVnode
;
SArray
*
qTaskFArray
=
NULL
;
int64_t
version
=
pRSmaStat
->
commitAppliedVer
;
TdFilePtr
pOutFD
=
NULL
;
TdFilePtr
pInFD
=
NULL
;
char
fname
[
TSDB_FILENAME_LEN
];
char
fnameVer
[
TSDB_FILENAME_LEN
];
SRSmaFS
fs
=
{
0
};
if
(
taosHashGetSize
(
pInfoHash
)
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
int64_t
fsMaxVer
=
tdRSmaFSMaxVer
(
pSma
,
pRSmaStat
);
if
(
pRSmaStat
->
commitAppliedVer
<=
fsMaxVer
)
{
smaDebug
(
"vgId:%d, rsma persist, no need as applied %"
PRIi64
" not larger than fsMaxVer %"
PRIi64
,
vid
,
pRSmaStat
->
commitAppliedVer
,
fsMaxVer
);
return
TSDB_CODE_SUCCESS
;
qTaskFArray
=
taosArrayInit
(
taosHashGetSize
(
pInfoHash
)
<<
1
,
sizeof
(
SQTaskFile
));
if
(
!
qTaskFArray
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
void
*
infoHash
=
NULL
;
...
...
@@ -1253,19 +1220,80 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SRSmaInfoItem
*
pItem
=
RSMA_INFO_ITEM
(
pRSmaInfo
,
i
);
if
(
pItem
&&
pItem
->
pStreamState
)
{
if
(
streamStateCommit
(
pItem
->
pStreamState
)
<
0
)
{
terrno
=
TSDB_CODE_RSMA_STREAM_STATE_COMMIT
;
goto
_err
;
code
=
TSDB_CODE_RSMA_STREAM_STATE_COMMIT
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
smaDebug
(
"vgId:%d, rsma persist, stream state commit success, table %"
PRIi64
", level %d"
,
TD_VID
(
pVnode
),
pRSmaInfo
->
suid
,
i
+
1
);
// qTaskInfo file
tdRSmaQTaskInfoGetFullName
(
TD_VID
(
pVnode
),
pRSmaInfo
->
suid
,
i
+
1
,
-
1
,
tfsGetPrimaryPath
(
pVnode
->
pTfs
),
fname
);
tdRSmaQTaskInfoGetFullName
(
TD_VID
(
pVnode
),
pRSmaInfo
->
suid
,
i
+
1
,
version
,
tfsGetPrimaryPath
(
pVnode
->
pTfs
),
fnameVer
);
if
(
taosCheckExistFile
(
fnameVer
))
{
smaWarn
(
"vgId:%d, rsma persist, duplicate file %s exist"
,
TD_VID
(
pVnode
),
fnameVer
);
}
pOutFD
=
taosCreateFile
(
fnameVer
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
pInFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
int64_t
size
=
0
;
uint32_t
mtime
=
0
;
if
(
taosFStatFile
(
pInFD
,
&
size
,
&
mtime
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
ASSERT
(
size
>
0
);
int64_t
offset
=
0
;
if
(
taosFSendFile
(
pOutFD
,
pInFD
,
&
offset
,
size
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
smaError
(
"vgId:%d, rsma persist, send qtaskinfo file %s to %s failed since %s"
,
TD_VID
(
pVnode
),
fname
,
fnameVer
,
tstrerror
(
code
));
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
smaDebug
(
"vgId:%d, rsma persist, stream state commit success, table %"
PRIi64
" level %d"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
pInFD
);
SQTaskFile
qTaskF
=
{
.
nRef
=
1
,
.
level
=
i
+
1
,
.
suid
=
pRSmaInfo
->
suid
,
.
version
=
version
,
.
size
=
size
,
.
mtime
=
mtime
};
taosArrayPush
(
qTaskFArray
,
&
qTaskF
);
}
}
}
return
TSDB_CODE_SUCCESS
;
_err:
smaError
(
"vgId:%d, rsma persist failed since %s"
,
vid
,
terrstr
());
return
TSDB_CODE_FAILED
;
// prepare
code
=
tdRSmaFSCopy
(
pSma
,
&
fs
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tdRSmaFSUpsertQTaskFile
(
pSma
,
&
fs
,
qTaskFArray
->
pData
,
taosArrayGetSize
(
qTaskFArray
));
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tdRSmaFSPrepareCommit
(
pSma
,
&
fs
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
taosArrayDestroy
(
fs
.
aQTaskInf
);
taosArrayDestroy
(
qTaskFArray
);
if
(
code
)
{
if
(
pOutFD
)
taosCloseFile
(
&
pOutFD
);
if
(
pInFD
)
taosCloseFile
(
&
pInFD
);
smaError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
terrno
=
code
;
return
code
;
}
/**
...
...
@@ -1352,12 +1380,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
tsem_post
(
&
(
pStat
->
notEmpty
));
}
}
break
;
case
TASK_TRIGGER_STAT_PAUSED
:
{
smaDebug
(
"vgId:%d, rsma fetch task not start for level:%"
PRIi8
" suid:%"
PRIi64
" since stat is paused"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
case
TASK_TRIGGER_STAT_INACTIVE
:
{
smaDebug
(
"vgId:%d, rsma fetch task not start for level:%"
PRIi8
" suid:%"
PRIi64
" since stat is inactive"
,
smaDebug
(
"vgId:%d, rsma fetch task not start for level:%"
PRIi8
" suid:%"
PRIi64
" since stat is inactive
"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
case
TASK_TRIGGER_STAT_INIT
:
{
...
...
@@ -1365,8 +1389,9 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
default:
{
smaDebug
(
"vgId:%d, rsma fetch task not start for level:%"
PRIi8
" suid:%"
PRIi64
" since stat is unknown"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
smaDebug
(
"vgId:%d, rsma fetch task not start for level:%"
PRIi8
" suid:%"
PRIi64
" since stat:%"
PRIi8
" is unknown"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
,
fetchTriggerStat
);
}
break
;
}
...
...
@@ -1448,7 +1473,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
taosGetQitem
(
qall
,
(
void
**
)
&
msg
);
if
(
msg
)
{
if
(
!
taosArrayPush
(
pSubmitArr
,
&
msg
))
{
t
dFreeRSmaSubmitItems
(
pSubmitArr
)
;
t
errno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
else
{
...
...
@@ -1460,7 +1485,6 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
if
(
size
>
0
)
{
for
(
int32_t
i
=
1
;
i
<=
TSDB_RETENTION_L2
;
++
i
)
{
if
(
tdExecuteRSmaImpl
(
pSma
,
pSubmitArr
->
pData
,
size
,
STREAM_INPUT__MERGED_SUBMIT
,
pInfo
,
type
,
i
)
<
0
)
{
tdFreeRSmaSubmitItems
(
pSubmitArr
);
goto
_err
;
}
}
...
...
@@ -1468,6 +1492,9 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
}
return
TSDB_CODE_SUCCESS
;
_err:
smaError
(
"vgId:%d, batch exec for suid:%"
PRIi64
" execType:%d size:%d failed since %s"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
type
,
(
int32_t
)
taosArrayGetSize
(
pSubmitArr
),
terrstr
());
tdFreeRSmaSubmitItems
(
pSubmitArr
);
while
(
1
)
{
void
*
msg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
msg
);
...
...
@@ -1514,8 +1541,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
while
((
pIter
=
taosHashIterate
(
infoHash
,
pIter
)))
{
SRSmaInfo
*
pInfo
=
*
(
SRSmaInfo
**
)
pIter
;
if
(
atomic_val_compare_exchange_8
(
&
pInfo
->
assigned
,
0
,
1
)
==
0
)
{
if
((
taosQueueItemSize
(
pInfo
->
queue
)
>
0
)
||
RSMA_INFO_ITEM
(
pInfo
,
0
)
->
fetchLevel
||
RSMA_INFO_ITEM
(
pInfo
,
1
)
->
fetchLevel
)
{
if
((
taosQueueItemSize
(
pInfo
->
queue
)
>
0
)
||
RSMA_NEED_FETCH
(
pInfo
))
{
int32_t
batchCnt
=
-
1
;
int32_t
batchMax
=
taosHashGetSize
(
infoHash
)
/
tsNumOfVnodeRsmaThreads
;
bool
occupied
=
(
batchMax
<=
1
);
...
...
@@ -1531,13 +1557,20 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
smaDebug
(
"vgId:%d, batchSize:%d, execType:%"
PRIi32
,
SMA_VID
(
pSma
),
qallItemSize
,
type
);
}
if
(
RSMA_
INFO_ITEM
(
pInfo
,
0
)
->
fetchLevel
||
RSMA_INFO_ITEM
(
pInfo
,
1
)
->
fetchLevel
)
{
if
(
RSMA_
NEED_FETCH
(
pInfo
)
)
{
int8_t
oldStat
=
atomic_val_compare_exchange_8
(
RSMA_COMMIT_STAT
(
pRSmaStat
),
0
,
2
);
if
(
oldStat
==
0
||
((
oldStat
==
2
)
&&
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pRSmaStat
))
<
TASK_TRIGGER_STAT_PAUSED
))
{
int32_t
oldVal
=
atomic_fetch_add_32
(
&
pRSmaStat
->
nFetchAll
,
1
);
ASSERT
(
oldVal
>=
0
);
tdRSmaFetchAllResult
(
pSma
,
pInfo
);
int8_t
curStat
=
atomic_load_8
(
RSMA_COMMIT_STAT
(
pRSmaStat
));
if
(
curStat
==
1
)
{
smaDebug
(
"vgId:%d, fetch all not exec as commit stat is %"
PRIi8
,
SMA_VID
(
pSma
),
curStat
);
}
else
{
tdRSmaFetchAllResult
(
pSma
,
pInfo
);
}
if
(
0
==
atomic_sub_fetch_32
(
&
pRSmaStat
->
nFetchAll
,
1
))
{
atomic_store_8
(
RSMA_COMMIT_STAT
(
pRSmaStat
),
0
);
}
...
...
@@ -1547,17 +1580,9 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if
(
qallItemSize
>
0
)
{
atomic_fetch_sub_64
(
&
pRSmaStat
->
nBufItems
,
qallItemSize
);
continue
;
}
else
if
(
RSMA_INFO_ITEM
(
pInfo
,
0
)
->
fetchLevel
||
RSMA_INFO_ITEM
(
pInfo
,
1
)
->
fetchLevel
)
{
if
(
atomic_load_8
(
RSMA_COMMIT_STAT
(
pRSmaStat
))
==
0
)
{
continue
;
}
for
(
int32_t
j
=
0
;
j
<
TSDB_RETENTION_L2
;
++
j
)
{
SRSmaInfoItem
*
pItem
=
RSMA_INFO_ITEM
(
pInfo
,
j
);
if
(
pItem
->
fetchLevel
)
{
pItem
->
fetchLevel
=
0
;
taosTmrReset
(
tdRSmaFetchTrigger
,
RSMA_FETCH_INTERVAL
,
pItem
,
smaMgmt
.
tmrHandle
,
&
pItem
->
tmrId
);
}
}
}
if
(
RSMA_NEED_FETCH
(
pInfo
))
{
continue
;
}
break
;
...
...
source/dnode/vnode/src/sma/smaSnapshot.c
浏览文件 @
d3eb1a74
此差异已折叠。
点击以展开。
source/dnode/vnode/src/sma/smaTimeRange.c
浏览文件 @
d3eb1a74
...
...
@@ -111,35 +111,48 @@ _err:
* @return int32_t
*/
static
int32_t
tdProcessTSmaCreateImpl
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
pMsg
)
{
SSmaCfg
*
pCfg
=
(
SSmaCfg
*
)
pMsg
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
SSmaCfg
*
pCfg
=
(
SSmaCfg
*
)
pMsg
;
SName
stbFullName
=
{
0
};
SVCreateStbReq
pReq
=
{
0
};
if
(
TD_VID
(
pSma
->
pVnode
)
==
pCfg
->
dstVgId
)
{
// create tsma meta in dstVgId
if
(
metaCreateTSma
(
SMA_META
(
pSma
),
version
,
pCfg
)
<
0
)
{
return
-
1
;
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// create stable to save tsma result in dstVgId
SName
stbFullName
=
{
0
};
tNameFromString
(
&
stbFullName
,
pCfg
->
dstTbName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
SVCreateStbReq
pReq
=
{
0
};
pReq
.
name
=
(
char
*
)
tNameGetTableName
(
&
stbFullName
);
pReq
.
suid
=
pCfg
->
dstTbUid
;
pReq
.
schemaRow
=
pCfg
->
schemaRow
;
pReq
.
schemaTag
=
pCfg
->
schemaTag
;
if
(
metaCreateSTable
(
SMA_META
(
pSma
),
version
,
&
pReq
)
<
0
)
{
return
-
1
;
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
else
{
code
=
terrno
=
TSDB_CODE_TSMA_INVALID_STAT
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
smaError
(
"vgId:%d, failed at line %d to create sma index %s %"
PRIi64
" on stb:%"
PRIi64
", dstSuid:%"
PRIi64
" dstTb:%s dstVg:%d"
,
SMA_VID
(
pSma
),
lino
,
pCfg
->
indexName
,
pCfg
->
indexUid
,
pCfg
->
tableUid
,
pCfg
->
dstTbUid
,
pReq
.
name
,
pCfg
->
dstVgId
);
}
else
{
smaDebug
(
"vgId:%d, success to create sma index %s %"
PRIi64
" on stb:%"
PRIi64
", dstSuid:%"
PRIi64
" dstTb:%s dstVg:%d"
,
SMA_VID
(
pSma
),
pCfg
->
indexName
,
pCfg
->
indexUid
,
pCfg
->
tableUid
,
pCfg
->
dstTbUid
,
pReq
.
name
,
pCfg
->
dstVgId
);
}
else
{
ASSERT
(
0
);
}
return
0
;
return
code
;
}
/**
...
...
@@ -174,7 +187,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
STSmaStat
*
pTsmaStat
=
NULL
;
if
(
!
pEnv
||
!
(
pStat
=
SMA_ENV_STAT
(
pEnv
)))
{
terrno
=
TSDB_CODE_TSMA_INVALID_
STAT
;
terrno
=
TSDB_CODE_TSMA_INVALID_
ENV
;
return
TSDB_CODE_FAILED
;
}
...
...
@@ -216,9 +229,14 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
indexUid
,
tstrerror
(
terrno
));
goto
_err
;
}
#if 0
ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14));
if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid,
pTsmaStat->pTSma->indexUid, tstrerror(terrno), pTsmaStat->pTSma->dstTbName);
goto _err;
}
#endif
SRpcMsg
submitReqMsg
=
{
...
...
source/dnode/vnode/src/sma/smaUtil.c
浏览文件 @
d3eb1a74
...
...
@@ -15,197 +15,72 @@
#include "sma.h"
// smaFileUtil ================
#if 0
#define TD_FILE_STATE_OK 0
#define TD_FILE_STATE_BAD 1
#define TD_QTASKINFO_FNAME_PREFIX "main.tdb"
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo);
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo);
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedI64(buf, pInfo->fsize);
return tlen;
}
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
return buf;
}
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t loffset = taosLSeekFile(TD_TFILE_PFILE(pTFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
}
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) {
ASSERT(TD_TFILE_OPENED(pTFile));
return taosFStatFile(pTFile->pFile, size, NULL);
}
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
int32_t tdUpdateTFileHeader(STFile *pTFile) {
char buf[TD_FILE_HEAD_SIZE] = "\0";
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1;
}
void *ptr = buf;
tdEncodeTFInfo(&ptr, &(pTFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_FILE_HEAD_SIZE);
if (tdWriteTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
return -1;
}
return 0;
}
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) {
char buf[TD_FILE_HEAD_SIZE] = "\0";
uint32_t _version;
ASSERT(TD_TFILE_OPENED(pTFile));
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1;
}
if (tdReadTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_FILE_HEAD_SIZE)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
void *pBuf = buf;
pBuf = tdDecodeTFInfo(pBuf, pInfo);
return 0;
}
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) {
pTFile->info.magic = taosCalcChecksum(pTFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
void
tdRSmaQTaskInfoGetFileName
(
int32_t
vgId
,
int64_t
suid
,
int8_t
level
,
int64_t
version
,
char
*
outputName
)
{
tdRSmaGetFileName
(
vgId
,
NULL
,
VNODE_RSMA_DIR
,
TD_QTASKINFO_FNAME_PREFIX
,
suid
,
level
,
version
,
outputName
);
}
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t toffset;
if ((toffset = tdSeekTFile(pTFile, 0, SEEK_END)) < 0) {
return -1;
}
#if 0
smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile),
toffset, nbyte, toffset + nbyte);
#endif
ASSERT(pTFile->info.fsize == toffset);
if (offset) {
*offset = toffset;
}
if (tdWriteTFile(pTFile, buf, nbyte) < 0) {
return -1;
}
pTFile->info.fsize += nbyte;
return nbyte;
void
tdRSmaQTaskInfoGetFullName
(
int32_t
vgId
,
int64_t
suid
,
int8_t
level
,
int64_t
version
,
const
char
*
path
,
char
*
outputName
)
{
tdRSmaGetFileName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
TD_QTASKINFO_FNAME_PREFIX
,
suid
,
level
,
version
,
outputName
);
}
int32_t tdOpenTFile(STFile *pTFile, int flags) {
ASSERT(!TD_TFILE_OPENED(pTFile));
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), flags);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
void
tdRSmaQTaskInfoGetFullPath
(
int32_t
vgId
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
)
{
tdRSmaGetDirName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
true
,
outputName
);
int32_t
rsmaLen
=
strlen
(
outputName
);
snprintf
(
outputName
+
rsmaLen
,
TSDB_FILENAME_LEN
-
rsmaLen
,
"%"
PRIi8
,
level
);
}
void tdCloseTFile(STFile *pTFile) {
if (TD_TFILE_OPENED(pTFile)) {
taosCloseFile(&pTFile->pFile);
TD_TFILE_SET_CLOSED(pTFile);
}
void
tdRSmaQTaskInfoGetFullPathEx
(
int32_t
vgId
,
tb_uid_t
suid
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
)
{
tdRSmaGetDirName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
true
,
outputName
);
int32_t
rsmaLen
=
strlen
(
outputName
);
snprintf
(
outputName
+
rsmaLen
,
TSDB_FILENAME_LEN
-
rsmaLen
,
"%"
PRIi8
"%s%"
PRIi64
,
level
,
TD_DIRSEP
,
suid
);
}
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
#endif
void
tdGetVndFileName
(
int32_t
vgId
,
const
char
*
pdname
,
const
char
*
dname
,
const
char
*
fname
,
int64_t
version
,
char
*
outputName
)
{
if
(
version
<
0
)
{
if
(
pdname
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"%s%svnode%svnode%d%s%s%sv%d%s"
,
pdname
,
TD_DIRSEP
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
vgId
,
fname
);
void
tdRSmaGetFileName
(
int32_t
vgId
,
const
char
*
pdname
,
const
char
*
dname
,
const
char
*
fname
,
int64_t
suid
,
int8_t
level
,
int64_t
version
,
char
*
outputName
)
{
if
(
level
>=
0
&&
suid
>
0
)
{
if
(
version
>=
0
)
{
if
(
pdname
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"%s%svnode%svnode%d%s%s%s%"
PRIi8
"%s%"
PRIi64
"%s%s.%"
PRIi64
,
pdname
,
TD_DIRSEP
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
level
,
TD_DIRSEP
,
suid
,
TD_DIRSEP
,
fname
,
version
);
}
else
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d%s%s%s%"
PRIi8
"%s%"
PRIi64
"%s%s.%"
PRIi64
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
level
,
TD_DIRSEP
,
suid
,
TD_DIRSEP
,
fname
,
version
);
}
}
else
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d%s%s%sv%d%s"
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
vgId
,
fname
);
if
(
pdname
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"%s%svnode%svnode%d%s%s%s%"
PRIi8
"%s%"
PRIi64
"%s%s"
,
pdname
,
TD_DIRSEP
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
level
,
TD_DIRSEP
,
suid
,
TD_DIRSEP
,
fname
);
}
else
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d%s%s%s%"
PRIi8
"%s%"
PRIi64
"%s%s"
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
level
,
TD_DIRSEP
,
suid
,
TD_DIRSEP
,
fname
);
}
}
}
else
{
if
(
pdname
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"%s%svnode%svnode%d%s%s%sv%d%s%"
PRIi64
,
pdname
,
TD_DIRSEP
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
vgId
,
fname
,
version
);
if
(
version
>=
0
)
{
if
(
pdname
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"%s%svnode%svnode%d%s%s%sv%d%s%"
PRIi64
,
pdname
,
TD_DIRSEP
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
vgId
,
fname
,
version
);
}
else
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d%s%s%sv%d%s%"
PRIi64
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
vgId
,
fname
,
version
);
}
}
else
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d%s%s%sv%d%s%"
PRIi64
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
vgId
,
fname
,
version
);
if
(
pdname
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"%s%svnode%svnode%d%s%s%sv%d%s"
,
pdname
,
TD_DIRSEP
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
vgId
,
fname
);
}
else
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d%s%s%sv%d%s"
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
dname
,
TD_DIRSEP
,
vgId
,
fname
);
}
}
}
}
void
td
GetVnd
DirName
(
int32_t
vgId
,
const
char
*
pdname
,
const
char
*
dname
,
bool
endWithSep
,
char
*
outputName
)
{
void
td
RSmaGet
DirName
(
int32_t
vgId
,
const
char
*
pdname
,
const
char
*
dname
,
bool
endWithSep
,
char
*
outputName
)
{
if
(
pdname
)
{
if
(
endWithSep
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"%s%svnode%svnode%d%s%s%s"
,
pdname
,
TD_DIRSEP
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
...
...
@@ -223,81 +98,13 @@ void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool e
}
}
#if 0
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
TD_TFILE_SET_CLOSED(pTFile);
memset(&(pTFile->info), 0, sizeof(pTFile->info));
pTFile->info.magic = TD_FILE_INIT_MAGIC;
char tmpName[TSDB_FILENAME_LEN * 2 + 32] = {0};
snprintf(tmpName, TSDB_FILENAME_LEN * 2 + 32, "%s%s%s", dname, TD_DIRSEP, fname);
int32_t tmpNameLen = strlen(tmpName) + 1;
pTFile->fname = taosMemoryMalloc(tmpNameLen);
if (!pTFile->fname) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tstrncpy(pTFile->fname, tmpName, tmpNameLen);
return 0;
}
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) {
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
if (errno == ENOENT) {
// Try to create directory recursively
char *s = strdup(TD_TFILE_FULL_NAME(pTFile));
if (taosMulMkDir(taosDirName(s)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(s);
return -1;
}
taosMemoryFree(s);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
}
if (!updateHeader) {
return 0;
}
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
pTFile->info.fver = 0;
if (tdUpdateTFileHeader(pTFile) < 0) {
tdCloseTFile(pTFile);
tdRemoveTFile(pTFile);
return -1;
}
return 0;
}
int32_t tdRemoveTFile(STFile *pTFile) {
if (taosRemoveFile(TD_TFILE_FULL_NAME(pTFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
};
return 0;
}
#endif
// smaXXXUtil ================
void
*
tdAcquireSmaRef
(
int32_t
rsetId
,
int64_t
refId
)
{
void
*
pResult
=
taosAcquireRef
(
rsetId
,
refId
);
if
(
!
pResult
)
{
smaWarn
(
"rsma acquire ref for rsetId:%d refId:%"
PRIi64
" failed since %s"
,
rsetId
,
refId
,
terrstr
());
}
else
{
sma
Debug
(
"rsma acquire ref for rsetId:%d refId:%"
PRIi64
" success"
,
rsetId
,
refId
);
sma
Trace
(
"rsma acquire ref for rsetId:%d refId:%"
PRIi64
" success"
,
rsetId
,
refId
);
}
return
pResult
;
}
...
...
@@ -307,7 +114,7 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
smaWarn
(
"rsma release ref for rsetId:%d refId:%"
PRIi64
" failed since %s"
,
rsetId
,
refId
,
terrstr
());
return
TSDB_CODE_FAILED
;
}
sma
Debug
(
"rsma release ref for rsetId:%d refId:%"
PRIi64
" success"
,
rsetId
,
refId
);
sma
Trace
(
"rsma release ref for rsetId:%d refId:%"
PRIi64
" success"
,
rsetId
,
refId
);
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
d3eb1a74
...
...
@@ -85,7 +85,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
uint64_t
ts
=
0
;
tqDebug
(
"vgId:%d, tmq task start to execute"
,
pTq
->
pVnode
->
config
.
vgId
);
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
tqError
(
"vgId:%d task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
tqError
(
"vgId:%d
,
task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
return
-
1
;
}
tqDebug
(
"vgId:%d, tmq task executed, get %p"
,
pTq
->
pVnode
->
config
.
vgId
,
pDataBlock
);
...
...
@@ -150,7 +150,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
uint64_t
ts
=
0
;
tqDebug
(
"tmqsnap task start to execute"
);
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
tqError
(
"vgId:%d task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
tqError
(
"vgId:%d
,
task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
return
-
1
;
}
tqDebug
(
"tmqsnap task execute end, get %p"
,
pDataBlock
);
...
...
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
浏览文件 @
d3eb1a74
...
...
@@ -1370,7 +1370,7 @@ _exit:
taosMemoryFree
(
pWriter
);
}
}
else
{
tsdbInfo
(
"vgId:%d %s done"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
);
tsdbInfo
(
"vgId:%d
,
%s done"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
);
*
ppWriter
=
pWriter
;
}
return
code
;
...
...
@@ -1391,7 +1391,7 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
__func__
,
tstrerror
(
code
));
tsdbError
(
"vgId:%d
,
%s failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
__func__
,
tstrerror
(
code
));
}
return
code
;
}
...
...
@@ -1442,7 +1442,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
(
pWriter
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
(
pWriter
->
aBuf
[
iBuf
]);
}
tsdbInfo
(
"vgId:%d %s done"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
__func__
);
tsdbInfo
(
"vgId:%d
,
%s done"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
__func__
);
taosMemoryFree
(
pWriter
);
*
ppWriter
=
NULL
;
return
code
;
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
d3eb1a74
...
...
@@ -184,16 +184,21 @@ _err:
return
-
1
;
}
static
void
vnodePrepareCommit
(
SVnode
*
pVnode
)
{
static
int32_t
vnodePrepareCommit
(
SVnode
*
pVnode
)
{
int32_t
code
=
0
;
tsem_wait
(
&
pVnode
->
canCommit
);
tsdbPrepareCommit
(
pVnode
->
pTsdb
);
metaPrepareAsyncCommit
(
pVnode
->
pMeta
);
smaPrepareAsyncCommit
(
pVnode
->
pSma
);
code
=
smaPrepareAsyncCommit
(
pVnode
->
pSma
);
if
(
code
)
goto
_exit
;
_exit:
vnodeBufPoolUnRef
(
pVnode
->
inUse
);
pVnode
->
inUse
=
NULL
;
return
code
;
}
static
int32_t
vnodeCommitTask
(
void
*
arg
)
{
int32_t
code
=
0
;
...
...
@@ -203,10 +208,9 @@ static int32_t vnodeCommitTask(void *arg) {
code
=
vnodeCommitImpl
(
pInfo
);
if
(
code
)
goto
_exit
;
_exit:
// end commit
tsem_post
(
&
pInfo
->
pVnode
->
canCommit
);
_exit:
taosMemoryFree
(
pInfo
);
return
code
;
}
...
...
@@ -214,7 +218,8 @@ int vnodeAsyncCommit(SVnode *pVnode) {
int32_t
code
=
0
;
// prepare to commit
vnodePrepareCommit
(
pVnode
);
code
=
vnodePrepareCommit
(
pVnode
);
if
(
code
)
goto
_exit
;
// schedule the task
pVnode
->
state
.
commitTerm
=
pVnode
->
state
.
applyTerm
;
...
...
@@ -230,14 +235,15 @@ int vnodeAsyncCommit(SVnode *pVnode) {
pInfo
->
info
.
state
.
commitID
=
pVnode
->
state
.
commitID
;
pInfo
->
pVnode
=
pVnode
;
pInfo
->
txn
=
metaGetTxn
(
pVnode
->
pMeta
);
vnodeScheduleTask
(
vnodeCommitTask
,
pInfo
);
code
=
vnodeScheduleTask
(
vnodeCommitTask
,
pInfo
);
_exit:
if
(
code
)
{
vError
(
"vgId:%d %s failed since %s, commit id:%"
PRId64
,
TD_VID
(
pVnode
),
__func__
,
tstrerror
(
code
),
tsem_post
(
&
pVnode
->
canCommit
);
vError
(
"vgId:%d, %s failed since %s, commit id:%"
PRId64
,
TD_VID
(
pVnode
),
__func__
,
tstrerror
(
code
),
pVnode
->
state
.
commitID
);
}
else
{
vDebug
(
"vgId:%d %s done"
,
TD_VID
(
pVnode
),
__func__
);
vDebug
(
"vgId:%d
,
%s done"
,
TD_VID
(
pVnode
),
__func__
);
}
return
code
;
}
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
d3eb1a74
...
...
@@ -15,13 +15,13 @@
#include "vnd.h"
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \
do { \
int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \
ASSERT(newVal >= 0); \
if (newVal < 0) { \
vWarn("vgId:%d %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \
} \
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags)
\
do {
\
int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal));
\
ASSERT(newVal >= 0);
\
if (newVal < 0) {
\
vWarn("vgId:%d
,
%s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \
}
\
} while (0)
int
vnodeQueryOpen
(
SVnode
*
pVnode
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
d3eb1a74
...
...
@@ -317,7 +317,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// commit if need
if
(
vnodeShouldCommit
(
pVnode
))
{
vInfo
(
"vgId:%d, commit at version %"
PRId64
,
TD_VID
(
pVnode
),
version
);
vnodeAsyncCommit
(
pVnode
);
if
(
vnodeAsyncCommit
(
pVnode
)
<
0
)
{
vError
(
"vgId:%d, failed to vnode async commit since %s."
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// start a new one
if
(
vnodeBegin
(
pVnode
)
<
0
)
{
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
d3eb1a74
...
...
@@ -64,7 +64,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
sTrace
(
"vgId:%d received append entries reply. srcId:0x%016"
PRIx64
", term:%"
PRId64
", matchIndex:%"
PRId64
""
,
sTrace
(
"vgId:%d
,
received append entries reply. srcId:0x%016"
PRIx64
", term:%"
PRId64
", matchIndex:%"
PRId64
""
,
pMsg
->
vgId
,
pMsg
->
srcId
.
addr
,
pMsg
->
term
,
pMsg
->
matchIndex
);
if
(
pMsg
->
success
)
{
...
...
source/libs/wal/src/walRead.c
浏览文件 @
d3eb1a74
...
...
@@ -339,7 +339,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t
contLen
;
bool
seeked
=
false
;
wDebug
(
"vgId:%d try to fetch ver %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
wDebug
(
"vgId:%d
,
try to fetch ver %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
", applied ver:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
vers
.
lastVer
,
pRead
->
pWal
->
vers
.
appliedVer
);
...
...
@@ -393,7 +393,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
,
const
SWalCkHead
*
pHead
)
{
int64_t
code
;
wDebug
(
"vgId:%d skip fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
wDebug
(
"vgId:%d
,
skip fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
", applied ver:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pHead
->
head
.
version
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
vers
.
lastVer
,
pRead
->
pWal
->
vers
.
appliedVer
);
...
...
@@ -414,7 +414,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont
*
pReadHead
=
&
((
*
ppHead
)
->
head
);
int64_t
ver
=
pReadHead
->
version
;
wDebug
(
"vgId:%d fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
wDebug
(
"vgId:%d
,
fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
", applied ver:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
vers
.
lastVer
,
pRead
->
pWal
->
vers
.
appliedVer
);
...
...
source/util/src/terror.c
浏览文件 @
d3eb1a74
...
...
@@ -585,6 +585,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_INVALID_ENV
,
"Invalid rsma env"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_INVALID_STAT
,
"Invalid rsma state"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_QTASKINFO_CREATE
,
"Rsma qtaskinfo creation error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_FS_COMMIT
,
"Rsma fs commit error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_REMOVE_EXISTS
,
"Rsma remove exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP
,
"Rsma fetch msg is messed up"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_EMPTY_INFO
,
"Rsma info is empty"
)
...
...
@@ -592,6 +593,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_REGEX_MATCH
,
"Rsma regex match"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_STREAM_STATE_OPEN
,
"Rsma stream state open"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_STREAM_STATE_COMMIT
,
"Rsma stream state commit"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_FS_REF
,
"Rsma fs ref error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_FS_SYNC
,
"Rsma fs sync error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_FS_UPDATE
,
"Rsma fs update error"
)
//index
TAOS_DEFINE_ERROR
(
TSDB_CODE_INDEX_REBUILDING
,
"Index is rebuilding"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录