Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0e30d177
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
“1b1215d5e954726b142d5408dfd6d7153b3abe8b”上不存在“drivers/ide/ide-cd_verbose.c”
提交
0e30d177
编写于
10月 19, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into enh/TD-19623
上级
96ca18e6
c58daa70
变更
44
展开全部
隐藏空白更改
内联
并排
Showing
44 changed file
with
1060 addition
and
846 deletion
+1060
-846
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
docs/zh/21-tdinternal/01-arch.md
docs/zh/21-tdinternal/01-arch.md
+1
-1
include/util/tutil.h
include/util/tutil.h
+6
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+66
-3
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+1
-1
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+4
-11
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+2
-0
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+9
-6
source/dnode/vnode/src/meta/metaCommit.c
source/dnode/vnode/src/meta/metaCommit.c
+1
-0
source/dnode/vnode/src/meta/metaOpen.c
source/dnode/vnode/src/meta/metaOpen.c
+12
-12
source/dnode/vnode/src/meta/metaSnapshot.c
source/dnode/vnode/src/meta/metaSnapshot.c
+2
-0
source/dnode/vnode/src/sma/smaCommit.c
source/dnode/vnode/src/sma/smaCommit.c
+24
-9
source/dnode/vnode/src/sma/smaOpen.c
source/dnode/vnode/src/sma/smaOpen.c
+14
-14
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+3
-3
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+55
-26
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+622
-643
source/dnode/vnode/src/tsdb/tsdbOpen.c
source/dnode/vnode/src/tsdb/tsdbOpen.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbRetention.c
source/dnode/vnode/src/tsdb/tsdbRetention.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+2
-2
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+47
-50
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+9
-7
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/dataDeleter.c
source/libs/executor/src/dataDeleter.c
+7
-1
source/libs/executor/src/dataInserter.c
source/libs/executor/src/dataInserter.c
+1
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+28
-2
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+2
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+13
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+2
-0
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+5
-3
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+5
-5
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+4
-1
source/libs/tdb/inc/tdb.h
source/libs/tdb/inc/tdb.h
+4
-2
source/libs/tdb/src/db/tdbDb.c
source/libs/tdb/src/db/tdbDb.c
+17
-2
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+22
-0
source/libs/tdb/src/db/tdbTable.c
source/libs/tdb/src/db/tdbTable.c
+10
-5
source/libs/tdb/src/inc/tdbInt.h
source/libs/tdb/src/inc/tdbInt.h
+2
-0
source/libs/tdb/test/tdbExOVFLTest.cpp
source/libs/tdb/test/tdbExOVFLTest.cpp
+8
-8
source/libs/tdb/test/tdbTest.cpp
source/libs/tdb/test/tdbTest.cpp
+13
-13
source/os/src/osFile.c
source/os/src/osFile.c
+1
-0
source/util/src/tsched.c
source/util/src/tsched.c
+22
-1
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/script/tsim/sma/tsmaCreateInsertQuery.sim
tests/script/tsim/sma/tsmaCreateInsertQuery.sim
+4
-4
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+2
-2
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
0e30d177
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
9284147
GIT_TAG
cc973e0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
docs/zh/21-tdinternal/01-arch.md
浏览文件 @
0e30d177
...
...
@@ -240,7 +240,7 @@ dataDir /mnt/data6 2 0
## 数据查询
TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、qnode、mnode 节点协同完成,一个复杂的超级表聚合查询可能需要多个 vnode 和 qnode 节点
公共
分担查询和计算任务。
TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、qnode、mnode 节点协同完成,一个复杂的超级表聚合查询可能需要多个 vnode 和 qnode 节点
共同
分担查询和计算任务。
### 查询基本流程
...
...
include/util/tutil.h
浏览文件 @
0e30d177
...
...
@@ -83,6 +83,12 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
}
}
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
if (CODE) { \
LINO = __LINE__; \
goto LABEL; \
}
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientImpl.c
浏览文件 @
0e30d177
...
...
@@ -199,7 +199,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
if
(
tsQueryUseNodeAllocator
&&
!
qIsInsertValuesSql
((
*
pRequest
)
->
sqlstr
,
(
*
pRequest
)
->
sqlLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
nodesCreateAllocator
((
*
pRequest
)
->
requestId
,
tsQueryNodeChunkSize
,
&
((
*
pRequest
)
->
allocatorRefId
)))
{
tscError
(
"%
d
failed to create node allocator, reqId:0x%"
PRIx64
", conn:%"
PRId64
", %s"
,
(
*
pRequest
)
->
self
,
tscError
(
"%
"
PRId64
"
failed to create node allocator, reqId:0x%"
PRIx64
", conn:%"
PRId64
", %s"
,
(
*
pRequest
)
->
self
,
(
*
pRequest
)
->
requestId
,
pTscObj
->
id
,
sql
);
destroyRequest
(
*
pRequest
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
0e30d177
...
...
@@ -217,17 +217,80 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
}
}
static
void
*
vmCloseVnodeInThread
(
void
*
param
)
{
SVnodeThread
*
pThread
=
param
;
SVnodeMgmt
*
pMgmt
=
pThread
->
pMgmt
;
dInfo
(
"thread:%d, start to close %d vnodes"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
);
setThreadName
(
"close-vnodes"
);
for
(
int32_t
v
=
0
;
v
<
pThread
->
vnodeNum
;
++
v
)
{
SVnodeObj
*
pVnode
=
pThread
->
ppVnodes
[
v
];
char
stepDesc
[
TSDB_STEP_DESC_LEN
]
=
{
0
};
snprintf
(
stepDesc
,
TSDB_STEP_DESC_LEN
,
"vgId:%d, start to close, %d of %d have been closed"
,
pVnode
->
vgId
,
pMgmt
->
state
.
openVnodes
,
pMgmt
->
state
.
totalVnodes
);
tmsgReportStartup
(
"vnode-close"
,
stepDesc
);
vmCloseVnode
(
pMgmt
,
pVnode
);
}
dInfo
(
"thread:%d, numOfVnodes:%d is closed"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
);
return
NULL
;
}
static
void
vmCloseVnodes
(
SVnodeMgmt
*
pMgmt
)
{
dInfo
(
"start to close all vnodes"
);
int32_t
numOfVnodes
=
0
;
SVnodeObj
**
ppVnodes
=
vmGetVnodeListFromHash
(
pMgmt
,
&
numOfVnodes
);
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
if
(
ppVnodes
==
NULL
||
ppVnodes
[
i
]
==
NULL
)
continue
;
vmCloseVnode
(
pMgmt
,
ppVnodes
[
i
]);
int32_t
threadNum
=
tsNumOfCores
/
2
;
if
(
threadNum
<
1
)
threadNum
=
1
;
int32_t
vnodesPerThread
=
numOfVnodes
/
threadNum
+
1
;
SVnodeThread
*
threads
=
taosMemoryCalloc
(
threadNum
,
sizeof
(
SVnodeThread
));
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
threads
[
t
].
threadIndex
=
t
;
threads
[
t
].
pMgmt
=
pMgmt
;
threads
[
t
].
ppVnodes
=
taosMemoryCalloc
(
vnodesPerThread
,
sizeof
(
SVnode
*
));
}
for
(
int32_t
v
=
0
;
v
<
numOfVnodes
;
++
v
)
{
int32_t
t
=
v
%
threadNum
;
SVnodeThread
*
pThread
=
&
threads
[
t
];
if
(
pThread
->
ppVnodes
!=
NULL
&&
ppVnodes
!=
NULL
)
{
pThread
->
ppVnodes
[
pThread
->
vnodeNum
++
]
=
ppVnodes
[
v
];
}
}
pMgmt
->
state
.
openVnodes
=
0
;
dInfo
(
"close %d vnodes with %d threads"
,
numOfVnodes
,
threadNum
);
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
SVnodeThread
*
pThread
=
&
threads
[
t
];
if
(
pThread
->
vnodeNum
==
0
)
continue
;
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
taosThreadCreate
(
&
pThread
->
thread
,
&
thAttr
,
vmCloseVnodeInThread
,
pThread
)
!=
0
)
{
dError
(
"thread:%d, failed to create thread to close vnode since %s"
,
pThread
->
threadIndex
,
strerror
(
errno
));
}
taosThreadAttrDestroy
(
&
thAttr
);
}
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
SVnodeThread
*
pThread
=
&
threads
[
t
];
if
(
pThread
->
vnodeNum
>
0
&&
taosCheckPthreadValid
(
pThread
->
thread
))
{
taosThreadJoin
(
pThread
->
thread
,
NULL
);
taosThreadClear
(
&
pThread
->
thread
);
}
taosMemoryFree
(
pThread
->
ppVnodes
);
}
taosMemoryFree
(
threads
);
if
(
ppVnodes
!=
NULL
)
{
taosMemoryFree
(
ppVnodes
);
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
0e30d177
...
...
@@ -2555,7 +2555,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
int32_t
rollupNum
=
(
int32_t
)
taosArrayGetSize
(
pStb
->
pFuncs
);
char
*
sep
=
", "
;
int32_t
sepLen
=
strlen
(
sep
);
int32_t
rollupLen
=
sizeof
(
rollup
)
-
2
;
int32_t
rollupLen
=
sizeof
(
rollup
)
-
VARSTR_HEADER_SIZE
-
2
;
for
(
int32_t
i
=
0
;
i
<
rollupNum
;
++
i
)
{
char
*
funcName
=
taosArrayGet
(
pStb
->
pFuncs
,
i
);
if
(
i
)
{
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
0e30d177
...
...
@@ -32,12 +32,6 @@ extern "C" {
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
if (CODE) { \
LINO = __LINE__; \
goto LABEL; \
}
typedef
struct
TSDBROW
TSDBROW
;
typedef
struct
TABLEID
TABLEID
;
typedef
struct
TSDBKEY
TSDBKEY
;
...
...
@@ -247,18 +241,17 @@ void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, ch
// SDelFile
void
tsdbDelFileName
(
STsdb
*
pTsdb
,
SDelFile
*
pFile
,
char
fname
[]);
// tsdbFS.c ==============================================================================================
int32_t
tsdbFSOpen
(
STsdb
*
pTsdb
);
int32_t
tsdbFSOpen
(
STsdb
*
pTsdb
,
int8_t
rollback
);
int32_t
tsdbFSClose
(
STsdb
*
pTsdb
);
int32_t
tsdbFSCopy
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
void
tsdbFSDestroy
(
STsdbFS
*
pFS
);
int32_t
tDFileSetCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
tsdbFSCommit1
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
int32_t
tsdbFSCommit2
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
int32_t
tsdbFSCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbFSRollback
(
STsdb
*
pTsdb
);
int32_t
tsdbFSPrepareCommit
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
int32_t
tsdbFSRef
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
void
tsdbFSUnref
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
int32_t
tsdbFSRollback
(
STsdbFS
*
pFS
);
int32_t
tsdbFSUpsertFSet
(
STsdbFS
*
pFS
,
SDFileSet
*
pSet
);
int32_t
tsdbFSUpsertDelFile
(
STsdbFS
*
pFS
,
SDelFile
*
pDelFile
);
// tsdbReaderWriter.c ==============================================================================================
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
0e30d177
...
...
@@ -87,11 +87,13 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
int32_t
vnodeBegin
(
SVnode
*
pVnode
);
int32_t
vnodeShouldCommit
(
SVnode
*
pVnode
);
int32_t
vnodeCommit
(
SVnode
*
pVnode
);
void
vnodeRollback
(
SVnode
*
pVnode
);
int32_t
vnodeSaveInfo
(
const
char
*
dir
,
const
SVnodeInfo
*
pCfg
);
int32_t
vnodeCommitInfo
(
const
char
*
dir
,
const
SVnodeInfo
*
pInfo
);
int32_t
vnodeLoadInfo
(
const
char
*
dir
,
SVnodeInfo
*
pInfo
);
int32_t
vnodeSyncCommit
(
SVnode
*
pVnode
);
int32_t
vnodeAsyncCommit
(
SVnode
*
pVnode
);
bool
vnodeShouldRollback
(
SVnode
*
pVnode
);
// vnodeSync.c
int32_t
vnodeSyncOpen
(
SVnode
*
pVnode
,
char
*
path
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
0e30d177
...
...
@@ -97,10 +97,11 @@ typedef struct SMCtbCursor SMCtbCursor;
typedef
struct
SMStbCursor
SMStbCursor
;
typedef
struct
STbUidStore
STbUidStore
;
int
metaOpen
(
SVnode
*
pVnode
,
SMeta
**
ppMeta
);
int
metaOpen
(
SVnode
*
pVnode
,
SMeta
**
ppMeta
,
int8_t
rollback
);
int
metaClose
(
SMeta
*
pMeta
);
int
metaBegin
(
SMeta
*
pMeta
,
int8_t
fromSys
);
int
metaCommit
(
SMeta
*
pMeta
);
int
metaFinishCommit
(
SMeta
*
pMeta
);
int
metaCreateSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
);
int
metaAlterSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
);
int
metaDropSTable
(
SMeta
*
pMeta
,
int64_t
verison
,
SVDropStbReq
*
pReq
,
SArray
*
tbUidList
);
...
...
@@ -149,10 +150,12 @@ typedef struct {
int32_t
metaGetStbStats
(
SMeta
*
pMeta
,
int64_t
uid
,
SMetaStbStats
*
pInfo
);
// tsdb
int
tsdbOpen
(
SVnode
*
pVnode
,
STsdb
**
ppTsdb
,
const
char
*
dir
,
STsdbKeepCfg
*
pKeepCfg
);
int
tsdbOpen
(
SVnode
*
pVnode
,
STsdb
**
ppTsdb
,
const
char
*
dir
,
STsdbKeepCfg
*
pKeepCfg
,
int8_t
rollback
);
int
tsdbClose
(
STsdb
**
pTsdb
);
int32_t
tsdbBegin
(
STsdb
*
pTsdb
);
int32_t
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbFinishCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbRollbackCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
);
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
...
...
@@ -200,15 +203,15 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// sma
int32_t
smaInit
();
void
smaCleanUp
();
int32_t
smaOpen
(
SVnode
*
pVnode
);
int32_t
smaOpen
(
SVnode
*
pVnode
,
int8_t
rollback
);
int32_t
smaClose
(
SSma
*
pSma
);
int32_t
smaBegin
(
SSma
*
pSma
);
int32_t
smaSyncPreCommit
(
SSma
*
pSma
);
int32_t
smaSyncCommit
(
SSma
*
pSma
);
int32_t
smaSyncPostCommit
(
SSma
*
pSma
);
int32_t
sma
Async
PreCommit
(
SSma
*
pSma
);
int32_t
sma
Async
Commit
(
SSma
*
pSma
);
int32_t
sma
Async
PostCommit
(
SSma
*
pSma
);
int32_t
smaPreCommit
(
SSma
*
pSma
);
int32_t
smaCommit
(
SSma
*
pSma
);
int32_t
smaPostCommit
(
SSma
*
pSma
);
int32_t
smaDoRetention
(
SSma
*
pSma
,
int64_t
now
);
int32_t
tdProcessTSmaCreate
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
msg
);
...
...
source/dnode/vnode/src/meta/metaCommit.c
浏览文件 @
0e30d177
...
...
@@ -34,6 +34,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) {
// commit the meta txn
int
metaCommit
(
SMeta
*
pMeta
)
{
return
tdbCommit
(
pMeta
->
pEnv
,
&
pMeta
->
txn
);
}
int
metaFinishCommit
(
SMeta
*
pMeta
)
{
return
tdbPostCommit
(
pMeta
->
pEnv
,
&
pMeta
->
txn
);
}
// abort the meta txn
int
metaAbort
(
SMeta
*
pMeta
)
{
return
tdbAbort
(
pMeta
->
pEnv
,
&
pMeta
->
txn
);
}
source/dnode/vnode/src/meta/metaOpen.c
浏览文件 @
0e30d177
...
...
@@ -27,7 +27,7 @@ static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int k
static
int32_t
metaInitLock
(
SMeta
*
pMeta
)
{
return
taosThreadRwlockInit
(
&
pMeta
->
lock
,
NULL
);
}
static
int32_t
metaDestroyLock
(
SMeta
*
pMeta
)
{
return
taosThreadRwlockDestroy
(
&
pMeta
->
lock
);
}
int
metaOpen
(
SVnode
*
pVnode
,
SMeta
**
ppMeta
)
{
int
metaOpen
(
SVnode
*
pVnode
,
SMeta
**
ppMeta
,
int8_t
rollback
)
{
SMeta
*
pMeta
=
NULL
;
int
ret
;
int
slen
;
...
...
@@ -60,49 +60,49 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
taosMkDir
(
pMeta
->
path
);
// open env
ret
=
tdbOpen
(
pMeta
->
path
,
pVnode
->
config
.
szPage
,
pVnode
->
config
.
szCache
,
&
pMeta
->
pEnv
);
ret
=
tdbOpen
(
pMeta
->
path
,
pVnode
->
config
.
szPage
,
pVnode
->
config
.
szCache
,
&
pMeta
->
pEnv
,
rollback
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta env since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open pTbDb
ret
=
tdbTbOpen
(
"table.db"
,
sizeof
(
STbDbKey
),
-
1
,
tbDbKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pTbDb
);
ret
=
tdbTbOpen
(
"table.db"
,
sizeof
(
STbDbKey
),
-
1
,
tbDbKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pTbDb
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta table db since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open pSkmDb
ret
=
tdbTbOpen
(
"schema.db"
,
sizeof
(
SSkmDbKey
),
-
1
,
skmDbKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pSkmDb
);
ret
=
tdbTbOpen
(
"schema.db"
,
sizeof
(
SSkmDbKey
),
-
1
,
skmDbKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pSkmDb
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta schema db since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open pUidIdx
ret
=
tdbTbOpen
(
"uid.idx"
,
sizeof
(
tb_uid_t
),
sizeof
(
SUidIdxVal
),
uidIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pUidIdx
);
ret
=
tdbTbOpen
(
"uid.idx"
,
sizeof
(
tb_uid_t
),
sizeof
(
SUidIdxVal
),
uidIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pUidIdx
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta uid idx since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open pNameIdx
ret
=
tdbTbOpen
(
"name.idx"
,
-
1
,
sizeof
(
tb_uid_t
),
NULL
,
pMeta
->
pEnv
,
&
pMeta
->
pNameIdx
);
ret
=
tdbTbOpen
(
"name.idx"
,
-
1
,
sizeof
(
tb_uid_t
),
NULL
,
pMeta
->
pEnv
,
&
pMeta
->
pNameIdx
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta name index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open pCtbIdx
ret
=
tdbTbOpen
(
"ctb.idx"
,
sizeof
(
SCtbIdxKey
),
-
1
,
ctbIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pCtbIdx
);
ret
=
tdbTbOpen
(
"ctb.idx"
,
sizeof
(
SCtbIdxKey
),
-
1
,
ctbIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pCtbIdx
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta child table index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open pSuidIdx
ret
=
tdbTbOpen
(
"suid.idx"
,
sizeof
(
tb_uid_t
),
0
,
uidIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pSuidIdx
);
ret
=
tdbTbOpen
(
"suid.idx"
,
sizeof
(
tb_uid_t
),
0
,
uidIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pSuidIdx
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta super table index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
...
...
@@ -119,27 +119,27 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto
_err
;
}
ret
=
tdbTbOpen
(
"tag.idx"
,
-
1
,
0
,
tagIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pTagIdx
);
ret
=
tdbTbOpen
(
"tag.idx"
,
-
1
,
0
,
tagIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pTagIdx
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta tag index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open pTtlIdx
ret
=
tdbTbOpen
(
"ttl.idx"
,
sizeof
(
STtlIdxKey
),
0
,
ttlIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pTtlIdx
);
ret
=
tdbTbOpen
(
"ttl.idx"
,
sizeof
(
STtlIdxKey
),
0
,
ttlIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pTtlIdx
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta ttl index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open pSmaIdx
ret
=
tdbTbOpen
(
"sma.idx"
,
sizeof
(
SSmaIdxKey
),
0
,
smaIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pSmaIdx
);
ret
=
tdbTbOpen
(
"sma.idx"
,
sizeof
(
SSmaIdxKey
),
0
,
smaIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pSmaIdx
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta sma index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
ret
=
tdbTbOpen
(
"stream.task.db"
,
sizeof
(
int64_t
),
-
1
,
taskIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pStreamDb
);
ret
=
tdbTbOpen
(
"stream.task.db"
,
sizeof
(
int64_t
),
-
1
,
taskIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pStreamDb
,
0
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta stream task index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
...
...
source/dnode/vnode/src/meta/metaSnapshot.c
浏览文件 @
0e30d177
...
...
@@ -165,6 +165,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
}
else
{
code
=
metaCommit
(
pWriter
->
pMeta
);
if
(
code
)
goto
_err
;
code
=
metaFinishCommit
(
pWriter
->
pMeta
);
if
(
code
)
goto
_err
;
}
taosMemoryFree
(
pWriter
);
*
ppWriter
=
NULL
;
...
...
source/dnode/vnode/src/sma/smaCommit.c
浏览文件 @
0e30d177
...
...
@@ -54,28 +54,28 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(p
#endif
/**
* @brief
O
nly applicable to Rollup SMA
* @brief
async commit, o
nly applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t
sma
Async
PreCommit
(
SSma
*
pSma
)
{
return
tdProcessRSmaAsyncPreCommitImpl
(
pSma
);
}
int32_t
smaPreCommit
(
SSma
*
pSma
)
{
return
tdProcessRSmaAsyncPreCommitImpl
(
pSma
);
}
/**
* @brief
O
nly applicable to Rollup SMA
* @brief
async commit, o
nly applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t
sma
Async
Commit
(
SSma
*
pSma
)
{
return
tdProcessRSmaAsyncCommitImpl
(
pSma
);
}
int32_t
smaCommit
(
SSma
*
pSma
)
{
return
tdProcessRSmaAsyncCommitImpl
(
pSma
);
}
/**
* @brief
O
nly applicable to Rollup SMA
* @brief
async commit, o
nly applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t
sma
Async
PostCommit
(
SSma
*
pSma
)
{
return
tdProcessRSmaAsyncPostCommitImpl
(
pSma
);
}
int32_t
smaPostCommit
(
SSma
*
pSma
)
{
return
tdProcessRSmaAsyncPostCommitImpl
(
pSma
);
}
/**
* @brief set rsma trigger stat active
...
...
@@ -366,9 +366,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
* @return int32_t
*/
static
int32_t
tdProcessRSmaAsyncCommitImpl
(
SSma
*
pSma
)
{
int32_t
code
=
0
;
SVnode
*
pVnode
=
pSma
->
pVnode
;
SSmaEnv
*
pSmaEnv
=
SMA_RSMA_ENV
(
pSma
);
if
(
!
pSmaEnv
)
{
return
TSDB_CODE_SUCCESS
;
goto
_exit
;
}
#if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
...
...
@@ -378,8 +380,21 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
return TSDB_CODE_FAILED;
}
#endif
return
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbCommit
(
VND_RSMA0
(
pVnode
)))
<
0
)
{
smaError
(
"vgId:%d, failed to commit tsdb rsma0 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
code
));
goto
_exit
;
}
if
((
code
=
tsdbCommit
(
VND_RSMA1
(
pVnode
)))
<
0
)
{
smaError
(
"vgId:%d, failed to commit tsdb rsma1 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
code
));
goto
_exit
;
}
if
((
code
=
tsdbCommit
(
VND_RSMA2
(
pVnode
)))
<
0
)
{
smaError
(
"vgId:%d, failed to commit tsdb rsma2 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
code
));
goto
_exit
;
}
_exit:
terrno
=
code
;
return
code
;
}
/**
...
...
source/dnode/vnode/src/sma/smaOpen.c
浏览文件 @
0e30d177
...
...
@@ -29,19 +29,19 @@ static int32_t rsmaRestore(SSma *pSma);
pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \
} while (0)
#define SMA_OPEN_RSMA_IMPL(v, l) \
do { \
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
if (!RETENTION_VALID(r)) { \
if (l == 0) { \
goto _err; \
} \
break; \
} \
smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \
goto _err; \
} \
#define SMA_OPEN_RSMA_IMPL(v, l)
\
do {
\
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l;
\
if (!RETENTION_VALID(r)) {
\
if (l == 0) {
\
goto _err;
\
}
\
break;
\
}
\
smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l);
\
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg
, rollback
) < 0) { \
goto _err;
\
}
\
} while (0)
/**
...
...
@@ -119,7 +119,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
return
0
;
}
int32_t
smaOpen
(
SVnode
*
pVnode
)
{
int32_t
smaOpen
(
SVnode
*
pVnode
,
int8_t
rollback
)
{
STsdbCfg
*
pCfg
=
&
pVnode
->
config
.
tsdbCfg
;
ASSERT
(
!
pVnode
->
pSma
);
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
0e30d177
...
...
@@ -70,17 +70,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
}
int32_t
tqMetaOpen
(
STQ
*
pTq
)
{
if
(
tdbOpen
(
pTq
->
path
,
16
*
1024
,
1
,
&
pTq
->
pMetaDB
)
<
0
)
{
if
(
tdbOpen
(
pTq
->
path
,
16
*
1024
,
1
,
&
pTq
->
pMetaDB
,
0
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
if
(
tdbTbOpen
(
"tq.db"
,
-
1
,
-
1
,
NULL
,
pTq
->
pMetaDB
,
&
pTq
->
pExecStore
)
<
0
)
{
if
(
tdbTbOpen
(
"tq.db"
,
-
1
,
-
1
,
NULL
,
pTq
->
pMetaDB
,
&
pTq
->
pExecStore
,
0
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
if
(
tdbTbOpen
(
"tq.check.db"
,
-
1
,
-
1
,
NULL
,
pTq
->
pMetaDB
,
&
pTq
->
pCheckStore
)
<
0
)
{
if
(
tdbTbOpen
(
"tq.check.db"
,
-
1
,
-
1
,
NULL
,
pTq
->
pMetaDB
,
&
pTq
->
pCheckStore
,
0
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
0e30d177
...
...
@@ -1041,38 +1041,20 @@ _exit:
static
int32_t
tsdbEndCommit
(
SCommitter
*
pCommitter
,
int32_t
eno
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
ASSERT
(
eno
==
0
&&
"tsdbCommit failure"
"Restart taosd"
);
code
=
tsdbFSCommit1
(
pTsdb
,
&
pCommitter
->
fs
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// lock
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
// commit or rollback
code
=
tsdbFSCommit2
(
pTsdb
,
&
pCommitter
->
fs
);
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
eno
)
{
code
=
eno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbFSPrepareCommit
(
pCommitter
->
pTsdb
,
&
pCommitter
->
fs
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
pTsdb
->
imem
=
NULL
;
// unlock
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbUnrefMemTable
(
pMemTable
);
_exit:
tsdbFSDestroy
(
&
pCommitter
->
fs
);
taosArrayDestroy
(
pCommitter
->
aTbDataP
);
_exit:
if
(
code
)
{
if
(
code
||
eno
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbInfo
(
"vgId:%d tsdb end commit"
,
TD_VID
(
pTsdb
->
pVnode
));
...
...
@@ -1646,3 +1628,50 @@ _exit:
}
return
code
;
}
int32_t
tsdbFinishCommit
(
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
// lock
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
code
=
tsdbFSCommit
(
pTsdb
);
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
pTsdb
->
imem
=
NULL
;
// unlock
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
pMemTable
)
{
tsdbUnrefMemTable
(
pMemTable
);
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbInfo
(
"vgId:%d tsdb finish commit"
,
TD_VID
(
pTsdb
->
pVnode
));
}
return
code
;
}
int32_t
tsdbRollbackCommit
(
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
code
=
tsdbFSRollback
(
pTsdb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbInfo
(
"vgId:%d tsdb rollback commit"
,
TD_VID
(
pTsdb
->
pVnode
));
}
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
0e30d177
此差异已折叠。
点击以展开。
source/dnode/vnode/src/tsdb/tsdbOpen.c
浏览文件 @
0e30d177
...
...
@@ -33,7 +33,7 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
* @param dir
* @return int
*/
int
tsdbOpen
(
SVnode
*
pVnode
,
STsdb
**
ppTsdb
,
const
char
*
dir
,
STsdbKeepCfg
*
pKeepCfg
)
{
int
tsdbOpen
(
SVnode
*
pVnode
,
STsdb
**
ppTsdb
,
const
char
*
dir
,
STsdbKeepCfg
*
pKeepCfg
,
int8_t
rollback
)
{
STsdb
*
pTsdb
=
NULL
;
int
slen
=
0
;
...
...
@@ -66,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
}
// open tsdb
if
(
tsdbFSOpen
(
pTsdb
)
<
0
)
{
if
(
tsdbFSOpen
(
pTsdb
,
rollback
)
<
0
)
{
goto
_err
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbRetention.c
浏览文件 @
0e30d177
...
...
@@ -86,12 +86,12 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
}
// do change fs
code
=
tsdbFS
Commit1
(
pTsdb
,
&
fs
);
code
=
tsdbFS
PrepareCommit
(
pTsdb
,
&
fs
);
if
(
code
)
goto
_err
;
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
code
=
tsdbFSCommit
2
(
pTsdb
,
&
fs
);
code
=
tsdbFSCommit
(
pTsdb
);
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_err
;
...
...
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
浏览文件 @
0e30d177
...
...
@@ -1380,13 +1380,13 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
code
=
tsdbSnapWriteDelEnd
(
pWriter
);
if
(
code
)
goto
_err
;
code
=
tsdbFS
Commit1
(
pWriter
->
pTsdb
,
&
pWriter
->
fs
);
code
=
tsdbFS
PrepareCommit
(
pWriter
->
pTsdb
,
&
pWriter
->
fs
);
if
(
code
)
goto
_err
;
// lock
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
code
=
tsdbFSCommit
2
(
pWriter
->
pTsdb
,
&
pWriter
->
fs
);
code
=
tsdbFSCommit
(
pWriter
->
pTsdb
);
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_err
;
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
0e30d177
...
...
@@ -20,8 +20,6 @@
static
int
vnodeEncodeInfo
(
const
SVnodeInfo
*
pInfo
,
char
**
ppData
);
static
int
vnodeDecodeInfo
(
uint8_t
*
pData
,
SVnodeInfo
*
pInfo
);
static
int
vnodeStartCommit
(
SVnode
*
pVnode
);
static
int
vnodeEndCommit
(
SVnode
*
pVnode
);
static
int
vnodeCommitImpl
(
void
*
arg
);
static
void
vnodeWaitCommit
(
SVnode
*
pVnode
);
...
...
@@ -215,6 +213,8 @@ int vnodeSyncCommit(SVnode *pVnode) {
}
int
vnodeCommit
(
SVnode
*
pVnode
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
];
...
...
@@ -234,15 +234,13 @@ int vnodeCommit(SVnode *pVnode) {
snprintf
(
dir
,
TSDB_FILENAME_LEN
,
"%s"
,
pVnode
->
path
);
}
if
(
vnodeSaveInfo
(
dir
,
&
info
)
<
0
)
{
vError
(
"vgId:%d, failed to save vnode info since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
))
;
return
-
1
;
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
walBeginSnapshot
(
pVnode
->
pWal
,
pVnode
->
state
.
applied
);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
if
(
smaAsyncPreCommit
(
pVnode
->
pSma
)
<
0
){
vError
(
"vgId:%d, failed to async pre-commit sma since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
if
(
smaPreCommit
(
pVnode
->
pSma
)
<
0
)
{
vError
(
"vgId:%d, failed to pre-commit sma since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
return
-
1
;
}
...
...
@@ -251,64 +249,73 @@ int vnodeCommit(SVnode *pVnode) {
// commit each sub-system
if
(
metaCommit
(
pVnode
->
pMeta
)
<
0
)
{
vError
(
"vgId:%d, failed to commit meta since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
))
;
return
-
1
;
code
=
TSDB_CODE_FAILED
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
if
(
VND_IS_RSMA
(
pVnode
))
{
if
(
smaAsyncCommit
(
pVnode
->
pSma
)
<
0
)
{
vError
(
"vgId:%d, failed to async commit sma since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbCommit
(
VND_RSMA0
(
pVnode
))
<
0
)
{
vError
(
"vgId:%d, failed to commit tsdb rsma0 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbCommit
(
VND_RSMA1
(
pVnode
))
<
0
)
{
vError
(
"vgId:%d, failed to commit tsdb rsma1 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbCommit
(
VND_RSMA2
(
pVnode
))
<
0
)
{
vError
(
"vgId:%d, failed to commit tsdb rsma2 since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
if
(
smaCommit
(
pVnode
->
pSma
)
<
0
)
{
vError
(
"vgId:%d, failed to commit sma since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
return
-
1
;
}
}
else
{
if
(
tsdbCommit
(
pVnode
->
pTsdb
)
<
0
)
{
vError
(
"vgId:%d, failed to commit tsdb since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
return
-
1
;
}
code
=
tsdbCommit
(
pVnode
->
pTsdb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
tqCommit
(
pVnode
->
pTq
)
<
0
)
{
vError
(
"vgId:%d, failed to commit tq since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
))
;
return
-
1
;
code
=
TSDB_CODE_FAILED
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
// walCommit (TODO)
// commit info
if
(
vnodeCommitInfo
(
dir
,
&
info
)
<
0
)
{
vError
(
"vgId:%d, failed to commit vnode info since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
return
-
1
;
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
tsdbFinishCommit
(
pVnode
->
pTsdb
);
if
(
metaFinishCommit
(
pVnode
->
pMeta
)
<
0
)
{
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
pVnode
->
state
.
committed
=
info
.
state
.
committed
;
// postCommit
// smaSyncPostCommit(pVnode->pSma);
if
(
smaAsyncPostCommit
(
pVnode
->
pSma
)
<
0
)
{
vError
(
"vgId:%d, failed to async post-commit sma since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
if
(
smaPostCommit
(
pVnode
->
pSma
)
<
0
)
{
vError
(
"vgId:%d, failed to post-commit sma since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
return
-
1
;
}
// apply the commit (TODO)
walEndSnapshot
(
pVnode
->
pWal
);
vInfo
(
"vgId:%d, commit end"
,
TD_VID
(
pVnode
));
_exit:
if
(
code
)
{
vError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
vInfo
(
"vgId:%d, commit end"
,
TD_VID
(
pVnode
));
}
return
0
;
}
bool
vnodeShouldRollback
(
SVnode
*
pVnode
)
{
char
tFName
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
tFName
,
TSDB_FILENAME_LEN
,
"%s%s%s%s%s"
,
tfsGetPrimaryPath
(
pVnode
->
pTfs
),
TD_DIRSEP
,
pVnode
->
path
,
TD_DIRSEP
,
VND_INFO_FNAME_TMP
);
return
taosCheckExistFile
(
tFName
);
}
void
vnodeRollback
(
SVnode
*
pVnode
)
{
char
tFName
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
tFName
,
TSDB_FILENAME_LEN
,
"%s%s%s%s%s"
,
tfsGetPrimaryPath
(
pVnode
->
pTfs
),
TD_DIRSEP
,
pVnode
->
path
,
TD_DIRSEP
,
VND_INFO_FNAME_TMP
);
(
void
)
taosRemoveFile
(
tFName
);
}
static
int
vnodeCommitImpl
(
void
*
arg
)
{
SVnode
*
pVnode
=
(
SVnode
*
)
arg
;
...
...
@@ -321,16 +328,6 @@ static int vnodeCommitImpl(void *arg) {
return
0
;
}
static
int
vnodeStartCommit
(
SVnode
*
pVnode
)
{
// TODO
return
0
;
}
static
int
vnodeEndCommit
(
SVnode
*
pVnode
)
{
// TODO
return
0
;
}
static
FORCE_INLINE
void
vnodeWaitCommit
(
SVnode
*
pVnode
)
{
tsem_wait
(
&
pVnode
->
canCommit
);
}
static
int
vnodeEncodeState
(
const
void
*
pObj
,
SJson
*
pJson
)
{
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
0e30d177
...
...
@@ -110,6 +110,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadMutexInit
(
&
pVnode
->
mutex
,
NULL
);
taosThreadCondInit
(
&
pVnode
->
poolNotEmpty
,
NULL
);
int8_t
rollback
=
vnodeShouldRollback
(
pVnode
);
// open buffer pool
if
(
vnodeOpenBufPool
(
pVnode
)
<
0
)
{
vError
(
"vgId:%d, failed to open vnode buffer pool since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
...
...
@@ -117,19 +119,19 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
}
// open meta
if
(
metaOpen
(
pVnode
,
&
pVnode
->
pMeta
)
<
0
)
{
if
(
metaOpen
(
pVnode
,
&
pVnode
->
pMeta
,
rollback
)
<
0
)
{
vError
(
"vgId:%d, failed to open vnode meta since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open tsdb
if
(
!
VND_IS_RSMA
(
pVnode
)
&&
tsdbOpen
(
pVnode
,
&
VND_TSDB
(
pVnode
),
VNODE_TSDB_DIR
,
NULL
)
<
0
)
{
if
(
!
VND_IS_RSMA
(
pVnode
)
&&
tsdbOpen
(
pVnode
,
&
VND_TSDB
(
pVnode
),
VNODE_TSDB_DIR
,
NULL
,
rollback
)
<
0
)
{
vError
(
"vgId:%d, failed to open vnode tsdb since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// open sma
if
(
smaOpen
(
pVnode
))
{
if
(
smaOpen
(
pVnode
,
rollback
))
{
vError
(
"vgId:%d, failed to open vnode sma since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
...
...
@@ -153,14 +155,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto
_err
;
}
#if !VNODE_AS_LIB
// open query
if
(
vnodeQueryOpen
(
pVnode
))
{
vError
(
"vgId:%d, failed to open vnode query since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
#endif
// vnode begin
if
(
vnodeBegin
(
pVnode
)
<
0
)
{
...
...
@@ -169,13 +169,15 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto
_err
;
}
#if !VNODE_AS_LIB
// open sync
if
(
vnodeSyncOpen
(
pVnode
,
dir
))
{
vError
(
"vgId:%d, failed to open sync since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
#endif
if
(
rollback
)
{
vnodeRollback
(
pVnode
);
}
return
pVnode
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
0e30d177
...
...
@@ -1095,7 +1095,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t
buildDataBlockFromGroupRes
(
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
saveSessionDiscBuf
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
*
buf
,
int32_t
size
);
int32_t
buildSessionResultDataBlock
(
S
ExecTaskInfo
*
pTaskInfo
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
int32_t
buildSessionResultDataBlock
(
S
OperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
setOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
);
...
...
source/libs/executor/src/dataDeleter.c
浏览文件 @
0e30d177
...
...
@@ -150,9 +150,15 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) {
static
int32_t
putDataBlock
(
SDataSinkHandle
*
pHandle
,
const
SInputData
*
pInput
,
bool
*
pContinue
)
{
SDataDeleterHandle
*
pDeleter
=
(
SDataDeleterHandle
*
)
pHandle
;
SDataDeleterBuf
*
pBuf
=
taosAllocateQitem
(
sizeof
(
SDataDeleterBuf
),
DEF_QITEM
);
if
(
NULL
==
pBuf
||
!
allocBuf
(
pDeleter
,
pInput
,
pBuf
)
)
{
if
(
NULL
==
pBuf
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
if
(
!
allocBuf
(
pDeleter
,
pInput
,
pBuf
))
{
taosFreeQitem
(
pBuf
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
toDataCacheEntry
(
pDeleter
,
pInput
,
pBuf
);
taosWriteQitem
(
pDeleter
->
pDataBlocks
,
pBuf
);
*
pContinue
=
(
DS_BUF_LOW
==
updateStatus
(
pDeleter
)
?
true
:
false
);
...
...
source/libs/executor/src/dataInserter.c
浏览文件 @
0e30d177
...
...
@@ -324,6 +324,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
tsdbGetTableSchema
(
inserter
->
pParam
->
readHandle
->
vnode
,
pInserterNode
->
tableId
,
&
inserter
->
pSchema
,
&
suid
);
if
(
code
)
{
destroyDataSinker
((
SDataSinkHandle
*
)
inserter
);
taosMemoryFree
(
inserter
);
return
code
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
0e30d177
...
...
@@ -4312,8 +4312,9 @@ int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, in
return
TSDB_CODE_SUCCESS
;
}
int32_t
buildSessionResultDataBlock
(
S
ExecTaskInfo
*
pTaskInfo
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
int32_t
buildSessionResultDataBlock
(
S
OperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExprInfo
*
pExprInfo
=
pSup
->
pExprInfo
;
int32_t
numOfExprs
=
pSup
->
numOfExprs
;
int32_t
*
rowEntryOffset
=
pSup
->
rowEntryInfoOffset
;
...
...
@@ -4338,6 +4339,31 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta
if
(
pBlock
->
info
.
groupId
==
0
)
{
pBlock
->
info
.
groupId
=
pKey
->
groupId
;
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
)
{
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
char
*
tbname
=
taosHashGet
(
pInfo
->
pGroupIdTbNameMap
,
&
pBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
tbname
!=
NULL
)
{
memcpy
(
pBlock
->
info
.
parTbName
,
tbname
,
TSDB_TABLE_NAME_LEN
);
}
else
{
pBlock
->
info
.
parTbName
[
0
]
=
0
;
}
}
else
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
||
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION
||
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION
)
{
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
char
*
tbname
=
taosHashGet
(
pInfo
->
pGroupIdTbNameMap
,
&
pBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
tbname
!=
NULL
)
{
memcpy
(
pBlock
->
info
.
parTbName
,
tbname
,
TSDB_TABLE_NAME_LEN
);
}
else
{
pBlock
->
info
.
parTbName
[
0
]
=
0
;
}
}
else
{
ASSERT
(
0
);
}
}
else
{
// current value belongs to different group, it can't be packed into one datablock
if
(
pBlock
->
info
.
groupId
!=
pKey
->
groupId
)
{
...
...
@@ -4383,4 +4409,4 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta
}
blockDataUpdateTsWindow
(
pBlock
,
0
);
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
}
source/libs/executor/src/tfill.c
浏览文件 @
0e30d177
...
...
@@ -1492,6 +1492,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
case
STREAM_NORMAL
:
case
STREAM_INVALID
:
{
doApplyStreamScalarCalculation
(
pOperator
,
pBlock
,
pInfo
->
pSrcBlock
);
memcpy
(
pInfo
->
pSrcBlock
->
info
.
parTbName
,
pBlock
->
info
.
parTbName
,
TSDB_TABLE_NAME_LEN
);
pInfo
->
srcRowIndex
=
0
;
}
break
;
default:
...
...
@@ -1502,6 +1503,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
doStreamFillImpl
(
pOperator
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
pInfo
->
pColMatchColInfo
,
NULL
);
memcpy
(
pInfo
->
pRes
->
info
.
parTbName
,
pInfo
->
pSrcBlock
->
info
.
parTbName
,
TSDB_TABLE_NAME_LEN
);
pOperator
->
resultInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
break
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
0e30d177
...
...
@@ -4044,7 +4044,7 @@ void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroup
// clear the existed group id
pBlock
->
info
.
groupId
=
0
;
buildSessionResultDataBlock
(
p
TaskInfo
,
pState
,
pBlock
,
&
pOperator
->
exprSupp
,
pGroupResInfo
);
buildSessionResultDataBlock
(
p
Operator
,
pState
,
pBlock
,
&
pOperator
->
exprSupp
,
pGroupResInfo
);
}
static
SSDataBlock
*
doStreamSessionAgg
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -4088,6 +4088,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if
(
pBlock
->
info
.
parTbName
[
0
])
{
taosHashPut
(
pInfo
->
pGroupIdTbNameMap
,
&
pBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pBlock
->
info
.
parTbName
,
TSDB_TABLE_NAME_LEN
);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SSessionKey
));
...
...
@@ -4617,6 +4623,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
}
printDataBlock
(
pBlock
,
"single state recv"
);
if
(
pBlock
->
info
.
parTbName
[
0
])
{
taosHashPut
(
pInfo
->
pGroupIdTbNameMap
,
&
pBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pBlock
->
info
.
parTbName
,
TSDB_TABLE_NAME_LEN
);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SSessionKey
));
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
0e30d177
...
...
@@ -137,6 +137,8 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
if
(
pTask
->
outputType
==
TASK_OUTPUT__FIXED_DISPATCH
||
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
streamDispatch
(
pTask
);
}
}
else
{
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
}
}
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
0e30d177
...
...
@@ -27,7 +27,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
char
*
streamPath
=
taosMemoryCalloc
(
1
,
len
);
sprintf
(
streamPath
,
"%s/%s"
,
path
,
"stream"
);
pMeta
->
path
=
strdup
(
streamPath
);
if
(
tdbOpen
(
pMeta
->
path
,
16
*
1024
,
1
,
&
pMeta
->
db
)
<
0
)
{
if
(
tdbOpen
(
pMeta
->
path
,
16
*
1024
,
1
,
&
pMeta
->
db
,
0
)
<
0
)
{
taosMemoryFree
(
streamPath
);
goto
_err
;
}
...
...
@@ -36,11 +36,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosMulModeMkDir
(
streamPath
,
0755
);
taosMemoryFree
(
streamPath
);
if
(
tdbTbOpen
(
"task.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pTaskDb
)
<
0
)
{
if
(
tdbTbOpen
(
"task.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pTaskDb
,
0
)
<
0
)
{
goto
_err
;
}
if
(
tdbTbOpen
(
"checkpoint.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pCheckpointDb
)
<
0
)
{
if
(
tdbTbOpen
(
"checkpoint.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pCheckpointDb
,
0
)
<
0
)
{
goto
_err
;
}
...
...
@@ -262,12 +262,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
if
(
pMeta
->
expandFunc
(
pMeta
->
ahandle
,
pTask
)
<
0
)
{
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
return
-
1
;
}
if
(
taosHashPut
(
pMeta
->
pTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
&
pTask
,
sizeof
(
void
*
))
<
0
)
{
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
return
-
1
;
}
}
...
...
source/libs/stream/src/streamState.c
浏览文件 @
0e30d177
...
...
@@ -99,26 +99,26 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
memset
(
statePath
,
0
,
300
);
tstrncpy
(
statePath
,
path
,
300
);
}
if
(
tdbOpen
(
statePath
,
szPage
,
pages
,
&
pState
->
db
)
<
0
)
{
if
(
tdbOpen
(
statePath
,
szPage
,
pages
,
&
pState
->
db
,
0
)
<
0
)
{
goto
_err
;
}
// open state storage backend
if
(
tdbTbOpen
(
"state.db"
,
sizeof
(
SStateKey
),
-
1
,
stateKeyCmpr
,
pState
->
db
,
&
pState
->
pStateDb
)
<
0
)
{
if
(
tdbTbOpen
(
"state.db"
,
sizeof
(
SStateKey
),
-
1
,
stateKeyCmpr
,
pState
->
db
,
&
pState
->
pStateDb
,
0
)
<
0
)
{
goto
_err
;
}
// todo refactor
if
(
tdbTbOpen
(
"fill.state.db"
,
sizeof
(
SWinKey
),
-
1
,
winKeyCmpr
,
pState
->
db
,
&
pState
->
pFillStateDb
)
<
0
)
{
if
(
tdbTbOpen
(
"fill.state.db"
,
sizeof
(
SWinKey
),
-
1
,
winKeyCmpr
,
pState
->
db
,
&
pState
->
pFillStateDb
,
0
)
<
0
)
{
goto
_err
;
}
if
(
tdbTbOpen
(
"session.state.db"
,
sizeof
(
SStateSessionKey
),
-
1
,
stateSessionKeyCmpr
,
pState
->
db
,
&
pState
->
pSessionStateDb
)
<
0
)
{
&
pState
->
pSessionStateDb
,
0
)
<
0
)
{
goto
_err
;
}
if
(
tdbTbOpen
(
"func.state.db"
,
sizeof
(
STupleKey
),
-
1
,
STupleKeyCmpr
,
pState
->
db
,
&
pState
->
pFuncStateDb
)
<
0
)
{
if
(
tdbTbOpen
(
"func.state.db"
,
sizeof
(
STupleKey
),
-
1
,
STupleKeyCmpr
,
pState
->
db
,
&
pState
->
pFuncStateDb
,
0
)
<
0
)
{
goto
_err
;
}
...
...
source/libs/stream/src/streamTask.c
浏览文件 @
0e30d177
...
...
@@ -119,7 +119,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
for
(
int32_t
i
=
0
;
i
<
epSz
;
i
++
)
{
SStreamChildEpInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamChildEpInfo
));
if
(
pInfo
==
NULL
)
return
-
1
;
if
(
tDecodeStreamEpInfo
(
pDecoder
,
pInfo
)
<
0
)
return
-
1
;
if
(
tDecodeStreamEpInfo
(
pDecoder
,
pInfo
)
<
0
)
{
taosMemoryFreeClear
(
pInfo
);
return
-
1
;
}
taosArrayPush
(
pTask
->
childEpInfo
,
&
pInfo
);
}
...
...
source/libs/tdb/inc/tdb.h
浏览文件 @
0e30d177
...
...
@@ -31,15 +31,17 @@ typedef struct STBC TBC;
typedef
struct
STxn
TXN
;
// TDB
int32_t
tdbOpen
(
const
char
*
dbname
,
int
szPage
,
int
pages
,
TDB
**
ppDb
);
int32_t
tdbOpen
(
const
char
*
dbname
,
int
szPage
,
int
pages
,
TDB
**
ppDb
,
int8_t
rollback
);
int32_t
tdbClose
(
TDB
*
pDb
);
int32_t
tdbBegin
(
TDB
*
pDb
,
TXN
*
pTxn
);
int32_t
tdbCommit
(
TDB
*
pDb
,
TXN
*
pTxn
);
int32_t
tdbPostCommit
(
TDB
*
pDb
,
TXN
*
pTxn
);
int32_t
tdbAbort
(
TDB
*
pDb
,
TXN
*
pTxn
);
int32_t
tdbAlter
(
TDB
*
pDb
,
int
pages
);
// TTB
int32_t
tdbTbOpen
(
const
char
*
tbname
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TDB
*
pEnv
,
TTB
**
ppTb
);
int32_t
tdbTbOpen
(
const
char
*
tbname
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TDB
*
pEnv
,
TTB
**
ppTb
,
int8_t
rollback
);
int32_t
tdbTbClose
(
TTB
*
pTb
);
int32_t
tdbTbDrop
(
TTB
*
pTb
);
int32_t
tdbTbInsert
(
TTB
*
pTb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
,
TXN
*
pTxn
);
...
...
source/libs/tdb/src/db/tdbDb.c
浏览文件 @
0e30d177
...
...
@@ -15,7 +15,7 @@
#include "tdbInt.h"
int32_t
tdbOpen
(
const
char
*
dbname
,
int32_t
szPage
,
int32_t
pages
,
TDB
**
ppDb
)
{
int32_t
tdbOpen
(
const
char
*
dbname
,
int32_t
szPage
,
int32_t
pages
,
TDB
**
ppDb
,
int8_t
rollback
)
{
TDB
*
pDb
;
int
dsize
;
int
zsize
;
...
...
@@ -66,7 +66,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) {
#ifdef USE_MAINDB
// open main db
ret
=
tdbTbOpen
(
TDB_MAINDB_NAME
,
-
1
,
sizeof
(
SBtInfo
),
NULL
,
pDb
,
&
pDb
->
pMainDb
);
ret
=
tdbTbOpen
(
TDB_MAINDB_NAME
,
-
1
,
sizeof
(
SBtInfo
),
NULL
,
pDb
,
&
pDb
->
pMainDb
,
rollback
);
if
(
ret
<
0
)
{
return
-
1
;
}
...
...
@@ -129,6 +129,21 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) {
return
0
;
}
int32_t
tdbPostCommit
(
TDB
*
pDb
,
TXN
*
pTxn
)
{
SPager
*
pPager
;
int
ret
;
for
(
pPager
=
pDb
->
pgrList
;
pPager
;
pPager
=
pPager
->
pNext
)
{
ret
=
tdbPagerPostCommit
(
pPager
,
pTxn
);
if
(
ret
<
0
)
{
tdbError
(
"failed to commit pager since %s. dbName:%s, txnId:%d"
,
tstrerror
(
terrno
),
pDb
->
dbName
,
pTxn
->
txnId
);
return
-
1
;
}
}
return
0
;
}
int32_t
tdbAbort
(
TDB
*
pDb
,
TXN
*
pTxn
)
{
SPager
*
pPager
;
int
ret
;
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
0e30d177
...
...
@@ -305,6 +305,18 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
return
0
;
}
int
tdbPagerPostCommit
(
SPager
*
pPager
,
TXN
*
pTxn
)
{
if
(
tdbOsRemove
(
pPager
->
jFileName
)
<
0
&&
errno
!=
ENOENT
)
{
tdbError
(
"failed to remove file due to %s. file:%s"
,
strerror
(
errno
),
pPager
->
jFileName
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pPager
->
inTran
=
0
;
return
0
;
}
// recovery dirty pages
int
tdbPagerAbort
(
SPager
*
pPager
,
TXN
*
pTxn
)
{
SPage
*
pPage
;
...
...
@@ -657,3 +669,13 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
return
0
;
}
int
tdbPagerRollback
(
SPager
*
pPager
)
{
if
(
tdbOsRemove
(
pPager
->
jFileName
)
<
0
&&
errno
!=
ENOENT
)
{
tdbError
(
"failed to remove file due to %s. jFileName:%s"
,
strerror
(
errno
),
pPager
->
jFileName
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
return
0
;
}
source/libs/tdb/src/db/tdbTable.c
浏览文件 @
0e30d177
...
...
@@ -24,7 +24,8 @@ struct STBC {
SBTC
btc
;
};
int
tdbTbOpen
(
const
char
*
tbname
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TDB
*
pEnv
,
TTB
**
ppTb
)
{
int
tdbTbOpen
(
const
char
*
tbname
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TDB
*
pEnv
,
TTB
**
ppTb
,
int8_t
rollback
)
{
TTB
*
pTb
;
SPager
*
pPager
;
int
ret
;
...
...
@@ -110,10 +111,14 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
return
-
1
;
}
ret
=
tdbPagerRestore
(
pPager
,
pTb
->
pBt
);
if
(
ret
<
0
)
{
tdbOsFree
(
pTb
);
return
-
1
;
if
(
rollback
)
{
tdbPagerRollback
(
pPager
);
}
else
{
ret
=
tdbPagerRestore
(
pPager
,
pTb
->
pBt
);
if
(
ret
<
0
)
{
tdbOsFree
(
pTb
);
return
-
1
;
}
}
*
ppTb
=
pTb
;
...
...
source/libs/tdb/src/inc/tdbInt.h
浏览文件 @
0e30d177
...
...
@@ -190,12 +190,14 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt);
int
tdbPagerWrite
(
SPager
*
pPager
,
SPage
*
pPage
);
int
tdbPagerBegin
(
SPager
*
pPager
,
TXN
*
pTxn
);
int
tdbPagerCommit
(
SPager
*
pPager
,
TXN
*
pTxn
);
int
tdbPagerPostCommit
(
SPager
*
pPager
,
TXN
*
pTxn
);
int
tdbPagerAbort
(
SPager
*
pPager
,
TXN
*
pTxn
);
int
tdbPagerFetchPage
(
SPager
*
pPager
,
SPgno
*
ppgno
,
SPage
**
ppPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
,
int
),
void
*
arg
,
TXN
*
pTxn
);
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
,
TXN
*
pTxn
);
int
tdbPagerAllocPage
(
SPager
*
pPager
,
SPgno
*
ppgno
);
int
tdbPagerRestore
(
SPager
*
pPager
,
SBTree
*
pBt
);
int
tdbPagerRollback
(
SPager
*
pPager
);
// tdbPCache.c ====================================
#define TDB_PCACHE_PAGE \
...
...
source/libs/tdb/test/tdbExOVFLTest.cpp
浏览文件 @
0e30d177
...
...
@@ -140,7 +140,7 @@ static void generateBigVal(char *val, int valLen) {
static
TDB
*
openEnv
(
char
const
*
envName
,
int
const
pageSize
,
int
const
pageNum
)
{
TDB
*
pEnv
=
NULL
;
int
ret
=
tdbOpen
(
envName
,
pageSize
,
pageNum
,
&
pEnv
);
int
ret
=
tdbOpen
(
envName
,
pageSize
,
pageNum
,
&
pEnv
,
0
);
if
(
ret
)
{
pEnv
=
NULL
;
}
...
...
@@ -162,8 +162,8 @@ static void insertOfp(void) {
// open db
TTB
*
pDb
=
NULL
;
tdb_cmpr_fn_t
compFunc
=
tKeyCmpr
;
// ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
ret
=
tdbTbOpen
(
"ofp_insert.db"
,
12
,
-
1
,
compFunc
,
pEnv
,
&
pDb
);
// ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb
, 0
);
ret
=
tdbTbOpen
(
"ofp_insert.db"
,
12
,
-
1
,
compFunc
,
pEnv
,
&
pDb
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// open the pool
...
...
@@ -211,8 +211,8 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
// open db
TTB
*
pDb
=
NULL
;
tdb_cmpr_fn_t
compFunc
=
tKeyCmpr
;
// int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
int
ret
=
tdbTbOpen
(
"ofp_insert.db"
,
12
,
-
1
,
compFunc
,
pEnv
,
&
pDb
);
// int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb
, 0
);
int
ret
=
tdbTbOpen
(
"ofp_insert.db"
,
12
,
-
1
,
compFunc
,
pEnv
,
&
pDb
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// generate value payload
...
...
@@ -253,7 +253,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) {
// open db
TTB
*
pDb
=
NULL
;
tdb_cmpr_fn_t
compFunc
=
tKeyCmpr
;
ret
=
tdbTbOpen
(
"ofp_insert.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
);
ret
=
tdbTbOpen
(
"ofp_insert.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// open the pool
...
...
@@ -354,12 +354,12 @@ TEST(tdb_test, simple_insert1) {
taosRemoveDir
(
"tdb"
);
// Open Env
ret
=
tdbOpen
(
"tdb"
,
pageSize
,
64
,
&
pEnv
);
ret
=
tdbOpen
(
"tdb"
,
pageSize
,
64
,
&
pEnv
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// Create a database
compFunc
=
tKeyCmpr
;
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
);
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
{
...
...
source/libs/tdb/test/tdbTest.cpp
浏览文件 @
0e30d177
...
...
@@ -130,12 +130,12 @@ TEST(tdb_test, DISABLED_simple_insert1) {
taosRemoveDir
(
"tdb"
);
// Open Env
ret
=
tdbOpen
(
"tdb"
,
4096
,
64
,
&
pEnv
);
ret
=
tdbOpen
(
"tdb"
,
4096
,
64
,
&
pEnv
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// Create a database
compFunc
=
tKeyCmpr
;
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
);
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
{
...
...
@@ -250,12 +250,12 @@ TEST(tdb_test, DISABLED_simple_insert2) {
taosRemoveDir
(
"tdb"
);
// Open Env
ret
=
tdbOpen
(
"tdb"
,
1024
,
10
,
&
pEnv
);
ret
=
tdbOpen
(
"tdb"
,
1024
,
10
,
&
pEnv
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// Create a database
compFunc
=
tDefaultKeyCmpr
;
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
);
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
{
...
...
@@ -346,11 +346,11 @@ TEST(tdb_test, DISABLED_simple_delete1) {
pPool
=
openPool
();
// open env
ret
=
tdbOpen
(
"tdb"
,
1024
,
256
,
&
pEnv
);
ret
=
tdbOpen
(
"tdb"
,
1024
,
256
,
&
pEnv
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// open database
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
tKeyCmpr
,
pEnv
,
&
pDb
);
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
tKeyCmpr
,
pEnv
,
&
pDb
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
tdbTxnOpen
(
&
txn
,
0
,
poolMalloc
,
poolFree
,
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
...
...
@@ -435,11 +435,11 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
taosRemoveDir
(
"tdb"
);
// open env
ret
=
tdbOpen
(
"tdb"
,
4096
,
64
,
&
pEnv
);
ret
=
tdbOpen
(
"tdb"
,
4096
,
64
,
&
pEnv
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// open database
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
NULL
,
pEnv
,
&
pDb
);
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
NULL
,
pEnv
,
&
pDb
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
pPool
=
openPool
();
...
...
@@ -497,12 +497,12 @@ TEST(tdb_test, multi_thread_query) {
taosRemoveDir
(
"tdb"
);
// Open Env
ret
=
tdbOpen
(
"tdb"
,
4096
,
10
,
&
pEnv
);
ret
=
tdbOpen
(
"tdb"
,
4096
,
10
,
&
pEnv
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// Create a database
compFunc
=
tKeyCmpr
;
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
);
ret
=
tdbTbOpen
(
"db.db"
,
-
1
,
-
1
,
compFunc
,
pEnv
,
&
pDb
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
char
key
[
64
];
...
...
@@ -614,10 +614,10 @@ TEST(tdb_test, DISABLED_multi_thread1) {
taosRemoveDir("tdb");
// Open Env
ret = tdbOpen("tdb", 512, 1, &pDb);
ret = tdbOpen("tdb", 512, 1, &pDb
, 0
);
GTEST_ASSERT_EQ(ret, 0);
ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb);
ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb
, 0
);
GTEST_ASSERT_EQ(ret, 0);
auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) {
...
...
@@ -726,4 +726,4 @@ TEST(tdb_test, DISABLED_multi_thread1) {
ret = tdbClose(pDb);
GTEST_ASSERT_EQ(ret, 0);
#endif
}
\ No newline at end of file
}
source/os/src/osFile.c
浏览文件 @
0e30d177
...
...
@@ -343,6 +343,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
TdFilePtr
pFile
=
(
TdFilePtr
)
taosMemoryMalloc
(
sizeof
(
TdFile
));
if
(
pFile
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
if
(
fd
>=
0
)
close
(
fd
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
return
NULL
;
...
...
source/util/src/tsched.c
浏览文件 @
0e30d177
...
...
@@ -26,19 +26,25 @@ static void *taosProcessSchedQueue(void *param);
static
void
taosDumpSchedulerStatus
(
void
*
qhandle
,
void
*
tmrId
);
void
*
taosInitScheduler
(
int32_t
queueSize
,
int32_t
numOfThreads
,
const
char
*
label
,
SSchedQueue
*
pSched
)
{
bool
schedMalloced
=
false
;
if
(
NULL
==
pSched
)
{
pSched
=
(
SSchedQueue
*
)
taosMemoryCalloc
(
sizeof
(
SSchedQueue
),
1
);
if
(
pSched
==
NULL
)
{
uError
(
"%s: no enough memory for pSched"
,
label
);
return
NULL
;
}
schedMalloced
=
true
;
}
pSched
->
queue
=
(
SSchedMsg
*
)
taosMemoryCalloc
(
sizeof
(
SSchedMsg
),
queueSize
);
if
(
pSched
->
queue
==
NULL
)
{
uError
(
"%s: no enough memory for queue"
,
label
);
taosCleanUpScheduler
(
pSched
);
taosMemoryFree
(
pSched
);
if
(
schedMalloced
)
{
taosMemoryFree
(
pSched
);
}
return
NULL
;
}
...
...
@@ -46,6 +52,9 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
if
(
pSched
->
qthread
==
NULL
)
{
uError
(
"%s: no enough memory for qthread"
,
label
);
taosCleanUpScheduler
(
pSched
);
if
(
schedMalloced
)
{
taosMemoryFree
(
pSched
);
}
return
NULL
;
}
...
...
@@ -58,18 +67,27 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
if
(
taosThreadMutexInit
(
&
pSched
->
queueMutex
,
NULL
)
<
0
)
{
uError
(
"init %s:queueMutex failed(%s)"
,
label
,
strerror
(
errno
));
taosCleanUpScheduler
(
pSched
);
if
(
schedMalloced
)
{
taosMemoryFree
(
pSched
);
}
return
NULL
;
}
if
(
tsem_init
(
&
pSched
->
emptySem
,
0
,
(
uint32_t
)
pSched
->
queueSize
)
!=
0
)
{
uError
(
"init %s:empty semaphore failed(%s)"
,
label
,
strerror
(
errno
));
taosCleanUpScheduler
(
pSched
);
if
(
schedMalloced
)
{
taosMemoryFree
(
pSched
);
}
return
NULL
;
}
if
(
tsem_init
(
&
pSched
->
fullSem
,
0
,
0
)
!=
0
)
{
uError
(
"init %s:full semaphore failed(%s)"
,
label
,
strerror
(
errno
));
taosCleanUpScheduler
(
pSched
);
if
(
schedMalloced
)
{
taosMemoryFree
(
pSched
);
}
return
NULL
;
}
...
...
@@ -83,6 +101,9 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
if
(
code
!=
0
)
{
uError
(
"%s: failed to create rpc thread(%s)"
,
label
,
strerror
(
errno
));
taosCleanUpScheduler
(
pSched
);
if
(
schedMalloced
)
{
taosMemoryFree
(
pSched
);
}
return
NULL
;
}
++
pSched
->
numOfThreads
;
...
...
tests/script/jenkins/basic.txt
浏览文件 @
0e30d177
...
...
@@ -301,7 +301,7 @@
# --- sma
./test.sh -f tsim/sma/drop_sma.sim
#
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
# temp disable
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
...
...
tests/script/tsim/sma/tsmaCreateInsertQuery.sim
浏览文件 @
0e30d177
...
...
@@ -5,7 +5,7 @@ sleep 50
sql connect
print =============== create database
sql create database d1 vgroups 1
sql create database d1
keep 36500d
vgroups 1
sql use d1
print =============== create super table, include column type for count/sum/min/max/first
...
...
@@ -25,8 +25,8 @@ if $rows != 1 then
endi
print =============== insert data, mode1: one row one table in sql
sql insert into ct1 values(
now+0s
, 10, 2.0, 3.0)
sql insert into ct1 values(
now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s
, -13, -2.3, -3.3)
sql insert into ct1 values(
'2022-10-19 09:55:45.682'
, 10, 2.0, 3.0)
sql insert into ct1 values(
'2022-10-19 09:55:46.682', 11, 2.1, 3.1)('2022-10-19 09:55:47.682', -12, -2.2, -3.2)('2022-10-19 09:55:48.682'
, -13, -2.3, -3.3)
print =============== create sma index from super table
...
...
@@ -34,7 +34,7 @@ sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) in
print $data00 $data01 $data02 $data03
print =============== trigger stream to execute sma aggr task and insert sma data into sma store
sql insert into ct1 values(
now+5s
, 20, 20.0, 30.0)
sql insert into ct1 values(
'2022-10-19 09:55:50.682'
, 20, 20.0, 30.0)
#===================================================================
print =============== show streams ================================
...
...
tests/system-test/fulltest.sh
浏览文件 @
0e30d177
...
...
@@ -232,7 +232,7 @@ python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStop.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStop2Follower.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStopLoop.py
-N
5
-M
3
#
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py
-N
5
-M
3
...
...
@@ -248,7 +248,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 5 -
python3 ./test.py
-f
6-cluster/5dnode3mnodeAdd1Ddnoe.py
-N
6
-M
3
-C
5
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
#
python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
python3 ./test.py
-f
6-cluster/5dnode3mnodeDrop.py
-N
5
python3 test.py
-f
6-cluster/5dnode3mnodeStopConnect.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeRecreateMnode.py
-N
5
-M
3
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录