Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b8e451d2
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
b8e451d2
编写于
10月 31, 2022
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(stream): scalar stream update data
上级
5202ccfb
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
167 addition
and
71 deletion
+167
-71
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+10
-8
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+60
-61
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+3
-2
tests/script/tsim/stream/scalar.sim
tests/script/tsim/stream/scalar.sim
+94
-0
未找到文件。
source/libs/executor/src/groupoperator.c
浏览文件 @
b8e451d2
...
...
@@ -62,7 +62,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
int32_t
numOfGroupCols
=
taosArrayGetSize
(
pGroupColList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroupCols
;
++
i
)
{
SColumn
*
pCol
=
(
SColumn
*
)
taosArrayGet
(
pGroupColList
,
i
);
SColumn
*
pCol
=
(
SColumn
*
)
taosArrayGet
(
pGroupColList
,
i
);
(
*
keyLen
)
+=
pCol
->
bytes
;
// actual data + null_flag
SGroupKeys
key
=
{
0
};
...
...
@@ -397,7 +397,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return
buildGroupResultDataBlock
(
pOperator
);
}
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SAggPhysiNode
*
pAggNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SAggPhysiNode
*
pAggNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SGroupbyOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SGroupbyOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -442,8 +442,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
hashGroupbyAggregate
,
NULL
,
NULL
,
destroyGroupOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
hashGroupbyAggregate
,
NULL
,
NULL
,
destroyGroupOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
@@ -765,7 +765,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto
_error
;
}
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPartNode
->
pTargets
,
NULL
,
&
numOfCols
);
pInfo
->
pGroupCols
=
extractPartitionColInfo
(
pPartNode
->
pPartitionKeys
);
...
...
@@ -819,8 +818,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
hashPartition
,
NULL
,
NULL
,
destroyPartitionOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
hashPartition
,
NULL
,
NULL
,
destroyPartitionOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
...
...
@@ -965,6 +964,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case
STREAM_DELETE_DATA
:
{
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
printDataBlock
(
pInfo
->
pDelRes
,
"stream partitionby delete"
);
return
pInfo
->
pDelRes
;
}
break
;
default:
...
...
@@ -1014,6 +1014,9 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
partitionSup
=
*
pParSup
;
pScanInfo
->
pPartScalarSup
=
pExpr
;
if
(
!
pScanInfo
->
pUpdateInfo
)
{
pScanInfo
->
pUpdateInfo
=
updateInfoInit
(
60000
,
TSDB_TIME_PRECISION_MILLI
,
0
);
}
}
SOperatorInfo
*
createStreamPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SStreamPartitionPhysiNode
*
pPartNode
,
...
...
@@ -1108,7 +1111,6 @@ _error:
return
NULL
;
}
SArray
*
extractColumnInfo
(
SNodeList
*
pNodeList
)
{
size_t
numOfCols
=
LIST_LENGTH
(
pNodeList
);
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumn
));
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
b8e451d2
...
...
@@ -363,7 +363,8 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
if
(
pLimitInfo
->
remainOffset
>=
pBlock
->
info
.
rows
)
{
pLimitInfo
->
remainOffset
-=
pBlock
->
info
.
rows
;
pBlock
->
info
.
rows
=
0
;
qDebug
(
"current block ignore due to offset, current:%"
PRId64
", %s"
,
pLimitInfo
->
remainOffset
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"current block ignore due to offset, current:%"
PRId64
", %s"
,
pLimitInfo
->
remainOffset
,
GET_TASKID
(
pTaskInfo
));
}
else
{
blockDataTrimFirstNRows
(
pBlock
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
...
...
@@ -376,7 +377,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
int32_t
keep
=
pBlock
->
info
.
rows
-
overflowRows
;
blockDataKeepFirstNRows
(
pBlock
,
keep
);
qDebug
(
"output limit %"
PRId64
" has reached, %s"
,
pLimit
->
limit
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"output limit %"
PRId64
" has reached, %s"
,
pLimit
->
limit
,
GET_TASKID
(
pTaskInfo
));
pOperator
->
status
=
OP_EXEC_DONE
;
}
}
...
...
@@ -748,13 +749,13 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return
NULL
;
}
int32_t
num
=
0
;
int32_t
num
=
0
;
STableKeyInfo
*
pList
=
NULL
;
tableListGetGroupList
(
pTaskInfo
->
pTableInfoList
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
ASSERT
(
pInfo
->
dataReader
==
NULL
);
int32_t
code
=
tsdbReaderOpen
(
pInfo
->
readHandle
.
vnode
,
&
pInfo
->
cond
,
pList
,
num
,
(
STsdbReader
**
)
&
pInfo
->
dataReader
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
tsdbReaderOpen
(
pInfo
->
readHandle
.
vnode
,
&
pInfo
->
cond
,
pList
,
num
,
(
STsdbReader
**
)
&
pInfo
->
dataReader
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -776,7 +777,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pInfo
->
limitInfo
.
numOfOutputRows
=
0
;
pInfo
->
limitInfo
.
remainOffset
=
pInfo
->
limitInfo
.
limit
.
offset
;
int32_t
num
=
0
;
int32_t
num
=
0
;
STableKeyInfo
*
pList
=
NULL
;
tableListGetGroupList
(
pTaskInfo
->
pTableInfoList
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
...
...
@@ -819,8 +820,8 @@ static void destroyTableScanOperatorInfo(void* param) {
taosMemoryFreeClear
(
param
);
}
SOperatorInfo
*
createTableScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createTableScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
STableScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -884,7 +885,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
_error:
_error:
if
(
pInfo
!=
NULL
)
{
destroyTableScanOperatorInfo
(
pInfo
);
}
...
...
@@ -1024,7 +1025,8 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createDataBlockInfoScanOperator
(
SReadHandle
*
readHandle
,
SBlockDistScanPhysiNode
*
pBlockScanNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createDataBlockInfoScanOperator
(
SReadHandle
*
readHandle
,
SBlockDistScanPhysiNode
*
pBlockScanNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SBlockDistInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SBlockDistInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -1041,8 +1043,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
}
STableListInfo
*
pTableListInfo
=
pTaskInfo
->
pTableInfoList
;
size_t
num
=
tableListGetSize
(
pTableListInfo
);
void
*
pList
=
tableListGetInfo
(
pTableListInfo
,
0
);
size_t
num
=
tableListGetSize
(
pTableListInfo
);
void
*
pList
=
tableListGetInfo
(
pTableListInfo
,
0
);
tsdbReaderOpen
(
readHandle
->
vnode
,
&
cond
,
pList
,
num
,
&
pInfo
->
pHandle
,
pTaskInfo
->
id
.
str
);
cleanupQueryTableDataCond
(
&
cond
);
...
...
@@ -1070,7 +1072,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
createOperatorFpSet
(
operatorDummyOpenFn
,
doBlockInfoScan
,
NULL
,
NULL
,
destroyBlockDistScanOperatorInfo
,
NULL
);
return
pOperator
;
_error:
_error:
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
...
...
@@ -1135,8 +1137,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
blockDataCleanup
(
pBlock
);
STsdbReader
*
pReader
=
NULL
;
int32_t
code
=
tsdbReaderOpen
(
pTableScanInfo
->
readHandle
.
vnode
,
&
cond
,
&
tblInfo
,
1
,
(
STsdbReader
**
)
&
pReader
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
tsdbReaderOpen
(
pTableScanInfo
->
readHandle
.
vnode
,
&
cond
,
&
tblInfo
,
1
,
(
STsdbReader
**
)
&
pReader
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
return
NULL
;
...
...
@@ -1162,7 +1164,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
tsdbReaderClose
(
pReader
);
qDebug
(
"retrieve prev rows:%d, skey:%"
PRId64
", ekey:%"
PRId64
" uid:%"
PRIu64
", max ver:%"
PRId64
", suid:%"
PRIu64
,
pBlock
->
info
.
rows
,
startTs
,
endTs
,
tbUid
,
maxVersion
,
cond
.
suid
);
", suid:%"
PRIu64
,
pBlock
->
info
.
rows
,
startTs
,
endTs
,
tbUid
,
maxVersion
,
cond
.
suid
);
return
pBlock
->
info
.
rows
>
0
?
pBlock
:
NULL
;
}
...
...
@@ -1178,12 +1181,12 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
static
uint64_t
getGroupIdByUid
(
SStreamScanInfo
*
pInfo
,
uint64_t
uid
)
{
return
getTableGroupId
(
pInfo
->
pTableScanOp
->
pTaskInfo
->
pTableInfoList
,
uid
);
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->pTableInfoList.map;
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
// if (groupId) {
// return *groupId;
// }
// return 0;
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->pTableInfoList.map;
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
// if (groupId) {
// return *groupId;
// }
// return 0;
}
static
uint64_t
getGroupIdByData
(
SStreamScanInfo
*
pInfo
,
uint64_t
uid
,
TSKEY
ts
,
int64_t
maxVersion
)
{
...
...
@@ -1380,7 +1383,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if
(
rows
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
blockDataEnsureCapacity
(
pDestBlock
,
rows
*
2
);
int32_t
code
=
blockDataEnsureCapacity
(
pDestBlock
,
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1423,39 +1426,33 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
}
static
int32_t
generateDeleteResultBlock
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pSrcBlock
,
SSDataBlock
*
pDestBlock
)
{
if
(
pSrcBlock
->
info
.
rows
==
0
)
{
blockDataCleanup
(
pDestBlock
);
int32_t
rows
=
pSrcBlock
->
info
.
rows
;
if
(
rows
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
blockDataCleanup
(
pDestBlock
);
int32_t
code
=
blockDataEnsureCapacity
(
pDestBlock
,
pSrcBlock
->
info
.
rows
);
int32_t
code
=
blockDataEnsureCapacity
(
pDestBlock
,
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
ASSERT
(
taosArrayGetSize
(
pSrcBlock
->
pDataBlock
)
>=
3
);
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
startData
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
TSKEY
*
endData
=
(
TSKEY
*
)
pEndTsCol
->
pData
;
SColumnInfoData
*
pUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
uint64_t
*
uidCol
=
(
uint64_t
*
)
pUidCol
->
pData
;
SColumnInfoData
*
pDestStartCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestEndCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestUidCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pDestGpCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
int32_t
dummy
=
0
;
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
SColumnInfoData
*
pSrcStartTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcEndTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
uint64_t
*
srcUidData
=
(
uint64_t
*
)
pSrcUidCol
->
pData
;
SColumnInfoData
*
pSrcGpCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
srcGp
=
(
uint64_t
*
)
pSrcGpCol
->
pData
;
ASSERT
(
pSrcStartTsCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
srcStartTsCol
=
(
TSKEY
*
)
pSrcStartTsCol
->
pData
;
TSKEY
*
srcEndTsCol
=
(
TSKEY
*
)
pSrcEndTsCol
->
pData
;
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
for
(
int32_t
i
=
0
;
i
<
pSrcBlock
->
info
.
rows
;
i
++
)
{
uint64_t
groupId
=
getGroupIdByData
(
pInfo
,
uidCol
[
i
],
startData
[
i
],
version
);
colDataAppend
(
pDestStartCol
,
i
,
(
const
char
*
)(
startData
+
i
),
false
);
colDataAppend
(
pDestEndCol
,
i
,
(
const
char
*
)(
endData
+
i
),
false
);
colDataAppendNULL
(
pDestUidCol
,
i
);
colDataAppend
(
pDestGpCol
,
i
,
(
const
char
*
)
&
groupId
,
false
);
colDataAppendNULL
(
pDestCalStartTsCol
,
i
);
colDataAppendNULL
(
pDestCalEndTsCol
,
i
);
pDestBlock
->
info
.
rows
++
;
uint64_t
srcUid
=
srcUidData
[
i
];
uint64_t
groupId
=
srcGp
[
i
];
if
(
groupId
==
0
)
{
groupId
=
getGroupIdByData
(
pInfo
,
srcUid
,
srcStartTsCol
[
i
],
version
);
}
appendOneRowToStreamSpecialBlock
(
pDestBlock
,
srcStartTsCol
+
i
,
srcEndTsCol
+
i
,
srcUidData
+
i
,
&
groupId
,
NULL
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1466,6 +1463,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
code
=
generateIntervalScanRange
(
pInfo
,
pSrcBlock
,
pDestBlock
);
}
else
if
(
isSessionWindow
(
pInfo
)
||
isStateWindow
(
pInfo
))
{
code
=
generateSessionScanRange
(
pInfo
,
pSrcBlock
,
pDestBlock
);
}
else
{
code
=
generateDeleteResultBlock
(
pInfo
,
pSrcBlock
,
pDestBlock
);
}
pDestBlock
->
info
.
type
=
STREAM_CLEAR
;
pDestBlock
->
info
.
version
=
pSrcBlock
->
info
.
version
;
...
...
@@ -1894,8 +1893,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
#endif
size_t
total
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
// TODO: refactor
FETCH_NEXT_BLOCK:
// TODO: refactor
FETCH_NEXT_BLOCK:
if
(
pInfo
->
blockType
==
STREAM_INPUT__DATA_BLOCK
)
{
if
(
pInfo
->
validBlockIndex
>=
total
)
{
doClearBufferedBlocks
(
pInfo
);
...
...
@@ -2022,7 +2021,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
int32_t
totBlockNum
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
NEXT_SUBMIT_BLK:
NEXT_SUBMIT_BLK:
while
(
1
)
{
if
(
pInfo
->
tqReader
->
pMsg
==
NULL
)
{
if
(
pInfo
->
validBlockIndex
>=
totBlockNum
)
{
...
...
@@ -2278,7 +2277,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doRawScan
,
NULL
,
NULL
,
destroyRawScanOperatorInfo
,
NULL
);
return
pOperator
;
_end:
_end:
taosMemoryFree
(
pInfo
);
taosMemoryFree
(
pOperator
);
pTaskInfo
->
code
=
code
;
...
...
@@ -2388,7 +2387,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
STableKeyInfo
*
pList
=
NULL
;
int32_t
num
=
0
;
int32_t
num
=
0
;
tableListGetGroupList
(
pTaskInfo
->
pTableInfoList
,
0
,
&
pList
,
&
num
);
if
(
pHandle
->
initTableReader
)
{
...
...
@@ -2467,7 +2466,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
return
pOperator
;
_error:
_error:
if
(
pColIds
!=
NULL
)
{
taosArrayDestroy
(
pColIds
);
}
...
...
@@ -4102,7 +4101,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
return
pOperator
;
_error:
_error:
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
...
...
@@ -4241,7 +4240,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
return
pOperator
;
_error:
_error:
taosMemoryFree
(
pInfo
);
taosMemoryFree
(
pOperator
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -4264,8 +4263,8 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle*
STableListInfo
*
pTableListInfo
,
int32_t
tableStartIdx
,
int32_t
tableEndIdx
,
STsdbReader
**
ppReader
,
const
char
*
idstr
)
{
STsdbReader
*
pReader
=
NULL
;
void
*
pStart
=
tableListGetInfo
(
pTableListInfo
,
tableStartIdx
);
int32_t
num
=
tableEndIdx
-
tableStartIdx
+
1
;
void
*
pStart
=
tableListGetInfo
(
pTableListInfo
,
tableStartIdx
);
int32_t
num
=
tableEndIdx
-
tableStartIdx
+
1
;
int32_t
code
=
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
pStart
,
num
,
&
pReader
,
idstr
);
if
(
code
!=
0
)
{
...
...
@@ -4426,7 +4425,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
bool
allColumnsHaveAgg
=
true
;
SColumnDataAgg
**
pColAgg
=
NULL
;
// STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
// STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
if
(
allColumnsHaveAgg
==
true
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
...
...
@@ -4524,7 +4523,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
int64_t
st
=
taosGetTimestampUs
();
void
*
p
=
tableListGetInfo
(
pInfo
->
tableListInfo
,
readIdx
+
pInfo
->
tableStartIndex
);
void
*
p
=
tableListGetInfo
(
pInfo
->
tableListInfo
,
readIdx
+
pInfo
->
tableStartIndex
);
SReadHandle
*
pHandle
=
&
pInfo
->
readHandle
;
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
p
,
1
,
&
pInfo
->
pReader
,
GET_TASKID
(
pTaskInfo
));
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
b8e451d2
...
...
@@ -1687,7 +1687,9 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
windowSup
.
parentType
=
type
;
pScanInfo
->
windowSup
.
pIntervalAggSup
=
pSup
;
pScanInfo
->
pUpdateInfo
=
updateInfoInitP
(
pInterval
,
pTwSup
->
waterMark
);
if
(
!
pScanInfo
->
pUpdateInfo
)
{
pScanInfo
->
pUpdateInfo
=
updateInfoInitP
(
pInterval
,
pTwSup
->
waterMark
);
}
pScanInfo
->
interval
=
*
pInterval
;
pScanInfo
->
twAggSup
=
*
pTwSup
;
}
...
...
@@ -2453,7 +2455,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
else
{
// non-linear interpolation
pSliceInfo
->
current
=
taosTimeAdd
(
pSliceInfo
->
current
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
);
}
}
...
...
tests/script/tsim/stream/scalar.sim
0 → 100644
浏览文件 @
b8e451d2
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams0 into streamt0 as select ts c1, a, abs(b) c4 from t1 partition by a;
sql create stream streams1 into streamt1 as select ts c1, a, abs(b) c4 from t1;
sql create stream streams2 into streamt2 as select ts c1, a, abs(b) c4 from st partition by tbname;
sql insert into t1 values(1648791213000,1,1,1,1);
sql insert into t1 values(1648791213001,1,1,1,1);
sql insert into t1 values(1648791213002,1,1,1,1);
sql insert into t2 values(1648791213000,1,2,2,2);
sql insert into t2 values(1648791213001,1,1,1,1);
sql insert into t2 values(1648791213002,1,1,1,1);
sql insert into t1 values(1648791213001,2,11,11,11);
$loop_count = 0
loop1:
sleep 200
sql select * from streamt0 order by a desc;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print ======streamt0=rows=$rows
goto loop1
endi
if $data01 != 2 then
print ======streamt0=data01=$data01
goto loop1
endi
if $data02 != 11 then
print ======streamt0=data02=$data02
goto loop1
endi
sql select * from streamt1 order by a desc;
if $rows != 3 then
print ======streamt1=rows=$rows
goto loop1
endi
if $data01 != 2 then
print ======streamt1=data01=$data01
goto loop1
endi
if $data02 != 11 then
print ======streamt1=data02=$data02
goto loop1
endi
sql select * from streamt2 order by a desc;
if $rows != 6 then
print ======streamt2=rows=$rows
goto loop1
endi
if $data01 != 2 then
print ======streamt2=data01=$data01
goto loop1
endi
if $data02 != 11 then
print ======streamt2=data02=$data02
goto loop1
endi
system sh/stop_dnodes.sh
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录