Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b1643228
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b1643228
编写于
7月 14, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
上级
eb7e6152
44be57b3
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
591 addition
and
58 deletion
+591
-58
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+101
-7
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+16
-17
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+2
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+4
-1
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+17
-15
source/libs/stream/src/tstreamFileState.c
source/libs/stream/src/tstreamFileState.c
+2
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
tests/script/tsim/stream/checkpointInterval0.sim
tests/script/tsim/stream/checkpointInterval0.sim
+61
-16
tests/script/tsim/stream/checkpointInterval1.sim
tests/script/tsim/stream/checkpointInterval1.sim
+104
-0
tests/script/tsim/stream/checkpointSession0.sim
tests/script/tsim/stream/checkpointSession0.sim
+178
-0
tests/script/tsim/stream/checkpointSession1.sim
tests/script/tsim/stream/checkpointSession1.sim
+104
-0
tests/script/win-test-file
tests/script/win-test-file
+1
-0
未找到文件。
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
b1643228
...
@@ -1009,7 +1009,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
...
@@ -1009,7 +1009,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
/*A(pTask->info.nodeId > 0);*/
/*A(pTask->info.nodeId > 0);*/
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
pTask
->
info
.
nodeId
);
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
pTask
->
info
.
nodeId
);
if
(
pVgObj
==
NULL
)
{
if
(
pVgObj
==
NULL
)
{
taos
R
UnLockLatch
(
&
pStream
->
lock
);
taos
W
UnLockLatch
(
&
pStream
->
lock
);
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
...
@@ -1019,7 +1019,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
...
@@ -1019,7 +1019,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
if
(
mndBuildStreamCheckpointSourceReq2
(
&
buf
,
&
tlen
,
pTask
->
info
.
nodeId
,
checkpointId
,
pTask
->
id
.
streamId
,
if
(
mndBuildStreamCheckpointSourceReq2
(
&
buf
,
&
tlen
,
pTask
->
info
.
nodeId
,
checkpointId
,
pTask
->
id
.
streamId
,
pTask
->
id
.
taskId
)
<
0
)
{
pTask
->
id
.
taskId
)
<
0
)
{
mndReleaseVgroup
(
pMnode
,
pVgObj
);
mndReleaseVgroup
(
pMnode
,
pVgObj
);
taos
R
UnLockLatch
(
&
pStream
->
lock
);
taos
W
UnLockLatch
(
&
pStream
->
lock
);
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
...
@@ -1034,7 +1034,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
...
@@ -1034,7 +1034,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
buf
);
taosMemoryFree
(
buf
);
taos
R
UnLockLatch
(
&
pStream
->
lock
);
taos
W
UnLockLatch
(
&
pStream
->
lock
);
mndReleaseStream
(
pMnode
,
pStream
);
mndReleaseStream
(
pMnode
,
pStream
);
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -1079,6 +1079,78 @@ _ERR:
...
@@ -1079,6 +1079,78 @@ _ERR:
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
static
int32_t
mndAddStreamCheckpointToTrans
(
STrans
*
pTrans
,
SStreamObj
*
pStream
,
SMnode
*
pMnode
,
int64_t
checkpointId
)
{
taosWLockLatch
(
&
pStream
->
lock
);
int32_t
totLevel
=
taosArrayGetSize
(
pStream
->
tasks
);
for
(
int32_t
i
=
0
;
i
<
totLevel
;
i
++
)
{
SArray
*
pLevel
=
taosArrayGetP
(
pStream
->
tasks
,
i
);
SStreamTask
*
pTask
=
taosArrayGetP
(
pLevel
,
0
);
if
(
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
)
{
int32_t
sz
=
taosArrayGetSize
(
pLevel
);
for
(
int32_t
j
=
0
;
j
<
sz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pLevel
,
j
);
/*A(pTask->info.nodeId > 0);*/
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
pTask
->
info
.
nodeId
);
if
(
pVgObj
==
NULL
)
{
taosWUnLockLatch
(
&
pStream
->
lock
);
return
-
1
;
}
void
*
buf
;
int32_t
tlen
;
if
(
mndBuildStreamCheckpointSourceReq2
(
&
buf
,
&
tlen
,
pTask
->
info
.
nodeId
,
checkpointId
,
pTask
->
id
.
streamId
,
pTask
->
id
.
taskId
)
<
0
)
{
mndReleaseVgroup
(
pMnode
,
pVgObj
);
taosWUnLockLatch
(
&
pStream
->
lock
);
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_STREAM_CHECK_POINT_SOURCE
;
mndReleaseVgroup
(
pMnode
,
pVgObj
);
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
buf
);
taosWUnLockLatch
(
&
pStream
->
lock
);
return
-
1
;
}
}
}
}
pStream
->
checkpointFreq
=
checkpointId
;
pStream
->
checkpointId
=
checkpointId
;
pStream
->
checkpointFreq
=
taosGetTimestampMs
();
atomic_store_64
(
&
pStream
->
currentTick
,
0
);
// 3. commit log: stream checkpoint info
pStream
->
version
=
pStream
->
version
+
1
;
taosWUnLockLatch
(
&
pStream
->
lock
);
SSdbRaw
*
pCommitRaw
=
mndStreamActionEncode
(
pStream
);
if
(
pCommitRaw
==
NULL
)
{
mError
(
"failed to prepare trans rebalance since %s"
,
terrstr
());
return
-
1
;
}
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
sdbFreeRaw
(
pCommitRaw
);
mError
(
"failed to prepare trans rebalance since %s"
,
terrstr
());
return
-
1
;
}
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
)
!=
0
)
{
sdbFreeRaw
(
pCommitRaw
);
mError
(
"failed to prepare trans rebalance since %s"
,
terrstr
());
return
-
1
;
}
return
0
;
}
static
int32_t
mndProcessStreamDoCheckpoint
(
SRpcMsg
*
pReq
)
{
static
int32_t
mndProcessStreamDoCheckpoint
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
@@ -1089,16 +1161,38 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
...
@@ -1089,16 +1161,38 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMStreamDoCheckpointMsg
*
pMsg
=
(
SMStreamDoCheckpointMsg
*
)
pReq
->
pCont
;
SMStreamDoCheckpointMsg
*
pMsg
=
(
SMStreamDoCheckpointMsg
*
)
pReq
->
pCont
;
int64_t
checkpointId
=
pMsg
->
checkpointId
;
int64_t
checkpointId
=
pMsg
->
checkpointId
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_DB_INSIDE
,
NULL
,
"stream-checkpoint"
);
if
(
pTrans
==
NULL
)
{
mError
(
"failed to trigger checkpoint, reason: %s"
,
tstrerror
(
TSDB_CODE_OUT_OF_MEMORY
));
return
-
1
;
}
mDebug
(
"start to trigger checkpoint, checkpointId: %"
PRId64
""
,
checkpointId
);
mndTransSetDbName
(
pTrans
,
"checkpoint"
,
"checkpoint"
);
if
(
mndTransCheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"failed to trigger checkpoint, checkpointId: %"
PRId64
", reason:%s"
,
checkpointId
,
tstrerror
(
TSDB_CODE_MND_TRANS_CONFLICT
));
mndTransDrop
(
pTrans
);
return
-
1
;
}
while
(
1
)
{
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_STREAM
,
pIter
,
(
void
**
)
&
pStream
);
pIter
=
sdbFetch
(
pSdb
,
SDB_STREAM
,
pIter
,
(
void
**
)
&
pStream
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
break
;
code
=
mndProcessStreamCheckpointTrans
(
pMnode
,
pStream
,
checkpointId
);
code
=
mndAddStreamCheckpointToTrans
(
pTrans
,
pStream
,
pMnode
,
checkpointId
);
sdbRelease
(
pSdb
,
pStream
);
if
(
code
==
-
1
)
{
if
(
code
==
-
1
)
{
mInfo
(
"stream:%s failed to do checkpoint, reason: last checkpoint not finished"
,
pStream
->
name
)
;
break
;
}
}
sdbRelease
(
pSdb
,
pStream
);
}
}
return
0
;
if
(
code
==
0
)
{
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"failed to prepre trans rebalance since %s"
,
terrstr
());
}
}
mndTransDrop
(
pTrans
);
return
code
;
}
}
static
int32_t
mndProcessDropStreamReq
(
SRpcMsg
*
pReq
)
{
static
int32_t
mndProcessDropStreamReq
(
SRpcMsg
*
pReq
)
{
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
b1643228
...
@@ -28,9 +28,9 @@
...
@@ -28,9 +28,9 @@
#define TRANS_ARRAY_SIZE 8
#define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 48
#define TRANS_RESERVE_SIZE 48
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
OldTrans
,
STrans
*
pOld
);
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
OldTrans
,
STrans
*
pOld
);
static
int32_t
mndTransDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
,
bool
callFunc
);
static
int32_t
mndTransDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
,
bool
callFunc
);
static
int32_t
mndTransAppendLog
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
int32_t
mndTransAppendLog
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
STransAction
*
pAction
);
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
STransAction
*
pAction
);
...
@@ -100,10 +100,9 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
...
@@ -100,10 +100,9 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
return
rawDataLen
;
return
rawDataLen
;
}
}
static
int32_t
mndTransEncodeAction
(
SSdbRaw
*
pRaw
,
int32_t
*
offset
,
SArray
*
pActions
,
int32_t
actionsNum
)
{
static
int32_t
mndTransEncodeAction
(
SSdbRaw
*
pRaw
,
int32_t
*
offset
,
SArray
*
pActions
,
int32_t
actionsNum
)
{
int32_t
dataPos
=
*
offset
;
int32_t
dataPos
=
*
offset
;
int8_t
unused
=
0
;
int8_t
unused
=
0
;
int32_t
ret
=
-
1
;
int32_t
ret
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
actionsNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
actionsNum
;
++
i
)
{
...
@@ -266,16 +265,16 @@ _OVER:
...
@@ -266,16 +265,16 @@ _OVER:
SSdbRow
*
mndTransDecode
(
SSdbRaw
*
pRaw
)
{
SSdbRow
*
mndTransDecode
(
SSdbRaw
*
pRaw
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
terrno
=
TSDB_CODE_INVALID_MSG
;
SSdbRow
*
pRow
=
NULL
;
SSdbRow
*
pRow
=
NULL
;
STrans
*
pTrans
=
NULL
;
STrans
*
pTrans
=
NULL
;
char
*
pData
=
NULL
;
char
*
pData
=
NULL
;
int32_t
dataLen
=
0
;
int32_t
dataLen
=
0
;
int8_t
sver
=
0
;
int8_t
sver
=
0
;
int32_t
prepareActionNum
=
0
;
int32_t
prepareActionNum
=
0
;
int32_t
redoActionNum
=
0
;
int32_t
redoActionNum
=
0
;
int32_t
undoActionNum
=
0
;
int32_t
undoActionNum
=
0
;
int32_t
commitActionNum
=
0
;
int32_t
commitActionNum
=
0
;
int32_t
dataPos
=
0
;
int32_t
dataPos
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
_OVER
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
_OVER
;
...
@@ -577,7 +576,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
...
@@ -577,7 +576,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans
->
undoActions
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
commitActions
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
commitActions
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
pRpcArray
=
taosArrayInit
(
1
,
sizeof
(
SRpcHandleInfo
));
pTrans
->
pRpcArray
=
taosArrayInit
(
1
,
sizeof
(
SRpcHandleInfo
));
pTrans
->
mTraceId
=
pReq
?
TRACE_GET_ROOTID
(
&
pReq
->
info
.
traceId
)
:
0
;
pTrans
->
mTraceId
=
pReq
?
TRACE_GET_ROOTID
(
&
pReq
->
info
.
traceId
)
:
tGenIdPI64
()
;
taosInitRWLatch
(
&
pTrans
->
lockRpcArray
);
taosInitRWLatch
(
&
pTrans
->
lockRpcArray
);
taosThreadMutexInit
(
&
pTrans
->
mutex
,
NULL
);
taosThreadMutexInit
(
&
pTrans
->
mutex
,
NULL
);
...
@@ -1327,7 +1326,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
...
@@ -1327,7 +1326,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
}
}
bool
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
bool
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
bool
continueExec
=
true
;
bool
continueExec
=
true
;
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
numOfActions
=
taosArrayGetSize
(
pTrans
->
prepareActions
);
int32_t
numOfActions
=
taosArrayGetSize
(
pTrans
->
prepareActions
);
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
b1643228
...
@@ -452,7 +452,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
...
@@ -452,7 +452,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
}
}
}
}
if
(
diffIdx
==
-
1
&&
diffIdx
==
0
)
{
if
(
diffIdx
==
-
1
||
diffIdx
==
0
)
{
goto
_err
;
goto
_err
;
}
}
...
@@ -939,7 +939,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) {
...
@@ -939,7 +939,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) {
return
0
;
return
0
;
}
}
metaInfo
(
"ttl find expired table count: %zu"
,
TARRAY_SIZE
(
tbUids
));
metaInfo
(
"ttl find expired table count: %zu"
,
TARRAY_SIZE
(
tbUids
));
metaDropTables
(
pMeta
,
tbUids
);
metaDropTables
(
pMeta
,
tbUids
);
return
0
;
return
0
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
b1643228
...
@@ -1788,10 +1788,13 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
...
@@ -1788,10 +1788,13 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
buf
=
decodeSTimeWindowAggSupp
(
buf
,
&
pInfo
->
twAggSup
);
buf
=
decodeSTimeWindowAggSupp
(
buf
,
&
pInfo
->
twAggSup
);
int32_t
tlen
=
len
-
(
pBuff
-
buf
);
int32_t
tlen
=
len
-
(
pBuff
-
buf
);
void
*
pUpInfo
=
pInfo
->
stateStore
.
updateInfoInit
(
0
,
TSDB_TIME_PRECISION_MILLI
,
0
,
pInfo
->
igCheckUpdate
);
void
*
pUpInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUpdateInfo
)
);
int32_t
code
=
pInfo
->
stateStore
.
updateInfoDeserialize
(
buf
,
tlen
,
pUpInfo
);
int32_t
code
=
pInfo
->
stateStore
.
updateInfoDeserialize
(
buf
,
tlen
,
pUpInfo
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pInfo
->
stateStore
.
updateInfoDestroy
(
pInfo
->
pUpdateInfo
);
pInfo
->
pUpdateInfo
=
pUpInfo
;
pInfo
->
pUpdateInfo
=
pUpInfo
;
}
else
{
taosMemoryFree
(
pUpInfo
);
}
}
}
}
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
b1643228
...
@@ -40,16 +40,8 @@ typedef struct {
...
@@ -40,16 +40,8 @@ typedef struct {
rocksdb_comparator_t
**
pCompares
;
rocksdb_comparator_t
**
pCompares
;
}
RocksdbCfInst
;
}
RocksdbCfInst
;
uint32_t
nextPow2
(
uint32_t
x
)
{
uint32_t
nextPow2
(
uint32_t
x
);
if
(
x
<=
1
)
return
2
;
x
=
x
-
1
;
x
=
x
|
(
x
>>
1
);
x
=
x
|
(
x
>>
2
);
x
=
x
|
(
x
>>
4
);
x
=
x
|
(
x
>>
8
);
x
=
x
|
(
x
>>
16
);
return
x
+
1
;
}
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
);
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
);
void
destroyRocksdbCfInst
(
RocksdbCfInst
*
inst
);
void
destroyRocksdbCfInst
(
RocksdbCfInst
*
inst
);
...
@@ -262,8 +254,8 @@ void streamBackendCleanup(void* arg) {
...
@@ -262,8 +254,8 @@ void streamBackendCleanup(void* arg) {
taosThreadMutexDestroy
(
&
pHandle
->
cfMutex
);
taosThreadMutexDestroy
(
&
pHandle
->
cfMutex
);
taosMemoryFree
(
pHandle
);
qDebug
(
"destroy stream backend backend:%p"
,
pHandle
);
qDebug
(
"destroy stream backend backend:%p"
,
pHandle
);
taosMemoryFree
(
pHandle
);
return
;
return
;
}
}
void
streamBackendHandleCleanup
(
void
*
arg
)
{
void
streamBackendHandleCleanup
(
void
*
arg
)
{
...
@@ -986,8 +978,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
...
@@ -986,8 +978,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
char
suffix
[
64
]
=
{
0
};
char
suffix
[
64
]
=
{
0
};
rocksdb_options_t
**
cfOpts
=
taosMemoryCalloc
(
nCf
,
sizeof
(
rocksdb_options_t
*
));
rocksdb_options_t
**
cfOpts
=
taosMemoryCalloc
(
nCf
,
sizeof
(
rocksdb_options_t
*
));
RocksdbCfParam
*
params
=
taosMemoryCalloc
(
nCf
,
sizeof
(
RocksdbCfParam
*
));
RocksdbCfParam
*
params
=
taosMemoryCalloc
(
nCf
,
sizeof
(
RocksdbCfParam
));
rocksdb_comparator_t
**
pCompare
=
taosMemoryCalloc
(
nCf
,
sizeof
(
rocksdb_comparator_t
*
*
));
rocksdb_comparator_t
**
pCompare
=
taosMemoryCalloc
(
nCf
,
sizeof
(
rocksdb_comparator_t
*
));
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryCalloc
(
nCf
,
sizeof
(
rocksdb_column_family_handle_t
*
));
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryCalloc
(
nCf
,
sizeof
(
rocksdb_column_family_handle_t
*
));
for
(
int
i
=
0
;
i
<
nCf
;
i
++
)
{
for
(
int
i
=
0
;
i
<
nCf
;
i
++
)
{
...
@@ -1153,7 +1145,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
...
@@ -1153,7 +1145,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
param
[
i
].
tableOpt
=
tableOpt
;
param
[
i
].
tableOpt
=
tableOpt
;
};
};
rocksdb_comparator_t
**
pCompare
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
*
*
));
rocksdb_comparator_t
**
pCompare
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
*
));
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
SCfInit
*
cf
=
&
ginitDict
[
i
];
SCfInit
*
cf
=
&
ginitDict
[
i
];
...
@@ -1294,8 +1286,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChk
...
@@ -1294,8 +1286,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChk
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
if (err != NULL) { \
taosMemoryFree(err); \
qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \
code = -1; \
code = -1; \
} else { \
} else { \
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \
...
@@ -2361,3 +2353,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
...
@@ -2361,3 +2353,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
}
}
return
0
;
return
0
;
}
}
uint32_t
nextPow2
(
uint32_t
x
)
{
if
(
x
<=
1
)
return
2
;
x
=
x
-
1
;
x
=
x
|
(
x
>>
1
);
x
=
x
|
(
x
>>
2
);
x
=
x
|
(
x
>>
4
);
x
=
x
|
(
x
>>
8
);
x
=
x
|
(
x
>>
16
);
return
x
+
1
;
}
source/libs/stream/src/tstreamFileState.c
浏览文件 @
b1643228
...
@@ -507,10 +507,12 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
...
@@ -507,10 +507,12 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
destroyRowBuffPos
(
pNewPos
);
destroyRowBuffPos
(
pNewPos
);
SListNode
*
pNode
=
tdListPopTail
(
pFileState
->
usedBuffs
);
SListNode
*
pNode
=
tdListPopTail
(
pFileState
->
usedBuffs
);
taosMemoryFreeClear
(
pNode
);
taosMemoryFreeClear
(
pNode
);
taosMemoryFreeClear
(
pVal
);
break
;
break
;
}
}
ASSERT
(
pVLen
==
pFileState
->
rowSize
);
ASSERT
(
pVLen
==
pFileState
->
rowSize
);
memcpy
(
pNewPos
->
pRowBuff
,
pVal
,
pVLen
);
memcpy
(
pNewPos
->
pRowBuff
,
pVal
,
pVLen
);
taosMemoryFreeClear
(
pVal
);
code
=
tSimpleHashPut
(
pFileState
->
rowBuffMap
,
pNewPos
->
pKey
,
pFileState
->
keyLen
,
&
pNewPos
,
POINTER_BYTES
);
code
=
tSimpleHashPut
(
pFileState
->
rowBuffMap
,
pNewPos
->
pKey
,
pFileState
->
keyLen
,
&
pNewPos
,
POINTER_BYTES
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
destroyRowBuffPos
(
pNewPos
);
destroyRowBuffPos
(
pNewPos
);
...
...
tests/parallel_test/cases.task
浏览文件 @
b1643228
...
@@ -997,6 +997,7 @@
...
@@ -997,6 +997,7 @@
,,y,script,./test.sh -f tsim/stream/basic2.sim
,,y,script,./test.sh -f tsim/stream/basic2.sim
,,y,script,./test.sh -f tsim/stream/basic3.sim
,,y,script,./test.sh -f tsim/stream/basic3.sim
,,y,script,./test.sh -f tsim/stream/basic4.sim
,,y,script,./test.sh -f tsim/stream/basic4.sim
,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
...
...
tests/script/tsim/stream/checkpoint0.sim
→
tests/script/tsim/stream/checkpoint
Interval
0.sim
浏览文件 @
b1643228
...
@@ -4,20 +4,16 @@ system sh/exec.sh -n dnode1 -s start
...
@@ -4,20 +4,16 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sleep 50
sql connect
sql connect
print step 1
print =============== create database
print =============== create database
sql create database test vgroups 1;
sql create database test vgroups 1;
sql select * from information_schema.ins_databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams
1
trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
sql create stream streams
0
trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1);
sql insert into t1 values(1648791213001,2,2,3,1.1);
...
@@ -29,7 +25,7 @@ sleep 1000
...
@@ -29,7 +25,7 @@ sleep 1000
sql select * from streamt;
sql select * from streamt;
$loop_count = $loop_count + 1
$loop_count = $loop_count + 1
if $loop_count ==
2
0 then
if $loop_count ==
1
0 then
return -1
return -1
endi
endi
...
@@ -49,13 +45,13 @@ if $data02 != 3 then
...
@@ -49,13 +45,13 @@ if $data02 != 3 then
goto loop0
goto loop0
endi
endi
print waiting for checkpoint generation ......
print waiting for checkpoint generation
1
......
sleep 25000
sleep 25000
print restart taosd
print restart taosd
01 ......
system sh/
exec.sh -n dnode1 -s stop -x SIGINT
system sh/
stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode1 -s start
...
@@ -69,7 +65,7 @@ sleep 1000
...
@@ -69,7 +65,7 @@ sleep 1000
sql select * from streamt;
sql select * from streamt;
$loop_count = $loop_count + 1
$loop_count = $loop_count + 1
if $loop_count ==
2
0 then
if $loop_count ==
1
0 then
return -1
return -1
endi
endi
...
@@ -89,7 +85,7 @@ if $data02 != 6 then
...
@@ -89,7 +85,7 @@ if $data02 != 6 then
goto loop1
goto loop1
endi
endi
sql insert into t1 values(164879122300
2
,4,2,3,1.1);
sql insert into t1 values(164879122300
3
,4,2,3,1.1);
$loop_count = 0
$loop_count = 0
...
@@ -99,7 +95,7 @@ sleep 1000
...
@@ -99,7 +95,7 @@ sleep 1000
sql select * from streamt;
sql select * from streamt;
$loop_count = $loop_count + 1
$loop_count = $loop_count + 1
if $loop_count ==
2
0 then
if $loop_count ==
1
0 then
return -1
return -1
endi
endi
...
@@ -121,13 +117,62 @@ endi
...
@@ -121,13 +117,62 @@ endi
# row 1
# row 1
if $data11 != 1 then
if $data11 != 1 then
print =====data
01=$data0
1
print =====data
11=$data1
1
goto loop2
goto loop2
endi
endi
if $data12 != 4 then
if $data12 != 4 then
print =====data
02=$data0
2
print =====data
12=$data1
2
goto loop2
goto loop2
endi
endi
print step 2
print restart taosd 02 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791223004,5,2,3,1.1);
loop20:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop20
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop20
endi
if $data02 != 6 then
print =====data02=$data02
goto loop20
endi
# row 1
if $data11 != 2 then
print =====data11=$data11
goto loop20
endi
if $data12 != 9 then
print =====data12=$data12
goto loop20
endi
print end---------------------------------
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/tsim/stream/checkpointInterval1.sim
0 → 100644
浏览文件 @
b1643228
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step 1
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791213001,2,2,3,1.1);
$loop_count = 0
loop0:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop0
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data02 != 3 then
print =====data02=$data02
goto loop0
endi
print waiting for checkpoint generation 1 ......
sleep 25000
print restart taosd
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791213002,3,2,3,1.1);
sql insert into t2 values(1648791223003,4,2,3,1.1);
$loop_count = 0
loop1:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop1
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop1
endi
if $data02 != 6 then
print =====data02=$data02
goto loop1
endi
# row 1
if $data11 != 1 then
print =====data11=$data11
goto loop1
endi
if $data12 != 4 then
print =====data12=$data12
goto loop1
endi
print end---------------------------------
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/tsim/stream/checkpointSession0.sim
0 → 100644
浏览文件 @
b1643228
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step 1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 session(ts, 10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1);
$loop_count = 0
loop0:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop0
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data02 != 3 then
print =====data02=$data02
goto loop0
endi
print waiting for checkpoint generation 1 ......
sleep 25000
print restart taosd 01 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791213002,3,2,3,1.1);
$loop_count = 0
loop1:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop1
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop1
endi
if $data02 != 6 then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791233003,4,2,3,1.1);
$loop_count = 0
loop2:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop2
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop2
endi
if $data02 != 6 then
print =====data02=$data02
goto loop2
endi
# row 1
if $data11 != 1 then
print =====data11=$data11
goto loop2
endi
if $data12 != 4 then
print =====data12=$data12
goto loop2
endi
print step 2
print restart taosd 02 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791233004,5,2,3,1.1);
loop20:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop20
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop20
endi
if $data02 != 6 then
print =====data02=$data02
goto loop20
endi
# row 1
if $data11 != 2 then
print =====data11=$data11
goto loop20
endi
if $data12 != 9 then
print =====data12=$data12
goto loop20
endi
print end---------------------------------
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/tsim/stream/checkpointSession1.sim
0 → 100644
浏览文件 @
b1643228
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step 1
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st session(ts, 10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791213001,2,2,3,1.1);
$loop_count = 0
loop0:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop0
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data02 != 3 then
print =====data02=$data02
goto loop0
endi
print waiting for checkpoint generation 1 ......
sleep 25000
print restart taosd
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791213002,3,2,3,1.1);
sql insert into t2 values(1648791233003,4,2,3,1.1);
$loop_count = 0
loop1:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop1
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop1
endi
if $data02 != 6 then
print =====data02=$data02
goto loop1
endi
# row 1
if $data11 != 1 then
print =====data11=$data11
goto loop1
endi
if $data12 != 4 then
print =====data12=$data12
goto loop1
endi
print end---------------------------------
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/win-test-file
浏览文件 @
b1643228
...
@@ -240,6 +240,7 @@
...
@@ -240,6 +240,7 @@
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/basic3.sim
./test.sh -f tsim/stream/basic3.sim
./test.sh -f tsim/stream/basic4.sim
./test.sh -f tsim/stream/basic4.sim
./test.sh -f tsim/stream/checkpointInterval0.sim
./test.sh -f tsim/stream/checkStreamSTable1.sim
./test.sh -f tsim/stream/checkStreamSTable1.sim
./test.sh -f tsim/stream/checkStreamSTable.sim
./test.sh -f tsim/stream/checkStreamSTable.sim
./test.sh -f tsim/stream/deleteInterval.sim
./test.sh -f tsim/stream/deleteInterval.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录