Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
29d80b1c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
29d80b1c
编写于
4月 11, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix invalid free
上级
a3836b23
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
105 addition
and
58 deletion
+105
-58
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+37
-37
source/libs/stream/src/streamStateRocksdb.c
source/libs/stream/src/streamStateRocksdb.c
+0
-1
source/libs/stream/src/tstreamFileState.c
source/libs/stream/src/tstreamFileState.c
+68
-20
未找到文件。
source/libs/executor/src/timewindowoperator.c
浏览文件 @
29d80b1c
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "tglobal.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
...
...
@@ -21,6 +20,7 @@
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "tglobal.h"
#include "tlog.h"
#include "ttime.h"
...
...
@@ -1367,12 +1367,12 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
}
uint64_t
winGpId
=
pGpDatas
[
i
];
SWinKey
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
winGpId
};
void
*
chIds
=
taosHashGet
(
pInfo
->
pPullDataMap
,
&
winRes
,
sizeof
(
SWinKey
));
void
*
chIds
=
taosHashGet
(
pInfo
->
pPullDataMap
,
&
winRes
,
sizeof
(
SWinKey
));
if
(
chIds
)
{
getNextTimeWindow
(
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
,
&
win
);
continue
;
}
bool
res
=
doDeleteWindow
(
pOperator
,
win
.
skey
,
winGpId
);
bool
res
=
doDeleteWindow
(
pOperator
,
win
.
skey
,
winGpId
);
if
(
pUpWins
&&
res
)
{
taosArrayPush
(
pUpWins
,
&
winRes
);
}
...
...
@@ -2096,12 +2096,11 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
}
}
bool
hasIntervalWindow
(
SStreamState
*
pState
,
SWinKey
*
pKey
)
{
return
streamStateCheck
(
pState
,
pKey
);
}
bool
hasIntervalWindow
(
SStreamState
*
pState
,
SWinKey
*
pKey
)
{
return
streamStateCheck
(
pState
,
pKey
);
}
int32_t
setIntervalOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SRowBuffPos
**
pResult
,
int64_t
groupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
)
{
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
)
{
SWinKey
key
=
{
.
ts
=
win
->
skey
,
.
groupId
=
groupId
,
...
...
@@ -2115,7 +2114,7 @@ int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos
*
pResult
=
(
SRowBuffPos
*
)
value
;
SResultRow
*
res
=
(
SResultRow
*
)((
*
pResult
)
->
pRowBuff
);
// set time window for current result
res
->
win
=
(
*
win
);
res
->
win
=
(
*
win
);
setResultRowInitCtx
(
res
,
pCtx
,
numOfOutput
,
rowEntryInfoOffset
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2148,21 +2147,20 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
continue
;
}
if
(
num
==
0
)
{
int32_t
code
=
setIntervalOutputBuf
(
pInfo
->
pState
,
&
parentWin
,
&
pCurResPos
,
pWinRes
->
groupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
);
int32_t
code
=
setIntervalOutputBuf
(
pInfo
->
pState
,
&
parentWin
,
&
pCurResPos
,
pWinRes
->
groupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
);
ASSERT
(
pCurResPos
!=
NULL
);
pCurResult
=
(
SResultRow
*
)
pCurResPos
->
pRowBuff
;
pCurResult
=
(
SResultRow
*
)
pCurResPos
->
pRowBuff
;
if
(
code
!=
TSDB_CODE_SUCCESS
||
pCurResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_OUT_OF_MEMORY
);
}
}
num
++
;
SRowBuffPos
*
pChResPos
=
NULL
;
SResultRow
*
pChResult
=
NULL
;
setIntervalOutputBuf
(
pChInfo
->
pState
,
&
parentWin
,
&
pChResPos
,
pWinRes
->
groupId
,
pChildSup
->
pCtx
,
pChildSup
->
numOfExprs
,
pChildSup
->
rowEntryInfoOffset
,
&
pChInfo
->
aggSup
);
pChResult
=
(
SResultRow
*
)
pChResPos
->
pRowBuff
;
setIntervalOutputBuf
(
pChInfo
->
pState
,
&
parentWin
,
&
pChResPos
,
pWinRes
->
groupId
,
pChildSup
->
pCtx
,
pChildSup
->
numOfExprs
,
pChildSup
->
rowEntryInfoOffset
,
&
pChInfo
->
aggSup
);
pChResult
=
(
SResultRow
*
)
pChResPos
->
pRowBuff
;
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
parentWin
,
true
);
compactFunctions
(
pSup
->
pCtx
,
pChildSup
->
pCtx
,
numOfOutput
,
pTaskInfo
,
&
pInfo
->
twAggSup
.
timeWindowData
);
}
...
...
@@ -2412,7 +2410,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
return
startPos
;
}
static
void
setStreamDataVersion
(
SExecTaskInfo
*
pTaskInfo
,
int64_t
version
,
int64_t
ckId
)
{
static
void
setStreamDataVersion
(
SExecTaskInfo
*
pTaskInfo
,
int64_t
version
,
int64_t
ckId
)
{
pTaskInfo
->
streamInfo
.
dataVersion
=
version
;
pTaskInfo
->
streamInfo
.
checkPointId
=
ckId
;
}
...
...
@@ -2493,8 +2491,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
}
int32_t
code
=
setIntervalOutputBuf
(
pInfo
->
pState
,
&
nextWin
,
&
pResPos
,
groupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
);
pResult
=
(
SResultRow
*
)
pResPos
->
pRowBuff
;
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
);
pResult
=
(
SResultRow
*
)
pResPos
->
pRowBuff
;
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_OUT_OF_MEMORY
);
}
...
...
@@ -2506,8 +2504,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
}
SWinKey
key
=
{
.
ts
=
pResult
->
win
.
skey
,
.
groupId
=
groupId
,
.
ts
=
pResult
->
win
.
skey
,
.
groupId
=
groupId
,
};
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
&&
pUpdatedMap
)
{
saveWinResult
(
&
key
,
pResPos
,
pUpdatedMap
);
...
...
@@ -2554,8 +2552,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
static
inline
int
winPosCmprImpl
(
const
void
*
pKey1
,
const
void
*
pKey2
)
{
SRowBuffPos
*
pos1
=
*
(
SRowBuffPos
**
)
pKey1
;
SRowBuffPos
*
pos2
=
*
(
SRowBuffPos
**
)
pKey2
;
SWinKey
*
pWin1
=
(
SWinKey
*
)
pos1
->
pKey
;
SWinKey
*
pWin2
=
(
SWinKey
*
)
pos2
->
pKey
;
SWinKey
*
pWin1
=
(
SWinKey
*
)
pos1
->
pKey
;
SWinKey
*
pWin2
=
(
SWinKey
*
)
pos2
->
pKey
;
if
(
pWin1
->
groupId
>
pWin2
->
groupId
)
{
return
1
;
...
...
@@ -2757,7 +2755,7 @@ int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
}
TSKEY
compareTs
(
void
*
pKey
)
{
SWinKey
*
pWinKey
=
(
SWinKey
*
)
pKey
;
SWinKey
*
pWinKey
=
(
SWinKey
*
)
pKey
;
return
pWinKey
->
ts
;
}
...
...
@@ -2786,7 +2784,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.
deleteMarkSaved
=
0
,
.
calTriggerSaved
=
0
,
.
checkPointTs
=
0
,
.
checkPointInterval
=
convertTimePrecision
(
tsCheckpointInterval
,
TSDB_TIME_PRECISION_MILLI
,
pInfo
->
interval
.
precision
),
.
checkPointInterval
=
convertTimePrecision
(
tsCheckpointInterval
,
TSDB_TIME_PRECISION_MILLI
,
pInfo
->
interval
.
precision
),
};
ASSERTS
(
pInfo
->
twAggSup
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
,
"trigger type should not be max delay"
);
pInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
...
...
@@ -2869,8 +2868,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo
->
numOfDatapack
=
0
;
pInfo
->
pUpdated
=
NULL
;
pInfo
->
pUpdatedMap
=
NULL
;
pInfo
->
pState
->
pFileState
=
streamFileStateInit
(
tsStreamBufferSize
,
sizeof
(
SWinKey
),
pInfo
->
aggSup
.
resultRowSize
,
compareTs
,
pInfo
->
pState
,
pInfo
->
twAggSup
.
deleteMark
);
pInfo
->
pState
->
pFileState
=
streamFileStateInit
(
tsStreamBufferSize
,
sizeof
(
SWinKey
),
pInfo
->
aggSup
.
resultRowSize
,
compareTs
,
pInfo
->
pState
,
pInfo
->
twAggSup
.
deleteMark
);
pInfo
->
dataVersion
=
0
;
pOperator
->
operatorType
=
pPhyNode
->
type
;
...
...
@@ -4969,23 +4968,24 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
SSDataBlock
*
pResBlock
=
createDataBlockFromDescNode
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
interval
=
(
SInterval
)
{
.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
.
intervalUnit
=
pIntervalPhyNode
->
intervalUnit
,
.
slidingUnit
=
pIntervalPhyNode
->
slidingUnit
,
.
offset
=
pIntervalPhyNode
->
offset
,
.
precision
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
,
pInfo
->
interval
=
(
SInterval
){
.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
.
intervalUnit
=
pIntervalPhyNode
->
intervalUnit
,
.
slidingUnit
=
pIntervalPhyNode
->
slidingUnit
,
.
offset
=
pIntervalPhyNode
->
offset
,
.
precision
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
,
};
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
)
{
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
){
.
waterMark
=
pIntervalPhyNode
->
window
.
watermark
,
.
calTrigger
=
pIntervalPhyNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
,
.
minTs
=
INT64_MAX
,
.
deleteMark
=
getDeleteMark
(
pIntervalPhyNode
),
.
checkPointTs
=
0
,
.
checkPointInterval
=
convertTimePrecision
(
tsCheckpointInterval
,
TSDB_TIME_PRECISION_MILLI
,
pInfo
->
interval
.
precision
),
.
checkPointInterval
=
convertTimePrecision
(
tsCheckpointInterval
,
TSDB_TIME_PRECISION_MILLI
,
pInfo
->
interval
.
precision
),
};
ASSERTS
(
pInfo
->
twAggSup
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
,
"trigger type should not be max delay"
);
...
...
@@ -5042,8 +5042,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo
->
numOfDatapack
=
0
;
pInfo
->
pUpdated
=
NULL
;
pInfo
->
pUpdatedMap
=
NULL
;
pInfo
->
pState
->
pFileState
=
streamFileStateInit
(
tsStreamBufferSize
,
sizeof
(
SWinKey
),
pInfo
->
aggSup
.
resultRowSize
,
compareTs
,
pInfo
->
pState
,
pInfo
->
twAggSup
.
deleteMark
);
pInfo
->
pState
->
pFileState
=
streamFileStateInit
(
tsStreamBufferSize
,
sizeof
(
SWinKey
),
pInfo
->
aggSup
.
resultRowSize
,
compareTs
,
pInfo
->
pState
,
pInfo
->
twAggSup
.
deleteMark
);
setOperatorInfo
(
pOperator
,
"StreamIntervalOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
...
...
source/libs/stream/src/streamStateRocksdb.c
浏览文件 @
29d80b1c
...
...
@@ -16,7 +16,6 @@
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "tcommon.h"
#include "tlog.h"
int
defaultKeyComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
)
{
//
...
...
source/libs/stream/src/tstreamFileState.c
浏览文件 @
29d80b1c
...
...
@@ -313,15 +313,23 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
return
pFileState
->
usedBuffs
;
}
void
streamFileStateDecode
(
SStreamFileState
*
pFileState
,
void
*
pBuff
,
int32_t
len
)
{
pBuff
=
taosDecodeFixedI64
(
pBuff
,
&
pFileState
->
flushMark
);
}
void
streamFileStateEncode
(
SStreamFileState
*
pFileState
,
void
**
pVal
,
int32_t
*
pLen
)
{
// void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) {
// pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark);
// }
// void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) {
// *pLen = sizeof(TSKEY);
// (*pVal) = taosMemoryCalloc(1, *pLen);
// void* buff = *pVal;
// taosEncodeFixedI64(&buff, pFileState->flushMark);
// }
void
streamFileStateDecode
(
TSKEY
*
key
,
void
*
pBuff
,
int32_t
len
)
{
pBuff
=
taosDecodeFixedI64
(
pBuff
,
key
);
}
void
streamFileStateEncode
(
TSKEY
*
key
,
void
**
pVal
,
int32_t
*
pLen
)
{
*
pLen
=
sizeof
(
TSKEY
);
(
*
pVal
)
=
taosMemoryCalloc
(
1
,
*
pLen
);
void
*
buff
=
*
pVal
;
taosEncodeFixedI64
(
&
buff
,
pFileState
->
flushMark
);
taosEncodeFixedI64
(
&
buff
,
*
key
);
}
int32_t
flushSnapshot
(
SStreamFileState
*
pFileState
,
SStreamSnapshot
*
pSnapshot
,
bool
flushState
)
{
...
...
@@ -349,12 +357,26 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
if
(
flushState
)
{
int32_t
len
=
0
;
void
*
buff
=
NULL
;
streamFileStateEncode
(
pFileState
,
&
buff
,
&
len
);
SWinKey
key
=
{.
ts
=
-
1
,
.
groupId
=
0
};
// dengyihao
streamStatePut_rocksdb
(
pFileState
->
pFileStore
,
&
key
,
buff
,
len
);
taosMemoryFree
(
buff
);
const
char
*
taskKey
=
"streamFileState"
;
{
char
keyBuf
[
128
]
=
{
0
};
void
*
valBuf
=
NULL
;
int32_t
len
=
0
;
sprintf
(
keyBuf
,
"%s:%"
PRId64
""
,
taskKey
,
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
checkPointId
);
streamFileStateEncode
(
&
pFileState
->
flushMark
,
&
valBuf
,
&
len
);
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
len
);
taosMemoryFree
(
valBuf
);
}
{
char
keyBuf
[
128
]
=
{
0
};
char
valBuf
[
64
]
=
{
0
};
int32_t
len
=
0
;
sprintf
(
keyBuf
,
"%s:%"
PRId64
""
,
taskKey
,
INT64_MIN
);
sprintf
(
valBuf
,
"%"
PRId64
""
,
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
checkPointId
);
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
strlen
(
valBuf
));
}
streamStatePutBatch_rocksdb
(
pFileState
->
pFileStore
,
batch
);
}
streamStateDestroyBatch
(
batch
);
...
...
@@ -362,16 +384,42 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
int32_t
recoverSnapshot
(
SStreamFileState
*
pFileState
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SWinKey
stkey
=
{.
ts
=
-
1
,
.
groupId
=
0
};
// dengyihao
int32_t
code
=
TSDB_CODE_SUCCESS
;
const
char
*
taskKey
=
"streamFileState"
;
int64_t
maxCheckPointId
=
0
;
{
char
buf
[
128
]
=
{
0
};
void
*
val
=
NULL
;
int32_t
len
=
0
;
sprintf
(
buf
,
"%s:%"
PRId64
""
,
taskKey
,
INT64_MIN
);
code
=
streamDefaultGet_rocksdb
(
pFileState
->
pFileStore
,
buf
,
&
val
,
&
len
);
if
(
code
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
sscanf
(
val
,
"%"
PRId64
""
,
&
maxCheckPointId
);
taosMemoryFree
(
val
);
}
for
(
int64_t
i
=
maxCheckPointId
;
i
>
0
;
i
++
)
{
char
buf
[
128
]
=
{
0
};
void
*
val
=
0
;
int32_t
len
=
0
;
sprintf
(
buf
,
"%s:%"
PRId64
""
,
taskKey
,
i
);
code
=
streamDefaultGet_rocksdb
(
pFileState
->
pFileStore
,
buf
,
&
val
,
&
len
);
if
(
code
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
TSKEY
ts
;
sscanf
(
val
,
"%"
PRId64
""
,
&
ts
);
taosMemoryFree
(
val
);
if
(
ts
<
pFileState
->
flushMark
)
{
// forceRemoveCheckPoint(pFileState->pFileStore, i);
}
else
{
break
;
}
}
void
*
pStVal
=
NULL
;
int32_t
len
=
0
;
code
=
streamStateGet_rocksdb
(
pFileState
->
pFileStore
,
&
stkey
,
&
pStVal
,
&
len
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
streamFileStateDecode
(
pFileState
,
pStVal
,
len
);
}
else
{
return
TSDB_CODE_FAILED
;
}
SWinKey
key
=
{.
groupId
=
0
,
.
ts
=
0
};
SStreamStateCur
*
pCur
=
streamStateGetCur_rocksdb
(
pFileState
->
pFileStore
,
&
key
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录