Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4b96bc71
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4b96bc71
编写于
7月 19, 2023
作者:
M
Minglei Jin
提交者:
GitHub
7月 19, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into enh/tsdb_optimize
上级
185081a9
c3e5375f
变更
42
隐藏空白更改
内联
并排
Showing
42 changed file
with
547 addition
and
147 deletion
+547
-147
docs/en/28-releases/01-tdengine.md
docs/en/28-releases/01-tdengine.md
+4
-0
docs/zh/28-releases/01-tdengine.md
docs/zh/28-releases/01-tdengine.md
+4
-0
include/common/tglobal.h
include/common/tglobal.h
+1
-0
include/libs/tfs/tfs.h
include/libs/tfs/tfs.h
+27
-1
include/os/osSysinfo.h
include/os/osSysinfo.h
+1
-1
include/util/taoserror.h
include/util/taoserror.h
+1
-1
source/common/CMakeLists.txt
source/common/CMakeLists.txt
+4
-0
source/common/src/tglobal.c
source/common/src/tglobal.c
+8
-6
source/common/src/tmisce.c
source/common/src/tmisce.c
+1
-1
source/dnode/mgmt/exe/dmMain.c
source/dnode/mgmt/exe/dmMain.c
+5
-1
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+3
-0
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
+3
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+17
-8
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+64
-4
source/dnode/mgmt/node_mgmt/src/dmEnv.c
source/dnode/mgmt/node_mgmt/src/dmEnv.c
+56
-9
source/dnode/mnode/impl/src/mndCluster.c
source/dnode/mnode/impl/src/mndCluster.c
+0
-1
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+43
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+3
-0
source/dnode/mnode/impl/src/mndTelem.c
source/dnode/mnode/impl/src/mndTelem.c
+1
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+7
-5
source/dnode/vnode/src/inc/metaTtl.h
source/dnode/vnode/src/inc/metaTtl.h
+4
-1
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+1
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-0
source/dnode/vnode/src/meta/metaOpen.c
source/dnode/vnode/src/meta/metaOpen.c
+4
-2
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+8
-1
source/dnode/vnode/src/meta/metaTtl.c
source/dnode/vnode/src/meta/metaTtl.c
+53
-38
source/dnode/vnode/src/sma/smaUtil.c
source/dnode/vnode/src/sma/smaUtil.c
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+4
-3
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+2
-1
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+4
-4
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+51
-19
source/dnode/vnode/src/vnd/vnodeSnapshot.c
source/dnode/vnode/src/vnd/vnodeSnapshot.c
+5
-5
source/libs/command/src/command.c
source/libs/command/src/command.c
+25
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+32
-5
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+1
-0
source/libs/tfs/src/tfs.c
source/libs/tfs/src/tfs.c
+66
-8
source/libs/tfs/test/tfsTest.cpp
source/libs/tfs/test/tfsTest.cpp
+3
-3
source/os/src/osSysinfo.c
source/os/src/osSysinfo.c
+23
-10
source/os/test/osTests.cpp
source/os/test/osTests.cpp
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
docs/en/28-releases/01-tdengine.md
浏览文件 @
4b96bc71
...
...
@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w
import Release from "/components/ReleaseV3";
## 3.0.7.1
<Release
type=
"tdengine"
version=
"3.0.7.1"
/>
## 3.0.7.0
<Release
type=
"tdengine"
version=
"3.0.7.0"
/>
...
...
docs/zh/28-releases/01-tdengine.md
浏览文件 @
4b96bc71
...
...
@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
## 3.0.7.1
<Release
type=
"tdengine"
version=
"3.0.7.1"
/>
## 3.0.7.0
<Release
type=
"tdengine"
version=
"3.0.7.0"
/>
...
...
include/common/tglobal.h
浏览文件 @
4b96bc71
...
...
@@ -34,6 +34,7 @@ extern char tsFirst[];
extern
char
tsSecond
[];
extern
char
tsLocalFqdn
[];
extern
char
tsLocalEp
[];
extern
char
tsVersionName
[];
extern
uint16_t
tsServerPort
;
extern
int32_t
tsVersion
;
extern
int32_t
tsStatusInterval
;
...
...
include/libs/tfs/tfs.h
浏览文件 @
4b96bc71
...
...
@@ -69,6 +69,13 @@ void tfsUpdateSize(STfs *pTfs);
*/
SDiskSize
tfsGetSize
(
STfs
*
pTfs
);
/**
* @brief Get the number of disks at level of multi-tier storage.
*
* @param pTfs
* @return int32_t
*/
int32_t
tfsGetDisksAtLevel
(
STfs
*
pTfs
,
int32_t
level
);
/**
* @brief Get level of multi-tier storage.
*
...
...
@@ -123,6 +130,15 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname);
*/
int32_t
tfsMkdirAt
(
STfs
*
pTfs
,
const
char
*
rname
,
SDiskID
diskId
);
/**
* @brief Recursive make directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t
tfsMkdirRecur
(
STfs
*
pTfs
,
const
char
*
rname
);
/**
* @brief Recursive create directories in tfs.
*
...
...
@@ -160,7 +176,17 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname);
* @param nrname The rel name of new file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t
tfsRename
(
STfs
*
pTfs
,
const
char
*
orname
,
const
char
*
nrname
);
int32_t
tfsRename
(
STfs
*
pTfs
,
int32_t
diskPrimary
,
const
char
*
orname
,
const
char
*
nrname
);
/**
* @brief Search fname in level of tfs
*
* @param pTfs The fs object.
* @param level The level to search on
* @param fname The relative file name to be searched
* @param int32_t diskId for successs, -1 for failure
*/
int32_t
tfsSearch
(
STfs
*
pTfs
,
int32_t
level
,
const
char
*
fname
);
/**
* @brief Init file object in tfs.
...
...
include/os/osSysinfo.h
浏览文件 @
4b96bc71
...
...
@@ -36,7 +36,7 @@ typedef struct {
bool
taosCheckSystemIsLittleEnd
();
void
taosGetSystemInfo
();
int32_t
taosGetEmail
(
char
*
email
,
int32_t
maxLen
);
int32_t
taosGetOsReleaseName
(
char
*
releaseName
,
int32_t
maxLen
);
int32_t
taosGetOsReleaseName
(
char
*
releaseName
,
char
*
sName
,
char
*
ver
,
int32_t
maxLen
);
int32_t
taosGetCpuInfo
(
char
*
cpuModel
,
int32_t
maxLen
,
float
*
numOfCores
);
int32_t
taosGetCpuCores
(
float
*
numOfCores
);
void
taosGetCpuUsage
(
double
*
cpu_system
,
double
*
cpu_engine
);
...
...
include/util/taoserror.h
浏览文件 @
4b96bc71
...
...
@@ -416,7 +416,7 @@ int32_t* taosGetErrno();
// #define TSDB_CODE_VND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0501) // 2.x
// #define TSDB_CODE_VND_ACTION_NEED_REPROCESS. TAOS_DEF_ERROR_CODE(0, 0x0502) // 2.x
#define TSDB_CODE_VND_INVALID_VGROUP_ID TAOS_DEF_ERROR_CODE(0, 0x0503)
// #define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504) // 2.x
#define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504)
// #define TSDB_CODE_VND_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0505) // 2.x
// #define TSDB_CODE_VND_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0506) // 2.x
// #define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) // 2.x
...
...
source/common/CMakeLists.txt
浏览文件 @
4b96bc71
aux_source_directory
(
src COMMON_SRC
)
IF
(
TD_ENTERPRISE
)
LIST
(
APPEND COMMON_SRC
${
TD_ENTERPRISE_DIR
}
/src/plugins/common/src/tglobal.c
)
ENDIF
()
add_library
(
common STATIC
${
COMMON_SRC
}
)
if
(
DEFINED GRANT_CFG_INCLUDE_DIR
)
...
...
source/common/src/tglobal.c
浏览文件 @
4b96bc71
...
...
@@ -34,6 +34,7 @@ char tsFirst[TSDB_EP_LEN] = {0};
char
tsSecond
[
TSDB_EP_LEN
]
=
{
0
};
char
tsLocalFqdn
[
TSDB_FQDN_LEN
]
=
{
0
};
char
tsLocalEp
[
TSDB_EP_LEN
]
=
{
0
};
// Local End Point, hostname:port
char
tsVersionName
[
16
]
=
"community"
;
uint16_t
tsServerPort
=
6030
;
int32_t
tsVersion
=
30000000
;
int32_t
tsStatusInterval
=
1
;
// second
...
...
@@ -938,6 +939,12 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
return
0
;
}
#ifndef TD_ENTERPRISE
static
int32_t
taosSetReleaseCfg
(
SConfig
*
pCfg
)
{
return
0
;
}
#else
int32_t
taosSetReleaseCfg
(
SConfig
*
pCfg
);
#endif
void
taosLocalCfgForbiddenToChange
(
char
*
name
,
bool
*
forbidden
)
{
int32_t
len
=
strlen
(
name
);
char
lowcaseName
[
CFG_NAME_MAX_LEN
+
1
]
=
{
0
};
...
...
@@ -1444,6 +1451,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
if
(
taosSetClientCfg
(
tsCfg
))
return
-
1
;
if
(
taosUpdateServerCfg
(
tsCfg
))
return
-
1
;
if
(
taosSetServerCfg
(
tsCfg
))
return
-
1
;
if
(
taosSetReleaseCfg
(
tsCfg
))
return
-
1
;
if
(
taosSetTfsCfg
(
tsCfg
)
!=
0
)
return
-
1
;
}
taosSetSystemCfg
(
tsCfg
);
...
...
@@ -1490,14 +1498,8 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
if
(
strcasecmp
(
option
,
"keepTimeOffset"
)
==
0
)
{
int32_t
newKeepTimeOffset
=
atoi
(
value
);
if
(
newKeepTimeOffset
<
0
||
newKeepTimeOffset
>
23
)
{
uError
(
"failed to set keepTimeOffset from %d to %d. Valid range: [0, 23]"
,
tsKeepTimeOffset
,
newKeepTimeOffset
);
return
;
}
uInfo
(
"keepTimeOffset set from %d to %d"
,
tsKeepTimeOffset
,
newKeepTimeOffset
);
tsKeepTimeOffset
=
newKeepTimeOffset
;
return
;
}
...
...
source/common/src/tmisce.c
浏览文件 @
4b96bc71
...
...
@@ -109,7 +109,7 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t
taosGetAppName
(
tmp
,
NULL
);
tjsonAddStringToObject
(
pJson
,
"appName"
,
tmp
);
if
(
taosGetOsReleaseName
(
tmp
,
sizeof
(
tmp
))
==
0
)
{
if
(
taosGetOsReleaseName
(
tmp
,
NULL
,
NULL
,
sizeof
(
tmp
))
==
0
)
{
tjsonAddStringToObject
(
pJson
,
"os"
,
tmp
);
}
...
...
source/dnode/mgmt/exe/dmMain.c
浏览文件 @
4b96bc71
...
...
@@ -359,7 +359,11 @@ int mainWindows(int argc, char **argv) {
taosCleanupArgs
();
if
(
dmInit
()
!=
0
)
{
dError
(
"failed to init dnode since %s"
,
terrstr
());
if
(
terrno
==
TSDB_CODE_NOT_FOUND
)
{
dError
(
"failed to init dnode since unsupported platform, please visit https://www.taosdata.com for support"
);
}
else
{
dError
(
"failed to init dnode since %s"
,
terrstr
());
}
taosCleanupCfg
();
taosCloseLog
();
...
...
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
浏览文件 @
4b96bc71
...
...
@@ -46,6 +46,7 @@ typedef struct {
int32_t
vgId
;
int32_t
vgVersion
;
int8_t
dropped
;
int32_t
diskPrimary
;
int32_t
toVgId
;
char
path
[
PATH_MAX
+
20
];
}
SWrapperCfg
;
...
...
@@ -56,6 +57,7 @@ typedef struct {
int32_t
refCount
;
int8_t
dropped
;
int8_t
disable
;
int32_t
diskPrimary
;
int32_t
toVgId
;
char
*
path
;
SVnode
*
pImpl
;
...
...
@@ -81,6 +83,7 @@ typedef struct {
}
SVnodeThread
;
// vmInt.c
int32_t
vmAllocPrimaryDisk
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
);
SVnodeObj
*
vmAcquireVnode
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
);
void
vmReleaseVnode
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
);
int32_t
vmOpenVnode
(
SVnodeMgmt
*
pMgmt
,
SWrapperCfg
*
pCfg
,
SVnode
*
pImpl
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
浏览文件 @
4b96bc71
...
...
@@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **
if
(
code
<
0
)
goto
_OVER
;
tjsonGetInt32ValueFromDouble
(
vnode
,
"vgVersion"
,
pCfg
->
vgVersion
,
code
);
if
(
code
<
0
)
goto
_OVER
;
tjsonGetInt32ValueFromDouble
(
vnode
,
"diskPrimary"
,
pCfg
->
diskPrimary
,
code
);
if
(
code
<
0
)
goto
_OVER
;
tjsonGetInt32ValueFromDouble
(
vnode
,
"toVgId"
,
pCfg
->
toVgId
,
code
);
if
(
code
<
0
)
goto
_OVER
;
...
...
@@ -167,6 +169,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num
if
(
tjsonAddDoubleToObject
(
vnode
,
"vgId"
,
pVnode
->
vgId
)
<
0
)
return
-
1
;
if
(
tjsonAddDoubleToObject
(
vnode
,
"dropped"
,
pVnode
->
dropped
)
<
0
)
return
-
1
;
if
(
tjsonAddDoubleToObject
(
vnode
,
"vgVersion"
,
pVnode
->
vgVersion
)
<
0
)
return
-
1
;
if
(
tjsonAddDoubleToObject
(
vnode
,
"diskPrimary"
,
pVnode
->
diskPrimary
)
<
0
)
return
-
1
;
if
(
pVnode
->
toVgId
&&
tjsonAddDoubleToObject
(
vnode
,
"toVgId"
,
pVnode
->
toVgId
)
<
0
)
return
-
1
;
if
(
tjsonAddItemToArray
(
vnodes
,
vnode
)
<
0
)
return
-
1
;
}
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
4b96bc71
...
...
@@ -263,16 +263,19 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
0
;
}
wrapperCfg
.
diskPrimary
=
vmAllocPrimaryDisk
(
pMgmt
,
vnodeCfg
.
vgId
);
int32_t
diskPrimary
=
wrapperCfg
.
diskPrimary
;
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vnodeCfg
.
vgId
);
if
(
vnodeCreate
(
path
,
&
vnodeCfg
,
pMgmt
->
pTfs
)
<
0
)
{
if
(
vnodeCreate
(
path
,
&
vnodeCfg
,
diskPrimary
,
pMgmt
->
pTfs
)
<
0
)
{
tFreeSCreateVnodeReq
(
&
req
);
dError
(
"vgId:%d, failed to create vnode since %s"
,
req
.
vgId
,
terrstr
());
code
=
terrno
;
goto
_OVER
;
}
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode since %s"
,
req
.
vgId
,
terrstr
());
code
=
terrno
;
...
...
@@ -403,21 +406,23 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.
dropped
=
pVnode
->
dropped
,
.
vgId
=
pVnode
->
vgId
,
.
vgVersion
=
pVnode
->
vgVersion
,
.
diskPrimary
=
pVnode
->
diskPrimary
,
};
tstrncpy
(
wrapperCfg
.
path
,
pVnode
->
path
,
sizeof
(
wrapperCfg
.
path
));
vmCloseVnode
(
pMgmt
,
pVnode
,
false
);
int32_t
diskPrimary
=
wrapperCfg
.
diskPrimary
;
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vgId
);
dInfo
(
"vgId:%d, start to alter vnode replica at %s"
,
vgId
,
path
);
if
(
vnodeAlterReplica
(
path
,
&
req
,
pMgmt
->
pTfs
)
<
0
)
{
if
(
vnodeAlterReplica
(
path
,
&
req
,
diskPrimary
,
pMgmt
->
pTfs
)
<
0
)
{
dError
(
"vgId:%d, failed to alter vnode at %s since %s"
,
vgId
,
path
,
terrstr
());
return
-
1
;
}
dInfo
(
"vgId:%d, begin to open vnode"
,
vgId
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode at %s since %s"
,
vgId
,
path
,
terrstr
());
return
-
1
;
...
...
@@ -490,6 +495,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.
dropped
=
pVnode
->
dropped
,
.
vgId
=
dstVgId
,
.
vgVersion
=
pVnode
->
vgVersion
,
.
diskPrimary
=
pVnode
->
diskPrimary
,
};
tstrncpy
(
wrapperCfg
.
path
,
pVnode
->
path
,
sizeof
(
wrapperCfg
.
path
));
...
...
@@ -503,19 +509,20 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo
(
"vgId:%d, close vnode"
,
srcVgId
);
vmCloseVnode
(
pMgmt
,
pVnode
,
true
);
int32_t
diskPrimary
=
wrapperCfg
.
diskPrimary
;
char
srcPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
dstPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
srcPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
srcVgId
);
snprintf
(
dstPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
dstVgId
);
dInfo
(
"vgId:%d, alter vnode hashrange at %s"
,
srcVgId
,
srcPath
);
if
(
vnodeAlterHashRange
(
srcPath
,
dstPath
,
&
req
,
pMgmt
->
pTfs
)
<
0
)
{
if
(
vnodeAlterHashRange
(
srcPath
,
dstPath
,
&
req
,
diskPrimary
,
pMgmt
->
pTfs
)
<
0
)
{
dError
(
"vgId:%d, failed to alter vnode hashrange since %s"
,
srcVgId
,
terrstr
());
return
-
1
;
}
dInfo
(
"vgId:%d, open vnode"
,
dstVgId
);
SVnode
*
pImpl
=
vnodeOpen
(
dstPath
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
dstPath
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode at %s since %s"
,
dstVgId
,
dstPath
,
terrstr
());
return
-
1
;
...
...
@@ -602,21 +609,23 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.
dropped
=
pVnode
->
dropped
,
.
vgId
=
pVnode
->
vgId
,
.
vgVersion
=
pVnode
->
vgVersion
,
.
diskPrimary
=
pVnode
->
diskPrimary
,
};
tstrncpy
(
wrapperCfg
.
path
,
pVnode
->
path
,
sizeof
(
wrapperCfg
.
path
));
vmCloseVnode
(
pMgmt
,
pVnode
,
false
);
int32_t
diskPrimary
=
wrapperCfg
.
diskPrimary
;
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vgId
);
dInfo
(
"vgId:%d, start to alter vnode replica at %s"
,
vgId
,
path
);
if
(
vnodeAlterReplica
(
path
,
&
alterReq
,
pMgmt
->
pTfs
)
<
0
)
{
if
(
vnodeAlterReplica
(
path
,
&
alterReq
,
diskPrimary
,
pMgmt
->
pTfs
)
<
0
)
{
dError
(
"vgId:%d, failed to alter vnode at %s since %s"
,
vgId
,
path
,
terrstr
());
return
-
1
;
}
dInfo
(
"vgId:%d, begin to open vnode"
,
vgId
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode at %s since %s"
,
vgId
,
path
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
4b96bc71
...
...
@@ -15,8 +15,64 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "tfs.h"
#include "vnd.h"
int32_t
vmAllocPrimaryDisk
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
)
{
STfs
*
pTfs
=
pMgmt
->
pTfs
;
int32_t
diskId
=
0
;
if
(
!
pTfs
)
{
return
diskId
;
}
// search fs
char
vnodePath
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
vnodePath
,
TSDB_FILENAME_LEN
-
1
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vgId
);
char
fname
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
fnameTmp
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s"
,
vnodePath
,
TD_DIRSEP
,
VND_INFO_FNAME
);
snprintf
(
fnameTmp
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s"
,
vnodePath
,
TD_DIRSEP
,
VND_INFO_FNAME_TMP
);
diskId
=
tfsSearch
(
pTfs
,
0
,
fname
);
if
(
diskId
>=
0
)
{
return
diskId
;
}
diskId
=
tfsSearch
(
pTfs
,
0
,
fnameTmp
);
if
(
diskId
>=
0
)
{
return
diskId
;
}
// alloc
int32_t
disks
[
TFS_MAX_DISKS_PER_TIER
]
=
{
0
};
int32_t
numOfVnodes
=
0
;
SVnodeObj
**
ppVnodes
=
vmGetVnodeListFromHash
(
pMgmt
,
&
numOfVnodes
);
for
(
int32_t
v
=
0
;
v
<
numOfVnodes
;
v
++
)
{
SVnodeObj
*
pVnode
=
ppVnodes
[
v
];
disks
[
pVnode
->
diskPrimary
]
+=
1
;
}
int32_t
minVal
=
INT_MAX
;
int32_t
ndisk
=
tfsGetDisksAtLevel
(
pTfs
,
0
);
diskId
=
0
;
for
(
int32_t
id
=
0
;
id
<
ndisk
;
id
++
)
{
if
(
minVal
>
disks
[
id
])
{
minVal
=
disks
[
id
];
diskId
=
id
;
}
}
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
if
(
ppVnodes
==
NULL
||
ppVnodes
[
i
]
==
NULL
)
continue
;
vmReleaseVnode
(
pMgmt
,
ppVnodes
[
i
]);
}
if
(
ppVnodes
!=
NULL
)
{
taosMemoryFree
(
ppVnodes
);
}
dInfo
(
"vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d"
,
vgId
,
diskId
,
ndisk
,
numOfVnodes
);
return
diskId
;
}
SVnodeObj
*
vmAcquireVnode
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
NULL
;
...
...
@@ -52,6 +108,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode
->
vgId
=
pCfg
->
vgId
;
pVnode
->
vgVersion
=
pCfg
->
vgVersion
;
pVnode
->
diskPrimary
=
pCfg
->
diskPrimary
;
pVnode
->
refCount
=
0
;
pVnode
->
dropped
=
0
;
pVnode
->
path
=
taosStrdup
(
pCfg
->
path
);
...
...
@@ -169,7 +226,8 @@ static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
snprintf
(
srcPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
srcVgId
);
snprintf
(
dstPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
dstVgId
);
int32_t
vgId
=
vnodeRestoreVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
pTfs
);
int32_t
diskPrimary
=
pCfg
->
diskPrimary
;
int32_t
vgId
=
vnodeRestoreVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
diskPrimary
,
pTfs
);
if
(
vgId
<=
0
)
{
dError
(
"vgId:%d, failed to restore vgroup id. srcPath: %s"
,
pCfg
->
vgId
,
srcPath
);
return
-
1
;
...
...
@@ -205,11 +263,12 @@ static void *vmOpenVnodeInThread(void *param) {
pThread
->
updateVnodesList
=
true
;
}
int32_t
diskPrimary
=
pCfg
->
diskPrimary
;
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
pCfg
->
vgId
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode by thread:%d
"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
dError
(
"vgId:%d, failed to open vnode by thread:%d
since %s"
,
pCfg
->
vgId
,
pThread
->
threadIndex
,
terrstr
()
);
pThread
->
failed
++
;
continue
;
}
...
...
@@ -296,6 +355,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
if
(
pMgmt
->
state
.
openVnodes
!=
pMgmt
->
state
.
totalVnodes
)
{
dError
(
"there are total vnodes:%d, opened:%d"
,
pMgmt
->
state
.
totalVnodes
,
pMgmt
->
state
.
openVnodes
);
terrno
=
TSDB_CODE_VND_INIT_FAILED
;
return
-
1
;
}
...
...
@@ -518,7 +578,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
tmsgReportStartup
(
"vnode-worker"
,
"initialized"
);
if
(
vmOpenVnodes
(
pMgmt
)
!=
0
)
{
dError
(
"failed to open
vnode
since %s"
,
terrstr
());
dError
(
"failed to open
all vnodes
since %s"
,
terrstr
());
goto
_OVER
;
}
tmsgReportStartup
(
"vnode-vnodes"
,
"initialized"
);
...
...
source/dnode/mgmt/node_mgmt/src/dmEnv.c
浏览文件 @
4b96bc71
...
...
@@ -16,7 +16,33 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
static
SDnode
globalDnode
=
{
0
};
#define STR_CASE_CMP(s, d) (0 == strcasecmp((s), (d)))
#define STR_STR_CMP(s, d) (strstr((s), (d)))
#define STR_INT_CMP(s, d, c) (taosStr2Int32(s, 0, 10) c(d))
#define STR_STR_SIGN ("ia")
#define DM_INIT_MON() \
do { \
code = (int32_t)(2147483648 | 298); \
strncpy(stName, tsVersionName, 64); \
monCfg.maxLogs = tsMonitorMaxLogs; \
monCfg.port = tsMonitorPort; \
monCfg.server = tsMonitorFqdn; \
monCfg.comp = tsMonitorComp; \
if (monInit(&monCfg) != 0) { \
if (terrno != 0) code = terrno; \
goto _exit; \
} \
} while (0)
#define DM_ERR_RTN(c) \
do { \
code = (c); \
goto _exit; \
} while (0)
static
SDnode
globalDnode
=
{
0
};
static
const
char
*
dmOS
[
10
]
=
{
"Ubuntu"
,
"CentOS Linux"
,
"Red Hat"
,
"Debian GNU"
,
"CoreOS"
,
"FreeBSD"
,
"openSUSE"
,
"SLES"
,
"Fedora"
,
"MacOS"
};
SDnode
*
dmInstance
()
{
return
&
globalDnode
;
}
...
...
@@ -37,16 +63,37 @@ static int32_t dmInitSystem() {
}
static
int32_t
dmInitMonitor
()
{
int32_t
code
=
0
;
SMonCfg
monCfg
=
{
0
};
monCfg
.
maxLogs
=
tsMonitorMaxLogs
;
monCfg
.
port
=
tsMonitorPort
;
monCfg
.
server
=
tsMonitorFqdn
;
monCfg
.
comp
=
tsMonitorComp
;
if
(
monInit
(
&
monCfg
)
!=
0
)
{
dError
(
"failed to init monitor since %s"
,
terrstr
());
return
-
1
;
char
reName
[
64
]
=
{
0
};
char
stName
[
64
]
=
{
0
};
char
ver
[
64
]
=
{
0
};
DM_INIT_MON
();
if
(
STR_STR_CMP
(
stName
,
STR_STR_SIGN
))
{
DM_ERR_RTN
(
0
);
}
return
0
;
if
(
taosGetOsReleaseName
(
reName
,
stName
,
ver
,
64
)
!=
0
)
{
DM_ERR_RTN
(
code
);
}
if
(
STR_CASE_CMP
(
stName
,
dmOS
[
0
]))
{
if
(
STR_INT_CMP
(
ver
,
17
,
>
))
{
DM_ERR_RTN
(
0
);
}
}
else
if
(
STR_CASE_CMP
(
stName
,
dmOS
[
1
]))
{
if
(
STR_INT_CMP
(
ver
,
6
,
>
))
{
DM_ERR_RTN
(
0
);
}
}
else
if
(
STR_STR_CMP
(
stName
,
dmOS
[
2
])
||
STR_STR_CMP
(
stName
,
dmOS
[
3
])
||
STR_STR_CMP
(
stName
,
dmOS
[
4
])
||
STR_STR_CMP
(
stName
,
dmOS
[
5
])
||
STR_STR_CMP
(
stName
,
dmOS
[
6
])
||
STR_STR_CMP
(
stName
,
dmOS
[
7
])
||
STR_STR_CMP
(
stName
,
dmOS
[
8
])
||
STR_STR_CMP
(
stName
,
dmOS
[
9
]))
{
DM_ERR_RTN
(
0
);
}
_exit:
if
(
code
)
terrno
=
code
;
return
code
;
}
static
bool
dmCheckDiskSpace
()
{
...
...
source/dnode/mnode/impl/src/mndCluster.c
浏览文件 @
4b96bc71
...
...
@@ -20,7 +20,6 @@
#define CLUSTER_VER_NUMBE 1
#define CLUSTER_RESERVE_SIZE 60
char
tsVersionName
[
16
]
=
"community"
;
int64_t
tsExpireTime
=
0
;
static
SSdbRaw
*
mndClusterActionEncode
(
SClusterObj
*
pCluster
);
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
4b96bc71
...
...
@@ -70,6 +70,8 @@ static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
static
int32_t
mndRetrieveDnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextDnode
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndMCfgGetValInt32
(
SMCfgDnodeReq
*
pInMCfgReq
,
int32_t
opLen
,
int32_t
*
pOutValue
);
int32_t
mndInitDnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{
.
sdbType
=
SDB_DNODE
,
...
...
@@ -947,7 +949,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
goto
_OVER
;
}
mInfo
(
"dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s"
,
mInfo
(
"dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s"
,
dropReq
.
dnodeId
,
dropReq
.
fqdn
,
dropReq
.
port
,
dropReq
.
force
?
"true"
:
"false"
,
dropReq
.
unsafe
?
"true"
:
"false"
);
if
(
mndCheckOperPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_DROP_MNODE
)
!=
0
)
{
goto
_OVER
;
...
...
@@ -987,7 +989,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
int32_t
numOfVnodes
=
mndGetVnodesNum
(
pMnode
,
pDnode
->
id
);
bool
isonline
=
mndIsDnodeOnline
(
pDnode
,
taosGetTimestampMs
());
if
(
isonline
&&
force
)
{
terrno
=
TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE
;
mError
(
"dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d"
,
pDnode
->
id
,
terrstr
(),
...
...
@@ -1060,6 +1062,20 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy
(
dcfgReq
.
config
,
"monitor"
);
snprintf
(
dcfgReq
.
value
,
TSDB_DNODE_VALUE_LEN
,
"%d"
,
flag
);
}
else
if
(
strncasecmp
(
cfgReq
.
config
,
"keeptimeoffset"
,
14
)
==
0
)
{
int32_t
optLen
=
strlen
(
"keeptimeoffset"
);
int32_t
flag
=
-
1
;
int32_t
code
=
mndMCfgGetValInt32
(
&
cfgReq
,
optLen
,
&
flag
);
if
(
code
<
0
)
return
code
;
if
(
flag
<
0
||
flag
>
23
)
{
mError
(
"dnode:%d, failed to config keepTimeOffset since value:%d. Valid range: [0, 23]"
,
cfgReq
.
dnodeId
,
flag
);
terrno
=
TSDB_CODE_INVALID_CFG
;
return
-
1
;
}
strcpy
(
dcfgReq
.
config
,
"keeptimeoffset"
);
snprintf
(
dcfgReq
.
value
,
TSDB_DNODE_VALUE_LEN
,
"%d"
,
flag
);
#ifdef TD_ENTERPRISE
}
else
if
(
strncasecmp
(
cfgReq
.
config
,
"activeCode"
,
10
)
==
0
||
strncasecmp
(
cfgReq
.
config
,
"cActiveCode"
,
11
)
==
0
)
{
int8_t
opt
=
strncasecmp
(
cfgReq
.
config
,
"a"
,
1
)
==
0
?
DND_ACTIVE_CODE
:
DND_CONN_ACTIVE_CODE
;
...
...
@@ -1292,3 +1308,28 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
// get int32_t value from 'SMCfgDnodeReq'
static
int32_t
mndMCfgGetValInt32
(
SMCfgDnodeReq
*
pMCfgReq
,
int32_t
opLen
,
int32_t
*
pOutValue
)
{
terrno
=
0
;
if
(
' '
!=
pMCfgReq
->
config
[
opLen
]
&&
0
!=
pMCfgReq
->
config
[
opLen
])
{
goto
_err
;
}
if
(
' '
==
pMCfgReq
->
config
[
opLen
])
{
// 'key value'
if
(
strlen
(
pMCfgReq
->
value
)
!=
0
)
goto
_err
;
*
pOutValue
=
atoi
(
pMCfgReq
->
config
+
opLen
+
1
);
}
else
{
// 'key' 'value'
if
(
strlen
(
pMCfgReq
->
value
)
==
0
)
goto
_err
;
*
pOutValue
=
atoi
(
pMCfgReq
->
value
);
}
return
0
;
_err:
mError
(
"dnode:%d, failed to config keeptimeoffset since invalid conf:%s"
,
pMCfgReq
->
dnodeId
,
pMCfgReq
->
config
);
terrno
=
TSDB_CODE_INVALID_CFG
;
return
-
1
;
}
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
4b96bc71
...
...
@@ -1738,6 +1738,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
SSchema
*
pSrcSchema
=
&
pStb
->
pColumns
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
flags
=
pSrcSchema
->
flags
;
pSchema
->
colId
=
pSrcSchema
->
colId
;
pSchema
->
bytes
=
pSrcSchema
->
bytes
;
}
...
...
@@ -1788,6 +1789,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
SSchema
*
pSrcSchema
=
&
pStb
->
pColumns
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
flags
=
pSrcSchema
->
flags
;
pSchema
->
colId
=
pSrcSchema
->
colId
;
pSchema
->
bytes
=
pSrcSchema
->
bytes
;
}
...
...
@@ -1797,6 +1799,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
SSchema
*
pSrcSchema
=
&
pStb
->
pTags
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
flags
=
pSrcSchema
->
flags
;
pSchema
->
colId
=
pSrcSchema
->
colId
;
pSchema
->
bytes
=
pSrcSchema
->
bytes
;
}
...
...
source/dnode/mnode/impl/src/mndTelem.c
浏览文件 @
4b96bc71
...
...
@@ -94,7 +94,7 @@ static char* mndBuildTelemetryReport(SMnode* pMnode) {
tjsonAddStringToObject
(
pJson
,
"instanceId"
,
clusterName
);
tjsonAddDoubleToObject
(
pJson
,
"reportVersion"
,
1
);
if
(
taosGetOsReleaseName
(
tmp
,
sizeof
(
tmp
))
==
0
)
{
if
(
taosGetOsReleaseName
(
tmp
,
NULL
,
NULL
,
sizeof
(
tmp
))
==
0
)
{
tjsonAddStringToObject
(
pJson
,
"os"
,
tmp
);
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
4b96bc71
...
...
@@ -51,12 +51,14 @@ extern const SVnodeCfg vnodeCfgDefault;
int32_t
vnodeInit
(
int32_t
nthreads
);
void
vnodeCleanup
();
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
STfs
*
pTfs
);
int32_t
vnodeAlterReplica
(
const
char
*
path
,
SAlterVnodeReplicaReq
*
pReq
,
STfs
*
pTfs
);
int32_t
vnodeAlterHashRange
(
const
char
*
srcPath
,
const
char
*
dstPath
,
SAlterVnodeHashRangeReq
*
pReq
,
STfs
*
pTfs
);
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
STfs
*
pTfs
);
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
int32_t
diskPrimary
,
STfs
*
pTfs
);
int32_t
vnodeAlterReplica
(
const
char
*
path
,
SAlterVnodeReplicaReq
*
pReq
,
int32_t
diskPrimary
,
STfs
*
pTfs
);
int32_t
vnodeAlterHashRange
(
const
char
*
srcPath
,
const
char
*
dstPath
,
SAlterVnodeHashRangeReq
*
pReq
,
int32_t
diskPrimary
,
STfs
*
pTfs
);
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
int32_t
diskPrimary
,
STfs
*
pTfs
);
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
void
vnodePreClose
(
SVnode
*
pVnode
);
void
vnodePostClose
(
SVnode
*
pVnode
);
void
vnodeSyncCheckTimeout
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/inc/metaTtl.h
浏览文件 @
4b96bc71
...
...
@@ -38,6 +38,8 @@ typedef struct STtlManger {
SHashObj
*
pTtlCache
;
// key: tuid, value: {ttl, ctime}
SHashObj
*
pDirtyUids
;
// dirty tuid
TTB
*
pTtlIdx
;
// btree<{deleteTime, tuid}, ttl>
char
*
logPrefix
;
}
STtlManger
;
typedef
struct
{
...
...
@@ -77,9 +79,10 @@ typedef struct {
typedef
struct
{
tb_uid_t
uid
;
TXN
*
pTxn
;
int64_t
ttlDays
;
}
STtlDelTtlCtx
;
int
ttlMgrOpen
(
STtlManger
**
ppTtlMgr
,
TDB
*
pEnv
,
int8_t
rollback
);
int
ttlMgrOpen
(
STtlManger
**
ppTtlMgr
,
TDB
*
pEnv
,
int8_t
rollback
,
const
char
*
logPrefix
);
void
ttlMgrClose
(
STtlManger
*
pTtlMgr
);
int
ttlMgrPostOpen
(
STtlManger
*
pTtlMgr
,
void
*
pMeta
);
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
4b96bc71
...
...
@@ -88,7 +88,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
int32_t
vnodeBufPoolRecycle
(
SVBufPool
*
pPool
);
// vnodeOpen.c
int32_t
vnodeGetPrimaryDir
(
const
char
*
relPath
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
);
int32_t
vnodeGetPrimaryDir
(
const
char
*
relPath
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
);
// vnodeQuery.c
int32_t
vnodeQueryOpen
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
4b96bc71
...
...
@@ -93,6 +93,7 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_BUFPOOL_SEGMENTS 3
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
// vnd.h
typedef
int32_t
(
*
_query_reseek_func_t
)(
void
*
pQHandle
);
...
...
@@ -385,6 +386,7 @@ struct SVnode {
SVState
state
;
SVStatis
statis
;
STfs
*
pTfs
;
int32_t
diskPrimary
;
SMsgCb
msgCb
;
// Buffer Pool
...
...
source/dnode/vnode/src/meta/metaOpen.c
浏览文件 @
4b96bc71
...
...
@@ -41,7 +41,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
*
ppMeta
=
NULL
;
// create handle
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
path
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
path
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
path
);
snprintf
(
path
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%s%s"
,
TD_DIRSEP
,
VNODE_META_DIR
);
...
...
@@ -128,7 +128,9 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
}
// open pTtlMgr ("ttlv1.idx")
ret
=
ttlMgrOpen
(
&
pMeta
->
pTtlMgr
,
pMeta
->
pEnv
,
0
);
char
logPrefix
[
128
]
=
{
0
};
sprintf
(
logPrefix
,
"vgId:%d"
,
TD_VID
(
pVnode
));
ret
=
ttlMgrOpen
(
&
pMeta
->
pTtlMgr
,
pMeta
->
pEnv
,
0
,
logPrefix
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta ttl index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
4b96bc71
...
...
@@ -974,7 +974,15 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
}
static
int
metaDeleteTtl
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
if
(
pME
->
type
!=
TSDB_CHILD_TABLE
&&
pME
->
type
!=
TSDB_NORMAL_TABLE
)
return
0
;
STtlDelTtlCtx
ctx
=
{.
uid
=
pME
->
uid
,
.
pTxn
=
pMeta
->
txn
};
if
(
pME
->
type
==
TSDB_CHILD_TABLE
)
{
ctx
.
ttlDays
=
pME
->
ctbEntry
.
ttlDays
;
}
else
{
ctx
.
ttlDays
=
pME
->
ntbEntry
.
ttlDays
;
}
return
ttlMgrDeleteTtl
(
pMeta
->
pTtlMgr
,
&
ctx
);
}
...
...
@@ -1968,7 +1976,6 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
if
(
pME
->
type
!=
TSDB_CHILD_TABLE
&&
pME
->
type
!=
TSDB_NORMAL_TABLE
)
return
0
;
STtlUpdTtlCtx
ctx
=
{.
uid
=
pME
->
uid
};
if
(
pME
->
type
==
TSDB_CHILD_TABLE
)
{
ctx
.
ttlDays
=
pME
->
ctbEntry
.
ttlDays
;
ctx
.
changeTimeMs
=
pME
->
ctbEntry
.
btime
;
...
...
source/dnode/vnode/src/meta/metaTtl.c
浏览文件 @
4b96bc71
...
...
@@ -39,8 +39,8 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr);
const
char
*
ttlTbname
=
"ttl.idx"
;
const
char
*
ttlV1Tbname
=
"ttlv1.idx"
;
int
ttlMgrOpen
(
STtlManger
**
ppTtlMgr
,
TDB
*
pEnv
,
int8_t
rollback
)
{
int
ret
=
TSDB_CODE_SUCCESS
;
int
ttlMgrOpen
(
STtlManger
**
ppTtlMgr
,
TDB
*
pEnv
,
int8_t
rollback
,
const
char
*
logPrefix
)
{
int
ret
=
TSDB_CODE_SUCCESS
;
int64_t
startNs
=
taosGetTimestampNs
();
*
ppTtlMgr
=
NULL
;
...
...
@@ -48,9 +48,17 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
STtlManger
*
pTtlMgr
=
(
STtlManger
*
)
tdbOsCalloc
(
1
,
sizeof
(
*
pTtlMgr
));
if
(
pTtlMgr
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
char
*
logBuffer
=
(
char
*
)
tdbOsCalloc
(
1
,
strlen
(
logPrefix
)
+
1
);
if
(
logBuffer
==
NULL
)
{
tdbOsFree
(
pTtlMgr
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
strcpy
(
logBuffer
,
logPrefix
);
pTtlMgr
->
logPrefix
=
logBuffer
;
ret
=
tdbTbOpen
(
ttlV1Tbname
,
TDB_VARIANT_LEN
,
TDB_VARIANT_LEN
,
ttlIdxKeyV1Cmpr
,
pEnv
,
&
pTtlMgr
->
pTtlIdx
,
rollback
);
if
(
ret
<
0
)
{
metaError
(
"
failed to open %s since %s"
,
ttlV1Tbname
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to open %s since %s"
,
pTtlMgr
->
logPrefix
,
ttlV1Tbname
,
tstrerror
(
terrno
));
tdbOsFree
(
pTtlMgr
);
return
ret
;
}
...
...
@@ -62,14 +70,14 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
ret
=
ttlMgrFillCache
(
pTtlMgr
);
if
(
ret
<
0
)
{
metaError
(
"
failed to fill hash since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to fill hash since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
ttlMgrCleanup
(
pTtlMgr
);
return
ret
;
}
int64_t
endNs
=
taosGetTimestampNs
();
metaInfo
(
"
ttl mgr open end, hash size: %d, time consumed: %"
PRId64
" ns"
,
taosHashGetSize
(
pTtlMgr
->
pTtlCache
)
,
endNs
-
startNs
);
metaInfo
(
"
%s, ttl mgr open end, hash size: %d, time consumed: %"
PRId64
" ns"
,
pTtlMgr
->
logPrefix
,
taosHashGetSize
(
pTtlMgr
->
pTtlCache
),
endNs
-
startNs
);
*
ppTtlMgr
=
pTtlMgr
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -91,37 +99,37 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
if
(
!
tdbTbExist
(
ttlTbname
,
meta
->
pEnv
))
return
TSDB_CODE_SUCCESS
;
metaInfo
(
"
ttl mgr start upgrade"
);
metaInfo
(
"
%s, ttl mgr start upgrade"
,
pTtlMgr
->
logPrefix
);
int64_t
startNs
=
taosGetTimestampNs
();
ret
=
tdbTbOpen
(
ttlTbname
,
sizeof
(
STtlIdxKey
),
0
,
ttlIdxKeyCmpr
,
meta
->
pEnv
,
&
pTtlMgr
->
pOldTtlIdx
,
0
);
if
(
ret
<
0
)
{
metaError
(
"
failed to open %s index since %s"
,
ttlTbname
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to open %s index since %s"
,
pTtlMgr
->
logPrefix
,
ttlTbname
,
tstrerror
(
terrno
));
goto
_out
;
}
ret
=
ttlMgrConvert
(
pTtlMgr
->
pOldTtlIdx
,
pTtlMgr
->
pTtlIdx
,
pMeta
);
if
(
ret
<
0
)
{
metaError
(
"
failed to convert ttl index since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to convert ttl index since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
ret
=
tdbTbDropByName
(
ttlTbname
,
meta
->
pEnv
,
meta
->
txn
);
if
(
ret
<
0
)
{
metaError
(
"
failed to drop old ttl index since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to drop old ttl index since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
ret
=
ttlMgrFillCache
(
pTtlMgr
);
if
(
ret
<
0
)
{
metaError
(
"
failed to fill hash since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, failed to fill hash since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
int64_t
endNs
=
taosGetTimestampNs
();
metaInfo
(
"
ttl mgr upgrade end, hash size: %d, time consumed: %"
PRId64
" ns"
,
taosHashGetSize
(
pTtlMgr
->
pTtlCache
)
,
endNs
-
startNs
);
metaInfo
(
"
%s, ttl mgr upgrade end, hash size: %d, time consumed: %"
PRId64
" ns"
,
pTtlMgr
->
logPrefix
,
taosHashGetSize
(
pTtlMgr
->
pTtlCache
),
endNs
-
startNs
);
_out:
tdbTbClose
(
pTtlMgr
->
pOldTtlIdx
);
pTtlMgr
->
pOldTtlIdx
=
NULL
;
...
...
@@ -130,11 +138,12 @@ _out:
}
static
void
ttlMgrCleanup
(
STtlManger
*
pTtlMgr
)
{
taosMemoryFree
(
pTtlMgr
->
logPrefix
);
taosHashCleanup
(
pTtlMgr
->
pTtlCache
);
taosHashCleanup
(
pTtlMgr
->
pDirtyUids
);
tdbTbClose
(
pTtlMgr
->
pTtlIdx
);
taosThreadRwlockDestroy
(
&
pTtlMgr
->
lock
);
t
dbOs
Free
(
pTtlMgr
);
t
aosMemory
Free
(
pTtlMgr
);
}
static
void
ttlMgrBuildKey
(
STtlIdxKeyV1
*
pTtlKey
,
int64_t
ttlDays
,
int64_t
changeTimeMs
,
tb_uid_t
uid
)
{
...
...
@@ -250,13 +259,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
int
ret
=
taosHashPut
(
pTtlMgr
->
pTtlCache
,
&
updCtx
->
uid
,
sizeof
(
updCtx
->
uid
),
&
cacheEntry
,
sizeof
(
cacheEntry
));
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr insert failed to update ttl cache since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr insert failed to update ttl cache since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
ret
=
taosHashPut
(
pTtlMgr
->
pDirtyUids
,
&
updCtx
->
uid
,
sizeof
(
updCtx
->
uid
),
&
dirtryEntry
,
sizeof
(
dirtryEntry
));
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr insert failed to update ttl dirty uids since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr insert failed to update ttl dirty uids since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
...
...
@@ -264,20 +273,21 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
_out:
ttlMgrULock
(
pTtlMgr
);
metaDebug
(
"
ttl mgr insert ttl, uid: %"
PRId64
", ctime: %"
PRId64
", ttlDays: %"
PRId64
,
updCtx
->
uid
,
updCtx
->
changeTimeMs
,
updCtx
->
ttlDays
);
metaDebug
(
"
%s, ttl mgr insert ttl, uid: %"
PRId64
", ctime: %"
PRId64
", ttlDays: %"
PRId64
,
pTtlMgr
->
logPrefix
,
updCtx
->
uid
,
updCtx
->
changeTimeMs
,
updCtx
->
ttlDays
);
return
ret
;
}
int
ttlMgrDeleteTtl
(
STtlManger
*
pTtlMgr
,
const
STtlDelTtlCtx
*
delCtx
)
{
if
(
delCtx
->
ttlDays
==
0
)
return
0
;
ttlMgrWLock
(
pTtlMgr
);
STtlDirtyEntry
dirtryEntry
=
{.
type
=
ENTRY_TYPE_DEL
};
int
ret
=
taosHashPut
(
pTtlMgr
->
pDirtyUids
,
&
delCtx
->
uid
,
sizeof
(
delCtx
->
uid
),
&
dirtryEntry
,
sizeof
(
dirtryEntry
));
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr del failed to update ttl dirty uids since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr del failed to update ttl dirty uids since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
...
...
@@ -285,7 +295,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
_out:
ttlMgrULock
(
pTtlMgr
);
metaDebug
(
"
ttl mgr delete ttl, uid: %"
PRId64
,
delCtx
->
uid
);
metaDebug
(
"
%s, ttl mgr delete ttl, uid: %"
PRId64
,
pTtlMgr
->
logPrefix
,
delCtx
->
uid
);
return
ret
;
}
...
...
@@ -293,6 +303,8 @@ _out:
int
ttlMgrUpdateChangeTime
(
STtlManger
*
pTtlMgr
,
const
STtlUpdCtimeCtx
*
pUpdCtimeCtx
)
{
ttlMgrWLock
(
pTtlMgr
);
int
ret
=
0
;
STtlCacheEntry
*
oldData
=
taosHashGet
(
pTtlMgr
->
pTtlCache
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
));
if
(
oldData
==
NULL
)
{
goto
_out
;
...
...
@@ -301,17 +313,17 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
STtlCacheEntry
cacheEntry
=
{.
ttlDays
=
oldData
->
ttlDays
,
.
changeTimeMs
=
pUpdCtimeCtx
->
changeTimeMs
};
STtlDirtyEntry
dirtryEntry
=
{.
type
=
ENTRY_TYPE_UPSERT
};
int
ret
=
taosHashPut
(
pTtlMgr
->
pTtlCache
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
),
&
cacheEntry
,
sizeof
(
cacheEntry
));
ret
=
taosHashPut
(
pTtlMgr
->
pTtlCache
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
),
&
cacheEntry
,
sizeof
(
cacheEntry
));
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr update ctime failed to update ttl cache since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr update ctime failed to update ttl cache since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
ret
=
taosHashPut
(
pTtlMgr
->
pDirtyUids
,
&
pUpdCtimeCtx
->
uid
,
sizeof
(
pUpdCtimeCtx
->
uid
),
&
dirtryEntry
,
sizeof
(
dirtryEntry
));
if
(
ret
<
0
)
{
metaError
(
"ttlMgr update ctime failed to update ttl dirty uids since %s"
,
tstrerror
(
terrno
));
metaError
(
"%s, ttlMgr update ctime failed to update ttl dirty uids since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
...
...
@@ -319,7 +331,8 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
_out:
ttlMgrULock
(
pTtlMgr
);
metaDebug
(
"ttl mgr update ctime, uid: %"
PRId64
", ctime: %"
PRId64
,
pUpdCtimeCtx
->
uid
,
pUpdCtimeCtx
->
changeTimeMs
);
metaDebug
(
"%s, ttl mgr update ctime, uid: %"
PRId64
", ctime: %"
PRId64
,
pTtlMgr
->
logPrefix
,
pUpdCtimeCtx
->
uid
,
pUpdCtimeCtx
->
changeTimeMs
);
return
ret
;
}
...
...
@@ -366,7 +379,7 @@ _out:
int
ttlMgrFlush
(
STtlManger
*
pTtlMgr
,
TXN
*
pTxn
)
{
ttlMgrWLock
(
pTtlMgr
);
metaInfo
(
"
ttl mgr flush start."
);
metaInfo
(
"
%s, ttl mgr flush start. dirty uids:%d"
,
pTtlMgr
->
logPrefix
,
taosHashGetSize
(
pTtlMgr
->
pDirtyUids
)
);
int
ret
=
-
1
;
...
...
@@ -377,9 +390,9 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
STtlCacheEntry
*
cacheEntry
=
taosHashGet
(
pTtlMgr
->
pTtlCache
,
pUid
,
sizeof
(
*
pUid
));
if
(
cacheEntry
==
NULL
)
{
metaError
(
"
ttlMgr flush failed to get ttl cache since %s, uid: %"
PRId64
", type: %d"
,
tstrerror
(
terrno
),
*
pUid
,
pEntry
->
type
);
goto
_out
;
metaError
(
"
%s, ttlMgr flush failed to get ttl cache since %s, uid: %"
PRId64
", type: %d"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
),
*
pUid
,
pEntry
->
type
);
continue
;
}
STtlIdxKeyV1
ttlKey
;
...
...
@@ -389,27 +402,29 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ret
=
tdbTbUpsert
(
pTtlMgr
->
pTtlIdx
,
&
ttlKey
,
sizeof
(
ttlKey
),
&
cacheEntry
->
ttlDays
,
sizeof
(
cacheEntry
->
ttlDays
),
pTxn
);
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr flush failed to flush ttl cache upsert since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr flush failed to flush ttl cache upsert since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
}
else
if
(
pEntry
->
type
==
ENTRY_TYPE_DEL
)
{
ret
=
tdbTbDelete
(
pTtlMgr
->
pTtlIdx
,
&
ttlKey
,
sizeof
(
ttlKey
),
pTxn
);
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr flush failed to flush ttl cache del since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr flush failed to flush ttl cache del since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
ret
=
taosHashRemove
(
pTtlMgr
->
pTtlCache
,
pUid
,
sizeof
(
*
pUid
));
if
(
ret
<
0
)
{
metaError
(
"
ttlMgr flush failed to delete ttl cache since %s"
,
tstrerror
(
terrno
));
metaError
(
"
%s, ttlMgr flush failed to delete ttl cache since %s"
,
pTtlMgr
->
logPrefix
,
tstrerror
(
terrno
));
goto
_out
;
}
}
else
{
metaError
(
"
ttlMgr flush failed to flush ttl cache, unknown type: %d"
,
pEntry
->
type
);
metaError
(
"
%s, ttlMgr flush failed to flush ttl cache, unknown type: %d"
,
pTtlMgr
->
logPrefix
,
pEntry
->
type
);
goto
_out
;
}
pIter
=
taosHashIterate
(
pTtlMgr
->
pDirtyUids
,
pIter
);
void
*
pIterTmp
=
pIter
;
pIter
=
taosHashIterate
(
pTtlMgr
->
pDirtyUids
,
pIterTmp
);
taosHashRemove
(
pTtlMgr
->
pDirtyUids
,
pUid
,
sizeof
(
tb_uid_t
));
}
taosHashClear
(
pTtlMgr
->
pDirtyUids
);
...
...
@@ -418,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
_out:
ttlMgrULock
(
pTtlMgr
);
metaInfo
(
"
ttl mgr flush end."
);
metaInfo
(
"
%s, ttl mgr flush end."
,
pTtlMgr
->
logPrefix
);
return
ret
;
}
...
...
@@ -426,7 +441,7 @@ _out:
static
int32_t
ttlMgrRLock
(
STtlManger
*
pTtlMgr
)
{
int32_t
ret
=
0
;
metaTrace
(
"
ttlMgr rlock %p"
,
&
pTtlMgr
->
lock
);
metaTrace
(
"
%s, ttlMgr rlock %p"
,
pTtlMgr
->
logPrefix
,
&
pTtlMgr
->
lock
);
ret
=
taosThreadRwlockRdlock
(
&
pTtlMgr
->
lock
);
...
...
@@ -436,7 +451,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
static
int32_t
ttlMgrWLock
(
STtlManger
*
pTtlMgr
)
{
int32_t
ret
=
0
;
metaTrace
(
"
ttlMgr wlock %p"
,
&
pTtlMgr
->
lock
);
metaTrace
(
"
%s, ttlMgr wlock %p"
,
pTtlMgr
->
logPrefix
,
&
pTtlMgr
->
lock
);
ret
=
taosThreadRwlockWrlock
(
&
pTtlMgr
->
lock
);
...
...
@@ -446,7 +461,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
static
int32_t
ttlMgrULock
(
STtlManger
*
pTtlMgr
)
{
int32_t
ret
=
0
;
metaTrace
(
"
ttlMgr ulock %p"
,
&
pTtlMgr
->
lock
);
metaTrace
(
"
%s, ttlMgr ulock %p"
,
pTtlMgr
->
logPrefix
,
&
pTtlMgr
->
lock
);
ret
=
taosThreadRwlockUnlock
(
&
pTtlMgr
->
lock
);
...
...
source/dnode/vnode/src/sma/smaUtil.c
浏览文件 @
4b96bc71
...
...
@@ -26,7 +26,7 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN
int32_t
offset
=
0
;
// vnode
vnodeGetPrimaryDir
(
pVnode
->
path
,
pTfs
,
outputName
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
p
Vnode
->
diskPrimary
,
p
Tfs
,
outputName
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
outputName
);
// rsma
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
4b96bc71
...
...
@@ -1137,14 +1137,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// wait for the stream task get ready for scan history data
while
(((
pStreamTask
->
status
.
downstreamReady
==
0
)
&&
(
pStreamTask
->
status
.
taskStatus
!=
TASK_STATUS__STOP
))
||
pStreamTask
->
status
.
taskStatus
==
TASK_STATUS__SCAN_HISTORY
)
{
tqDebug
(
"s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms"
,
pId
,
pTask
->
info
.
taskLevel
,
pId
);
tqDebug
(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms"
,
pId
,
pTask
->
info
.
taskLevel
,
pStreamTask
->
id
.
idStr
,
streamGetTaskStatusStr
(
pStreamTask
->
status
.
taskStatus
));
taosMsleep
(
100
);
}
// now we can stop the stream task execution
pStreamTask
->
status
.
taskStatus
=
TASK_STATUS__HALT
;
tqDebug
(
"s-task:%s level:%d status is set to halt by history scan task:%s"
,
p
Id
,
tqDebug
(
"s-task:%s level:%d status is set to halt by history scan task:%s"
,
p
StreamTask
->
id
.
idStr
,
pStreamTask
->
info
.
taskLevel
,
pId
);
// if it's an source task, extract the last version in wal.
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
4b96bc71
...
...
@@ -61,7 +61,7 @@ typedef struct {
static
void
tsdbGetRocksPath
(
STsdb
*
pTsdb
,
char
*
path
)
{
SVnode
*
pVnode
=
pTsdb
->
pVnode
;
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
pTfs
,
path
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
path
,
TSDB_FILENAME_LEN
);
int32_t
offset
=
strlen
(
path
);
snprintf
(
path
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%scache.rdb"
,
TD_DIRSEP
);
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
4b96bc71
...
...
@@ -276,14 +276,14 @@ void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
// CURRENT
if
(
current
)
{
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
pTfs
,
current
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
current
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
current
);
snprintf
(
current
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%sCURRENT"
,
TD_DIRSEP
);
}
// CURRENT.t
if
(
current_t
)
{
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
pTfs
,
current_t
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
current_t
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
current_t
);
snprintf
(
current_t
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%sCURRENT.t"
,
TD_DIRSEP
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
4b96bc71
...
...
@@ -284,8 +284,9 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
// SDelFile ===============================================
void
tsdbDelFileName
(
STsdb
*
pTsdb
,
SDelFile
*
pFile
,
char
fname
[])
{
int32_t
offset
=
0
;
SVnode
*
pVnode
=
pTsdb
->
pVnode
;
vnodeGetPrimaryDir
(
pTsdb
->
path
,
p
Tsdb
->
pVnode
->
pTfs
,
fname
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pTsdb
->
path
,
p
Vnode
->
diskPrimary
,
pVnode
->
pTfs
,
fname
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
fname
);
snprintf
((
char
*
)
fname
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%sv%dver%"
PRId64
".del"
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
pFile
->
commitID
);
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
4b96bc71
...
...
@@ -295,7 +295,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
pInfo
->
txn
=
metaGetTxn
(
pVnode
->
pMeta
);
// save info
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vDebug
(
"vgId:%d, save config while prepare commit"
,
TD_VID
(
pVnode
));
if
(
vnodeSaveInfo
(
dir
,
&
pInfo
->
info
)
<
0
)
{
...
...
@@ -433,7 +433,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
return
-
1
;
}
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
syncBeginSnapshot
(
pVnode
->
sync
,
pInfo
->
info
.
state
.
committed
);
...
...
@@ -496,7 +496,7 @@ bool vnodeShouldRollback(SVnode *pVnode) {
char
tFName
[
TSDB_FILENAME_LEN
]
=
{
0
};
int32_t
offset
=
0
;
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
tFName
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
tFName
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
tFName
);
snprintf
(
tFName
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%s%s"
,
TD_DIRSEP
,
VND_INFO_FNAME_TMP
);
...
...
@@ -507,7 +507,7 @@ void vnodeRollback(SVnode *pVnode) {
char
tFName
[
TSDB_FILENAME_LEN
]
=
{
0
};
int32_t
offset
=
0
;
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
tFName
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
tFName
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
tFName
);
snprintf
(
tFName
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%s%s"
,
TD_DIRSEP
,
VND_INFO_FNAME_TMP
);
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
4b96bc71
...
...
@@ -15,9 +15,11 @@
#include "vnd.h"
int32_t
vnodeGetPrimaryDir
(
const
char
*
relPath
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
)
{
int32_t
vnodeGetPrimaryDir
(
const
char
*
relPath
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
)
{
if
(
pTfs
)
{
snprintf
(
buf
,
bufLen
-
1
,
"%s%s%s"
,
tfsGetPrimaryPath
(
pTfs
),
TD_DIRSEP
,
relPath
);
SDiskID
diskId
=
{
0
};
diskId
.
id
=
diskPrimary
;
snprintf
(
buf
,
bufLen
-
1
,
"%s%s%s"
,
tfsGetDiskPath
(
pTfs
,
diskId
),
TD_DIRSEP
,
relPath
);
}
else
{
snprintf
(
buf
,
bufLen
-
1
,
"%s"
,
relPath
);
}
...
...
@@ -25,7 +27,15 @@ int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bu
return
0
;
}
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
STfs
*
pTfs
)
{
static
int32_t
vnodeMkDir
(
STfs
*
pTfs
,
const
char
*
path
)
{
if
(
pTfs
)
{
return
tfsMkdirRecur
(
pTfs
,
path
);
}
else
{
return
taosMkDir
(
path
);
}
}
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
...
...
@@ -36,10 +46,11 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
}
// create vnode env
vnodeGetPrimaryDir
(
path
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
taosMkDir
(
dir
))
{
if
(
vnodeMkDir
(
pTfs
,
path
))
{
vError
(
"vgId:%d, failed to prepare vnode dir since %s, path: %s"
,
pCfg
->
vgId
,
strerror
(
errno
),
path
);
return
TAOS_SYSTEM_ERROR
(
errno
);
}
vnodeGetPrimaryDir
(
path
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
pCfg
)
{
info
.
config
=
*
pCfg
;
...
...
@@ -60,12 +71,12 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
return
0
;
}
int32_t
vnodeAlterReplica
(
const
char
*
path
,
SAlterVnodeReplicaReq
*
pReq
,
STfs
*
pTfs
)
{
int32_t
vnodeAlterReplica
(
const
char
*
path
,
SAlterVnodeReplicaReq
*
pReq
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
int32_t
ret
=
0
;
vnodeGetPrimaryDir
(
path
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
path
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
ret
=
vnodeLoadInfo
(
dir
,
&
info
);
if
(
ret
<
0
)
{
...
...
@@ -133,7 +144,8 @@ static int32_t vnodeVgroupIdLen(int32_t vgId) {
return
strlen
(
tmp
);
}
int32_t
vnodeRenameVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
STfs
*
pTfs
)
{
int32_t
vnodeRenameVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
int32_t
ret
=
0
;
char
oldRname
[
TSDB_FILENAME_LEN
]
=
{
0
};
...
...
@@ -164,7 +176,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
snprintf
(
newRname
,
TSDB_FILENAME_LEN
,
"%s%d%s"
,
oldRname
,
dstVgId
,
tsdbFileSurfixPos
);
vInfo
(
"vgId:%d, rename file from %s to %s"
,
dstVgId
,
tsdbFile
->
rname
,
newRname
);
ret
=
tfsRename
(
pTfs
,
tsdbFile
->
rname
,
newRname
);
ret
=
tfsRename
(
pTfs
,
diskPrimary
,
tsdbFile
->
rname
,
newRname
);
if
(
ret
!=
0
)
{
vError
(
"vgId:%d, failed to rename file from %s to %s since %s"
,
dstVgId
,
tsdbFile
->
rname
,
newRname
,
terrstr
());
tfsClosedir
(
tsdbDir
);
...
...
@@ -176,19 +188,20 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
tfsClosedir
(
tsdbDir
);
vInfo
(
"vgId:%d, rename dir from %s to %s"
,
dstVgId
,
srcPath
,
dstPath
);
ret
=
tfsRename
(
pTfs
,
srcPath
,
dstPath
);
ret
=
tfsRename
(
pTfs
,
diskPrimary
,
srcPath
,
dstPath
);
if
(
ret
!=
0
)
{
vError
(
"vgId:%d, failed to rename dir from %s to %s since %s"
,
dstVgId
,
srcPath
,
dstPath
,
terrstr
());
}
return
ret
;
}
int32_t
vnodeAlterHashRange
(
const
char
*
srcPath
,
const
char
*
dstPath
,
SAlterVnodeHashRangeReq
*
pReq
,
STfs
*
pTfs
)
{
int32_t
vnodeAlterHashRange
(
const
char
*
srcPath
,
const
char
*
dstPath
,
SAlterVnodeHashRangeReq
*
pReq
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
int32_t
ret
=
0
;
vnodeGetPrimaryDir
(
srcPath
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
srcPath
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
ret
=
vnodeLoadInfo
(
dir
,
&
info
);
if
(
ret
<
0
)
{
...
...
@@ -232,7 +245,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
}
vInfo
(
"vgId:%d, rename %s to %s"
,
pReq
->
dstVgId
,
srcPath
,
dstPath
);
ret
=
vnodeRenameVgroupId
(
srcPath
,
dstPath
,
pReq
->
srcVgId
,
pReq
->
dstVgId
,
pTfs
);
ret
=
vnodeRenameVgroupId
(
srcPath
,
dstPath
,
pReq
->
srcVgId
,
pReq
->
dstVgId
,
diskPrimary
,
pTfs
);
if
(
ret
<
0
)
{
vError
(
"vgId:%d, failed to rename vnode from %s to %s since %s"
,
pReq
->
dstVgId
,
srcPath
,
dstPath
,
tstrerror
(
terrno
));
...
...
@@ -243,11 +256,12 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
return
0
;
}
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
STfs
*
pTfs
)
{
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodeGetPrimaryDir
(
dstPath
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
dstPath
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
vnodeLoadInfo
(
dir
,
&
info
)
==
0
)
{
if
(
info
.
config
.
vgId
!=
dstVgId
)
{
vError
(
"vgId:%d, unexpected vnode config.vgId:%d"
,
dstVgId
,
info
.
config
.
vgId
);
...
...
@@ -256,7 +270,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
return
dstVgId
;
}
vnodeGetPrimaryDir
(
srcPath
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
srcPath
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
vnodeLoadInfo
(
dir
,
&
info
)
<
0
)
{
vError
(
"vgId:%d, failed to read vnode config from %s since %s"
,
srcVgId
,
srcPath
,
tstrerror
(
terrno
));
return
-
1
;
...
...
@@ -271,7 +285,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
}
vInfo
(
"vgId:%d, rename %s to %s"
,
dstVgId
,
srcPath
,
dstPath
);
if
(
vnodeRenameVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
pTfs
)
<
0
)
{
if
(
vnodeRenameVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
diskPrimary
,
pTfs
)
<
0
)
{
vError
(
"vgId:%d, failed to rename vnode from %s to %s since %s"
,
dstVgId
,
srcPath
,
dstPath
,
tstrerror
(
terrno
));
return
-
1
;
}
...
...
@@ -284,14 +298,31 @@ void vnodeDestroy(const char *path, STfs *pTfs) {
tfsRmdir
(
pTfs
,
path
);
}
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
)
{
static
int32_t
vnodeCheckDisk
(
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
int32_t
ndisk
=
1
;
if
(
pTfs
)
{
ndisk
=
tfsGetDisksAtLevel
(
pTfs
,
0
);
}
if
(
diskPrimary
<
0
||
diskPrimary
>=
ndisk
)
{
vError
(
"disk:%d is unavailable from the %d disks mounted at level 0"
,
diskPrimary
,
ndisk
);
terrno
=
TSDB_CODE_FS_INVLD_CFG
;
return
-
1
;
}
return
0
;
}
SVnode
*
vnodeOpen
(
const
char
*
path
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
SMsgCb
msgCb
)
{
SVnode
*
pVnode
=
NULL
;
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tdir
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
int32_t
ret
=
0
;
vnodeGetPrimaryDir
(
path
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
vnodeCheckDisk
(
diskPrimary
,
pTfs
))
{
vError
(
"failed to open vnode from %s since %s. diskPrimary:%d"
,
path
,
terrstr
(),
diskPrimary
);
return
NULL
;
}
vnodeGetPrimaryDir
(
path
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
info
.
config
=
vnodeCfgDefault
;
...
...
@@ -334,6 +365,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode
->
state
.
applied
=
info
.
state
.
committed
;
pVnode
->
state
.
applyTerm
=
info
.
state
.
commitTerm
;
pVnode
->
pTfs
=
pTfs
;
pVnode
->
diskPrimary
=
diskPrimary
;
pVnode
->
msgCb
=
msgCb
;
taosThreadMutexInit
(
&
pVnode
->
lock
,
NULL
);
pVnode
->
blocked
=
false
;
...
...
source/dnode/vnode/src/vnd/vnodeSnapshot.c
浏览文件 @
4b96bc71
...
...
@@ -86,6 +86,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t
vnodeSnapRead
(
SVSnapReader
*
pReader
,
uint8_t
**
ppData
,
uint32_t
*
nData
)
{
int32_t
code
=
0
;
SVnode
*
pVnode
=
pReader
->
pVnode
;
// CONFIG ==============
// FIXME: if commit multiple times and the config changed?
...
...
@@ -93,7 +94,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
char
fName
[
TSDB_FILENAME_LEN
];
int32_t
offset
=
0
;
vnodeGetPrimaryDir
(
p
Reader
->
pVnode
->
path
,
pReader
->
pVnode
->
pTfs
,
fName
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
p
Vnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
fName
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
fName
);
snprintf
(
fName
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%s%s"
,
TD_DIRSEP
,
VND_INFO_FNAME
);
...
...
@@ -343,7 +344,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
.
applyTerm
=
pWriter
->
info
.
state
.
commitTerm
};
pVnode
->
statis
=
pWriter
->
info
.
statis
;
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeCommitInfo
(
dir
);
}
else
{
...
...
@@ -381,7 +382,7 @@ _exit:
static
int32_t
vnodeSnapWriteInfo
(
SVSnapWriter
*
pWriter
,
uint8_t
*
pData
,
uint32_t
nData
)
{
int32_t
code
=
0
;
SVnode
*
pVnode
=
pWriter
->
pVnode
;
SSnapDataHdr
*
pHdr
=
(
SSnapDataHdr
*
)
pData
;
// decode info
...
...
@@ -395,10 +396,9 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
// modify info as needed
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodeGetPrimaryDir
(
p
Writer
->
pVnode
->
path
,
pWriter
->
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
p
Vnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
SVnodeStats
vndStats
=
pWriter
->
info
.
config
.
vndStats
;
SVnode
*
pVnode
=
pWriter
->
pVnode
;
pWriter
->
info
.
config
=
pVnode
->
config
;
pWriter
->
info
.
config
.
vndStats
=
vndStats
;
vDebug
(
"vgId:%d, save config while write snapshot"
,
pWriter
->
pVnode
->
config
.
vgId
);
...
...
source/libs/command/src/command.c
浏览文件 @
4b96bc71
...
...
@@ -615,6 +615,31 @@ void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg*
if
(
pCfg
->
ttl
>
0
)
{
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
" TTL %d"
,
pCfg
->
ttl
);
}
if
(
TSDB_SUPER_TABLE
==
pCfg
->
tableType
||
TSDB_NORMAL_TABLE
==
pCfg
->
tableType
)
{
int32_t
nSma
=
0
;
for
(
int32_t
i
=
0
;
i
<
pCfg
->
numOfColumns
;
++
i
)
{
if
(
IS_BSMA_ON
(
pCfg
->
pSchemas
+
i
))
{
++
nSma
;
}
}
if
(
nSma
<
pCfg
->
numOfColumns
)
{
bool
smaOn
=
false
;
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
" SMA("
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
numOfColumns
;
++
i
)
{
if
(
IS_BSMA_ON
(
pCfg
->
pSchemas
+
i
))
{
if
(
smaOn
)
{
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
",`%s`"
,
(
pCfg
->
pSchemas
+
i
)
->
name
);
}
else
{
smaOn
=
true
;
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
"`%s`"
,
(
pCfg
->
pSchemas
+
i
)
->
name
);
}
}
}
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
")"
);
}
}
}
static
int32_t
setCreateTBResultIntoDataBlock
(
SSDataBlock
*
pBlock
,
SDbCfgInfo
*
pDbCfg
,
char
*
tbName
,
STableCfg
*
pCfg
)
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
4b96bc71
...
...
@@ -4036,7 +4036,7 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
bool
compareStateKey
(
void
*
data
,
void
*
key
)
{
if
(
!
data
||
!
key
)
{
return
tru
e
;
return
fals
e
;
}
SStateKeys
*
stateKey
=
(
SStateKeys
*
)
key
;
stateKey
->
pData
=
(
char
*
)
key
+
sizeof
(
SStateKeys
);
...
...
@@ -4062,7 +4062,13 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if
(
code
==
TSDB_CODE_SUCCESS
&&
!
inWinRange
(
&
pAggSup
->
winRange
,
&
pCurWin
->
winInfo
.
sessionWin
.
win
))
{
code
=
TSDB_CODE_FAILED
;
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
pCurWin
->
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
pCurWin
->
winInfo
.
pOutputBuf
=
taosMemoryMalloc
(
size
);
pCurWin
->
winInfo
.
pOutputBuf
=
taosMemoryCalloc
(
1
,
size
);
pCurWin
->
pStateKey
=
(
SStateKeys
*
)((
char
*
)
pCurWin
->
winInfo
.
pOutputBuf
+
(
pAggSup
->
resultRowSize
-
pAggSup
->
stateKeySize
));
pCurWin
->
pStateKey
->
bytes
=
pAggSup
->
stateKeySize
-
sizeof
(
SStateKeys
);
pCurWin
->
pStateKey
->
type
=
pAggSup
->
stateKeyType
;
pCurWin
->
pStateKey
->
pData
=
(
char
*
)
pCurWin
->
pStateKey
+
sizeof
(
SStateKeys
);
pCurWin
->
pStateKey
->
isNull
=
false
;
}
if
(
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4076,11 +4082,19 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
}
pNextWin
->
winInfo
.
sessionWin
=
pCurWin
->
winInfo
.
sessionWin
;
pNextWin
->
winInfo
.
pOutputBuf
=
NULL
;
SStreamStateCur
*
pCur
=
pAggSup
->
stateStore
.
streamStateSessionSeekKeyNext
(
pAggSup
->
pState
,
&
pCurWin
->
winInfo
.
sessionWin
)
;
code
=
pAggSup
->
stateStore
.
streamStateSessionGetKVByCur
(
pCur
,
&
pNextWin
->
winInfo
.
sessionWin
,
NULL
,
0
);
SStreamStateCur
*
pCur
=
pAggSup
->
stateStore
.
streamStateSessionSeekKeyNext
(
pAggSup
->
pState
,
&
pNextWin
->
winInfo
.
sessionWin
)
;
int32_t
nextSize
=
pAggSup
->
resultRowSize
;
code
=
pAggSup
->
stateStore
.
streamStateSessionGetKVByCur
(
pCur
,
&
pNextWin
->
winInfo
.
sessionWin
,
&
pNextWin
->
winInfo
.
pOutputBuf
,
&
nextSize
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
SET_SESSION_WIN_INVALID
(
pNextWin
->
winInfo
);
}
else
{
pNextWin
->
pStateKey
=
(
SStateKeys
*
)((
char
*
)
pNextWin
->
winInfo
.
pOutputBuf
+
(
pAggSup
->
resultRowSize
-
pAggSup
->
stateKeySize
));
pNextWin
->
pStateKey
->
bytes
=
pAggSup
->
stateKeySize
-
sizeof
(
SStateKeys
);
pNextWin
->
pStateKey
->
type
=
pAggSup
->
stateKeyType
;
pNextWin
->
pStateKey
->
pData
=
(
char
*
)
pNextWin
->
pStateKey
+
sizeof
(
SStateKeys
);
pNextWin
->
pStateKey
->
isNull
=
false
;
pNextWin
->
winInfo
.
isOutput
=
true
;
}
pAggSup
->
stateStore
.
streamStateFreeCur
(
pCur
);
}
...
...
@@ -4156,6 +4170,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStateWindowInfo
curWin
=
{
0
};
SStateWindowInfo
nextWin
=
{
0
};
setStateOutputBuf
(
pAggSup
,
tsCols
[
i
],
groupId
,
pKeyData
,
&
curWin
,
&
nextWin
);
if
(
IS_VALID_SESSION_WIN
(
nextWin
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
nextWin
.
winInfo
.
pOutputBuf
,
&
pAPI
->
stateStore
);
}
setSessionWinOutputInfo
(
pSeUpdated
,
&
curWin
.
winInfo
);
winRows
=
updateStateWindowInfo
(
&
curWin
,
&
nextWin
,
tsCols
,
groupId
,
pKeyColInfo
,
rows
,
i
,
&
allEqual
,
pAggSup
->
pResultRows
,
pSeUpdated
,
pStDeleted
);
...
...
@@ -4346,9 +4363,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
for
(
int32_t
i
=
0
;
i
<
num
;
i
++
)
{
SStateWindowInfo
curInfo
=
{
0
};
SStateWindowInfo
nextInfo
=
{
0
};
SStateWindowInfo
dummy
=
{
0
};
setStateOutputBuf
(
pAggSup
,
pSeKeyBuf
[
i
].
win
.
skey
,
pSeKeyBuf
[
i
].
groupId
,
NULL
,
&
curInfo
,
&
nextInfo
);
if
(
compareStateKey
(
curInfo
.
pStateKey
,
nextInfo
.
pStateKey
))
{
compactStateWindow
(
pOperator
,
&
curInfo
.
winInfo
,
&
nextInfo
.
winInfo
,
pInfo
->
pStUpdated
,
pInfo
->
pStDeleted
);
saveResult
(
curInfo
.
winInfo
,
pInfo
->
pStUpdated
);
}
if
(
IS_VALID_SESSION_WIN
(
curInfo
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
curInfo
.
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
}
if
(
IS_VALID_SESSION_WIN
(
nextInfo
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
nextInfo
.
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
}
}
taosMemoryFree
(
pBuf
);
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
4b96bc71
...
...
@@ -201,6 +201,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if
(
left
==
0
)
{
taosArrayDestroy
(
pTask
->
checkReqIds
);
pTask
->
checkReqIds
=
NULL
;
pTask
->
status
.
downstreamReady
=
1
;
if
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__SCAN_HISTORY
)
{
qDebug
(
"s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s"
,
id
,
...
...
source/libs/tfs/src/tfs.c
浏览文件 @
4b96bc71
...
...
@@ -113,6 +113,15 @@ SDiskSize tfsGetSize(STfs *pTfs) {
return
size
;
}
int32_t
tfsGetDisksAtLevel
(
STfs
*
pTfs
,
int32_t
level
)
{
if
(
level
<
0
||
level
>=
pTfs
->
nlevel
)
{
return
0
;
}
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
return
pTier
->
ndisk
;
}
int32_t
tfsGetLevel
(
STfs
*
pTfs
)
{
return
pTfs
->
nlevel
;
}
int32_t
tfsAllocDisk
(
STfs
*
pTfs
,
int32_t
expLevel
,
SDiskID
*
pDiskId
)
{
...
...
@@ -272,6 +281,20 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId) {
return
0
;
}
int32_t
tfsMkdirRecur
(
STfs
*
pTfs
,
const
char
*
rname
)
{
for
(
int32_t
level
=
0
;
level
<
pTfs
->
nlevel
;
level
++
)
{
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
for
(
int32_t
id
=
0
;
id
<
pTier
->
ndisk
;
id
++
)
{
SDiskID
did
=
{.
id
=
id
,
.
level
=
level
};
if
(
tfsMkdirRecurAt
(
pTfs
,
rname
,
did
)
<
0
)
{
return
-
1
;
}
}
}
return
0
;
}
int32_t
tfsMkdir
(
STfs
*
pTfs
,
const
char
*
rname
)
{
for
(
int32_t
level
=
0
;
level
<
pTfs
->
nlevel
;
level
++
)
{
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
...
...
@@ -314,25 +337,60 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
return
0
;
}
int32_t
tfsRename
(
STfs
*
pTfs
,
const
char
*
orname
,
const
char
*
nrname
)
{
static
int32_t
tfsRenameAt
(
STfs
*
pTfs
,
SDiskID
diskId
,
const
char
*
orname
,
const
char
*
nrname
)
{
char
oaname
[
TMPNAME_LEN
]
=
"
\0
"
;
char
naname
[
TMPNAME_LEN
]
=
"
\0
"
;
int32_t
level
=
diskId
.
level
;
int32_t
id
=
diskId
.
id
;
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
STfsDisk
*
pDisk
=
pTier
->
disks
[
id
];
snprintf
(
oaname
,
TMPNAME_LEN
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
orname
);
snprintf
(
naname
,
TMPNAME_LEN
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
nrname
);
if
(
taosRenameFile
(
oaname
,
naname
)
!=
0
&&
errno
!=
ENOENT
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
fError
(
"failed to rename %s to %s since %s"
,
oaname
,
naname
,
terrstr
());
return
-
1
;
}
return
0
;
}
int32_t
tfsRename
(
STfs
*
pTfs
,
int32_t
diskPrimary
,
const
char
*
orname
,
const
char
*
nrname
)
{
for
(
int32_t
level
=
pTfs
->
nlevel
-
1
;
level
>=
0
;
level
--
)
{
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
for
(
int32_t
id
=
pTier
->
ndisk
-
1
;
id
>=
0
;
id
--
)
{
STfsDisk
*
pDisk
=
pTier
->
disks
[
id
];
snprintf
(
oaname
,
TMPNAME_LEN
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
orname
)
;
snprintf
(
naname
,
TMPNAME_LEN
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
nrname
);
if
(
taosRenameFile
(
oaname
,
naname
)
!=
0
&&
errno
!=
ENOENT
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
)
;
fError
(
"failed to rename %s to %s since %s"
,
oaname
,
naname
,
terrstr
());
if
(
level
==
0
&&
id
==
diskPrimary
)
{
continue
;
}
SDiskID
diskId
=
{.
level
=
level
,
.
id
=
id
}
;
if
(
tfsRenameAt
(
pTfs
,
diskId
,
orname
,
nrname
))
{
return
-
1
;
}
}
}
return
0
;
SDiskID
diskId
=
{.
level
=
0
,
.
id
=
diskPrimary
};
return
tfsRenameAt
(
pTfs
,
diskId
,
orname
,
nrname
);
}
int32_t
tfsSearch
(
STfs
*
pTfs
,
int32_t
level
,
const
char
*
fname
)
{
if
(
level
<
0
||
level
>=
pTfs
->
nlevel
)
{
return
-
1
;
}
char
path
[
TMPNAME_LEN
]
=
{
0
};
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
for
(
int32_t
id
=
0
;
id
<
pTier
->
ndisk
;
id
++
)
{
STfsDisk
*
pDisk
=
pTier
->
disks
[
id
];
snprintf
(
path
,
TMPNAME_LEN
-
1
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
fname
);
if
(
taosCheckExistFile
(
path
))
{
return
id
;
}
}
return
-
1
;
}
STfsDir
*
tfsOpendir
(
STfs
*
pTfs
,
const
char
*
rname
)
{
...
...
source/libs/tfs/test/tfsTest.cpp
浏览文件 @
4b96bc71
...
...
@@ -156,7 +156,7 @@ TEST_F(TfsTest, 03_Dir) {
EXPECT_NE
(
taosDirExist
(
ap4
),
1
);
EXPECT_EQ
(
tfsMkdirRecurAt
(
pTfs
,
p4
,
did
),
0
);
EXPECT_EQ
(
taosDirExist
(
ap4
),
1
);
EXPECT_EQ
(
tfsRename
(
pTfs
,
p44
,
p45
),
0
);
EXPECT_EQ
(
tfsRename
(
pTfs
,
0
,
p44
,
p45
),
0
);
EXPECT_EQ
(
tfsRmdir
(
pTfs
,
p4
),
0
);
EXPECT_NE
(
taosDirExist
(
ap4
),
1
);
...
...
@@ -609,7 +609,7 @@ TEST_F(TfsTest, 05_MultiDisk) {
EXPECT_NE
(
taosDirExist
(
_ap22
),
1
);
EXPECT_EQ
(
tfsMkdirRecurAt
(
pTfs
,
p4
,
did
),
0
);
EXPECT_EQ
(
taosDirExist
(
_ap22
),
1
);
EXPECT_EQ
(
tfsRename
(
pTfs
,
p44
,
p45
),
0
);
EXPECT_EQ
(
tfsRename
(
pTfs
,
0
,
p44
,
p45
),
0
);
EXPECT_EQ
(
tfsRmdir
(
pTfs
,
p4
),
0
);
EXPECT_NE
(
taosDirExist
(
_ap22
),
1
);
}
...
...
@@ -721,4 +721,4 @@ TEST_F(TfsTest, 05_MultiDisk) {
}
tfsClose
(
pTfs
);
}
\ No newline at end of file
}
source/os/src/osSysinfo.c
浏览文件 @
4b96bc71
...
...
@@ -327,17 +327,19 @@ bool getWinVersionReleaseName(char *releaseName, int32_t maxLen) {
}
#endif
int32_t
taosGetOsReleaseName
(
char
*
releaseName
,
int32_t
maxLen
)
{
int32_t
taosGetOsReleaseName
(
char
*
releaseName
,
char
*
sName
,
char
*
ver
,
int32_t
maxLen
)
{
#ifdef WINDOWS
if
(
!
getWinVersionReleaseName
(
releaseName
,
maxLen
))
{
snprintf
(
releaseName
,
maxLen
,
"Windows"
);
}
if
(
sName
)
snprintf
(
sName
,
maxLen
,
"Windows"
);
return
0
;
#elif defined(_TD_DARWIN_64)
char
osversion
[
32
];
size_t
osversion_len
=
sizeof
(
osversion
)
-
1
;
int
osversion_name
[]
=
{
CTL_KERN
,
KERN_OSRELEASE
};
if
(
sName
)
snprintf
(
sName
,
maxLen
,
"macOS"
);
if
(
sysctl
(
osversion_name
,
2
,
osversion
,
&
osversion_len
,
NULL
,
0
)
==
-
1
)
{
return
-
1
;
}
...
...
@@ -357,24 +359,35 @@ int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen) {
return
0
;
#else
char
line
[
1024
];
char
*
dest
=
NULL
;
size_t
size
=
0
;
int32_t
code
=
-
1
;
int32_t
cnt
=
0
;
TdFilePtr
pFile
=
taosOpenFile
(
"/etc/os-release"
,
TD_FILE_READ
|
TD_FILE_STREAM
);
if
(
pFile
==
NULL
)
return
fals
e
;
if
(
pFile
==
NULL
)
return
cod
e
;
while
((
size
=
taosGetsFile
(
pFile
,
sizeof
(
line
),
line
))
!=
-
1
)
{
line
[
size
-
1
]
=
'\0'
;
if
(
strncmp
(
line
,
"PRETTY_NAME"
,
11
)
==
0
)
{
const
char
*
p
=
strchr
(
line
,
'='
)
+
1
;
if
(
*
p
==
'"'
)
{
p
++
;
line
[
size
-
2
]
=
0
;
}
tstrncpy
(
releaseName
,
p
,
maxLen
);
if
(
strncmp
(
line
,
"NAME"
,
4
)
==
0
)
{
dest
=
sName
;
}
else
if
(
strncmp
(
line
,
"PRETTY_NAME"
,
11
)
==
0
)
{
dest
=
releaseName
;
code
=
0
;
break
;
}
else
if
(
strncmp
(
line
,
"VERSION_ID"
,
10
)
==
0
)
{
dest
=
ver
;
}
else
{
continue
;
}
if
(
!
dest
)
continue
;
const
char
*
p
=
strchr
(
line
,
'='
)
+
1
;
if
(
*
p
==
'"'
)
{
p
++
;
line
[
size
-
2
]
=
0
;
}
tstrncpy
(
dest
,
p
,
maxLen
);
if
(
++
cnt
>=
3
)
break
;
}
taosCloseFile
(
&
pFile
);
...
...
source/os/test/osTests.cpp
浏览文件 @
4b96bc71
...
...
@@ -37,7 +37,7 @@ TEST(osTest, osSystem) {
const
int
sysLen
=
64
;
char
osSysName
[
sysLen
];
int
ret
=
taosGetOsReleaseName
(
osSysName
,
sysLen
);
int
ret
=
taosGetOsReleaseName
(
osSysName
,
NULL
,
NULL
,
sysLen
);
printf
(
"os systeme name:%s
\n
"
,
osSysName
);
ASSERT_EQ
(
ret
,
0
);
}
...
...
source/util/src/terror.c
浏览文件 @
4b96bc71
...
...
@@ -329,6 +329,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE, "Please use this comma
// vnode
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_VGROUP_ID
,
"Vnode is closed or removed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INIT_FAILED
,
"Vnode init failure"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_WRITE_AUTH
,
"Database write operation denied"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NOT_EXIST
,
"Vnode not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_ALREADY_EXIST
,
"Vnode already exist"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录