Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4d0ce625
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4d0ce625
编写于
6月 01, 2023
作者:
Y
yihaoDeng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/TD-19042
上级
56782a5d
ee8a9856
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
139 addition
and
53 deletion
+139
-53
include/common/tglobal.h
include/common/tglobal.h
+1
-1
source/common/src/tglobal.c
source/common/src/tglobal.c
+10
-6
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+4
-2
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+5
-7
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+55
-7
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+4
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+7
-5
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+6
-5
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+25
-12
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+11
-0
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+6
-2
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+4
-4
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+1
-1
未找到文件。
include/common/tglobal.h
浏览文件 @
4d0ce625
...
...
@@ -29,7 +29,6 @@ extern "C" {
#define SLOW_LOG_TYPE_OTHERS 0x4
#define SLOW_LOG_TYPE_ALL 0xFFFFFFFF
// cluster
extern
char
tsFirst
[];
extern
char
tsSecond
[];
...
...
@@ -181,6 +180,7 @@ extern bool tsDisableStream;
extern
int64_t
tsStreamBufferSize
;
extern
int64_t
tsCheckpointInterval
;
extern
bool
tsFilterScalarMode
;
extern
int32_t
tsMaxStreamBackendCache
;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
...
...
source/common/src/tglobal.c
浏览文件 @
4d0ce625
...
...
@@ -60,6 +60,7 @@ int32_t tsNumOfQnodeQueryThreads = 4;
int32_t
tsNumOfQnodeFetchThreads
=
1
;
int32_t
tsNumOfSnodeStreamThreads
=
4
;
int32_t
tsNumOfSnodeWriteThreads
=
1
;
int32_t
tsMaxStreamBackendCache
=
128
;
// M
// sync raft
int32_t
tsElectInterval
=
25
*
1000
;
...
...
@@ -105,7 +106,7 @@ int32_t tsQueryPolicy = 1;
int32_t
tsQueryRspPolicy
=
0
;
int64_t
tsQueryMaxConcurrentTables
=
200
;
// unit is TSDB_TABLE_NUM_UNIT
bool
tsEnableQueryHb
=
false
;
bool
tsEnableScience
=
false
;
// on taos-cli show float and doulbe with scientific notation if true
bool
tsEnableScience
=
false
;
// on taos-cli show float and doulbe with scientific notation if true
int32_t
tsQuerySmaOptimize
=
0
;
int32_t
tsQueryRsmaTolerance
=
1000
;
// the tolerance time (ms) to judge from which level to query rsma data.
bool
tsQueryPlannerTrace
=
false
;
...
...
@@ -117,8 +118,8 @@ int32_t tsRedirectFactor = 2;
int32_t
tsRedirectMaxPeriod
=
1000
;
int32_t
tsMaxRetryWaitTime
=
10000
;
bool
tsUseAdapter
=
false
;
int32_t
tsMetaCacheMaxSize
=
-
1
;
// MB
int32_t
tsSlowLogThreshold
=
3
;
// seconds
int32_t
tsMetaCacheMaxSize
=
-
1
;
// MB
int32_t
tsSlowLogThreshold
=
3
;
// seconds
int32_t
tsSlowLogScope
=
SLOW_LOG_TYPE_ALL
;
/*
...
...
@@ -349,7 +350,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"maxRetryWaitTime"
,
tsMaxRetryWaitTime
,
0
,
86400000
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"useAdapter"
,
tsUseAdapter
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"crashReporting"
,
tsEnableCrashReport
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"queryMaxConcurrentTables"
,
tsQueryMaxConcurrentTables
,
INT64_MIN
,
INT64_MAX
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"queryMaxConcurrentTables"
,
tsQueryMaxConcurrentTables
,
INT64_MIN
,
INT64_MAX
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"metaCacheMaxSize"
,
tsMetaCacheMaxSize
,
-
1
,
INT32_MAX
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"slowLogThreshold"
,
tsSlowLogThreshold
,
0
,
INT32_MAX
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"slowLogScope"
,
""
,
true
)
!=
0
)
return
-
1
;
...
...
@@ -524,6 +526,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"cacheLazyLoadThreshold"
,
tsCacheLazyLoadThreshold
,
0
,
100000
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"filterScalarMode"
,
tsFilterScalarMode
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"maxStreamBackendCache"
,
tsMaxStreamBackendCache
,
16
,
1024
,
0
)
!=
0
)
return
-
1
;
GRANT_CFG_ADD
;
return
0
;
...
...
@@ -781,7 +784,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsNumOfTaskQueueThreads
=
cfgGetItem
(
pCfg
,
"numOfTaskQueueThreads"
)
->
i32
;
tsQueryPolicy
=
cfgGetItem
(
pCfg
,
"queryPolicy"
)
->
i32
;
tsEnableQueryHb
=
cfgGetItem
(
pCfg
,
"enableQueryHb"
)
->
bval
;
tsEnableScience
=
cfgGetItem
(
pCfg
,
"enableScience"
)
->
bval
;
tsEnableScience
=
cfgGetItem
(
pCfg
,
"enableScience"
)
->
bval
;
tsQuerySmaOptimize
=
cfgGetItem
(
pCfg
,
"querySmaOptimize"
)
->
i32
;
tsQueryPlannerTrace
=
cfgGetItem
(
pCfg
,
"queryPlannerTrace"
)
->
bval
;
tsQueryNodeChunkSize
=
cfgGetItem
(
pCfg
,
"queryNodeChunkSize"
)
->
i32
;
...
...
@@ -902,7 +905,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsCheckpointInterval
=
cfgGetItem
(
pCfg
,
"checkpointInterval"
)
->
i64
;
tsFilterScalarMode
=
cfgGetItem
(
pCfg
,
"filterScalarMode"
)
->
bval
;
tsMaxStreamBackendCache
=
cfgGetItem
(
pCfg
,
"maxStreamBackendCache"
)
->
i32
;
GRANT_CFG_GET
;
return
0
;
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
4d0ce625
...
...
@@ -1291,13 +1291,15 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp
*
pRsp
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
taskId
=
ntohl
(
pRsp
->
upstreamTaskId
);
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pTq
->
pStreamMeta
,
taskId
);
tqDebug
(
"recv dispatch rsp, code:%x"
,
pMsg
->
code
);
int32_t
vgId
=
pTq
->
pStreamMeta
->
vgId
;
if
(
pTask
)
{
streamProcessDispatchRsp
(
pTask
,
pRsp
,
pMsg
->
code
);
streamMetaReleaseTask
(
pTq
->
pStreamMeta
,
pTask
);
return
0
;
}
else
{
return
-
1
;
tqDebug
(
"vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed"
,
vgId
,
taskId
);
return
TSDB_CODE_INVALID_MSG
;
}
}
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
4d0ce625
...
...
@@ -137,7 +137,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
int32_t
blockSz
=
taosArrayGetSize
(
pBlocks
);
tqDebug
(
"vgId:%d, s-task:%s write results
blocks:%d
into table"
,
TD_VID
(
pVnode
),
pTask
->
id
.
idStr
,
blockSz
);
tqDebug
(
"vgId:%d, s-task:%s write results
%d blocks
into table"
,
TD_VID
(
pVnode
),
pTask
->
id
.
idStr
,
blockSz
);
void
*
pBuf
=
NULL
;
SArray
*
tagArray
=
NULL
;
...
...
@@ -482,17 +482,15 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tEncoderClear
(
&
encoder
);
tDestroySubmitReq
(
&
submitReq
,
TSDB_MSG_FLG_ENCODE
);
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_SUBMIT
,
.
pCont
=
pBuf
,
.
contLen
=
len
,
};
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_SUBMIT
,
.
pCont
=
pBuf
,
.
contLen
=
len
};
if
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
!=
0
)
{
tqDebug
(
"failed to put into write-queue since %s"
,
terrstr
());
}
}
}
tqDebug
(
"vgId:%d, s-task:%s write results completed"
,
TD_VID
(
pVnode
),
pTask
->
id
.
idStr
);
_end:
taosArrayDestroy
(
tagArray
);
taosArrayDestroy
(
pVals
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
4d0ce625
...
...
@@ -1121,6 +1121,27 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData
endPos
=
doBinarySearchKey
(
pBlockData
->
aTSKEY
,
pBlock
->
nRow
,
pos
,
key
,
pReader
->
order
);
}
if
((
pReader
->
verRange
.
maxVer
>=
pBlock
->
minVer
&&
pReader
->
verRange
.
maxVer
<
pBlock
->
maxVer
)
||
(
pReader
->
verRange
.
minVer
<=
pBlock
->
maxVer
&&
pReader
->
verRange
.
minVer
>
pBlock
->
minVer
))
{
int32_t
i
=
endPos
;
if
(
asc
)
{
for
(;
i
>=
0
;
--
i
)
{
if
(
pBlockData
->
aVersion
[
i
]
<=
pReader
->
verRange
.
maxVer
)
{
break
;
}
}
}
else
{
for
(;
i
<
pBlock
->
nRow
;
++
i
)
{
if
(
pBlockData
->
aVersion
[
i
]
>=
pReader
->
verRange
.
minVer
)
{
break
;
}
}
}
endPos
=
i
;
}
return
endPos
;
}
...
...
@@ -1260,10 +1281,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
return
0
;
}
// row index of dump info remain the initial position, let's find the appropriate start position.
if
((
pDumpInfo
->
rowIndex
==
0
&&
asc
)
||
(
pDumpInfo
->
rowIndex
==
pBlock
->
nRow
-
1
&&
(
!
asc
)))
{
if
(
asc
&&
pReader
->
window
.
skey
<=
pBlock
->
minKey
.
ts
)
{
if
(
asc
&&
pReader
->
window
.
skey
<=
pBlock
->
minKey
.
ts
&&
pReader
->
verRange
.
minVer
<=
pBlock
->
minVer
)
{
// pDumpInfo->rowIndex = 0;
}
else
if
(
!
asc
&&
pReader
->
window
.
ekey
>=
pBlock
->
maxKey
.
ts
)
{
}
else
if
(
!
asc
&&
pReader
->
window
.
ekey
>=
pBlock
->
maxKey
.
ts
&&
pReader
->
verRange
.
maxVer
>=
pBlock
->
maxVer
)
{
// pDumpInfo->rowIndex = pBlock->nRow - 1;
}
else
{
// find the appropriate the start position in current block, and set it to be the current rowIndex
int32_t
pos
=
asc
?
pBlock
->
nRow
-
1
:
0
;
...
...
@@ -1279,6 +1301,29 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
pBlock
->
maxVer
,
pReader
->
idStr
);
return
TSDB_CODE_INVALID_PARA
;
}
ASSERT
(
pReader
->
verRange
.
minVer
<=
pBlock
->
maxVer
&&
pReader
->
verRange
.
maxVer
>=
pBlock
->
minVer
);
// find the appropriate start position that satisfies the version requirement.
if
((
pReader
->
verRange
.
maxVer
>=
pBlock
->
minVer
&&
pReader
->
verRange
.
maxVer
<
pBlock
->
maxVer
)
||
(
pReader
->
verRange
.
minVer
<=
pBlock
->
maxVer
&&
pReader
->
verRange
.
minVer
>
pBlock
->
minVer
))
{
int32_t
i
=
pDumpInfo
->
rowIndex
;
if
(
asc
)
{
for
(;
i
<
pBlock
->
nRow
;
++
i
)
{
if
(
pBlockData
->
aVersion
[
i
]
>=
pReader
->
verRange
.
minVer
)
{
break
;
}
}
}
else
{
for
(;
i
>=
0
;
--
i
)
{
if
(
pBlockData
->
aVersion
[
i
]
<=
pReader
->
verRange
.
maxVer
)
{
break
;
}
}
}
pDumpInfo
->
rowIndex
=
i
;
}
}
}
...
...
@@ -1293,6 +1338,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
int32_t
dumpedRows
=
asc
?
(
endIndex
-
pDumpInfo
->
rowIndex
)
:
(
pDumpInfo
->
rowIndex
-
endIndex
);
if
(
dumpedRows
>
pReader
->
resBlockInfo
.
capacity
)
{
// output buffer check
dumpedRows
=
pReader
->
resBlockInfo
.
capacity
;
}
else
if
(
dumpedRows
<=
0
)
{
// no qualified rows in current data block, abort directly.
setBlockAllDumped
(
pDumpInfo
,
pReader
->
window
.
ekey
,
pReader
->
order
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
i
=
0
;
...
...
@@ -1848,7 +1896,7 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc
SDataBlockToLoadInfo
info
=
{
0
};
getBlockToLoadInfo
(
&
info
,
pBlockInfo
,
pBlock
,
pScanInfo
,
keyInBuf
,
pLastBlockReader
,
pReader
);
bool
isCleanFileBlock
=
!
(
info
.
overlapWithNeighborBlock
||
info
.
hasDupTs
||
info
.
overlapWithKeyInBuf
||
info
.
overlapWithDelInfo
||
info
.
overlapWithLastBlock
||
info
.
partiallyRequired
);
info
.
overlapWithDelInfo
||
info
.
overlapWithLastBlock
);
return
isCleanFileBlock
;
}
...
...
@@ -2809,7 +2857,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// it is a clean block, load it directly
if
(
isCleanFileDataBlock
(
pReader
,
pBlockInfo
,
pBlock
,
pBlockScanInfo
,
keyInBuf
,
pLastBlockReader
)
&&
pBlock
->
nRow
<=
pReader
->
resBlockInfo
.
capacity
)
{
if
(
asc
||
(
(
!
asc
)
&&
(
!
hasDataInLastBlock
(
pLastBlockReader
)
)))
{
if
(
asc
||
(
!
hasDataInLastBlock
(
pLastBlockReader
)))
{
code
=
copyBlockDataToSDataBlock
(
pReader
);
if
(
code
)
{
goto
_end
;
...
...
@@ -2828,7 +2876,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
}
SBlockData
*
pBlockData
=
&
pReader
->
status
.
fileBlockData
;
SBlockData
*
pBlockData
=
&
pReader
->
status
.
fileBlockData
;
while
(
1
)
{
bool
hasBlockData
=
false
;
...
...
@@ -2842,7 +2890,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pDumpInfo
->
rowIndex
+=
step
;
SDataBlk
*
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
if
(
pDumpInfo
->
rowIndex
>=
pBlock
->
nRow
||
pDumpInfo
->
rowIndex
<
0
)
{
pBlockInfo
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
// NOTE: get the new block info
...
...
@@ -2870,7 +2918,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// currently loaded file data block is consumed
if
((
pBlockData
->
nRow
>
0
)
&&
(
pDumpInfo
->
rowIndex
>=
pBlockData
->
nRow
||
pDumpInfo
->
rowIndex
<
0
))
{
SDataBlk
*
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
setBlockAllDumped
(
pDumpInfo
,
pBlock
->
maxKey
.
ts
,
pReader
->
order
);
break
;
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
4d0ce625
...
...
@@ -132,7 +132,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator
->
status
=
OP_NOT_OPENED
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
qDebug
(
"s-task:%s set source blocks:%d"
,
id
,
(
int32_t
)
numOfBlocks
);
qDebug
(
"s-task:%s in this batch, all %d blocks need to be processed and dump results"
,
id
,
(
int32_t
)
numOfBlocks
);
ASSERT
(
pInfo
->
validBlockIndex
==
0
&&
taosArrayGetSize
(
pInfo
->
pBlockLists
)
==
0
);
if
(
type
==
STREAM_INPUT__MERGED_SUBMIT
)
{
...
...
@@ -140,6 +141,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SPackedData
*
pReq
=
POINTER_SHIFT
(
input
,
i
*
sizeof
(
SPackedData
));
taosArrayPush
(
pInfo
->
pBlockLists
,
pReq
);
}
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
}
else
if
(
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
taosArrayPush
(
pInfo
->
pBlockLists
,
input
);
...
...
@@ -150,6 +152,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SPackedData
tmp
=
{
.
pDataBlock
=
pDataBlock
};
taosArrayPush
(
pInfo
->
pBlockLists
,
&
tmp
);
}
pInfo
->
blockType
=
STREAM_INPUT__DATA_BLOCK
;
}
else
{
ASSERT
(
0
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
4d0ce625
...
...
@@ -1797,9 +1797,10 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
static
SSDataBlock
*
doStreamScan
(
SOperatorInfo
*
pOperator
)
{
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStorageAPI
*
pAPI
=
&
pTaskInfo
->
storageAPI
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
const
char
*
id
=
GET_TASKID
(
pTaskInfo
)
;
SStorageAPI
*
pAPI
=
&
pTaskInfo
->
storageAPI
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
qDebug
(
"stream scan started, %s"
,
GET_TASKID
(
pTaskInfo
));
...
...
@@ -1883,7 +1884,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if
(
pInfo
->
pRecoverRes
!=
NULL
)
{
pInfo
->
blockRecoverContiCnt
++
;
calBlockTbName
(
pInfo
,
pInfo
->
pRecoverRes
);
if
(
pInfo
->
pUpdateInfo
)
{
if
(
!
pInfo
->
igCheckUpdate
&&
pInfo
->
pUpdateInfo
)
{
if
(
pTaskInfo
->
streamInfo
.
recoverStep
==
STREAM_RECOVER_STEP__SCAN1
)
{
TSKEY
maxTs
=
pAPI
->
stateStore
.
updateInfoFillBlockData
(
pInfo
->
pUpdateInfo
,
pInfo
->
pRecoverRes
,
pInfo
->
primaryTsIndex
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
maxTs
);
...
...
@@ -1922,7 +1923,9 @@ FETCH_NEXT_BLOCK:
return
NULL
;
}
int32_t
current
=
pInfo
->
validBlockIndex
++
;
int32_t
current
=
pInfo
->
validBlockIndex
++
;
qDebug
(
"process %d/%d input data blocks, %s"
,
current
,
(
int32_t
)
total
,
id
);
SPackedData
*
pPacked
=
taosArrayGet
(
pInfo
->
pBlockLists
,
current
);
SSDataBlock
*
pBlock
=
pPacked
->
pDataBlock
;
if
(
pBlock
->
info
.
parTbName
[
0
])
{
...
...
@@ -2057,7 +2060,6 @@ FETCH_NEXT_BLOCK:
return
pInfo
->
pUpdateRes
;
}
const
char
*
id
=
GET_TASKID
(
pTaskInfo
);
SSDataBlock
*
pBlock
=
pInfo
->
pRes
;
SDataBlockInfo
*
pBlockInfo
=
&
pBlock
->
info
;
int32_t
totalBlocks
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
4d0ce625
...
...
@@ -1623,7 +1623,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
windowSup
.
parentType
=
type
;
pScanInfo
->
windowSup
.
pIntervalAggSup
=
&
pInfo
->
aggSup
;
if
(
!
pScanInfo
->
igCheckUpdate
&&
!
pScanInfo
->
pUpdateInfo
)
{
if
(
!
pScanInfo
->
pUpdateInfo
)
{
pScanInfo
->
pUpdateInfo
=
pAPI
->
updateInfoInitP
(
&
pInfo
->
interval
,
pInfo
->
twAggSup
.
waterMark
);
}
...
...
@@ -2150,28 +2150,29 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
}
void
processPullOver
(
SSDataBlock
*
pBlock
,
SHashObj
*
pMap
,
SInterval
*
pInterval
)
{
SColumnInfoData
*
pStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
CALCULATE_
START_TS_COLUMN_INDEX
);
TSKEY
*
tsData
=
(
TSKEY
*
)
pStartCol
->
pData
;
SColumnInfoData
*
pEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
CALCULATE_
END_TS_COLUMN_INDEX
);
TSKEY
*
tsEndData
=
(
TSKEY
*
)
pEndCol
->
pData
;
SColumnInfoData
*
pGroupCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
groupIdData
=
(
uint64_t
*
)
pGroupCol
->
pData
;
int32_t
chId
=
getChildIndex
(
pBlock
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
TSKEY
winTs
=
tsData
[
i
];
while
(
winTs
<
tsEndData
[
i
])
{
while
(
winTs
<
=
tsEndData
[
i
])
{
SWinKey
winRes
=
{.
ts
=
winTs
,
.
groupId
=
groupIdData
[
i
]};
void
*
chIds
=
taosHashGet
(
pMap
,
&
winRes
,
sizeof
(
SWinKey
));
if
(
chIds
)
{
SArray
*
chArray
=
*
(
SArray
**
)
chIds
;
int32_t
index
=
taosArraySearchIdx
(
chArray
,
&
chId
,
compareInt32Val
,
TD_EQ
);
if
(
index
!=
-
1
)
{
qDebug
(
"===stream===window %"
PRId64
" delete child id %d"
,
winRes
.
ts
,
chId
);
qDebug
(
"===stream===
retrive
window %"
PRId64
" delete child id %d"
,
winRes
.
ts
,
chId
);
taosArrayRemove
(
chArray
,
index
);
if
(
taosArrayGetSize
(
chArray
)
==
0
)
{
// pull data is over
taosArrayDestroy
(
chArray
);
taosHashRemove
(
pMap
,
&
winRes
,
sizeof
(
SWinKey
));
qDebug
(
"===stream===retrive pull data over.window %"
PRId64
,
winRes
.
ts
);
}
}
}
...
...
source/libs/stream/src/stream.c
浏览文件 @
4d0ce625
...
...
@@ -126,14 +126,12 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
if
(
pBlock
==
NULL
)
{
streamTaskInputFail
(
pTask
);
status
=
TASK_INPUT_STATUS__FAILED
;
q
Debug
(
"vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory"
,
pTask
->
pMeta
->
vgId
,
q
Error
(
"vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory"
,
pTask
->
pMeta
->
vgId
,
pTask
->
id
.
idStr
);
}
else
{
int32_t
code
=
tAppendDataToInputQueue
(
pTask
,
(
SStreamQueueItem
*
)
pBlock
);
// input queue is full, upstream is blocked now
status
=
(
code
==
TSDB_CODE_SUCCESS
)
?
TASK_INPUT_STATUS__NORMAL
:
TASK_INPUT_STATUS__BLOCKED
;
}
// rsp by input status
...
...
@@ -235,12 +233,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SStreamDispatchRsp
*
pRsp
,
int32_t
code
)
{
ASSERT
(
pRsp
->
inputStatus
==
TASK_OUTPUT_STATUS__NORMAL
||
pRsp
->
inputStatus
==
TASK_OUTPUT_STATUS__BLOCKED
);
qDebug
(
"s-task:%s receive dispatch rsp, code: %x"
,
pTask
->
id
.
idStr
,
code
);
qDebug
(
"s-task:%s receive dispatch rsp, output status:%d code:%d"
,
pTask
->
id
.
idStr
,
pRsp
->
inputStatus
,
code
);
if
(
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
int32_t
leftRsp
=
atomic_sub_fetch_32
(
&
pTask
->
shuffleDispatcher
.
waitingRspCnt
,
1
);
qDebug
(
"
task %d is shuffle, left waiting rsp %d"
,
pTask
->
id
.
taskId
,
leftRsp
);
qDebug
(
"
s-task:%s is shuffle, left waiting rsp %d"
,
pTask
->
id
.
idStr
,
leftRsp
);
if
(
leftRsp
>
0
)
{
return
0
;
}
...
...
@@ -248,13 +245,20 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int8_t
old
=
atomic_exchange_8
(
&
pTask
->
outputStatus
,
pRsp
->
inputStatus
);
ASSERT
(
old
==
TASK_OUTPUT_STATUS__WAIT
);
// the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp
// todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms.
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
// TODO: init recover timer
ASSERT
(
0
);
qError
(
"s-task:%s inputQ of downstream task:0x%x is full, need to block output"
,
pTask
->
id
.
idStr
,
pRsp
->
downstreamTaskId
);
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
qError
(
"s-task:%s ignore error, and reset task output status:%d"
,
pTask
->
id
.
idStr
,
pTask
->
outputStatus
);
return
0
;
}
//
continue dispatch one block to down stream
in pipeline
//
otherwise, continue dispatch the first block to down stream task
in pipeline
streamDispatchStreamBlock
(
pTask
);
return
0
;
}
...
...
@@ -304,23 +308,32 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return
-
1
;
}
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
streamDataSubmitDestroy
(
px
);
taosFreeQitem
(
pItem
);
return
code
;
}
}
else
if
(
type
==
STREAM_INPUT__DATA_BLOCK
||
type
==
STREAM_INPUT__DATA_RETRIEVE
||
type
==
STREAM_INPUT__REF_DATA_BLOCK
)
{
if
(
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
&&
(
tInputQueueIsFull
(
pTask
)))
{
if
(
/*(pTask->taskLevel == TASK_LEVEL__SOURCE) && */
(
tInputQueueIsFull
(
pTask
)))
{
qError
(
"s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort"
,
pTask
->
id
.
idStr
,
STREAM_TASK_INPUT_QUEUEU_CAPACITY
,
STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE
,
total
,
size
);
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
taosFreeQitem
(
pItem
);
return
-
1
;
}
qDebug
(
"s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)"
,
pTask
->
id
.
idStr
,
total
,
size
);
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
return
code
;
}
}
else
if
(
type
==
STREAM_INPUT__CHECKPOINT
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
else
if
(
type
==
STREAM_INPUT__GET_RES
)
{
// use the default memory limit, refactor later.
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
qDebug
(
"s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)"
,
pTask
->
id
.
idStr
,
total
,
size
);
}
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
4d0ce625
...
...
@@ -38,6 +38,15 @@ typedef struct {
rocksdb_comparator_t
**
pCompares
;
}
RocksdbCfInst
;
uint32_t
nextPow2
(
uint32_t
x
)
{
x
=
x
-
1
;
x
=
x
|
(
x
>>
1
);
x
=
x
|
(
x
>>
2
);
x
=
x
|
(
x
>>
4
);
x
=
x
|
(
x
>>
8
);
x
=
x
|
(
x
>>
16
);
return
x
+
1
;
}
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
);
void
destroyRocksdbCfInst
(
RocksdbCfInst
*
inst
);
...
...
@@ -92,6 +101,8 @@ void* streamBackendInit(const char* path) {
rocksdb_options_set_recycle_log_file_num
(
opts
,
6
);
rocksdb_options_set_max_write_buffer_number
(
opts
,
2
);
rocksdb_options_set_info_log_level
(
opts
,
0
);
uint32_t
dbLimit
=
nextPow2
(
tsMaxStreamBackendCache
);
rocksdb_options_set_db_write_buffer_size
(
opts
,
dbLimit
<<
20
);
pHandle
->
env
=
env
;
pHandle
->
dbOpt
=
opts
;
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
4d0ce625
...
...
@@ -510,14 +510,17 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
int8_t
old
=
atomic_val_compare_exchange_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
,
TASK_OUTPUT_STATUS__WAIT
);
if
(
old
!=
TASK_OUTPUT_STATUS__NORMAL
)
{
qDebug
(
"s-task:%s task wait for dispatch rsp, not dispatch now
"
,
pTask
->
id
.
idStr
);
qDebug
(
"s-task:%s task wait for dispatch rsp, not dispatch now
, output status:%d"
,
pTask
->
id
.
idStr
,
old
);
return
0
;
}
qDebug
(
"s-task:%s start to dispatch msg, output status:%d"
,
pTask
->
id
.
idStr
,
pTask
->
outputStatus
);
SStreamDataBlock
*
pDispatchedBlock
=
streamQueueNextItem
(
pTask
->
outputQueue
);
if
(
pDispatchedBlock
==
NULL
)
{
qDebug
(
"s-task:%s stop dispatching since no output in output queue"
,
pTask
->
id
.
idStr
);
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
qDebug
(
"s-task:%s stop dispatching since no output in output queue, output status:%d"
,
pTask
->
id
.
idStr
,
pTask
->
outputStatus
);
return
0
;
}
...
...
@@ -527,6 +530,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
streamQueueProcessFail
(
pTask
->
outputQueue
);
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
qDebug
(
"s-task:%s failed to dispatch msg to downstream, output status:%d"
,
pTask
->
id
.
idStr
,
pTask
->
outputStatus
);
}
// this block can be freed only when it has been pushed to down stream.
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
4d0ce625
...
...
@@ -16,9 +16,9 @@
#include "streamInc.h"
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM
128
#define MIN_STREAM_EXEC_BATCH_NUM
16
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
0
#define MAX_STREAM_EXEC_BATCH_NUM
32
#define MIN_STREAM_EXEC_BATCH_NUM
8
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
static
int32_t
updateCheckPointInfo
(
SStreamTask
*
pTask
);
...
...
@@ -385,7 +385,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
)
{
ASSERT
(
pInput
->
type
==
STREAM_INPUT__DATA_BLOCK
);
qDebug
(
"s-task:%s sink
node start to sink result. numOfBlocks:%d
"
,
pTask
->
id
.
idStr
,
batchSize
);
qDebug
(
"s-task:%s sink
task start to sink %d blocks
"
,
pTask
->
id
.
idStr
,
batchSize
);
streamTaskOutputResultBlock
(
pTask
,
(
SStreamDataBlock
*
)
pInput
);
continue
;
}
...
...
source/libs/stream/src/streamState.c
浏览文件 @
4d0ce625
...
...
@@ -1062,7 +1062,7 @@ _end:
}
int32_t
streamStatePutParName
(
SStreamState
*
pState
,
int64_t
groupId
,
const
char
tbname
[
TSDB_TABLE_NAME_LEN
])
{
q
Warn
(
"try to write to cf parname"
);
q
Debug
(
"try to write to cf parname"
);
#ifdef USE_ROCKSDB
if
(
tSimpleHashGetSize
(
pState
->
parNameMap
)
>
MAX_TABLE_NAME_NUM
)
{
if
(
tSimpleHashGet
(
pState
->
parNameMap
,
&
groupId
,
sizeof
(
int64_t
))
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录