Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a32c6502
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a32c6502
编写于
7月 19, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
上级
2ae95217
fd02b9a7
变更
28
隐藏空白更改
内联
并排
Showing
28 changed file
with
374 addition
and
89 deletion
+374
-89
include/libs/tfs/tfs.h
include/libs/tfs/tfs.h
+27
-1
include/util/taoserror.h
include/util/taoserror.h
+1
-1
source/common/src/tglobal.c
source/common/src/tglobal.c
+0
-6
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+3
-0
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
+3
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+17
-8
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+64
-4
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+43
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+3
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+7
-5
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+1
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-0
source/dnode/vnode/src/meta/metaOpen.c
source/dnode/vnode/src/meta/metaOpen.c
+1
-1
source/dnode/vnode/src/sma/smaUtil.c
source/dnode/vnode/src/sma/smaUtil.c
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+5
-4
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+2
-1
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+4
-6
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+51
-19
source/dnode/vnode/src/vnd/vnodeRetention.c
source/dnode/vnode/src/vnd/vnodeRetention.c
+2
-2
source/dnode/vnode/src/vnd/vnodeSnapshot.c
source/dnode/vnode/src/vnd/vnodeSnapshot.c
+5
-5
source/libs/command/src/command.c
source/libs/command/src/command.c
+25
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+33
-8
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+1
-0
source/libs/tfs/src/tfs.c
source/libs/tfs/src/tfs.c
+66
-8
source/libs/tfs/test/tfsTest.cpp
source/libs/tfs/test/tfsTest.cpp
+3
-3
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/libs/tfs/tfs.h
浏览文件 @
a32c6502
...
...
@@ -69,6 +69,13 @@ void tfsUpdateSize(STfs *pTfs);
*/
SDiskSize
tfsGetSize
(
STfs
*
pTfs
);
/**
* @brief Get the number of disks at level of multi-tier storage.
*
* @param pTfs
* @return int32_t
*/
int32_t
tfsGetDisksAtLevel
(
STfs
*
pTfs
,
int32_t
level
);
/**
* @brief Get level of multi-tier storage.
*
...
...
@@ -123,6 +130,15 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname);
*/
int32_t
tfsMkdirAt
(
STfs
*
pTfs
,
const
char
*
rname
,
SDiskID
diskId
);
/**
* @brief Recursive make directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t
tfsMkdirRecur
(
STfs
*
pTfs
,
const
char
*
rname
);
/**
* @brief Recursive create directories in tfs.
*
...
...
@@ -160,7 +176,17 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname);
* @param nrname The rel name of new file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t
tfsRename
(
STfs
*
pTfs
,
const
char
*
orname
,
const
char
*
nrname
);
int32_t
tfsRename
(
STfs
*
pTfs
,
int32_t
diskPrimary
,
const
char
*
orname
,
const
char
*
nrname
);
/**
* @brief Search fname in level of tfs
*
* @param pTfs The fs object.
* @param level The level to search on
* @param fname The relative file name to be searched
* @param int32_t diskId for successs, -1 for failure
*/
int32_t
tfsSearch
(
STfs
*
pTfs
,
int32_t
level
,
const
char
*
fname
);
/**
* @brief Init file object in tfs.
...
...
include/util/taoserror.h
浏览文件 @
a32c6502
...
...
@@ -416,7 +416,7 @@ int32_t* taosGetErrno();
// #define TSDB_CODE_VND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0501) // 2.x
// #define TSDB_CODE_VND_ACTION_NEED_REPROCESS. TAOS_DEF_ERROR_CODE(0, 0x0502) // 2.x
#define TSDB_CODE_VND_INVALID_VGROUP_ID TAOS_DEF_ERROR_CODE(0, 0x0503)
// #define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504) // 2.x
#define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504)
// #define TSDB_CODE_VND_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0505) // 2.x
// #define TSDB_CODE_VND_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0506) // 2.x
// #define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) // 2.x
...
...
source/common/src/tglobal.c
浏览文件 @
a32c6502
...
...
@@ -1495,14 +1495,8 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
if
(
strcasecmp
(
option
,
"keepTimeOffset"
)
==
0
)
{
int32_t
newKeepTimeOffset
=
atoi
(
value
);
if
(
newKeepTimeOffset
<
0
||
newKeepTimeOffset
>
23
)
{
uError
(
"failed to set keepTimeOffset from %d to %d. Valid range: [0, 23]"
,
tsKeepTimeOffset
,
newKeepTimeOffset
);
return
;
}
uInfo
(
"keepTimeOffset set from %d to %d"
,
tsKeepTimeOffset
,
newKeepTimeOffset
);
tsKeepTimeOffset
=
newKeepTimeOffset
;
return
;
}
...
...
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
浏览文件 @
a32c6502
...
...
@@ -46,6 +46,7 @@ typedef struct {
int32_t
vgId
;
int32_t
vgVersion
;
int8_t
dropped
;
int32_t
diskPrimary
;
int32_t
toVgId
;
char
path
[
PATH_MAX
+
20
];
}
SWrapperCfg
;
...
...
@@ -56,6 +57,7 @@ typedef struct {
int32_t
refCount
;
int8_t
dropped
;
int8_t
disable
;
int32_t
diskPrimary
;
int32_t
toVgId
;
char
*
path
;
SVnode
*
pImpl
;
...
...
@@ -81,6 +83,7 @@ typedef struct {
}
SVnodeThread
;
// vmInt.c
int32_t
vmAllocPrimaryDisk
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
);
SVnodeObj
*
vmAcquireVnode
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
);
void
vmReleaseVnode
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
);
int32_t
vmOpenVnode
(
SVnodeMgmt
*
pMgmt
,
SWrapperCfg
*
pCfg
,
SVnode
*
pImpl
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
浏览文件 @
a32c6502
...
...
@@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **
if
(
code
<
0
)
goto
_OVER
;
tjsonGetInt32ValueFromDouble
(
vnode
,
"vgVersion"
,
pCfg
->
vgVersion
,
code
);
if
(
code
<
0
)
goto
_OVER
;
tjsonGetInt32ValueFromDouble
(
vnode
,
"diskPrimary"
,
pCfg
->
diskPrimary
,
code
);
if
(
code
<
0
)
goto
_OVER
;
tjsonGetInt32ValueFromDouble
(
vnode
,
"toVgId"
,
pCfg
->
toVgId
,
code
);
if
(
code
<
0
)
goto
_OVER
;
...
...
@@ -167,6 +169,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num
if
(
tjsonAddDoubleToObject
(
vnode
,
"vgId"
,
pVnode
->
vgId
)
<
0
)
return
-
1
;
if
(
tjsonAddDoubleToObject
(
vnode
,
"dropped"
,
pVnode
->
dropped
)
<
0
)
return
-
1
;
if
(
tjsonAddDoubleToObject
(
vnode
,
"vgVersion"
,
pVnode
->
vgVersion
)
<
0
)
return
-
1
;
if
(
tjsonAddDoubleToObject
(
vnode
,
"diskPrimary"
,
pVnode
->
diskPrimary
)
<
0
)
return
-
1
;
if
(
pVnode
->
toVgId
&&
tjsonAddDoubleToObject
(
vnode
,
"toVgId"
,
pVnode
->
toVgId
)
<
0
)
return
-
1
;
if
(
tjsonAddItemToArray
(
vnodes
,
vnode
)
<
0
)
return
-
1
;
}
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
a32c6502
...
...
@@ -262,16 +262,19 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
0
;
}
wrapperCfg
.
diskPrimary
=
vmAllocPrimaryDisk
(
pMgmt
,
vnodeCfg
.
vgId
);
int32_t
diskPrimary
=
wrapperCfg
.
diskPrimary
;
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vnodeCfg
.
vgId
);
if
(
vnodeCreate
(
path
,
&
vnodeCfg
,
pMgmt
->
pTfs
)
<
0
)
{
if
(
vnodeCreate
(
path
,
&
vnodeCfg
,
diskPrimary
,
pMgmt
->
pTfs
)
<
0
)
{
tFreeSCreateVnodeReq
(
&
req
);
dError
(
"vgId:%d, failed to create vnode since %s"
,
req
.
vgId
,
terrstr
());
code
=
terrno
;
goto
_OVER
;
}
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode since %s"
,
req
.
vgId
,
terrstr
());
code
=
terrno
;
...
...
@@ -400,21 +403,23 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.
dropped
=
pVnode
->
dropped
,
.
vgId
=
pVnode
->
vgId
,
.
vgVersion
=
pVnode
->
vgVersion
,
.
diskPrimary
=
pVnode
->
diskPrimary
,
};
tstrncpy
(
wrapperCfg
.
path
,
pVnode
->
path
,
sizeof
(
wrapperCfg
.
path
));
vmCloseVnode
(
pMgmt
,
pVnode
,
false
);
int32_t
diskPrimary
=
wrapperCfg
.
diskPrimary
;
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vgId
);
dInfo
(
"vgId:%d, start to alter vnode replica at %s"
,
vgId
,
path
);
if
(
vnodeAlterReplica
(
path
,
&
req
,
pMgmt
->
pTfs
)
<
0
)
{
if
(
vnodeAlterReplica
(
path
,
&
req
,
diskPrimary
,
pMgmt
->
pTfs
)
<
0
)
{
dError
(
"vgId:%d, failed to alter vnode at %s since %s"
,
vgId
,
path
,
terrstr
());
return
-
1
;
}
dInfo
(
"vgId:%d, begin to open vnode"
,
vgId
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode at %s since %s"
,
vgId
,
path
,
terrstr
());
return
-
1
;
...
...
@@ -487,6 +492,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.
dropped
=
pVnode
->
dropped
,
.
vgId
=
dstVgId
,
.
vgVersion
=
pVnode
->
vgVersion
,
.
diskPrimary
=
pVnode
->
diskPrimary
,
};
tstrncpy
(
wrapperCfg
.
path
,
pVnode
->
path
,
sizeof
(
wrapperCfg
.
path
));
...
...
@@ -500,19 +506,20 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo
(
"vgId:%d, close vnode"
,
srcVgId
);
vmCloseVnode
(
pMgmt
,
pVnode
,
true
);
int32_t
diskPrimary
=
wrapperCfg
.
diskPrimary
;
char
srcPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
dstPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
srcPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
srcVgId
);
snprintf
(
dstPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
dstVgId
);
dInfo
(
"vgId:%d, alter vnode hashrange at %s"
,
srcVgId
,
srcPath
);
if
(
vnodeAlterHashRange
(
srcPath
,
dstPath
,
&
req
,
pMgmt
->
pTfs
)
<
0
)
{
if
(
vnodeAlterHashRange
(
srcPath
,
dstPath
,
&
req
,
diskPrimary
,
pMgmt
->
pTfs
)
<
0
)
{
dError
(
"vgId:%d, failed to alter vnode hashrange since %s"
,
srcVgId
,
terrstr
());
return
-
1
;
}
dInfo
(
"vgId:%d, open vnode"
,
dstVgId
);
SVnode
*
pImpl
=
vnodeOpen
(
dstPath
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
dstPath
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode at %s since %s"
,
dstVgId
,
dstPath
,
terrstr
());
return
-
1
;
...
...
@@ -598,21 +605,23 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.
dropped
=
pVnode
->
dropped
,
.
vgId
=
pVnode
->
vgId
,
.
vgVersion
=
pVnode
->
vgVersion
,
.
diskPrimary
=
pVnode
->
diskPrimary
,
};
tstrncpy
(
wrapperCfg
.
path
,
pVnode
->
path
,
sizeof
(
wrapperCfg
.
path
));
vmCloseVnode
(
pMgmt
,
pVnode
,
false
);
int32_t
diskPrimary
=
wrapperCfg
.
diskPrimary
;
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vgId
);
dInfo
(
"vgId:%d, start to alter vnode replica at %s"
,
vgId
,
path
);
if
(
vnodeAlterReplica
(
path
,
&
alterReq
,
pMgmt
->
pTfs
)
<
0
)
{
if
(
vnodeAlterReplica
(
path
,
&
alterReq
,
diskPrimary
,
pMgmt
->
pTfs
)
<
0
)
{
dError
(
"vgId:%d, failed to alter vnode at %s since %s"
,
vgId
,
path
,
terrstr
());
return
-
1
;
}
dInfo
(
"vgId:%d, begin to open vnode"
,
vgId
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode at %s since %s"
,
vgId
,
path
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
a32c6502
...
...
@@ -15,8 +15,64 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "tfs.h"
#include "vnd.h"
int32_t
vmAllocPrimaryDisk
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
)
{
STfs
*
pTfs
=
pMgmt
->
pTfs
;
int32_t
diskId
=
0
;
if
(
!
pTfs
)
{
return
diskId
;
}
// search fs
char
vnodePath
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
vnodePath
,
TSDB_FILENAME_LEN
-
1
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vgId
);
char
fname
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
fnameTmp
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s"
,
vnodePath
,
TD_DIRSEP
,
VND_INFO_FNAME
);
snprintf
(
fnameTmp
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s"
,
vnodePath
,
TD_DIRSEP
,
VND_INFO_FNAME_TMP
);
diskId
=
tfsSearch
(
pTfs
,
0
,
fname
);
if
(
diskId
>=
0
)
{
return
diskId
;
}
diskId
=
tfsSearch
(
pTfs
,
0
,
fnameTmp
);
if
(
diskId
>=
0
)
{
return
diskId
;
}
// alloc
int32_t
disks
[
TFS_MAX_DISKS_PER_TIER
]
=
{
0
};
int32_t
numOfVnodes
=
0
;
SVnodeObj
**
ppVnodes
=
vmGetVnodeListFromHash
(
pMgmt
,
&
numOfVnodes
);
for
(
int32_t
v
=
0
;
v
<
numOfVnodes
;
v
++
)
{
SVnodeObj
*
pVnode
=
ppVnodes
[
v
];
disks
[
pVnode
->
diskPrimary
]
+=
1
;
}
int32_t
minVal
=
INT_MAX
;
int32_t
ndisk
=
tfsGetDisksAtLevel
(
pTfs
,
0
);
diskId
=
0
;
for
(
int32_t
id
=
0
;
id
<
ndisk
;
id
++
)
{
if
(
minVal
>
disks
[
id
])
{
minVal
=
disks
[
id
];
diskId
=
id
;
}
}
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
if
(
ppVnodes
==
NULL
||
ppVnodes
[
i
]
==
NULL
)
continue
;
vmReleaseVnode
(
pMgmt
,
ppVnodes
[
i
]);
}
if
(
ppVnodes
!=
NULL
)
{
taosMemoryFree
(
ppVnodes
);
}
dInfo
(
"vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d"
,
vgId
,
diskId
,
ndisk
,
numOfVnodes
);
return
diskId
;
}
SVnodeObj
*
vmAcquireVnode
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
NULL
;
...
...
@@ -52,6 +108,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode
->
vgId
=
pCfg
->
vgId
;
pVnode
->
vgVersion
=
pCfg
->
vgVersion
;
pVnode
->
diskPrimary
=
pCfg
->
diskPrimary
;
pVnode
->
refCount
=
0
;
pVnode
->
dropped
=
0
;
pVnode
->
path
=
taosStrdup
(
pCfg
->
path
);
...
...
@@ -169,7 +226,8 @@ static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
snprintf
(
srcPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
srcVgId
);
snprintf
(
dstPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
dstVgId
);
int32_t
vgId
=
vnodeRestoreVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
pTfs
);
int32_t
diskPrimary
=
pCfg
->
diskPrimary
;
int32_t
vgId
=
vnodeRestoreVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
diskPrimary
,
pTfs
);
if
(
vgId
<=
0
)
{
dError
(
"vgId:%d, failed to restore vgroup id. srcPath: %s"
,
pCfg
->
vgId
,
srcPath
);
return
-
1
;
...
...
@@ -205,11 +263,12 @@ static void *vmOpenVnodeInThread(void *param) {
pThread
->
updateVnodesList
=
true
;
}
int32_t
diskPrimary
=
pCfg
->
diskPrimary
;
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
pCfg
->
vgId
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
diskPrimary
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode by thread:%d
"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
dError
(
"vgId:%d, failed to open vnode by thread:%d
since %s"
,
pCfg
->
vgId
,
pThread
->
threadIndex
,
terrstr
()
);
pThread
->
failed
++
;
continue
;
}
...
...
@@ -296,6 +355,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
if
(
pMgmt
->
state
.
openVnodes
!=
pMgmt
->
state
.
totalVnodes
)
{
dError
(
"there are total vnodes:%d, opened:%d"
,
pMgmt
->
state
.
totalVnodes
,
pMgmt
->
state
.
openVnodes
);
terrno
=
TSDB_CODE_VND_INIT_FAILED
;
return
-
1
;
}
...
...
@@ -518,7 +578,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
tmsgReportStartup
(
"vnode-worker"
,
"initialized"
);
if
(
vmOpenVnodes
(
pMgmt
)
!=
0
)
{
dError
(
"failed to open
vnode
since %s"
,
terrstr
());
dError
(
"failed to open
all vnodes
since %s"
,
terrstr
());
goto
_OVER
;
}
tmsgReportStartup
(
"vnode-vnodes"
,
"initialized"
);
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
a32c6502
...
...
@@ -70,6 +70,8 @@ static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
static
int32_t
mndRetrieveDnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextDnode
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndMCfgGetValInt32
(
SMCfgDnodeReq
*
pInMCfgReq
,
int32_t
opLen
,
int32_t
*
pOutValue
);
int32_t
mndInitDnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{
.
sdbType
=
SDB_DNODE
,
...
...
@@ -947,7 +949,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
goto
_OVER
;
}
mInfo
(
"dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s"
,
mInfo
(
"dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s"
,
dropReq
.
dnodeId
,
dropReq
.
fqdn
,
dropReq
.
port
,
dropReq
.
force
?
"true"
:
"false"
,
dropReq
.
unsafe
?
"true"
:
"false"
);
if
(
mndCheckOperPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_DROP_MNODE
)
!=
0
)
{
goto
_OVER
;
...
...
@@ -987,7 +989,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
int32_t
numOfVnodes
=
mndGetVnodesNum
(
pMnode
,
pDnode
->
id
);
bool
isonline
=
mndIsDnodeOnline
(
pDnode
,
taosGetTimestampMs
());
if
(
isonline
&&
force
)
{
terrno
=
TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE
;
mError
(
"dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d"
,
pDnode
->
id
,
terrstr
(),
...
...
@@ -1060,6 +1062,20 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy
(
dcfgReq
.
config
,
"monitor"
);
snprintf
(
dcfgReq
.
value
,
TSDB_DNODE_VALUE_LEN
,
"%d"
,
flag
);
}
else
if
(
strncasecmp
(
cfgReq
.
config
,
"keeptimeoffset"
,
14
)
==
0
)
{
int32_t
optLen
=
strlen
(
"keeptimeoffset"
);
int32_t
flag
=
-
1
;
int32_t
code
=
mndMCfgGetValInt32
(
&
cfgReq
,
optLen
,
&
flag
);
if
(
code
<
0
)
return
code
;
if
(
flag
<
0
||
flag
>
23
)
{
mError
(
"dnode:%d, failed to config keepTimeOffset since value:%d. Valid range: [0, 23]"
,
cfgReq
.
dnodeId
,
flag
);
terrno
=
TSDB_CODE_INVALID_CFG
;
return
-
1
;
}
strcpy
(
dcfgReq
.
config
,
"keeptimeoffset"
);
snprintf
(
dcfgReq
.
value
,
TSDB_DNODE_VALUE_LEN
,
"%d"
,
flag
);
#ifdef TD_ENTERPRISE
}
else
if
(
strncasecmp
(
cfgReq
.
config
,
"activeCode"
,
10
)
==
0
||
strncasecmp
(
cfgReq
.
config
,
"cActiveCode"
,
11
)
==
0
)
{
int8_t
opt
=
strncasecmp
(
cfgReq
.
config
,
"a"
,
1
)
==
0
?
DND_ACTIVE_CODE
:
DND_CONN_ACTIVE_CODE
;
...
...
@@ -1292,3 +1308,28 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
// get int32_t value from 'SMCfgDnodeReq'
static
int32_t
mndMCfgGetValInt32
(
SMCfgDnodeReq
*
pMCfgReq
,
int32_t
opLen
,
int32_t
*
pOutValue
)
{
terrno
=
0
;
if
(
' '
!=
pMCfgReq
->
config
[
opLen
]
&&
0
!=
pMCfgReq
->
config
[
opLen
])
{
goto
_err
;
}
if
(
' '
==
pMCfgReq
->
config
[
opLen
])
{
// 'key value'
if
(
strlen
(
pMCfgReq
->
value
)
!=
0
)
goto
_err
;
*
pOutValue
=
atoi
(
pMCfgReq
->
config
+
opLen
+
1
);
}
else
{
// 'key' 'value'
if
(
strlen
(
pMCfgReq
->
value
)
==
0
)
goto
_err
;
*
pOutValue
=
atoi
(
pMCfgReq
->
value
);
}
return
0
;
_err:
mError
(
"dnode:%d, failed to config keeptimeoffset since invalid conf:%s"
,
pMCfgReq
->
dnodeId
,
pMCfgReq
->
config
);
terrno
=
TSDB_CODE_INVALID_CFG
;
return
-
1
;
}
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
a32c6502
...
...
@@ -1740,6 +1740,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
SSchema
*
pSrcSchema
=
&
pStb
->
pColumns
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
flags
=
pSrcSchema
->
flags
;
pSchema
->
colId
=
pSrcSchema
->
colId
;
pSchema
->
bytes
=
pSrcSchema
->
bytes
;
}
...
...
@@ -1790,6 +1791,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
SSchema
*
pSrcSchema
=
&
pStb
->
pColumns
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
flags
=
pSrcSchema
->
flags
;
pSchema
->
colId
=
pSrcSchema
->
colId
;
pSchema
->
bytes
=
pSrcSchema
->
bytes
;
}
...
...
@@ -1799,6 +1801,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
SSchema
*
pSrcSchema
=
&
pStb
->
pTags
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
flags
=
pSrcSchema
->
flags
;
pSchema
->
colId
=
pSrcSchema
->
colId
;
pSchema
->
bytes
=
pSrcSchema
->
bytes
;
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
a32c6502
...
...
@@ -51,12 +51,14 @@ extern const SVnodeCfg vnodeCfgDefault;
int32_t
vnodeInit
(
int32_t
nthreads
);
void
vnodeCleanup
();
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
STfs
*
pTfs
);
int32_t
vnodeAlterReplica
(
const
char
*
path
,
SAlterVnodeReplicaReq
*
pReq
,
STfs
*
pTfs
);
int32_t
vnodeAlterHashRange
(
const
char
*
srcPath
,
const
char
*
dstPath
,
SAlterVnodeHashRangeReq
*
pReq
,
STfs
*
pTfs
);
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
STfs
*
pTfs
);
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
int32_t
diskPrimary
,
STfs
*
pTfs
);
int32_t
vnodeAlterReplica
(
const
char
*
path
,
SAlterVnodeReplicaReq
*
pReq
,
int32_t
diskPrimary
,
STfs
*
pTfs
);
int32_t
vnodeAlterHashRange
(
const
char
*
srcPath
,
const
char
*
dstPath
,
SAlterVnodeHashRangeReq
*
pReq
,
int32_t
diskPrimary
,
STfs
*
pTfs
);
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
int32_t
diskPrimary
,
STfs
*
pTfs
);
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
void
vnodePreClose
(
SVnode
*
pVnode
);
void
vnodePostClose
(
SVnode
*
pVnode
);
void
vnodeSyncCheckTimeout
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
a32c6502
...
...
@@ -87,7 +87,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
int32_t
vnodeBufPoolRecycle
(
SVBufPool
*
pPool
);
// vnodeOpen.c
int32_t
vnodeGetPrimaryDir
(
const
char
*
relPath
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
);
int32_t
vnodeGetPrimaryDir
(
const
char
*
relPath
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
);
// vnodeQuery.c
int32_t
vnodeQueryOpen
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
a32c6502
...
...
@@ -94,6 +94,7 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_BUFPOOL_SEGMENTS 3
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
// vnd.h
typedef
int32_t
(
*
_query_reseek_func_t
)(
void
*
pQHandle
);
...
...
@@ -404,6 +405,7 @@ struct SVnode {
SVState
state
;
SVStatis
statis
;
STfs
*
pTfs
;
int32_t
diskPrimary
;
SMsgCb
msgCb
;
// Buffer Pool
...
...
source/dnode/vnode/src/meta/metaOpen.c
浏览文件 @
a32c6502
...
...
@@ -41,7 +41,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
*
ppMeta
=
NULL
;
// create handle
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
path
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
path
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
path
);
snprintf
(
path
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%s%s"
,
TD_DIRSEP
,
VNODE_META_DIR
);
...
...
source/dnode/vnode/src/sma/smaUtil.c
浏览文件 @
a32c6502
...
...
@@ -26,7 +26,7 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN
int32_t
offset
=
0
;
// vnode
vnodeGetPrimaryDir
(
pVnode
->
path
,
pTfs
,
outputName
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
p
Vnode
->
diskPrimary
,
p
Tfs
,
outputName
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
outputName
);
// rsma
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
a32c6502
...
...
@@ -1158,15 +1158,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// wait for the stream task get ready for scan history data
while
(((
pStreamTask
->
status
.
downstreamReady
==
0
)
&&
(
pStreamTask
->
status
.
taskStatus
!=
TASK_STATUS__STOP
))
||
pStreamTask
->
status
.
taskStatus
==
TASK_STATUS__SCAN_HISTORY
)
{
tqDebug
(
"s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms"
,
pId
,
pTask
->
info
.
taskLevel
,
pId
);
tqDebug
(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms"
,
pId
,
pTask
->
info
.
taskLevel
,
pStreamTask
->
id
.
idStr
,
streamGetTaskStatusStr
(
pStreamTask
->
status
.
taskStatus
));
taosMsleep
(
100
);
}
// now we can stop the stream task execution
pStreamTask
->
status
.
taskStatus
=
TASK_STATUS__HALT
;
tqDebug
(
"s-task:%s level:%d status is set to halt by history scan task:%s"
,
p
Id
,
pStreamTask
->
info
.
taskLevel
,
pId
);
tqDebug
(
"s-task:%s level:%d status is set to halt by history scan task:%s"
,
p
StreamTask
->
id
.
idStr
,
p
StreamTask
->
info
.
taskLevel
,
p
Id
);
// if it's an source task, extract the last version in wal.
streamHistoryTaskSetVerRangeStep2
(
pTask
);
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
a32c6502
...
...
@@ -59,7 +59,7 @@ typedef struct {
static
void
tsdbGetRocksPath
(
STsdb
*
pTsdb
,
char
*
path
)
{
SVnode
*
pVnode
=
pTsdb
->
pVnode
;
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
pTfs
,
path
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
path
,
TSDB_FILENAME_LEN
);
int32_t
offset
=
strlen
(
path
);
snprintf
(
path
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%scache.rdb"
,
TD_DIRSEP
);
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
a32c6502
...
...
@@ -276,14 +276,14 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
// CURRENT
if
(
current
)
{
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
pTfs
,
current
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
current
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
current
);
snprintf
(
current
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%sCURRENT"
,
TD_DIRSEP
);
}
// CURRENT.t
if
(
current_t
)
{
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
pTfs
,
current_t
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pTsdb
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
current_t
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
current_t
);
snprintf
(
current_t
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%sCURRENT.t"
,
TD_DIRSEP
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
a32c6502
...
...
@@ -284,8 +284,9 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
// SDelFile ===============================================
void
tsdbDelFileName
(
STsdb
*
pTsdb
,
SDelFile
*
pFile
,
char
fname
[])
{
int32_t
offset
=
0
;
SVnode
*
pVnode
=
pTsdb
->
pVnode
;
vnodeGetPrimaryDir
(
pTsdb
->
path
,
p
Tsdb
->
pVnode
->
pTfs
,
fname
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pTsdb
->
path
,
p
Vnode
->
diskPrimary
,
pVnode
->
pTfs
,
fname
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
fname
);
snprintf
((
char
*
)
fname
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%sv%dver%"
PRId64
".del"
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
pFile
->
commitID
);
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
a32c6502
...
...
@@ -16,8 +16,6 @@
#include "vnd.h"
#include "vnodeInt.h"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
static
int
vnodeEncodeInfo
(
const
SVnodeInfo
*
pInfo
,
char
**
ppData
);
static
int
vnodeCommitImpl
(
SCommitInfo
*
pInfo
);
...
...
@@ -290,7 +288,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
pInfo
->
txn
=
metaGetTxn
(
pVnode
->
pMeta
);
// save info
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vDebug
(
"vgId:%d, save config while prepare commit"
,
TD_VID
(
pVnode
));
if
(
vnodeSaveInfo
(
dir
,
&
pInfo
->
info
)
<
0
)
{
...
...
@@ -428,7 +426,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
return
-
1
;
}
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
syncBeginSnapshot
(
pVnode
->
sync
,
pInfo
->
info
.
state
.
committed
);
...
...
@@ -492,7 +490,7 @@ bool vnodeShouldRollback(SVnode *pVnode) {
char
tFName
[
TSDB_FILENAME_LEN
]
=
{
0
};
int32_t
offset
=
0
;
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
tFName
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
tFName
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
tFName
);
snprintf
(
tFName
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%s%s"
,
TD_DIRSEP
,
VND_INFO_FNAME_TMP
);
...
...
@@ -503,7 +501,7 @@ void vnodeRollback(SVnode *pVnode) {
char
tFName
[
TSDB_FILENAME_LEN
]
=
{
0
};
int32_t
offset
=
0
;
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
tFName
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
tFName
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
tFName
);
snprintf
(
tFName
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%s%s"
,
TD_DIRSEP
,
VND_INFO_FNAME_TMP
);
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
a32c6502
...
...
@@ -15,9 +15,11 @@
#include "vnd.h"
int32_t
vnodeGetPrimaryDir
(
const
char
*
relPath
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
)
{
int32_t
vnodeGetPrimaryDir
(
const
char
*
relPath
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
)
{
if
(
pTfs
)
{
snprintf
(
buf
,
bufLen
-
1
,
"%s%s%s"
,
tfsGetPrimaryPath
(
pTfs
),
TD_DIRSEP
,
relPath
);
SDiskID
diskId
=
{
0
};
diskId
.
id
=
diskPrimary
;
snprintf
(
buf
,
bufLen
-
1
,
"%s%s%s"
,
tfsGetDiskPath
(
pTfs
,
diskId
),
TD_DIRSEP
,
relPath
);
}
else
{
snprintf
(
buf
,
bufLen
-
1
,
"%s"
,
relPath
);
}
...
...
@@ -25,7 +27,15 @@ int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bu
return
0
;
}
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
STfs
*
pTfs
)
{
static
int32_t
vnodeMkDir
(
STfs
*
pTfs
,
const
char
*
path
)
{
if
(
pTfs
)
{
return
tfsMkdirRecur
(
pTfs
,
path
);
}
else
{
return
taosMkDir
(
path
);
}
}
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
...
...
@@ -36,10 +46,11 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
}
// create vnode env
vnodeGetPrimaryDir
(
path
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
taosMkDir
(
dir
))
{
if
(
vnodeMkDir
(
pTfs
,
path
))
{
vError
(
"vgId:%d, failed to prepare vnode dir since %s, path: %s"
,
pCfg
->
vgId
,
strerror
(
errno
),
path
);
return
TAOS_SYSTEM_ERROR
(
errno
);
}
vnodeGetPrimaryDir
(
path
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
pCfg
)
{
info
.
config
=
*
pCfg
;
...
...
@@ -60,12 +71,12 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
return
0
;
}
int32_t
vnodeAlterReplica
(
const
char
*
path
,
SAlterVnodeReplicaReq
*
pReq
,
STfs
*
pTfs
)
{
int32_t
vnodeAlterReplica
(
const
char
*
path
,
SAlterVnodeReplicaReq
*
pReq
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
int32_t
ret
=
0
;
vnodeGetPrimaryDir
(
path
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
path
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
ret
=
vnodeLoadInfo
(
dir
,
&
info
);
if
(
ret
<
0
)
{
...
...
@@ -133,7 +144,8 @@ static int32_t vnodeVgroupIdLen(int32_t vgId) {
return
strlen
(
tmp
);
}
int32_t
vnodeRenameVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
STfs
*
pTfs
)
{
int32_t
vnodeRenameVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
int32_t
ret
=
0
;
char
oldRname
[
TSDB_FILENAME_LEN
]
=
{
0
};
...
...
@@ -164,7 +176,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
snprintf
(
newRname
,
TSDB_FILENAME_LEN
,
"%s%d%s"
,
oldRname
,
dstVgId
,
tsdbFileSurfixPos
);
vInfo
(
"vgId:%d, rename file from %s to %s"
,
dstVgId
,
tsdbFile
->
rname
,
newRname
);
ret
=
tfsRename
(
pTfs
,
tsdbFile
->
rname
,
newRname
);
ret
=
tfsRename
(
pTfs
,
diskPrimary
,
tsdbFile
->
rname
,
newRname
);
if
(
ret
!=
0
)
{
vError
(
"vgId:%d, failed to rename file from %s to %s since %s"
,
dstVgId
,
tsdbFile
->
rname
,
newRname
,
terrstr
());
tfsClosedir
(
tsdbDir
);
...
...
@@ -176,19 +188,20 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
tfsClosedir
(
tsdbDir
);
vInfo
(
"vgId:%d, rename dir from %s to %s"
,
dstVgId
,
srcPath
,
dstPath
);
ret
=
tfsRename
(
pTfs
,
srcPath
,
dstPath
);
ret
=
tfsRename
(
pTfs
,
diskPrimary
,
srcPath
,
dstPath
);
if
(
ret
!=
0
)
{
vError
(
"vgId:%d, failed to rename dir from %s to %s since %s"
,
dstVgId
,
srcPath
,
dstPath
,
terrstr
());
}
return
ret
;
}
int32_t
vnodeAlterHashRange
(
const
char
*
srcPath
,
const
char
*
dstPath
,
SAlterVnodeHashRangeReq
*
pReq
,
STfs
*
pTfs
)
{
int32_t
vnodeAlterHashRange
(
const
char
*
srcPath
,
const
char
*
dstPath
,
SAlterVnodeHashRangeReq
*
pReq
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
int32_t
ret
=
0
;
vnodeGetPrimaryDir
(
srcPath
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
srcPath
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
ret
=
vnodeLoadInfo
(
dir
,
&
info
);
if
(
ret
<
0
)
{
...
...
@@ -232,7 +245,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
}
vInfo
(
"vgId:%d, rename %s to %s"
,
pReq
->
dstVgId
,
srcPath
,
dstPath
);
ret
=
vnodeRenameVgroupId
(
srcPath
,
dstPath
,
pReq
->
srcVgId
,
pReq
->
dstVgId
,
pTfs
);
ret
=
vnodeRenameVgroupId
(
srcPath
,
dstPath
,
pReq
->
srcVgId
,
pReq
->
dstVgId
,
diskPrimary
,
pTfs
);
if
(
ret
<
0
)
{
vError
(
"vgId:%d, failed to rename vnode from %s to %s since %s"
,
pReq
->
dstVgId
,
srcPath
,
dstPath
,
tstrerror
(
terrno
));
...
...
@@ -243,11 +256,12 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
return
0
;
}
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
STfs
*
pTfs
)
{
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodeGetPrimaryDir
(
dstPath
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
dstPath
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
vnodeLoadInfo
(
dir
,
&
info
)
==
0
)
{
if
(
info
.
config
.
vgId
!=
dstVgId
)
{
vError
(
"vgId:%d, unexpected vnode config.vgId:%d"
,
dstVgId
,
info
.
config
.
vgId
);
...
...
@@ -256,7 +270,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
return
dstVgId
;
}
vnodeGetPrimaryDir
(
srcPath
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
srcPath
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
vnodeLoadInfo
(
dir
,
&
info
)
<
0
)
{
vError
(
"vgId:%d, failed to read vnode config from %s since %s"
,
srcVgId
,
srcPath
,
tstrerror
(
terrno
));
return
-
1
;
...
...
@@ -271,7 +285,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
}
vInfo
(
"vgId:%d, rename %s to %s"
,
dstVgId
,
srcPath
,
dstPath
);
if
(
vnodeRenameVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
pTfs
)
<
0
)
{
if
(
vnodeRenameVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
diskPrimary
,
pTfs
)
<
0
)
{
vError
(
"vgId:%d, failed to rename vnode from %s to %s since %s"
,
dstVgId
,
srcPath
,
dstPath
,
tstrerror
(
terrno
));
return
-
1
;
}
...
...
@@ -284,14 +298,31 @@ void vnodeDestroy(const char *path, STfs *pTfs) {
tfsRmdir
(
pTfs
,
path
);
}
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
)
{
static
int32_t
vnodeCheckDisk
(
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
int32_t
ndisk
=
1
;
if
(
pTfs
)
{
ndisk
=
tfsGetDisksAtLevel
(
pTfs
,
0
);
}
if
(
diskPrimary
<
0
||
diskPrimary
>=
ndisk
)
{
vError
(
"disk:%d is unavailable from the %d disks mounted at level 0"
,
diskPrimary
,
ndisk
);
terrno
=
TSDB_CODE_FS_INVLD_CFG
;
return
-
1
;
}
return
0
;
}
SVnode
*
vnodeOpen
(
const
char
*
path
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
SMsgCb
msgCb
)
{
SVnode
*
pVnode
=
NULL
;
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tdir
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
int32_t
ret
=
0
;
vnodeGetPrimaryDir
(
path
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
vnodeCheckDisk
(
diskPrimary
,
pTfs
))
{
vError
(
"failed to open vnode from %s since %s. diskPrimary:%d"
,
path
,
terrstr
(),
diskPrimary
);
return
NULL
;
}
vnodeGetPrimaryDir
(
path
,
diskPrimary
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
info
.
config
=
vnodeCfgDefault
;
...
...
@@ -334,6 +365,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode
->
state
.
applied
=
info
.
state
.
committed
;
pVnode
->
state
.
applyTerm
=
info
.
state
.
commitTerm
;
pVnode
->
pTfs
=
pTfs
;
pVnode
->
diskPrimary
=
diskPrimary
;
pVnode
->
msgCb
=
msgCb
;
taosThreadMutexInit
(
&
pVnode
->
lock
,
NULL
);
pVnode
->
blocked
=
false
;
...
...
source/dnode/vnode/src/vnd/vnodeRetention.c
浏览文件 @
a32c6502
...
...
@@ -35,7 +35,7 @@ static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
pInfo
->
commitID
=
++
pVnode
->
state
.
commitID
;
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
vnodeLoadInfo
(
dir
,
&
pInfo
->
info
)
<
0
)
{
code
=
terrno
;
...
...
@@ -60,7 +60,7 @@ static int32_t vnodeRetentionTask(void *param) {
SVnode
*
pVnode
=
pInfo
->
pVnode
;
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
// save info
pInfo
->
info
.
state
.
commitID
=
pInfo
->
commitID
;
...
...
source/dnode/vnode/src/vnd/vnodeSnapshot.c
浏览文件 @
a32c6502
...
...
@@ -86,6 +86,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t
vnodeSnapRead
(
SVSnapReader
*
pReader
,
uint8_t
**
ppData
,
uint32_t
*
nData
)
{
int32_t
code
=
0
;
SVnode
*
pVnode
=
pReader
->
pVnode
;
// CONFIG ==============
// FIXME: if commit multiple times and the config changed?
...
...
@@ -93,7 +94,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
char
fName
[
TSDB_FILENAME_LEN
];
int32_t
offset
=
0
;
vnodeGetPrimaryDir
(
p
Reader
->
pVnode
->
path
,
pReader
->
pVnode
->
pTfs
,
fName
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
p
Vnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
fName
,
TSDB_FILENAME_LEN
);
offset
=
strlen
(
fName
);
snprintf
(
fName
+
offset
,
TSDB_FILENAME_LEN
-
offset
-
1
,
"%s%s"
,
TD_DIRSEP
,
VND_INFO_FNAME
);
...
...
@@ -382,7 +383,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
.
applyTerm
=
pWriter
->
info
.
state
.
commitTerm
};
pVnode
->
statis
=
pWriter
->
info
.
statis
;
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
pVnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeCommitInfo
(
dir
);
}
else
{
...
...
@@ -430,7 +431,7 @@ _exit:
static
int32_t
vnodeSnapWriteInfo
(
SVSnapWriter
*
pWriter
,
uint8_t
*
pData
,
uint32_t
nData
)
{
int32_t
code
=
0
;
SVnode
*
pVnode
=
pWriter
->
pVnode
;
SSnapDataHdr
*
pHdr
=
(
SSnapDataHdr
*
)
pData
;
// decode info
...
...
@@ -444,10 +445,9 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
// modify info as needed
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodeGetPrimaryDir
(
p
Writer
->
pVnode
->
path
,
pWriter
->
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
vnodeGetPrimaryDir
(
p
Vnode
->
path
,
pVnode
->
diskPrimary
,
pVnode
->
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
SVnodeStats
vndStats
=
pWriter
->
info
.
config
.
vndStats
;
SVnode
*
pVnode
=
pWriter
->
pVnode
;
pWriter
->
info
.
config
=
pVnode
->
config
;
pWriter
->
info
.
config
.
vndStats
=
vndStats
;
vDebug
(
"vgId:%d, save config while write snapshot"
,
pWriter
->
pVnode
->
config
.
vgId
);
...
...
source/libs/command/src/command.c
浏览文件 @
a32c6502
...
...
@@ -615,6 +615,31 @@ void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg*
if
(
pCfg
->
ttl
>
0
)
{
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
" TTL %d"
,
pCfg
->
ttl
);
}
if
(
TSDB_SUPER_TABLE
==
pCfg
->
tableType
||
TSDB_NORMAL_TABLE
==
pCfg
->
tableType
)
{
int32_t
nSma
=
0
;
for
(
int32_t
i
=
0
;
i
<
pCfg
->
numOfColumns
;
++
i
)
{
if
(
IS_BSMA_ON
(
pCfg
->
pSchemas
+
i
))
{
++
nSma
;
}
}
if
(
nSma
<
pCfg
->
numOfColumns
)
{
bool
smaOn
=
false
;
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
" SMA("
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
numOfColumns
;
++
i
)
{
if
(
IS_BSMA_ON
(
pCfg
->
pSchemas
+
i
))
{
if
(
smaOn
)
{
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
",`%s`"
,
(
pCfg
->
pSchemas
+
i
)
->
name
);
}
else
{
smaOn
=
true
;
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
"`%s`"
,
(
pCfg
->
pSchemas
+
i
)
->
name
);
}
}
}
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
")"
);
}
}
}
static
int32_t
setCreateTBResultIntoDataBlock
(
SSDataBlock
*
pBlock
,
SDbCfgInfo
*
pDbCfg
,
char
*
tbName
,
STableCfg
*
pCfg
)
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
a32c6502
...
...
@@ -4450,7 +4450,7 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
bool
compareStateKey
(
void
*
data
,
void
*
key
)
{
if
(
!
data
||
!
key
)
{
return
tru
e
;
return
fals
e
;
}
SStateKeys
*
stateKey
=
(
SStateKeys
*
)
key
;
stateKey
->
pData
=
(
char
*
)
key
+
sizeof
(
SStateKeys
);
...
...
@@ -4475,9 +4475,14 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if
(
code
==
TSDB_CODE_SUCCESS
&&
!
inWinRange
(
&
pAggSup
->
winRange
,
&
pCurWin
->
winInfo
.
sessionWin
.
win
))
{
code
=
TSDB_CODE_FAILED
;
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
pCurWin
->
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
pCurWin
->
winInfo
.
pOutputBuf
=
taosMemoryMalloc
(
size
);
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
pCurWin
->
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
pCurWin
->
winInfo
.
pOutputBuf
=
taosMemoryCalloc
(
1
,
size
);
pCurWin
->
pStateKey
=
(
SStateKeys
*
)((
char
*
)
pCurWin
->
winInfo
.
pOutputBuf
+
(
pAggSup
->
resultRowSize
-
pAggSup
->
stateKeySize
));
pCurWin
->
pStateKey
->
bytes
=
pAggSup
->
stateKeySize
-
sizeof
(
SStateKeys
);
pCurWin
->
pStateKey
->
type
=
pAggSup
->
stateKeyType
;
pCurWin
->
pStateKey
->
pData
=
(
char
*
)
pCurWin
->
pStateKey
+
sizeof
(
SStateKeys
);
pCurWin
->
pStateKey
->
isNull
=
false
;
}
if
(
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4491,12 +4496,19 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
}
pNextWin
->
winInfo
.
sessionWin
=
pCurWin
->
winInfo
.
sessionWin
;
pNextWin
->
winInfo
.
pOutputBuf
=
NULL
;
SStreamStateCur
*
pCur
=
pAggSup
->
stateStore
.
streamStateSessionSeekKeyNext
(
pAggSup
->
pState
,
&
pCurWin
->
winInfo
.
sessionWin
);
code
=
pAggSup
->
stateStore
.
streamStateSessionGetKVByCur
(
pCur
,
&
pNextWin
->
winInfo
.
sessionWin
,
NULL
,
0
);
SStreamStateCur
*
pCur
=
pAggSup
->
stateStore
.
streamStateSessionSeekKeyNext
(
pAggSup
->
pState
,
&
pNextWin
->
winInfo
.
sessionWin
);
int32_t
nextSize
=
pAggSup
->
resultRowSize
;
code
=
pAggSup
->
stateStore
.
streamStateSessionGetKVByCur
(
pCur
,
&
pNextWin
->
winInfo
.
sessionWin
,
&
pNextWin
->
winInfo
.
pOutputBuf
,
&
nextSize
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
SET_SESSION_WIN_INVALID
(
pNextWin
->
winInfo
);
}
else
{
pNextWin
->
pStateKey
=
(
SStateKeys
*
)((
char
*
)
pNextWin
->
winInfo
.
pOutputBuf
+
(
pAggSup
->
resultRowSize
-
pAggSup
->
stateKeySize
));
pNextWin
->
pStateKey
->
bytes
=
pAggSup
->
stateKeySize
-
sizeof
(
SStateKeys
);
pNextWin
->
pStateKey
->
type
=
pAggSup
->
stateKeyType
;
pNextWin
->
pStateKey
->
pData
=
(
char
*
)
pNextWin
->
pStateKey
+
sizeof
(
SStateKeys
);
pNextWin
->
pStateKey
->
isNull
=
false
;
pNextWin
->
winInfo
.
isOutput
=
true
;
}
pAggSup
->
stateStore
.
streamStateFreeCur
(
pCur
);
}
...
...
@@ -4572,6 +4584,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStateWindowInfo
curWin
=
{
0
};
SStateWindowInfo
nextWin
=
{
0
};
setStateOutputBuf
(
pAggSup
,
tsCols
[
i
],
groupId
,
pKeyData
,
&
curWin
,
&
nextWin
);
if
(
IS_VALID_SESSION_WIN
(
nextWin
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
nextWin
.
winInfo
.
pOutputBuf
,
&
pAPI
->
stateStore
);
}
setSessionWinOutputInfo
(
pSeUpdated
,
&
curWin
.
winInfo
);
winRows
=
updateStateWindowInfo
(
&
curWin
,
&
nextWin
,
tsCols
,
groupId
,
pKeyColInfo
,
rows
,
i
,
&
allEqual
,
pAggSup
->
pResultRows
,
pSeUpdated
,
pStDeleted
);
...
...
@@ -4872,9 +4887,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
for
(
int32_t
i
=
0
;
i
<
num
;
i
++
)
{
SStateWindowInfo
curInfo
=
{
0
};
SStateWindowInfo
nextInfo
=
{
0
};
SStateWindowInfo
dummy
=
{
0
};
setStateOutputBuf
(
pAggSup
,
pSeKeyBuf
[
i
].
win
.
skey
,
pSeKeyBuf
[
i
].
groupId
,
NULL
,
&
curInfo
,
&
nextInfo
);
if
(
compareStateKey
(
curInfo
.
pStateKey
,
nextInfo
.
pStateKey
))
{
compactStateWindow
(
pOperator
,
&
curInfo
.
winInfo
,
&
nextInfo
.
winInfo
,
pInfo
->
pStUpdated
,
pInfo
->
pStDeleted
);
saveResult
(
curInfo
.
winInfo
,
pInfo
->
pStUpdated
);
}
if
(
IS_VALID_SESSION_WIN
(
curInfo
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
curInfo
.
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
}
if
(
IS_VALID_SESSION_WIN
(
nextInfo
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
nextInfo
.
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
}
}
taosMemoryFree
(
pBuf
);
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
a32c6502
...
...
@@ -204,6 +204,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if
(
left
==
0
)
{
taosArrayDestroy
(
pTask
->
checkReqIds
);
pTask
->
checkReqIds
=
NULL
;
pTask
->
status
.
downstreamReady
=
1
;
if
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__SCAN_HISTORY
)
{
qDebug
(
"s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s"
,
id
,
...
...
source/libs/tfs/src/tfs.c
浏览文件 @
a32c6502
...
...
@@ -113,6 +113,15 @@ SDiskSize tfsGetSize(STfs *pTfs) {
return
size
;
}
int32_t
tfsGetDisksAtLevel
(
STfs
*
pTfs
,
int32_t
level
)
{
if
(
level
<
0
||
level
>=
pTfs
->
nlevel
)
{
return
0
;
}
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
return
pTier
->
ndisk
;
}
int32_t
tfsGetLevel
(
STfs
*
pTfs
)
{
return
pTfs
->
nlevel
;
}
int32_t
tfsAllocDisk
(
STfs
*
pTfs
,
int32_t
expLevel
,
SDiskID
*
pDiskId
)
{
...
...
@@ -272,6 +281,20 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId) {
return
0
;
}
int32_t
tfsMkdirRecur
(
STfs
*
pTfs
,
const
char
*
rname
)
{
for
(
int32_t
level
=
0
;
level
<
pTfs
->
nlevel
;
level
++
)
{
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
for
(
int32_t
id
=
0
;
id
<
pTier
->
ndisk
;
id
++
)
{
SDiskID
did
=
{.
id
=
id
,
.
level
=
level
};
if
(
tfsMkdirRecurAt
(
pTfs
,
rname
,
did
)
<
0
)
{
return
-
1
;
}
}
}
return
0
;
}
int32_t
tfsMkdir
(
STfs
*
pTfs
,
const
char
*
rname
)
{
for
(
int32_t
level
=
0
;
level
<
pTfs
->
nlevel
;
level
++
)
{
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
...
...
@@ -314,25 +337,60 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
return
0
;
}
int32_t
tfsRename
(
STfs
*
pTfs
,
const
char
*
orname
,
const
char
*
nrname
)
{
static
int32_t
tfsRenameAt
(
STfs
*
pTfs
,
SDiskID
diskId
,
const
char
*
orname
,
const
char
*
nrname
)
{
char
oaname
[
TMPNAME_LEN
]
=
"
\0
"
;
char
naname
[
TMPNAME_LEN
]
=
"
\0
"
;
int32_t
level
=
diskId
.
level
;
int32_t
id
=
diskId
.
id
;
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
STfsDisk
*
pDisk
=
pTier
->
disks
[
id
];
snprintf
(
oaname
,
TMPNAME_LEN
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
orname
);
snprintf
(
naname
,
TMPNAME_LEN
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
nrname
);
if
(
taosRenameFile
(
oaname
,
naname
)
!=
0
&&
errno
!=
ENOENT
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
fError
(
"failed to rename %s to %s since %s"
,
oaname
,
naname
,
terrstr
());
return
-
1
;
}
return
0
;
}
int32_t
tfsRename
(
STfs
*
pTfs
,
int32_t
diskPrimary
,
const
char
*
orname
,
const
char
*
nrname
)
{
for
(
int32_t
level
=
pTfs
->
nlevel
-
1
;
level
>=
0
;
level
--
)
{
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
for
(
int32_t
id
=
pTier
->
ndisk
-
1
;
id
>=
0
;
id
--
)
{
STfsDisk
*
pDisk
=
pTier
->
disks
[
id
];
snprintf
(
oaname
,
TMPNAME_LEN
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
orname
)
;
snprintf
(
naname
,
TMPNAME_LEN
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
nrname
);
if
(
taosRenameFile
(
oaname
,
naname
)
!=
0
&&
errno
!=
ENOENT
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
)
;
fError
(
"failed to rename %s to %s since %s"
,
oaname
,
naname
,
terrstr
());
if
(
level
==
0
&&
id
==
diskPrimary
)
{
continue
;
}
SDiskID
diskId
=
{.
level
=
level
,
.
id
=
id
}
;
if
(
tfsRenameAt
(
pTfs
,
diskId
,
orname
,
nrname
))
{
return
-
1
;
}
}
}
return
0
;
SDiskID
diskId
=
{.
level
=
0
,
.
id
=
diskPrimary
};
return
tfsRenameAt
(
pTfs
,
diskId
,
orname
,
nrname
);
}
int32_t
tfsSearch
(
STfs
*
pTfs
,
int32_t
level
,
const
char
*
fname
)
{
if
(
level
<
0
||
level
>=
pTfs
->
nlevel
)
{
return
-
1
;
}
char
path
[
TMPNAME_LEN
]
=
{
0
};
STfsTier
*
pTier
=
TFS_TIER_AT
(
pTfs
,
level
);
for
(
int32_t
id
=
0
;
id
<
pTier
->
ndisk
;
id
++
)
{
STfsDisk
*
pDisk
=
pTier
->
disks
[
id
];
snprintf
(
path
,
TMPNAME_LEN
-
1
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
fname
);
if
(
taosCheckExistFile
(
path
))
{
return
id
;
}
}
return
-
1
;
}
STfsDir
*
tfsOpendir
(
STfs
*
pTfs
,
const
char
*
rname
)
{
...
...
source/libs/tfs/test/tfsTest.cpp
浏览文件 @
a32c6502
...
...
@@ -156,7 +156,7 @@ TEST_F(TfsTest, 03_Dir) {
EXPECT_NE
(
taosDirExist
(
ap4
),
1
);
EXPECT_EQ
(
tfsMkdirRecurAt
(
pTfs
,
p4
,
did
),
0
);
EXPECT_EQ
(
taosDirExist
(
ap4
),
1
);
EXPECT_EQ
(
tfsRename
(
pTfs
,
p44
,
p45
),
0
);
EXPECT_EQ
(
tfsRename
(
pTfs
,
0
,
p44
,
p45
),
0
);
EXPECT_EQ
(
tfsRmdir
(
pTfs
,
p4
),
0
);
EXPECT_NE
(
taosDirExist
(
ap4
),
1
);
...
...
@@ -609,7 +609,7 @@ TEST_F(TfsTest, 05_MultiDisk) {
EXPECT_NE
(
taosDirExist
(
_ap22
),
1
);
EXPECT_EQ
(
tfsMkdirRecurAt
(
pTfs
,
p4
,
did
),
0
);
EXPECT_EQ
(
taosDirExist
(
_ap22
),
1
);
EXPECT_EQ
(
tfsRename
(
pTfs
,
p44
,
p45
),
0
);
EXPECT_EQ
(
tfsRename
(
pTfs
,
0
,
p44
,
p45
),
0
);
EXPECT_EQ
(
tfsRmdir
(
pTfs
,
p4
),
0
);
EXPECT_NE
(
taosDirExist
(
_ap22
),
1
);
}
...
...
@@ -721,4 +721,4 @@ TEST_F(TfsTest, 05_MultiDisk) {
}
tfsClose
(
pTfs
);
}
\ No newline at end of file
}
source/util/src/terror.c
浏览文件 @
a32c6502
...
...
@@ -329,6 +329,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE, "Please use this comma
// vnode
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_VGROUP_ID
,
"Vnode is closed or removed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INIT_FAILED
,
"Vnode init failure"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_WRITE_AUTH
,
"Database write operation denied"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NOT_EXIST
,
"Vnode not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_ALREADY_EXIST
,
"Vnode already exist"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录