Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1b255231
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1b255231
编写于
4月 14, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'enh/rocksRevert' of
https://github.com/taosdata/TDengine
into enh/rocksRevert
上级
c0ebdb92
4906855a
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
28 addition
and
98 deletion
+28
-98
include/libs/stream/tstreamUpdate.h
include/libs/stream/tstreamUpdate.h
+1
-5
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+4
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+9
-46
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+10
-9
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+1
-0
source/libs/stream/src/streamUpdate.c
source/libs/stream/src/streamUpdate.c
+3
-35
未找到文件。
include/libs/stream/tstreamUpdate.h
浏览文件 @
1b255231
...
...
@@ -40,9 +40,7 @@ typedef struct SUpdateInfo {
TSKEY
minTS
;
SScalableBf
*
pCloseWinSBF
;
SHashObj
*
pMap
;
STimeWindow
scanWindow
;
uint64_t
scanGroupId
;
uint64_t
maxVersion
;
uint64_t
maxDataVersion
;
}
SUpdateInfo
;
SUpdateInfo
*
updateInfoInitP
(
SInterval
*
pInterval
,
int64_t
watermark
);
...
...
@@ -50,8 +48,6 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
TSKEY
updateInfoFillBlockData
(
SUpdateInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
int32_t
primaryTsCol
);
bool
updateInfoIsUpdated
(
SUpdateInfo
*
pInfo
,
uint64_t
tableId
,
TSKEY
ts
);
bool
updateInfoIsTableInserted
(
SUpdateInfo
*
pInfo
,
int64_t
tbUid
);
void
updateInfoSetScanRange
(
SUpdateInfo
*
pInfo
,
STimeWindow
*
pWin
,
uint64_t
groupId
,
uint64_t
version
);
bool
updateInfoIgnore
(
SUpdateInfo
*
pInfo
,
STimeWindow
*
pWin
,
uint64_t
groupId
,
uint64_t
version
);
void
updateInfoDestroy
(
SUpdateInfo
*
pInfo
);
void
updateInfoAddCloseWindowSBF
(
SUpdateInfo
*
pInfo
);
void
updateInfoDestoryColseWinSBF
(
SUpdateInfo
*
pInfo
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
1b255231
...
...
@@ -483,9 +483,10 @@ typedef struct SStreamScanInfo {
int32_t
blockRecoverTotCnt
;
SSDataBlock
*
pRecoverRes
;
SSDataBlock
*
pCreateTbRes
;
int8_t
igCheckUpdate
;
int8_t
igExpired
;
SSDataBlock
*
pCreateTbRes
;
int8_t
igCheckUpdate
;
int8_t
igExpired
;
SStreamState
*
pState
;
}
SStreamScanInfo
;
typedef
struct
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
1b255231
...
...
@@ -1022,8 +1022,9 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou
pInfo
->
groupId
=
groupCol
[
rowIndex
];
}
void
resetTableScanInfo
(
STableScanInfo
*
pTableScanInfo
,
STimeWindow
*
pWin
)
{
void
resetTableScanInfo
(
STableScanInfo
*
pTableScanInfo
,
STimeWindow
*
pWin
,
uint64_t
version
)
{
pTableScanInfo
->
base
.
cond
.
twindows
=
*
pWin
;
pTableScanInfo
->
base
.
cond
.
endVersion
=
version
;
pTableScanInfo
->
scanTimes
=
0
;
pTableScanInfo
->
currentGroupId
=
-
1
;
tsdbReaderClose
(
pTableScanInfo
->
base
.
dataReader
);
...
...
@@ -1142,7 +1143,7 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
break
;
}
resetTableScanInfo
(
pInfo
->
pTableScanOp
->
info
,
&
win
);
resetTableScanInfo
(
pInfo
->
pTableScanOp
->
info
,
&
win
,
pInfo
->
pUpdateInfo
->
maxDataVersion
);
pInfo
->
pTableScanOp
->
status
=
OP_OPENED
;
return
true
;
}
...
...
@@ -1446,39 +1447,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return
code
;
}
#if 0
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTagCalSup = &pInfo->tagCalSup;
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
if (pBlock == NULL || pBlock->info.rows == 0) return;
void* tag = NULL;
int32_t tagLen = 0;
if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
pBlock->info.tagLen = tagLen;
void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
if (pTag == NULL) {
tdbFree(tag);
taosMemoryFree(pBlock->info.pTag);
pBlock->info.pTag = NULL;
pBlock->info.tagLen = 0;
return;
}
pBlock->info.pTag = pTag;
memcpy(pBlock->info.pTag, tag, tagLen);
tdbFree(tag);
return;
} else {
pBlock->info.pTag = NULL;
}
tdbFree(tag);
}
#endif
static
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
SExprSupp
*
pTbNameCalSup
=
&
pInfo
->
tbnameCalSup
;
SStreamState
*
pState
=
pInfo
->
pStreamScanOp
->
pTaskInfo
->
streamInfo
.
pState
;
blockDataCleanup
(
pInfo
->
pCreateTbRes
);
if
(
pInfo
->
tbnameCalSup
.
numOfExprs
==
0
&&
pInfo
->
tagCalSup
.
numOfExprs
==
0
)
{
pBlock
->
info
.
parTbName
[
0
]
=
0
;
...
...
@@ -1534,7 +1504,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool
update
=
updateInfoIsUpdated
(
pInfo
->
pUpdateInfo
,
pBlock
->
info
.
id
.
uid
,
tsCol
[
rowId
]);
bool
closedWin
=
isClosed
&&
isSignleIntervalWindow
(
pInfo
)
&&
isDeletedStreamWindow
(
&
win
,
pBlock
->
info
.
id
.
groupId
,
pInfo
->
p
TableScanOp
->
pTaskInfo
->
streamInfo
.
p
State
,
&
pInfo
->
twAggSup
);
pInfo
->
pState
,
&
pInfo
->
twAggSup
);
if
((
update
||
closedWin
)
&&
out
)
{
qDebug
(
"stream update check not pass, update %d, closedWin %d"
,
update
,
closedWin
);
uint64_t
gpId
=
0
;
...
...
@@ -1784,6 +1754,7 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
static
void
doCheckUpdate
(
SStreamScanInfo
*
pInfo
,
TSKEY
endKey
,
SSDataBlock
*
pBlock
)
{
if
(
pInfo
->
pUpdateInfo
)
{
pInfo
->
pUpdateInfo
->
maxDataVersion
=
TMAX
(
pInfo
->
pUpdateInfo
->
maxDataVersion
,
pBlock
->
info
.
version
);
checkUpdateData
(
pInfo
,
true
,
pBlock
,
true
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
endKey
);
if
(
pInfo
->
pUpdateDataRes
->
info
.
rows
>
0
)
{
...
...
@@ -1845,7 +1816,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo
->
streamInfo
.
recoverStep
=
STREAM_RECOVER_STEP__SCAN2
;
}
/*resetTableScanInfo(pTSInfo, pWin);*/
tsdbReaderClose
(
pTSInfo
->
base
.
dataReader
);
qDebug
(
"4"
);
...
...
@@ -1891,8 +1861,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SSDataBlock
*
pSDB
=
doRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
updateResIndex
);
if
(
pSDB
)
{
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
uint64_t
version
=
getReaderMaxVersion
(
pTableScanInfo
->
base
.
dataReader
);
updateInfoSetScanRange
(
pInfo
->
pUpdateInfo
,
&
pTableScanInfo
->
base
.
cond
.
twindows
,
pInfo
->
groupId
,
version
);
pSDB
->
info
.
type
=
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER_RANGE
?
STREAM_NORMAL
:
STREAM_PULL_DATA
;
checkUpdateData
(
pInfo
,
true
,
pSDB
,
false
);
printDataBlock
(
pSDB
,
"scan recover update"
);
...
...
@@ -1961,6 +1929,9 @@ FETCH_NEXT_BLOCK:
pBlock
->
info
.
calWin
.
skey
=
INT64_MIN
;
pBlock
->
info
.
calWin
.
ekey
=
INT64_MAX
;
pBlock
->
info
.
dataLoad
=
1
;
if
(
pInfo
->
pUpdateInfo
)
{
pInfo
->
pUpdateInfo
->
maxDataVersion
=
TMAX
(
pInfo
->
pUpdateInfo
->
maxDataVersion
,
pBlock
->
info
.
version
);
}
blockDataUpdateTsWindow
(
pBlock
,
0
);
switch
(
pBlock
->
info
.
type
)
{
case
STREAM_NORMAL
:
...
...
@@ -2058,8 +2029,6 @@ FETCH_NEXT_BLOCK:
SSDataBlock
*
pSDB
=
doRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
updateResIndex
);
if
(
pSDB
)
{
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
uint64_t
version
=
getReaderMaxVersion
(
pTableScanInfo
->
base
.
dataReader
);
updateInfoSetScanRange
(
pInfo
->
pUpdateInfo
,
&
pTableScanInfo
->
base
.
cond
.
twindows
,
pInfo
->
groupId
,
version
);
pSDB
->
info
.
type
=
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER_RANGE
?
STREAM_NORMAL
:
STREAM_PULL_DATA
;
checkUpdateData
(
pInfo
,
true
,
pSDB
,
false
);
printDataBlock
(
pSDB
,
"stream scan update"
);
...
...
@@ -2125,13 +2094,6 @@ FETCH_NEXT_BLOCK:
setBlockIntoRes
(
pInfo
,
&
block
,
false
);
if
(
updateInfoIgnore
(
pInfo
->
pUpdateInfo
,
&
pInfo
->
pRes
->
info
.
window
,
pInfo
->
pRes
->
info
.
id
.
groupId
,
pInfo
->
pRes
->
info
.
version
))
{
printDataBlock
(
pInfo
->
pRes
,
"stream scan ignore"
);
blockDataCleanup
(
pInfo
->
pRes
);
continue
;
}
if
(
pInfo
->
pCreateTbRes
->
info
.
rows
>
0
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_RES
;
return
pInfo
->
pCreateTbRes
;
...
...
@@ -2541,6 +2503,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo
->
igCheckUpdate
=
pTableScanNode
->
igCheckUpdate
;
pInfo
->
igExpired
=
pTableScanNode
->
igExpired
;
pInfo
->
twAggSup
.
maxTs
=
INT64_MIN
;
pInfo
->
pState
=
NULL
;
// todo(liuyao) get buff from rocks db;
void
*
buff
=
NULL
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
1b255231
...
...
@@ -27,6 +27,7 @@
#define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
typedef
struct
SSessionAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
...
...
@@ -1612,20 +1613,20 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
return
needed
;
}
void
initIntervalDownStream
(
SOperatorInfo
*
downstream
,
uint16_t
type
,
SAggSupporter
*
pSup
,
SInterval
*
pInterval
,
STimeWindowAggSupp
*
pTwSup
)
{
void
initIntervalDownStream
(
SOperatorInfo
*
downstream
,
uint16_t
type
,
SStreamIntervalOperatorInfo
*
pInfo
)
{
if
(
downstream
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
initIntervalDownStream
(
downstream
->
pDownstream
[
0
],
type
,
p
Sup
,
pInterval
,
pTwSup
);
initIntervalDownStream
(
downstream
->
pDownstream
[
0
],
type
,
p
Info
);
return
;
}
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
windowSup
.
parentType
=
type
;
pScanInfo
->
windowSup
.
pIntervalAggSup
=
p
Sup
;
pScanInfo
->
windowSup
.
pIntervalAggSup
=
&
pInfo
->
agg
Sup
;
if
(
!
pScanInfo
->
igCheckUpdate
&&
!
pScanInfo
->
pUpdateInfo
)
{
pScanInfo
->
pUpdateInfo
=
updateInfoInitP
(
pInterval
,
pTwSup
->
waterMark
);
pScanInfo
->
pUpdateInfo
=
updateInfoInitP
(
&
pInfo
->
interval
,
pInfo
->
twAggSup
.
waterMark
);
}
pScanInfo
->
interval
=
*
pInterval
;
pScanInfo
->
twAggSup
=
*
pTwSup
;
pScanInfo
->
interval
=
pInfo
->
interval
;
pScanInfo
->
twAggSup
=
pInfo
->
twAggSup
;
pScanInfo
->
pState
=
pInfo
->
pState
;
}
void
initStreamFunciton
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExpr
)
{
...
...
@@ -2761,7 +2762,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
optrDefaultBufFn
,
NULL
);
if
(
pPhyNode
->
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
)
{
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
&
pInfo
->
twAggSup
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
pInfo
);
}
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4930,7 +4931,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
optrDefaultBufFn
,
NULL
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
&
pInfo
->
twAggSup
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
pInfo
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
source/libs/stream/src/streamState.c
浏览文件 @
1b255231
...
...
@@ -1066,6 +1066,7 @@ void streamStateDestroy(SStreamState* pState) {
#ifdef USE_ROCKSDB
streamFileStateDestroy
(
pState
->
pFileState
);
streamStateDestroy_rocksdb
(
pState
);
taosMemoryFreeClear
(
pState
->
parNameMap
);
// do nothong
#endif
taosMemoryFreeClear
(
pState
->
pTdbState
);
...
...
source/libs/stream/src/streamUpdate.c
浏览文件 @
1b255231
...
...
@@ -128,9 +128,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
pInfo
->
pCloseWinSBF
=
NULL
;
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
);
pInfo
->
pMap
=
taosHashInit
(
DEFAULT_MAP_CAPACITY
,
hashFn
,
true
,
HASH_NO_LOCK
);
pInfo
->
maxVersion
=
0
;
pInfo
->
scanGroupId
=
0
;
pInfo
->
scanWindow
=
(
STimeWindow
){.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
pInfo
->
maxDataVersion
=
0
;
return
pInfo
;
}
...
...
@@ -242,29 +240,6 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
return
true
;
}
void
updateInfoSetScanRange
(
SUpdateInfo
*
pInfo
,
STimeWindow
*
pWin
,
uint64_t
groupId
,
uint64_t
version
)
{
qDebug
(
"===stream===groupId:%"
PRIu64
", startTs:%"
PRIu64
", endTs:%"
PRIu64
", version:%"
PRIu64
,
groupId
,
pWin
->
skey
,
pWin
->
ekey
,
version
);
pInfo
->
scanWindow
=
*
pWin
;
pInfo
->
scanGroupId
=
groupId
;
pInfo
->
maxVersion
=
version
;
}
bool
updateInfoIgnore
(
SUpdateInfo
*
pInfo
,
STimeWindow
*
pWin
,
uint64_t
groupId
,
uint64_t
version
)
{
if
(
!
pInfo
)
{
return
false
;
}
qDebug
(
"===stream===check groupId:%"
PRIu64
", startTs:%"
PRIu64
", endTs:%"
PRIu64
", version:%"
PRIu64
,
groupId
,
pWin
->
skey
,
pWin
->
ekey
,
version
);
if
(
pInfo
->
scanGroupId
==
groupId
&&
pInfo
->
scanWindow
.
skey
<=
pWin
->
skey
&&
pWin
->
ekey
<=
pInfo
->
scanWindow
.
ekey
&&
version
<=
pInfo
->
maxVersion
)
{
qDebug
(
"===stream===ignore groupId:%"
PRIu64
", startTs:%"
PRIu64
", endTs:%"
PRIu64
", version:%"
PRIu64
,
groupId
,
pWin
->
skey
,
pWin
->
ekey
,
version
);
return
true
;
}
return
false
;
}
void
updateInfoDestroy
(
SUpdateInfo
*
pInfo
)
{
if
(
pInfo
==
NULL
)
{
return
;
...
...
@@ -337,10 +312,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
if
(
tEncodeI64
(
&
encoder
,
*
(
TSKEY
*
)
pIte
)
<
0
)
return
-
1
;
}
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
scanWindow
.
skey
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
scanWindow
.
ekey
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pInfo
->
scanGroupId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pInfo
->
maxVersion
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pInfo
->
maxDataVersion
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
...
...
@@ -393,11 +365,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
taosHashPut
(
pInfo
->
pMap
,
&
uid
,
sizeof
(
uint64_t
),
&
ts
,
sizeof
(
TSKEY
));
}
ASSERT
(
mapSize
==
taosHashGetSize
(
pInfo
->
pMap
));
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
scanWindow
.
skey
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
scanWindow
.
ekey
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pInfo
->
scanGroupId
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pInfo
->
maxVersion
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pInfo
->
maxDataVersion
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录