Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7a532284
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7a532284
编写于
11月 01, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
11月 01, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17784 from taosdata/fix/liao_cov
enh(query): improve the aggregate performance.
上级
2791f626
b78249c5
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
209 addition
and
239 deletion
+209
-239
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+5
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+57
-60
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+9
-8
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+63
-57
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-51
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+68
-60
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-1
未找到文件。
source/common/src/tdatablock.c
浏览文件 @
7a532284
...
...
@@ -1132,6 +1132,10 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
pDataBlock
->
info
.
window
.
ekey
=
0
;
pDataBlock
->
info
.
window
.
skey
=
0
;
if
(
pDataBlock
->
info
.
capacity
==
0
)
{
return
;
}
size_t
numOfCols
=
taosArrayGetSize
(
pDataBlock
->
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
...
...
@@ -1186,6 +1190,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
}
void
colInfoDataCleanup
(
SColumnInfoData
*
pColumn
,
uint32_t
numOfRows
)
{
pColumn
->
hasNull
=
false
;
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
info
.
type
))
{
pColumn
->
varmeta
.
length
=
0
;
if
(
pColumn
->
varmeta
.
offset
!=
NULL
)
{
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
7a532284
...
...
@@ -159,7 +159,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
void
tsdbReaderClose
(
STsdbReader
*
pReader
);
bool
tsdbNextDataBlock
(
STsdbReader
*
pReader
);
bool
tsdbTableNextDataBlock
(
STsdbReader
*
pReader
,
uint64_t
uid
);
void
tsdbRetrieveDataBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
);
void
tsdbRetrieveDataBlockInfo
(
const
STsdbReader
*
pReader
,
int32_t
*
rows
,
uint64_t
*
uid
,
STimeWindow
*
pWindow
);
int32_t
tsdbRetrieveDatablockSMA
(
STsdbReader
*
pReader
,
SColumnDataAgg
***
pBlockStatis
,
bool
*
allHave
);
SArray
*
tsdbRetrieveDataBlock
(
STsdbReader
*
pTsdbReadHandle
,
SArray
*
pColumnIdList
);
int32_t
tsdbReaderReset
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
7a532284
...
...
@@ -35,11 +35,17 @@ typedef struct {
int32_t
numOfLastFiles
;
}
SBlockNumber
;
typedef
struct
SBlockIndex
{
int32_t
ordinalIndex
;
int64_t
inFileOffset
;
STimeWindow
window
;
}
SBlockIndex
;
typedef
struct
STableBlockScanInfo
{
uint64_t
uid
;
TSKEY
lastKey
;
SMapData
mapData
;
// block info (compressed)
SArray
*
pBlockList
;
// block data index list
SArray
*
pBlockList
;
// block data index list
, SArray<SBlockIndex>
SIterInfo
iter
;
// mem buffer skip list iterator
SIterInfo
iiter
;
// imem buffer skip list iterator
SArray
*
delSkyline
;
// delete info for this table
...
...
@@ -568,7 +574,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
STableBlockScanInfo
*
pScanInfo
=
p
;
if
(
pScanInfo
->
pBlockList
==
NULL
)
{
pScanInfo
->
pBlockList
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
pScanInfo
->
pBlockList
=
taosArrayInit
(
4
,
sizeof
(
SBlockIndex
));
}
taosArrayPush
(
pIndexList
,
pBlockIdx
);
...
...
@@ -630,7 +636,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
continue
;
}
void
*
p
=
taosArrayPush
(
pScanInfo
->
pBlockList
,
&
j
);
SBlockIndex
bIndex
=
{.
ordinalIndex
=
j
,
.
inFileOffset
=
block
.
aSubBlock
->
offset
};
bIndex
.
window
=
(
STimeWindow
)
{.
skey
=
block
.
minKey
.
ts
,
.
ekey
=
block
.
maxKey
.
ts
};
void
*
p
=
taosArrayPush
(
pScanInfo
->
pBlockList
,
&
bIndex
);
if
(
p
==
NULL
)
{
tMapDataClear
(
&
pScanInfo
->
mapData
);
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -1061,8 +1070,8 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr)
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
*
mapData
Index
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pBlockInfo
->
tbBlockIdx
);
tMapDataGetItemByIdx
(
&
pScanInfo
->
mapData
,
*
mapData
Index
,
&
pBlockIter
->
block
,
tGetDataBlk
);
SBlockIndex
*
p
Index
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pBlockInfo
->
tbBlockIdx
);
tMapDataGetItemByIdx
(
&
pScanInfo
->
mapData
,
pIndex
->
ordinal
Index
,
&
pBlockIter
->
block
,
tGetDataBlk
);
}
#if 0
...
...
@@ -1075,6 +1084,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr)
static
int32_t
initBlockIterator
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
,
int32_t
numOfBlocks
)
{
bool
asc
=
ASCENDING_TRAVERSE
(
pReader
->
order
);
SBlockOrderSupporter
sup
=
{
0
};
pBlockIter
->
numOfBlocks
=
numOfBlocks
;
taosArrayClear
(
pBlockIter
->
blockList
);
pBlockIter
->
pTableMap
=
pReader
->
status
.
pTableMap
;
...
...
@@ -1083,9 +1093,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
int32_t
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pReader
->
status
.
pTableMap
);
int64_t
st
=
taosGetTimestampUs
();
SBlockOrderSupporter
sup
=
{
0
};
int32_t
code
=
initBlockOrderSupporter
(
&
sup
,
numOfTables
);
int32_t
code
=
initBlockOrderSupporter
(
&
sup
,
numOfTables
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1113,17 +1121,11 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
sup
.
pDataBlockInfo
[
sup
.
numOfTables
]
=
(
SBlockOrderWrapper
*
)
buf
;
SDataBlk
block
=
{
0
};
for
(
int32_t
k
=
0
;
k
<
num
;
++
k
)
{
SBlockOrderWrapper
wrapper
=
{
0
};
int32_t
*
mapDataIndex
=
taosArrayGet
(
pTableScanInfo
->
pBlockList
,
k
);
tMapDataGetItemByIdx
(
&
pTableScanInfo
->
mapData
,
*
mapDataIndex
,
&
block
,
tGetDataBlk
);
wrapper
.
uid
=
pTableScanInfo
->
uid
;
wrapper
.
offset
=
block
.
aSubBlock
[
0
].
offset
;
sup
.
pDataBlockInfo
[
sup
.
numOfTables
][
k
]
=
wrapper
;
for
(
int32_t
k
=
0
;
k
<
num
;
++
k
)
{
SBlockIndex
*
pIndex
=
taosArrayGet
(
pTableScanInfo
->
pBlockList
,
k
);
sup
.
pDataBlockInfo
[
sup
.
numOfTables
][
k
]
=
(
SBlockOrderWrapper
){.
uid
=
pTableScanInfo
->
uid
,
.
offset
=
pIndex
->
inFileOffset
};
cnt
++
;
}
...
...
@@ -1214,25 +1216,22 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
(
pVerRange
->
maxVer
<
pBlock
->
maxVer
&&
pVerRange
->
maxVer
>=
pBlock
->
minVer
);
}
static
SDataBlk
*
getNeighborBlockOfSameTable
(
SFileDataBlockInfo
*
pBlockInfo
,
STableBlockScanInfo
*
pTableBlockScanInfo
,
int32_t
*
nextIndex
,
int32_t
order
)
{
static
bool
getNeighborBlockOfSameTable
(
SFileDataBlockInfo
*
pBlockInfo
,
STableBlockScanInfo
*
pTableBlockScanInfo
,
int32_t
*
nextIndex
,
int32_t
order
,
SBlockIndex
*
pBlockIndex
)
{
bool
asc
=
ASCENDING_TRAVERSE
(
order
);
if
(
asc
&&
pBlockInfo
->
tbBlockIdx
>=
taosArrayGetSize
(
pTableBlockScanInfo
->
pBlockList
)
-
1
)
{
return
NULL
;
return
false
;
}
if
(
!
asc
&&
pBlockInfo
->
tbBlockIdx
==
0
)
{
return
NULL
;
return
false
;
}
int32_t
step
=
asc
?
1
:
-
1
;
*
nextIndex
=
pBlockInfo
->
tbBlockIdx
+
step
;
SDataBlk
*
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SDataBlk
));
int32_t
*
indexInMapdata
=
taosArrayGet
(
pTableBlockScanInfo
->
pBlockList
,
*
nextIndex
);
tMapDataGetItemByIdx
(
&
pTableBlockScanInfo
->
mapData
,
*
indexInMapdata
,
pBlock
,
tGetDataBlk
);
return
pBlock
;
*
pBlockIndex
=
*
(
SBlockIndex
*
)
taosArrayGet
(
pTableBlockScanInfo
->
pBlockList
,
*
nextIndex
);
// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
return
true
;
}
static
int32_t
findFileBlockInfoIndex
(
SDataBlockIter
*
pBlockIter
,
SFileDataBlockInfo
*
pFBlockInfo
)
{
...
...
@@ -1274,12 +1273,12 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t
return
TSDB_CODE_SUCCESS
;
}
static
bool
overlapWithNeighborBlock
(
SDataBlk
*
pBlock
,
S
DataBlk
*
pNeighbor
,
int32_t
order
)
{
static
bool
overlapWithNeighborBlock
(
SDataBlk
*
pBlock
,
S
BlockIndex
*
pNeighborBlockIndex
,
int32_t
order
)
{
// it is the last block in current file, no chance to overlap with neighbor blocks.
if
(
ASCENDING_TRAVERSE
(
order
))
{
return
pBlock
->
maxKey
.
ts
==
pNeighbor
->
minKey
.
ts
;
return
pBlock
->
maxKey
.
ts
==
pNeighbor
BlockIndex
->
window
.
skey
;
}
else
{
return
pBlock
->
minKey
.
ts
==
pNeighbor
->
maxKey
.
ts
;
return
pBlock
->
minKey
.
ts
==
pNeighbor
BlockIndex
->
window
.
ekey
;
}
}
...
...
@@ -1366,13 +1365,14 @@ typedef struct {
static
void
getBlockToLoadInfo
(
SDataBlockToLoadInfo
*
pInfo
,
SFileDataBlockInfo
*
pBlockInfo
,
SDataBlk
*
pBlock
,
STableBlockScanInfo
*
pScanInfo
,
TSDBKEY
keyInBuf
,
SLastBlockReader
*
pLastBlockReader
,
STsdbReader
*
pReader
)
{
int32_t
neighborIndex
=
0
;
SDataBlk
*
pNeighbor
=
getNeighborBlockOfSameTable
(
pBlockInfo
,
pScanInfo
,
&
neighborIndex
,
pReader
->
order
);
int32_t
neighborIndex
=
0
;
SBlockIndex
bIndex
=
{
0
};
bool
hasNeighbor
=
getNeighborBlockOfSameTable
(
pBlockInfo
,
pScanInfo
,
&
neighborIndex
,
pReader
->
order
,
&
bIndex
);
// overlap with neighbor
if
(
pNeighbor
)
{
pInfo
->
overlapWithNeighborBlock
=
overlapWithNeighborBlock
(
pBlock
,
pNeighbor
,
pReader
->
order
);
taosMemoryFree
(
pNeighbor
);
if
(
hasNeighbor
)
{
pInfo
->
overlapWithNeighborBlock
=
overlapWithNeighborBlock
(
pBlock
,
&
bIndex
,
pReader
->
order
);
}
// has duplicated ts of different version in this block
...
...
@@ -3069,15 +3069,15 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
*
state
=
CHECK_FILEBLOCK_QUIT
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
1
:
-
1
;
int32_t
nextIndex
=
-
1
;
SDataBlk
*
pNeighborBlock
=
getNeighborBlockOfSameTable
(
pFBlock
,
pScanInfo
,
&
nextIndex
,
pReader
->
order
);
if
(
pNeighborBlock
==
NULL
)
{
// do nothing
int32_t
nextIndex
=
-
1
;
SBlockIndex
bIndex
=
{
0
};
bool
hasNeighbor
=
getNeighborBlockOfSameTable
(
pFBlock
,
pScanInfo
,
&
nextIndex
,
pReader
->
order
,
&
bIndex
);
if
(
!
hasNeighbor
)
{
// do nothing
return
0
;
}
bool
overlap
=
overlapWithNeighborBlock
(
pBlock
,
pNeighborBlock
,
pReader
->
order
);
taosMemoryFree
(
pNeighborBlock
);
bool
overlap
=
overlapWithNeighborBlock
(
pBlock
,
&
bIndex
,
pReader
->
order
);
if
(
overlap
)
{
// load next block
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
SDataBlockIter
*
pBlockIter
=
&
pStatus
->
blockIter
;
...
...
@@ -3498,7 +3498,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pReader
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
// resetDataBlockScanInfo(pReader->status.pTableMap, pReader->window.skey);
// no data in files, let's try buffer in memory
if
(
pReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
...
...
@@ -3801,24 +3800,24 @@ bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
return
true
;
}
static
void
setBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
)
{
ASSERT
(
p
DataBlockInfo
!=
NULL
&&
p
Reader
!=
NULL
);
pDataBlockInfo
->
rows
=
pReader
->
pResBlock
->
info
.
rows
;
pDataBlockInfo
->
uid
=
pReader
->
pResBlock
->
info
.
uid
;
pDataBlockInfo
->
w
indow
=
pReader
->
pResBlock
->
info
.
window
;
static
void
setBlockInfo
(
const
STsdbReader
*
pReader
,
int32_t
*
rows
,
uint64_t
*
uid
,
STimeWindow
*
pWindow
)
{
ASSERT
(
pReader
!=
NULL
);
*
rows
=
pReader
->
pResBlock
->
info
.
rows
;
*
uid
=
pReader
->
pResBlock
->
info
.
uid
;
*
pW
indow
=
pReader
->
pResBlock
->
info
.
window
;
}
void
tsdbRetrieveDataBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
)
{
void
tsdbRetrieveDataBlockInfo
(
const
STsdbReader
*
pReader
,
int32_t
*
rows
,
uint64_t
*
uid
,
STimeWindow
*
pWindow
)
{
if
(
pReader
->
type
==
TIMEWINDOW_RANGE_EXTERNAL
)
{
if
(
pReader
->
step
==
EXTERNAL_ROWS_MAIN
)
{
setBlockInfo
(
pReader
,
pDataBlockInfo
);
setBlockInfo
(
pReader
,
rows
,
uid
,
pWindow
);
}
else
if
(
pReader
->
step
==
EXTERNAL_ROWS_PREV
)
{
setBlockInfo
(
pReader
->
innerReader
[
0
],
pDataBlockInfo
);
setBlockInfo
(
pReader
->
innerReader
[
0
],
rows
,
uid
,
pWindow
);
}
else
{
setBlockInfo
(
pReader
->
innerReader
[
1
],
pDataBlockInfo
);
setBlockInfo
(
pReader
->
innerReader
[
1
],
rows
,
uid
,
pWindow
);
}
}
else
{
setBlockInfo
(
pReader
,
pDataBlockInfo
);
setBlockInfo
(
pReader
,
rows
,
uid
,
pWindow
);
}
}
...
...
@@ -3840,7 +3839,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SDataBlk
*
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
int64_t
stime
=
taosGetTimestampUs
();
//
int64_t stime = taosGetTimestampUs();
SBlockLoadSuppInfo
*
pSup
=
&
pReader
->
suppInfo
;
...
...
@@ -3871,7 +3870,9 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
size_t
numOfCols
=
blockDataGetNumOfCols
(
pReader
->
pResBlock
);
int32_t
i
=
0
,
j
=
0
;
while
(
j
<
numOfCols
&&
i
<
taosArrayGetSize
(
pSup
->
pColAgg
))
{
size_t
size
=
taosArrayGetSize
(
pSup
->
pColAgg
);
while
(
j
<
numOfCols
&&
i
<
size
)
{
SColumnDataAgg
*
pAgg
=
taosArrayGet
(
pSup
->
pColAgg
,
i
);
if
(
pAgg
->
colId
==
pSup
->
colIds
[
j
])
{
if
(
IS_BSMA_ON
(
&
(
pReader
->
pSchema
->
columns
[
i
])))
{
...
...
@@ -3888,14 +3889,10 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
}
}
double
elapsed
=
(
taosGetTimestampUs
()
-
stime
)
/
1000
.
0
;
pReader
->
cost
.
smaLoadTime
+=
elapsed
;
pReader
->
cost
.
smaDataLoad
+=
1
;
*
pBlockStatis
=
pSup
->
plist
;
tsdbDebug
(
"vgId:%d, succeed to load block SMA for uid %"
PRIu64
", elapsed time:%.2f ms, %s"
,
0
,
pFBlock
->
uid
,
elapsed
,
pReader
->
idStr
);
tsdbDebug
(
"vgId:%d, succeed to load block SMA for uid %"
PRIu64
", %s"
,
0
,
pFBlock
->
uid
,
pReader
->
idStr
);
return
code
;
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
7a532284
...
...
@@ -921,7 +921,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
SColMatchInfo
*
pColMatchInfo
,
SFilterInfo
*
pFilterInfo
);
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
SSDataBlock
*
pBlock
,
int32_t
rows
,
const
char
*
idStr
);
void
cleanupAggSup
(
SAggSupporter
*
pAggSup
);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
7a532284
...
...
@@ -152,25 +152,27 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo
->
indexOfBufferedRes
=
0
;
}
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
if
(
pInfo
->
indexOfBufferedRes
<
pInfo
->
pBufferredRes
->
info
.
rows
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
);
++
i
)
{
SColMatchItem
*
pMatchInfo
=
taosArrayGet
(
pInfo
->
matchInfo
.
pList
,
i
);
int32_t
slotId
=
pMatchInfo
->
dstSlotId
;
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pInfo
->
pBufferredRes
->
pDataBlock
,
slotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
p
Info
->
p
Res
->
pDataBlock
,
slotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pRes
->
pDataBlock
,
slotId
);
char
*
p
=
colDataGetData
(
pSrc
,
pInfo
->
indexOfBufferedRes
);
bool
isNull
=
colDataIsNull_s
(
pSrc
,
pInfo
->
indexOfBufferedRes
);
colDataAppend
(
pDst
,
0
,
p
,
isNull
);
}
p
Info
->
p
Res
->
info
.
uid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pInfo
->
pUidList
,
pInfo
->
indexOfBufferedRes
);
p
Info
->
p
Res
->
info
.
rows
=
1
;
pRes
->
info
.
uid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pInfo
->
pUidList
,
pInfo
->
indexOfBufferedRes
);
pRes
->
info
.
rows
=
1
;
if
(
pInfo
->
pseudoExprSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pInfo
->
pseudoExprSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
p
Info
->
pRe
s
,
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
p
Res
,
pRes
->
info
.
row
s
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
...
...
@@ -178,10 +180,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
}
pInfo
->
pRes
->
info
.
groupId
=
getTableGroupId
(
pTableList
,
pInfo
->
pRes
->
info
.
uid
);
pRes
->
info
.
groupId
=
getTableGroupId
(
pTableList
,
pRes
->
info
.
uid
);
pInfo
->
indexOfBufferedRes
+=
1
;
return
p
Info
->
p
Res
;
return
pRes
;
}
else
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
...
...
@@ -221,7 +222,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
ASSERT
((
pInfo
->
retrieveType
&
CACHESCAN_RETRIEVE_LAST_ROW
)
==
CACHESCAN_RETRIEVE_LAST_ROW
);
pInfo
->
pRes
->
info
.
uid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pInfo
->
pUidList
,
0
);
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pInfo
->
pRes
,
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
7a532284
...
...
@@ -775,12 +775,67 @@ end:
return
code
;
}
static
int32_t
nameComparFn
(
const
void
*
p1
,
const
void
*
p2
)
{
const
char
*
pName1
=
*
(
const
char
**
)
p1
;
const
char
*
pName2
=
*
(
const
char
**
)
p2
;
int32_t
ret
=
strcmp
(
pName1
,
pName2
);
if
(
ret
==
0
)
{
return
0
;
}
else
{
return
(
ret
>
0
)
?
1
:
-
1
;
}
}
static
SArray
*
getTableNameList
(
const
SNodeListNode
*
pList
)
{
int32_t
len
=
LIST_LENGTH
(
pList
->
pNodeList
);
SListCell
*
cell
=
pList
->
pNodeList
->
pHead
;
SArray
*
pTbList
=
taosArrayInit
(
len
,
POINTER_BYTES
);
for
(
int
i
=
0
;
i
<
pList
->
pNodeList
->
length
;
i
++
)
{
SValueNode
*
valueNode
=
(
SValueNode
*
)
cell
->
pNode
;
if
(
!
IS_VAR_DATA_TYPE
(
valueNode
->
node
.
resType
.
type
))
{
terrno
=
TSDB_CODE_INVALID_PARA
;
taosArrayDestroy
(
pTbList
);
return
NULL
;
}
char
*
name
=
varDataVal
(
valueNode
->
datum
.
p
);
taosArrayPush
(
pTbList
,
&
name
);
cell
=
cell
->
pNext
;
}
size_t
numOfTables
=
taosArrayGetSize
(
pTbList
);
// order the name
taosArraySort
(
pTbList
,
nameComparFn
);
// remove the duplicates
SArray
*
pNewList
=
taosArrayInit
(
taosArrayGetSize
(
pTbList
),
sizeof
(
void
*
));
taosArrayPush
(
pNewList
,
taosArrayGet
(
pTbList
,
0
));
for
(
int32_t
i
=
1
;
i
<
numOfTables
;
++
i
)
{
char
**
name
=
taosArrayGetLast
(
pNewList
);
char
**
nameInOldList
=
taosArrayGet
(
pTbList
,
i
);
if
(
strcmp
(
*
name
,
*
nameInOldList
)
==
0
)
{
continue
;
}
taosArrayPush
(
pNewList
,
nameInOldList
);
}
taosArrayDestroy
(
pTbList
);
return
pNewList
;
}
static
int
tableUidCompare
(
const
void
*
a
,
const
void
*
b
)
{
int64_t
u1
=
*
(
uint64_t
*
)
a
;
int64_t
u2
=
*
(
uint64_t
*
)
b
;
uint64_t
u1
=
*
(
uint64_t
*
)
a
;
uint64_t
u2
=
*
(
uint64_t
*
)
b
;
if
(
u1
==
u2
)
{
return
0
;
}
return
u1
<
u2
?
-
1
:
1
;
}
...
...
@@ -803,7 +858,9 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
SNodeList
*
pList
=
(
SNodeList
*
)
pNode
->
pParameterList
;
int32_t
len
=
LIST_LENGTH
(
pList
);
if
(
len
<=
0
)
return
ret
;
if
(
len
<=
0
)
{
return
ret
;
}
SListCell
*
cell
=
pList
->
pHead
;
for
(
int
i
=
0
;
i
<
len
;
i
++
)
{
...
...
@@ -814,6 +871,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
}
cell
=
cell
->
pNext
;
}
taosArraySort
(
list
,
tableUidCompare
);
taosArrayRemoveDuplicate
(
list
,
tableUidCompare
,
NULL
);
...
...
@@ -821,6 +879,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
ret
=
metaGetTableTagsByUids
(
metaHandle
,
suid
,
list
,
tags
);
removeInvalidTable
(
list
,
tags
);
}
return
ret
;
}
...
...
@@ -844,59 +903,6 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) {
return
0
;
}
static
int32_t
nameComparFn
(
const
void
*
p1
,
const
void
*
p2
)
{
const
char
*
pName1
=
*
(
const
char
**
)
p1
;
const
char
*
pName2
=
*
(
const
char
**
)
p2
;
int32_t
ret
=
strcmp
(
pName1
,
pName2
);
if
(
ret
==
0
)
{
return
0
;
}
else
{
return
(
ret
>
0
)
?
1
:
-
1
;
}
}
static
SArray
*
getTableNameList
(
const
SNodeListNode
*
pList
)
{
int32_t
len
=
LIST_LENGTH
(
pList
->
pNodeList
);
SListCell
*
cell
=
pList
->
pNodeList
->
pHead
;
SArray
*
pTbList
=
taosArrayInit
(
len
,
POINTER_BYTES
);
for
(
int
i
=
0
;
i
<
pList
->
pNodeList
->
length
;
i
++
)
{
SValueNode
*
valueNode
=
(
SValueNode
*
)
cell
->
pNode
;
if
(
!
IS_VAR_DATA_TYPE
(
valueNode
->
node
.
resType
.
type
))
{
terrno
=
TSDB_CODE_INVALID_PARA
;
taosArrayDestroy
(
pTbList
);
return
NULL
;
}
char
*
name
=
varDataVal
(
valueNode
->
datum
.
p
);
taosArrayPush
(
pTbList
,
&
name
);
cell
=
cell
->
pNext
;
}
size_t
numOfTables
=
taosArrayGetSize
(
pTbList
);
// order the name
taosArraySort
(
pTbList
,
nameComparFn
);
// remove the duplicates
SArray
*
pNewList
=
taosArrayInit
(
taosArrayGetSize
(
pTbList
),
sizeof
(
void
*
));
taosArrayPush
(
pNewList
,
taosArrayGet
(
pTbList
,
0
));
for
(
int32_t
i
=
1
;
i
<
numOfTables
;
++
i
)
{
char
**
name
=
taosArrayGetLast
(
pNewList
);
char
**
nameInOldList
=
taosArrayGet
(
pTbList
,
i
);
if
(
strcmp
(
*
name
,
*
nameInOldList
)
==
0
)
{
continue
;
}
taosArrayPush
(
pNewList
,
nameInOldList
);
}
taosArrayDestroy
(
pTbList
);
return
pNewList
;
}
static
int32_t
optimizeTbnameInCondImpl
(
void
*
metaHandle
,
int64_t
suid
,
SArray
*
list
,
SNode
*
pTagCond
)
{
if
(
nodeType
(
pTagCond
)
!=
QUERY_NODE_OPERATOR
)
{
return
-
1
;
...
...
@@ -1757,7 +1763,7 @@ STableListInfo* tableListCreate() {
goto
_error
;
}
pListInfo
->
map
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BI
NARY
),
false
,
HASH_ENTRY_LOCK
);
pListInfo
->
map
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BI
GINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
pListInfo
->
map
==
NULL
)
{
goto
_error
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
7a532284
...
...
@@ -574,6 +574,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
// if the source equals to the destination, it is to create a new column as the result of scalar
// function or some operators.
bool
createNewColModel
=
(
pResult
==
pSrcBlock
);
if
(
createNewColModel
)
{
blockDataEnsureCapacity
(
pResult
,
pResult
->
info
.
rows
);
}
int32_t
numOfRows
=
0
;
...
...
@@ -623,6 +626,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t
startOffset
=
createNewColModel
?
0
:
pResult
->
info
.
rows
;
ASSERT
(
pResult
->
info
.
capacity
>
0
);
colDataMergeCol
(
pResColData
,
startOffset
,
(
int32_t
*
)
&
pResult
->
info
.
capacity
,
&
idata
,
dest
.
numOfRows
);
colDataDestroy
(
&
idata
);
...
...
@@ -844,57 +848,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
return
win
;
}
#if 0
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
STimeWindow w = {0};
TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
if (true) {
// getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
assert(w.ekey >= pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey) {
return true;
}
while (1) {
// getNextTimeWindow(pQueryAttr, &w);
if (w.skey > pBlockInfo->window.ekey) {
break;
}
assert(w.ekey > pBlockInfo->window.ekey);
if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
return true;
}
}
} else {
// getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
assert(w.skey <= pBlockInfo->window.ekey);
if (w.skey > pBlockInfo->window.skey) {
return true;
}
while (1) {
// getNextTimeWindow(pQueryAttr, &w);
if (w.ekey < pBlockInfo->window.skey) {
break;
}
assert(w.skey < pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
return true;
}
}
}
return false;
}
#endif
int32_t
loadDataBlockOnDemand
(
SExecTaskInfo
*
pTaskInfo
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
*
status
=
BLK_DATA_NOT_LOAD
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
7a532284
...
...
@@ -331,7 +331,8 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
}
}
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableScanInfo
->
matchInfo
.
pList
);
++
i
)
{
size_t
num
=
taosArrayGetSize
(
pTableScanInfo
->
matchInfo
.
pList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SColMatchItem
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
matchInfo
.
pList
,
i
);
if
(
!
pColMatchInfo
->
needOutput
)
{
continue
;
...
...
@@ -343,11 +344,11 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
return
true
;
}
static
void
doSetTagColumnData
(
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
static
void
doSetTagColumnData
(
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
rows
)
{
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pTableScanInfo
->
pseudoSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
...
...
@@ -382,6 +383,17 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
}
}
static
void
ensureBlockCapacity
(
SSDataBlock
*
pBlock
,
int32_t
capacity
)
{
// keep the value of rows temporarily
int32_t
rows
=
pBlock
->
info
.
rows
;
pBlock
->
info
.
rows
=
0
;
blockDataEnsureCapacity
(
pBlock
,
capacity
);
// restore the rows number
pBlock
->
info
.
rows
=
rows
;
}
static
int32_t
loadDataBlock
(
SOperatorInfo
*
pOperator
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -413,7 +425,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
qDebug
(
"%s data block skipped, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
ensureBlockCapacity
(
pBlock
,
pBlock
->
info
.
rows
);
}
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
1
);
pCost
->
skipBlocks
+=
1
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_STATIS_LOAD
)
{
...
...
@@ -423,7 +438,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if
(
success
)
{
// failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug
(
"%s data block SMA loaded, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
ensureBlockCapacity
(
pBlock
,
pBlock
->
info
.
rows
);
}
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
1
);
return
TSDB_CODE_SUCCESS
;
}
else
{
qDebug
(
"%s failed to load SMA, since not all columns have SMA"
,
GET_TASKID
(
pTaskInfo
));
...
...
@@ -472,8 +491,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
return
terrno
;
}
ensureBlockCapacity
(
pBlock
,
pBlock
->
info
.
rows
);
relocateColumnData
(
pBlock
,
pTableScanInfo
->
matchInfo
.
pList
,
pCols
,
true
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
pBlock
->
info
.
rows
);
// restore the previous value
pCost
->
totalRows
-=
pBlock
->
info
.
rows
;
...
...
@@ -513,27 +533,30 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
}
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
)
{
SSDataBlock
*
pBlock
,
int32_t
rows
,
const
char
*
idStr
)
{
// currently only the tbname pseudo column
if
(
numOfPseudoExpr
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
// backup the rows
int32_t
backupRows
=
pBlock
->
info
.
rows
;
pBlock
->
info
.
rows
=
rows
;
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pHandle
->
meta
,
0
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
pBlock
->
info
.
uid
);
metaReaderReleaseLock
(
&
mr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
pBlock
->
info
.
uid
,
tstrerror
(
terrno
),
idStr
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
metaReaderReleaseLock
(
&
mr
);
for
(
int32_t
j
=
0
;
j
<
numOfPseudoExpr
;
++
j
)
{
SExprInfo
*
pExpr
=
&
pPseudoExpr
[
j
];
int32_t
dstSlotId
=
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
dstSlotId
=
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
dstSlotId
);
colInfoDataCleanup
(
pColInfoData
,
pBlock
->
info
.
rows
);
...
...
@@ -572,6 +595,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
}
metaReaderClear
(
&
mr
);
// restore the rows
pBlock
->
info
.
rows
=
backupRows
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -616,14 +642,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
pTableScanInfo
->
dataReader
,
&
binfo
);
SDataBlockInfo
*
pBInfo
=
&
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
pTableScanInfo
->
dataReader
,
&
pBInfo
->
rows
,
&
pBInfo
->
uid
,
&
pBInfo
->
window
);
binfo
.
capacity
=
binfo
.
rows
;
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
=
binfo
;
ASSERT
(
binfo
.
uid
!=
0
);
ASSERT
(
pBInfo
->
uid
!=
0
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableInfoList
,
pBlock
->
info
.
uid
);
uint32_t
status
=
0
;
...
...
@@ -1146,20 +1168,19 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
bool
hasBlock
=
tsdbNextDataBlock
(
pReader
);
if
(
hasBlock
)
{
SDataBlockInfo
binfo
=
{
0
};
tsdbRetrieveDataBlockInfo
(
pReader
,
&
binfo
);
SDataBlockInfo
*
pBInfo
=
&
pBlock
->
info
;
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
pReader
,
NULL
)
;
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
pReader
,
&
rows
,
&
pBInfo
->
uid
,
&
pBInfo
->
window
);
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
pReader
,
NULL
)
;
blockDataEnsureCapacity
(
pBlock
,
rows
)
;
pBlock
->
info
.
rows
=
rows
;
relocateColumnData
(
pBlock
,
pTableScanInfo
->
matchInfo
.
pList
,
pCols
,
true
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
rows
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableInfoList
,
binfo
.
uid
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableInfoList
,
pBInfo
->
uid
);
}
tsdbReaderClose
(
pReader
);
...
...
@@ -1181,12 +1202,6 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
static
uint64_t
getGroupIdByUid
(
SStreamScanInfo
*
pInfo
,
uint64_t
uid
)
{
return
getTableGroupId
(
pInfo
->
pTableScanOp
->
pTaskInfo
->
pTableInfoList
,
uid
);
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->pTableInfoList.map;
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
// if (groupId) {
// return *groupId;
// }
// return 0;
}
static
uint64_t
getGroupIdByData
(
SStreamScanInfo
*
pInfo
,
uint64_t
uid
,
TSKEY
ts
,
int64_t
maxVersion
)
{
...
...
@@ -1631,7 +1646,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
pInfo
->
pRes
->
info
.
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
blockDataFreeRes
((
SSDataBlock
*
)
pBlock
);
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
...
...
@@ -1641,11 +1656,11 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
if
(
filter
)
{
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
,
NULL
);
}
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
blockDataFreeRes
((
SSDataBlock
*
)
pBlock
);
calBlockTbName
(
&
pInfo
->
tbnameCalSup
,
pInfo
->
pRes
);
return
0
;
}
...
...
@@ -2142,7 +2157,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
tsdbRetrieveDataBlockInfo
(
pInfo
->
dataReader
,
&
pBlock
->
info
);
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
pInfo
->
dataReader
,
&
rows
,
&
pBlock
->
info
.
uid
,
&
pBlock
->
info
.
window
);
pBlock
->
info
.
rows
=
rows
;
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
pInfo
->
dataReader
,
NULL
);
pBlock
->
pDataBlock
=
pCols
;
...
...
@@ -4359,7 +4376,7 @@ static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeS
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pseudoSup
.
pExprInfo
,
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
pBlock
->
info
.
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -4476,7 +4493,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pseudoSup
.
pExprInfo
,
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
pBlock
->
info
.
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -4540,14 +4557,11 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
}
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
binfo
);
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
.
type
=
binfo
.
type
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
rows
,
&
pBlock
->
info
.
uid
,
&
pBlock
->
info
.
window
);
blockDataEnsureCapacity
(
pBlock
,
rows
);
pBlock
->
info
.
rows
=
rows
;
if
(
pQueryCond
->
order
==
TSDB_ORDER_ASC
)
{
pQueryCond
->
twindows
.
skey
=
pBlock
->
info
.
window
.
ekey
+
1
;
...
...
@@ -4603,14 +4617,11 @@ static SSDataBlock* getTableDataBlock2(void* param) {
}
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
binfo
);
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
.
type
=
binfo
.
type
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
rows
,
&
pBlock
->
info
.
uid
,
&
pBlock
->
info
.
window
);
blockDataEnsureCapacity
(
pBlock
,
rows
);
pBlock
->
info
.
rows
=
rows
;
uint32_t
status
=
0
;
int32_t
code
=
loadDataBlockFromOneTable2
(
pOperator
,
pTableScanInfo
,
pBlock
,
&
status
);
...
...
@@ -4656,14 +4667,11 @@ static SSDataBlock* getTableDataBlock(void* param) {
}
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
binfo
);
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
.
type
=
binfo
.
type
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
rows
,
&
pBlock
->
info
.
uid
,
&
pBlock
->
info
.
window
);
blockDataEnsureCapacity
(
pBlock
,
rows
);
pBlock
->
info
.
rows
=
rows
;
uint32_t
status
=
0
;
int32_t
code
=
loadDataBlockFromOneTable
(
pOperator
,
pTableScanInfo
,
readerIdx
,
pBlock
,
&
status
);
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
7a532284
...
...
@@ -497,7 +497,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return
true
;
}
static
FORCE_INLINE
int32_t
getNumOfElems
(
SqlFunctionCtx
*
pCtx
)
{
static
int32_t
getNumOfElems
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
/*
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录