Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d8c2a68f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d8c2a68f
编写于
6月 15, 2022
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(stream): update data fo partition by
上级
84e6f48b
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
145 addition
and
60 deletion
+145
-60
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+4
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+115
-39
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+3
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-8
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+22
-12
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
d8c2a68f
...
...
@@ -391,7 +391,9 @@ typedef struct SStreamBlockScanInfo {
void
*
streamBlockReader
;
// stream block reader handle
SArray
*
pColMatchInfo
;
//
SNode
*
pCondition
;
int32_t
tsArrayIndex
;
SArray
*
tsArray
;
uint64_t
groupId
;
SUpdateInfo
*
pUpdateInfo
;
SExprInfo
*
pPseudoExpr
;
...
...
@@ -582,6 +584,7 @@ typedef struct SPartitionOperatorInfo {
int32_t
*
columnOffset
;
// start position for each column data
void
*
pGroupIter
;
// group iterator
int32_t
pageIndex
;
// page index of current group
SSDataBlock
*
pUpdateRes
;
}
SPartitionOperatorInfo
;
typedef
struct
SWindowRowsSup
{
...
...
@@ -907,6 +910,7 @@ int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
);
#ifdef __cplusplus
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
d8c2a68f
...
...
@@ -750,7 +750,42 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
return
true
;
}
static
void
copyOneRow
(
SSDataBlock
*
dest
,
SSDataBlock
*
source
,
int32_t
sourceRowId
)
{
for
(
int32_t
j
=
0
;
j
<
source
->
info
.
numOfCols
;
j
++
)
{
SColumnInfoData
*
pDestCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
dest
->
pDataBlock
,
j
);
SColumnInfoData
*
pSourceCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
source
->
pDataBlock
,
j
);
if
(
colDataIsNull_s
(
pSourceCol
,
sourceRowId
))
{
colDataAppendNULL
(
pDestCol
,
dest
->
info
.
rows
);
}
else
{
colDataAppend
(
pDestCol
,
dest
->
info
.
rows
,
colDataGetData
(
pSourceCol
,
sourceRowId
),
false
);
}
}
dest
->
info
.
rows
++
;
}
static
uint64_t
getGroupId
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
int32_t
rowId
)
{
uint64_t
*
groupId
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
pBlock
->
info
.
uid
,
sizeof
(
int64_t
));
if
(
groupId
)
{
return
*
groupId
;
}
return
0
;
/* Todo(liuyao) for partition by column
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
uint64_t resId = 0;
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
if (groupId) {
return *groupId;
} else if (len != 0) {
resId = calcGroupId(pTableScanInfo->keyBuf, len);
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
}
return resId;
*/
}
static
SSDataBlock
*
doDataScan
(
SStreamBlockScanInfo
*
pInfo
)
{
while
(
1
)
{
SSDataBlock
*
pResult
=
NULL
;
pResult
=
doTableScan
(
pInfo
->
pOperatorDumy
);
if
(
pResult
==
NULL
)
{
...
...
@@ -759,7 +794,59 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
pResult
=
doTableScan
(
pInfo
->
pOperatorDumy
);
}
}
if
(
!
pResult
)
{
return
NULL
;
}
if
(
pResult
->
info
.
groupId
==
pInfo
->
groupId
)
{
return
pResult
;
}
}
/* Todo(liuyao) for partition by column
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
blockDataCleanup(pResult);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
if (id == pInfo->groupId) {
copyOneRow(pResult, pBlock, i);
}
}
return pResult;
*/
}
static
void
setUpdateData
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
SSDataBlock
*
pUpdateBlock
)
{
blockDataCleanup
(
pUpdateBlock
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
tsArray
);
if
(
pInfo
->
tsArrayIndex
<
size
)
{
SColumnInfoData
*
pCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pUpdateBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
ASSERT
(
pCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
blockDataEnsureCapacity
(
pUpdateBlock
,
size
);
ASSERT
(
pBlock
->
info
.
numOfCols
==
pUpdateBlock
->
info
.
numOfCols
);
int32_t
rowId
=
*
(
int32_t
*
)
taosArrayGet
(
pInfo
->
tsArray
,
pInfo
->
tsArrayIndex
);
pInfo
->
groupId
=
getGroupId
(
pInfo
->
pOperatorDumy
,
pBlock
,
rowId
);
int32_t
i
=
0
;
for
(
;
i
<
size
;
i
++
)
{
rowId
=
*
(
int32_t
*
)
taosArrayGet
(
pInfo
->
tsArray
,
i
+
pInfo
->
tsArrayIndex
);
uint64_t
id
=
getGroupId
(
pInfo
->
pOperatorDumy
,
pBlock
,
rowId
);
if
(
pInfo
->
groupId
!=
id
)
{
break
;
}
copyOneRow
(
pUpdateBlock
,
pBlock
,
rowId
);
}
pUpdateBlock
->
info
.
rows
=
i
;
pInfo
->
tsArrayIndex
+=
i
;
pUpdateBlock
->
info
.
groupId
=
pInfo
->
groupId
;
pUpdateBlock
->
info
.
type
=
STREAM_REPROCESS
;
blockDataUpdateTsWindow
(
pUpdateBlock
,
0
);
}
// all rows have same group id
ASSERT
(
pInfo
->
tsArrayIndex
>=
size
);
if
(
size
>
0
&&
pInfo
->
tsArrayIndex
==
size
)
{
taosArrayClear
(
pInfo
->
tsArray
);
}
}
static
void
getUpdateDataBlock
(
SStreamBlockScanInfo
*
pInfo
,
bool
invertible
,
SSDataBlock
*
pBlock
,
...
...
@@ -767,41 +854,21 @@ static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSD
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
ASSERT
(
pColDataInfo
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
ts
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
if
(
updateInfoIsUpdated
(
pInfo
->
pUpdateInfo
,
pBlock
->
info
.
uid
,
ts
[
i
]))
{
taosArrayPush
(
pInfo
->
tsArray
,
ts
+
i
);
for
(
int32_t
rowId
=
0
;
rowId
<
pBlock
->
info
.
rows
;
rowId
++
)
{
if
(
updateInfoIsUpdated
(
pInfo
->
pUpdateInfo
,
pBlock
->
info
.
uid
,
ts
[
rowId
]))
{
taosArrayPush
(
pInfo
->
tsArray
,
&
rowId
);
}
}
if
(
!
pUpdateBlock
)
{
taosArrayClear
(
pInfo
->
tsArray
);
return
;
}
int32_t
size
=
taosArrayGetSize
(
pInfo
->
tsArray
);
if
(
size
>
0
&&
invertible
)
{
setUpdateData
(
pInfo
,
pBlock
,
pUpdateBlock
);
// Todo(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pBlock, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
SColumnInfoData
*
pCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pUpdateBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
ASSERT
(
pCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
blockDataEnsureCapacity
(
pUpdateBlock
,
size
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
TSKEY
*
pTs
=
(
TSKEY
*
)
taosArrayGet
(
pInfo
->
tsArray
,
i
);
colDataAppend
(
pCol
,
i
,
(
char
*
)
pTs
,
false
);
}
for
(
int32_t
i
=
0
;
i
<
pUpdateBlock
->
info
.
numOfCols
;
i
++
)
{
if
(
i
==
pInfo
->
primaryTsIndex
)
{
continue
;
}
SColumnInfoData
*
pCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pUpdateBlock
->
pDataBlock
,
i
);
colDataAppendNNULL
(
pCol
,
0
,
size
);
}
pUpdateBlock
->
info
.
rows
=
size
;
pUpdateBlock
->
info
.
type
=
STREAM_REPROCESS
;
blockDataUpdateTsWindow
(
pUpdateBlock
,
0
);
taosArrayClear
(
pInfo
->
tsArray
);
}
}
static
SSDataBlock
*
doStreamBlockScan
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -833,7 +900,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
return
pInfo
->
pRes
;
}
else
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_UPDATERES
)
{
blockDataCleanup
(
pInfo
->
pRes
);
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
if
(
!
isStateWindow
(
pInfo
))
{
prepareDataScan
(
pInfo
);
...
...
@@ -848,7 +914,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER
)
{
SSDataBlock
*
pSDB
=
doDataScan
(
pInfo
);
if
(
pSDB
==
NULL
)
{
setUpdateData
(
pInfo
,
pInfo
->
pRes
,
pInfo
->
pUpdateRes
);
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
if
(
!
isStateWindow
(
pInfo
))
{
prepareDataScan
(
pInfo
);
}
return
pInfo
->
pUpdateRes
;
}
else
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
}
}
else
{
getUpdateDataBlock
(
pInfo
,
true
,
pSDB
,
NULL
);
return
pSDB
;
...
...
@@ -941,7 +1015,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if
(
rows
==
0
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
else
if
(
pInfo
->
pUpdateInfo
)
{
blockDataCleanup
(
pInfo
->
pUpdateRes
)
;
pInfo
->
tsArrayIndex
=
0
;
getUpdateDataBlock
(
pInfo
,
true
,
pInfo
->
pRes
,
pInfo
->
pUpdateRes
);
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pUpdateRes
->
info
.
type
==
STREAM_REPROCESS
)
{
...
...
@@ -1020,7 +1094,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
goto
_error
;
}
pInfo
->
tsArray
=
taosArrayInit
(
4
,
sizeof
(
TSKEY
));
pInfo
->
tsArray
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
if
(
pInfo
->
tsArray
==
NULL
)
{
goto
_error
;
}
...
...
@@ -1047,6 +1121,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pInfo
->
pOperatorDumy
=
pTableScanDummy
;
pInfo
->
interval
=
pSTInfo
->
interval
;
pInfo
->
sessionSup
=
(
SessionWindowSupporter
){.
pStreamAggSup
=
NULL
,
.
gap
=
-
1
};
pInfo
->
groupId
=
0
;
pOperator
->
name
=
"StreamBlockScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
pOperator
->
blocking
=
false
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
d8c2a68f
...
...
@@ -1985,7 +1985,7 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) {
blockDataCleanup
(
pBlock
);
}
static
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
)
{
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
)
{
ASSERT
(
pDest
->
info
.
capacity
>=
pSource
->
info
.
rows
);
clearUpdateDataBlock
(
pDest
);
SColumnInfoData
*
pDestCol
=
taosArrayGet
(
pDest
->
pDataBlock
,
0
);
...
...
@@ -1997,6 +1997,8 @@ static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_
colDataAppendNNULL
(
pCol
,
0
,
pSource
->
info
.
rows
);
}
pDest
->
info
.
rows
=
pSource
->
info
.
rows
;
pDest
->
info
.
groupId
=
pSource
->
info
.
groupId
;
pDest
->
info
.
type
=
pSource
->
info
.
type
;
blockDataUpdateTsWindow
(
pDest
,
0
);
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
d8c2a68f
...
...
@@ -2345,14 +2345,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
SResultRowEntryInfo
*
pSResInfo
=
GET_RES_INFO
(
pSourceCtx
);
SAPercentileInfo
*
pSBuf
=
GET_ROWCELL_INTERBUF
(
pSResInfo
);
ASSERT
(
pDBuf
->
algo
==
pSBuf
->
algo
);
if
(
pDBuf
->
algo
==
APERCT_ALGO_TDIGEST
)
{
tdigestMerge
(
pDBuf
->
pTDigest
,
pSBuf
->
pTDigest
);
}
else
{
SHistogramInfo
*
pTmp
=
tHistogramMerge
(
pDBuf
->
pHisto
,
pSBuf
->
pHisto
,
MAX_HISTOGRAM_BIN
);
memcpy
(
pDBuf
->
pHisto
,
pTmp
,
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
(
MAX_HISTOGRAM_BIN
+
1
));
pDBuf
->
pHisto
->
elems
=
(
SHistBin
*
)((
char
*
)
pDBuf
->
pHisto
+
sizeof
(
SHistogramInfo
));
tHistogramDestroy
(
&
pTmp
);
}
apercentileTransferInfo
(
pSBuf
,
pDBuf
);
pDResInfo
->
numOfRes
=
TMAX
(
pDResInfo
->
numOfRes
,
pSResInfo
->
numOfRes
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
d8c2a68f
...
...
@@ -104,10 +104,14 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
return
false
;
}
if
(
NULL
==
pNode
->
pParent
||
(
QUERY_NODE_LOGIC_PLAN_WINDOW
!=
nodeType
(
pNode
->
pParent
)
&&
QUERY_NODE_LOGIC_PLAN_AGG
!=
nodeType
(
pNode
->
pParent
)))
{
QUERY_NODE_LOGIC_PLAN_AGG
!=
nodeType
(
pNode
->
pParent
)
&&
QUERY_NODE_LOGIC_PLAN_PARTITION
!=
nodeType
(
pNode
->
pParent
)))
{
return
false
;
}
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
->
pParent
))
{
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
->
pParent
)
||
(
QUERY_NODE_LOGIC_PLAN_PARTITION
==
nodeType
(
pNode
->
pParent
)
&&
pNode
->
pParent
->
pParent
&&
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
->
pParent
->
pParent
))
)
{
return
true
;
}
return
!
osdHaveNormalCol
(((
SAggLogicNode
*
)
pNode
->
pParent
)
->
pGroupKeys
);
...
...
@@ -217,16 +221,22 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) {
}
static
void
setScanWindowInfo
(
SScanLogicNode
*
pScan
)
{
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pScan
->
node
.
pParent
))
{
pScan
->
interval
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
interval
;
pScan
->
offset
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
offset
;
pScan
->
sliding
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
sliding
;
pScan
->
intervalUnit
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
intervalUnit
;
pScan
->
slidingUnit
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
slidingUnit
;
pScan
->
triggerType
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
triggerType
;
pScan
->
watermark
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
watermark
;
pScan
->
tsColId
=
((
SColumnNode
*
)((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
pTspk
)
->
colId
;
pScan
->
filesFactor
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
filesFactor
;
SLogicNode
*
pParent
=
pScan
->
node
.
pParent
;
if
(
QUERY_NODE_LOGIC_PLAN_PARTITION
==
nodeType
(
pParent
)
&&
pParent
->
pParent
&&
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pParent
->
pParent
))
{
pParent
=
pParent
->
pParent
;
}
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pParent
))
{
pScan
->
interval
=
((
SWindowLogicNode
*
)
pParent
)
->
interval
;
pScan
->
offset
=
((
SWindowLogicNode
*
)
pParent
)
->
offset
;
pScan
->
sliding
=
((
SWindowLogicNode
*
)
pParent
)
->
sliding
;
pScan
->
intervalUnit
=
((
SWindowLogicNode
*
)
pParent
)
->
intervalUnit
;
pScan
->
slidingUnit
=
((
SWindowLogicNode
*
)
pParent
)
->
slidingUnit
;
pScan
->
triggerType
=
((
SWindowLogicNode
*
)
pParent
)
->
triggerType
;
pScan
->
watermark
=
((
SWindowLogicNode
*
)
pParent
)
->
watermark
;
pScan
->
tsColId
=
((
SColumnNode
*
)((
SWindowLogicNode
*
)
pParent
)
->
pTspk
)
->
colId
;
pScan
->
filesFactor
=
((
SWindowLogicNode
*
)
pParent
)
->
filesFactor
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录